Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: reconnect / resubscribe on network disconnect #56

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
'use strict'

const Ws = require('ws')
const ws = new Ws('ws://localhost:45000')

ws.on('open', () => {
console.log('open')

msgs.forEach((msg, i) => {
ws.send(
JSON.stringify(msg)
)
console.log('sent', i)
})
})

ws.on('message', (data) => {
// console.log(data)
})

const msgs = [
[
'subscribe',
'bitfinex',
[
'ticker',
{
exchange: 'bitfinex',
quote: 'USD',
base: 'ADA',
wsID: 'tADAUSD',
restID: 'tADAUSD',
uiID: 'ADA/USD',
contexts: [
'e',
'm'
]
}
]
],
[
'get.candles',
'bitfinex',
{
exchange: 'bitfinex',
quote: 'USD',
base: 'ADA',
wsID: 'tADAUSD',
restID: 'tADAUSD',
uiID: 'ADA/USD',
contexts: [
'e',
'm'
]
},
'1m',
1605122077205,
1605182077205
],
[
'subscribe',
'bitfinex',
[
'candles',
'1m',
{
exchange: 'bitfinex',
quote: 'USD',
base: 'ADA',
wsID: 'tADAUSD',
restID: 'tADAUSD',
uiID: 'ADA/USD',
contexts: [
'e',
'm'
]
}
]
],
[
'subscribe',
'bitfinex',
[
'trades',
{
exchange: 'bitfinex',
quote: 'USD',
base: 'ADA',
wsID: 'tADAUSD',
restID: 'tADAUSD',
uiID: 'ADA/USD',
contexts: [
'e',
'm'
]
}
]
]
]
101 changes: 81 additions & 20 deletions lib/exchange_clients/bitfinex/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const getMarkets = require('./get_markets')
const unsubscribe = require('./unsubscribe')
const subscribe = require('./subscribe')

