diff --git a/example.js b/example.js new file mode 100644 index 00000000..ca292f29 --- /dev/null +++ b/example.js @@ -0,0 +1,99 @@ +'use strict' + +const Ws = require('ws') +const ws = new Ws('ws://localhost:45000') + +ws.on('open', () => { + console.log('open') + + msgs.forEach((msg, i) => { + ws.send( + JSON.stringify(msg) + ) + console.log('sent', i) + }) +}) + +ws.on('message', (data) => { + // console.log(data) +}) + +const msgs = [ + [ + 'subscribe', + 'bitfinex', + [ + 'ticker', + { + exchange: 'bitfinex', + quote: 'USD', + base: 'ADA', + wsID: 'tADAUSD', + restID: 'tADAUSD', + uiID: 'ADA/USD', + contexts: [ + 'e', + 'm' + ] + } + ] + ], + [ + 'get.candles', + 'bitfinex', + { + exchange: 'bitfinex', + quote: 'USD', + base: 'ADA', + wsID: 'tADAUSD', + restID: 'tADAUSD', + uiID: 'ADA/USD', + contexts: [ + 'e', + 'm' + ] + }, + '1m', + 1605122077205, + 1605182077205 + ], + [ + 'subscribe', + 'bitfinex', + [ + 'candles', + '1m', + { + exchange: 'bitfinex', + quote: 'USD', + base: 'ADA', + wsID: 'tADAUSD', + restID: 'tADAUSD', + uiID: 'ADA/USD', + contexts: [ + 'e', + 'm' + ] + } + ] + ], + [ + 'subscribe', + 'bitfinex', + [ + 'trades', + { + exchange: 'bitfinex', + quote: 'USD', + base: 'ADA', + wsID: 'tADAUSD', + restID: 'tADAUSD', + uiID: 'ADA/USD', + contexts: [ + 'e', + 'm' + ] + } + ] + ] +] diff --git a/lib/exchange_clients/bitfinex/index.js b/lib/exchange_clients/bitfinex/index.js index cb8df0c6..eb5c8369 100644 --- a/lib/exchange_clients/bitfinex/index.js +++ b/lib/exchange_clients/bitfinex/index.js @@ -16,7 +16,7 @@ const getMarkets = require('./get_markets') const unsubscribe = require('./unsubscribe') const subscribe = require('./subscribe') -class BitfinexEchangeConnection { +class BitfinexExchangeConnection { constructor (opts) { const { wsURL, restURL } = opts @@ -50,22 +50,80 @@ class BitfinexEchangeConnection { } } - reconnect () { - this.ws.reconnect() - } - openWS (args = {}) { const opts = { url: this.wsURL, - autoReconnect: true, - reconnectDelay: 10 * 1000, ...args } this.ws = new WS2Manager(opts, this.authArgs) - this.ws.on('message', (msg) => recvMessage(this, msg)) + this.ws.on('message', this.onWSMessage.bind(this)) this.ws.on('error', this.onWSError.bind(this)) this.ws.on('auth', this.onWSAuth.bind(this)) + this.ws.on('close', this.onWSClose.bind(this)) + this.ws.on('open', this.onWSOpen.bind(this)) + + this.reconnectTimeout = null + this.sendHeartbeatTimeout = null + this.firstConnect = true + } + + scheduleReconnect () { + this.reconnectTimeout = setTimeout(() => { + this.reconnect() + }, 10000) + } + + sendHeartbeat () { + clearTimeout(this.sendHeartbeatTimeout) + + this.sendHeartbeatTimeout = setTimeout(() => { + const socket = this.ws._sockets[0] + if (socket.ws._isOpen && !socket.ws._isClosing) { + socket.ws.send({ event: 'ping' }) + } + + this.sendHeartbeat() + }, 5000) + } + + reconnect () { + this.ws.reconnect() + } + + async onWSOpen () { + this.scheduleReconnect() + this.sendHeartbeat() + + if (this.firstConnect === false) { + this.resubscribe() + } + + this.firstConnect = false + } + + async resubscribe () { + if (!this.subscriptions) return + if (!this.subscriptions.length) return + + const unsubs = [] + for (const data of this.subscriptions) { + await this.unsubscribe(data) + unsubs.push(data) + } + + for (const data of unsubs) { + this.subscribe(data) + } + } + + onWSClose () {} + + onWSMessage (msg) { + clearTimeout(this.reconnectTimeout) + this.scheduleReconnect() + + recvMessage(this, msg) } onWSAuth () { @@ -73,19 +131,22 @@ class BitfinexEchangeConnection { } openSocket () { - if (this.ws) { - this.ws.openSocket() - } else { - debug('ws not initialized, cannot open socket') + if (!this.ws) { + return } + + this.ws.openSocket() + this.scheduleReconnect() } - close () { - if (this.ws) { - this.ws.close() - } else { - debug('ws not initialized, cannot close sockets') + async close () { + if (!this.ws) { + return } + + try { + await this.ws.close() + } catch (e) {} } on (event, handler) { @@ -166,7 +227,7 @@ class BitfinexEchangeConnection { } static async registerUIDefs (algoOrders, rest) { - const timeframes = BitfinexEchangeConnection.getCandleTimeFrames() + const timeframes = BitfinexExchangeConnection.getCandleTimeFrames() const aoUIDefs = algoOrders.filter((ao) => { const { meta = {} } = ao const { getUIDef } = meta @@ -205,6 +266,6 @@ class BitfinexEchangeConnection { } } -BitfinexEchangeConnection.id = 'bitfinex' +BitfinexExchangeConnection.id = 'bitfinex' -module.exports = BitfinexEchangeConnection +module.exports = BitfinexExchangeConnection diff --git a/lib/exchange_clients/bitfinex/subscribe.js b/lib/exchange_clients/bitfinex/subscribe.js index 1cad3920..480e35fb 100644 --- a/lib/exchange_clients/bitfinex/subscribe.js +++ b/lib/exchange_clients/bitfinex/subscribe.js @@ -13,6 +13,8 @@ module.exports = async (exa, channelData) => { return subs[cdKey] // return existing chanId } + d('subscribing to channel %j', channelData) + const [type] = channelData const filter = chanDataToSubscribePacket(channelData) @@ -20,8 +22,6 @@ module.exports = async (exa, channelData) => { throw new Error(`unknown channel type: ${type}`) } - d('subscribing to channel %j', channelData) - const p = new Promise((resolve) => { pendingSubs[cdKey] = [_last(channelData), resolve] }) diff --git a/lib/exchange_clients/bitfinex/unsubscribe.js b/lib/exchange_clients/bitfinex/unsubscribe.js index b5818a4b..d75e6b55 100644 --- a/lib/exchange_clients/bitfinex/unsubscribe.js +++ b/lib/exchange_clients/bitfinex/unsubscribe.js @@ -1,7 +1,6 @@ 'use strict' const chanDataToKey = require('../../util/chan_data_to_key') -const chanDataToSubscribePacket = require('./util/chan_data_to_subscribe_packet') module.exports = (exa, channelData) => { const { d, ws, subs, channelMap } = exa @@ -9,32 +8,31 @@ module.exports = (exa, channelData) => { const cdKey = chanDataToKey(channelData) const chanID = subs[cdKey] - if (!chanID) { - throw new Error('tried to unsub from unknown channel') - } - d('unsubscribing from channel %s', chanID) - const filter = chanDataToSubscribePacket(channelData) + if (!chanID) { + d('error: channel %s not found', chanID) + return + } switch (channelData[0]) { case 'candles': { - ws.managedUnsubscribe('candles', filter) + ws.unsubscribe(chanID) break } case 'ticker': { - ws.managedUnsubscribe('ticker', filter) + ws.unsubscribe(chanID) break } case 'trades': { - ws.managedUnsubscribe('trades', filter) + ws.unsubscribe(chanID) break } case 'book': { - ws.managedUnsubscribe('book', filter[0]) + ws.unsubscribe(chanID) break } diff --git a/lib/ws_servers/api/handlers/on_subscribe.js b/lib/ws_servers/api/handlers/on_subscribe.js index d5c2f6ca..a737876c 100644 --- a/lib/ws_servers/api/handlers/on_subscribe.js +++ b/lib/ws_servers/api/handlers/on_subscribe.js @@ -17,10 +17,8 @@ module.exports = async (server, ws, msg) => { return } - if (!ws.subscriptions) ws.subscriptions = {} - if (!ws.subscriptions[exID]) ws.subscriptions[exID] = [] - - ws.subscriptions[exID].push(channelData) + if (!exchangeClient.subscriptions) exchangeClient.subscriptions = [] + exchangeClient.subscriptions.push(channelData) const chanID = await exchangeClient.subscribe(channelData) send(ws, ['subscribed', exID, chanID, channelData]) diff --git a/lib/ws_servers/api/handlers/on_unsubscribe.js b/lib/ws_servers/api/handlers/on_unsubscribe.js index 559c0086..012f95c1 100644 --- a/lib/ws_servers/api/handlers/on_unsubscribe.js +++ b/lib/ws_servers/api/handlers/on_unsubscribe.js @@ -19,14 +19,14 @@ module.exports = (server, ws, msg) => { const { d } = server - if (!(ws.subscriptions || {})[exID]) { + if (!exchangeClient.subscriptions) { return d( 'warning: client %s tried to unsub from channel %j when not on exchange %s', ws.id, channelData, exID ) } - const subIndex = ws.subscriptions[exID].findIndex(cd => ( + const subIndex = exchangeClient.subscriptions.findIndex(cd => ( _isEqual(cd, channelData) )) @@ -38,5 +38,5 @@ module.exports = (server, ws, msg) => { } exchangeClient.unsubscribe(channelData) - ws.subscriptions[exID].splice(subIndex, 1) + exchangeClient.subscriptions.splice(subIndex, 1) } diff --git a/lib/ws_servers/api/index.js b/lib/ws_servers/api/index.js index 8ecb4a81..67ceeb7a 100644 --- a/lib/ws_servers/api/index.js +++ b/lib/ws_servers/api/index.js @@ -131,15 +131,17 @@ module.exports = class APIWSServer extends WSServer { ws.aoc.close() } - const subExchanges = Object.keys(ws.subscriptions || {}) - - subExchanges.forEach((exID) => { - ws.subscriptions[exID].forEach((channelData) => { - this.exchangeClient.unsubscribe(channelData) - }) - }) - ws.clients = {} ws.user = null + + let subs = this.exchangeClient.subscriptions + if (!subs) return + + subs = subs.reduce((acc, channelData) => { + console.log('unsub ', channelData) + this.exchangeClient.unsubscribe(channelData) + + return acc + }, []) } }