class BitfinexEchangeConnection {
class BitfinexExchangeConnection {
constructor (opts) {
const { wsURL, restURL } = opts

Expand Down Expand Up @@ -50,42 +50,103 @@ class BitfinexEchangeConnection {
}
}

reconnect () {
this.ws.reconnect()
}

openWS (args = {}) {
const opts = {
url: this.wsURL,
autoReconnect: true,
reconnectDelay: 10 * 1000,
...args
}

this.ws = new WS2Manager(opts, this.authArgs)
this.ws.on('message', (msg) => recvMessage(this, msg))
this.ws.on('message', this.onWSMessage.bind(this))
this.ws.on('error', this.onWSError.bind(this))
this.ws.on('auth', this.onWSAuth.bind(this))
this.ws.on('close', this.onWSClose.bind(this))
this.ws.on('open', this.onWSOpen.bind(this))

this.reconnectTimeout = null
this.sendHeartbeatTimeout = null
this.firstConnect = true
}

scheduleReconnect () {
this.reconnectTimeout = setTimeout(() => {
this.reconnect()
}, 10000)
}

sendHeartbeat () {
clearTimeout(this.sendHeartbeatTimeout)

this.sendHeartbeatTimeout = setTimeout(() => {
const socket = this.ws._sockets[0]
if (socket.ws._isOpen && !socket.ws._isClosing) {
socket.ws.send({ event: 'ping' })
}

this.sendHeartbeat()
}, 5000)
}

reconnect () {
this.ws.reconnect()
}

async onWSOpen () {
this.scheduleReconnect()
this.sendHeartbeat()

if (this.firstConnect === false) {
this.resubscribe()
}

this.firstConnect = false
}

async resubscribe () {
if (!this.subscriptions) return
if (!this.subscriptions.length) return

const unsubs = []
for (const data of this.subscriptions) {
await this.unsubscribe(data)
unsubs.push(data)
}

for (const data of unsubs) {
this.subscribe(data)
}
}

onWSClose () {}

onWSMessage (msg) {
clearTimeout(this.reconnectTimeout)
this.scheduleReconnect()

recvMessage(this, msg)
}

onWSAuth () {
this.channelMap['0'] = { channel: 'auth' }
}

openSocket () {
if (this.ws) {
this.ws.openSocket()
} else {
debug('ws not initialized, cannot open socket')
if (!this.ws) {
return
}

this.ws.openSocket()
this.scheduleReconnect()
}

close () {
if (this.ws) {
this.ws.close()
} else {
debug('ws not initialized, cannot close sockets')
async close () {
if (!this.ws) {
return
}

try {
await this.ws.close()
} catch (e) {}
}

on (event, handler) {
Expand Down Expand Up @@ -166,7 +227,7 @@ class BitfinexEchangeConnection {
}

static async registerUIDefs (algoOrders, rest) {
const timeframes = BitfinexEchangeConnection.getCandleTimeFrames()
const timeframes = BitfinexExchangeConnection.getCandleTimeFrames()
const aoUIDefs = algoOrders.filter((ao) => {
const { meta = {} } = ao
const { getUIDef } = meta
Expand Down Expand Up @@ -205,6 +266,6 @@ class BitfinexEchangeConnection {
}
}

BitfinexEchangeConnection.id = 'bitfinex'
BitfinexExchangeConnection.id = 'bitfinex'

module.exports = BitfinexEchangeConnection
module.exports = BitfinexExchangeConnection
4 changes: 2 additions & 2 deletions lib/exchange_clients/bitfinex/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ module.exports = async (exa, channelData) => {
return subs[cdKey] // return existing chanId
}

d('subscribing to channel %j', channelData)

const [type] = channelData
const filter = chanDataToSubscribePacket(channelData)

if (filter === null) {
throw new Error(`unknown channel type: ${type}`)
}

d('subscribing to channel %j', channelData)

const p = new Promise((resolve) => {
pendingSubs[cdKey] = [_last(channelData), resolve]
})
Expand Down
18 changes: 8 additions & 10 deletions lib/exchange_clients/bitfinex/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
'use strict'

const chanDataToKey = require('../../util/chan_data_to_key')
const chanDataToSubscribePacket = require('./util/chan_data_to_subscribe_packet')

module.exports = (exa, channelData) => {
const { d, ws, subs, channelMap } = exa

const cdKey = chanDataToKey(channelData)
const chanID = subs[cdKey]

if (!chanID) {
throw new Error('tried to unsub from unknown channel')
}

d('unsubscribing from channel %s', chanID)

const filter = chanDataToSubscribePacket(channelData)
if (!chanID) {
d('error: channel %s not found', chanID)
return
}

switch (channelData[0]) {
case 'candles': {
ws.managedUnsubscribe('candles', filter)
ws.unsubscribe(chanID)
break
}

case 'ticker': {
ws.managedUnsubscribe('ticker', filter)
ws.unsubscribe(chanID)
break
}

case 'trades': {
ws.managedUnsubscribe('trades', filter)
ws.unsubscribe(chanID)
break
}

case 'book': {
ws.managedUnsubscribe('book', filter[0])
ws.unsubscribe(chanID)
break
}

Expand Down
6 changes: 2 additions & 4 deletions lib/ws_servers/api/handlers/on_subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ module.exports = async (server, ws, msg) => {
return
}

if (!ws.subscriptions) ws.subscriptions = {}
if (!ws.subscriptions[exID]) ws.subscriptions[exID] = []

ws.subscriptions[exID].push(channelData)
if (!exchangeClient.subscriptions) exchangeClient.subscriptions = []
exchangeClient.subscriptions.push(channelData)

const chanID = await exchangeClient.subscribe(channelData)
send(ws, ['subscribed', exID, chanID, channelData])
Expand Down
6 changes: 3 additions & 3 deletions lib/ws_servers/api/handlers/on_unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ module.exports = (server, ws, msg) => {

const { d } = server

if (!(ws.subscriptions || {})[exID]) {
if (!exchangeClient.subscriptions) {
return d(
'warning: client %s tried to unsub from channel %j when not on exchange %s',
ws.id, channelData, exID
)
}

const subIndex = ws.subscriptions[exID].findIndex(cd => (
const subIndex = exchangeClient.subscriptions.findIndex(cd => (
_isEqual(cd, channelData)
))

Expand All @@ -38,5 +38,5 @@ module.exports = (server, ws, msg) => {
}

exchangeClient.unsubscribe(channelData)
ws.subscriptions[exID].splice(subIndex, 1)
exchangeClient.subscriptions.splice(subIndex, 1)
}
18 changes: 10 additions & 8 deletions lib/ws_servers/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,17 @@ module.exports = class APIWSServer extends WSServer {
ws.aoc.close()
}

const subExchanges = Object.keys(ws.subscriptions || {})

subExchanges.forEach((exID) => {
ws.subscriptions[exID].forEach((channelData) => {
this.exchangeClient.unsubscribe(channelData)
})
})

ws.clients = {}
ws.user = null

let subs = this.exchangeClient.subscriptions
if (!subs) return

subs = subs.reduce((acc, channelData) => {
console.log('unsub ', channelData)
this.exchangeClient.unsubscribe(channelData)

return acc
}, [])
}
}