diff --git a/client/cmd/testbinance/main.go b/client/cmd/testbinance/main.go index fa977a6f77..d5cc39e79a 100644 --- a/client/cmd/testbinance/main.go +++ b/client/cmd/testbinance/main.go @@ -51,6 +51,7 @@ var ( walkingSpeedAdj float64 gapRange float64 + flappyWS bool xcInfo = &bntypes.ExchangeInfo{ Timezone: "UTC", @@ -155,6 +156,7 @@ func main() { flag.Float64Var(&gapRange, "gaprange", 0.04, "a ratio of how much the gap can vary. default is 0.04 => 4%") flag.BoolVar(&logDebug, "debug", false, "use debug logging") flag.BoolVar(&logTrace, "trace", false, "use trace logging") + flag.BoolVar(&flappyWS, "flappyws", false, "periodically drop websocket clients and delete subscriptions") flag.Parse() switch { @@ -455,6 +457,35 @@ func (f *fakeBinance) run(ctx context.Context) { } }() + if flappyWS { + go func() { + tick := func() <-chan time.Time { + const minDelay = time.Minute + const delayRange = time.Minute * 5 + return time.After(minDelay + time.Duration(rand.Float64()*float64(delayRange))) + } + for { + select { + case <-tick(): + f.marketsMtx.Lock() + for addr, sub := range f.marketSubscribers { + sub.Disconnect() + delete(f.marketSubscribers, addr) + } + f.marketsMtx.Unlock() + f.accountSubscribersMtx.Lock() + for apiKey, sub := range f.accountSubscribers { + sub.Disconnect() + delete(f.accountSubscribers, apiKey) + } + f.accountSubscribersMtx.Unlock() + case <-ctx.Done(): + return + } + } + }() + } + f.srv.Run(ctx) } @@ -507,6 +538,11 @@ func (f *fakeBinance) handleAccountSubscription(w http.ResponseWriter, r *http.R }() } +type listSubsResp struct { + ID uint64 `json:"id"` + Result []string `json:"result"` +} + func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) { streamsStr := r.URL.Query().Get("streams") if streamsStr == "" { @@ -556,6 +592,25 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) f.cleanMarkets() } + listSubscriptions := func(id uint64) { + f.marketsMtx.Lock() + defer f.marketsMtx.Unlock() + var streams []string + for mktID := range cl.markets { + streams = append(streams, fmt.Sprintf("%s@depth", mktID)) + } + resp := listSubsResp{ + ID: id, + Result: streams, + } + b, err := json.Marshal(resp) + if err != nil { + log.Errorf("LIST_SUBSCRIBE marshal error: %v", err) + } + cl.WSLink.SendRaw(b) + f.cleanMarkets() + } + conn, cm := f.newWSLink(w, r, func(b []byte) { var req bntypes.StreamSubscription if err := json.Unmarshal(b, &req); err != nil { @@ -567,6 +622,8 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) subscribe(req.Params) case "UNSUBSCRIBE": unsubscribe(req.Params) + case "LIST_SUBSCRIPTIONS": + listSubscriptions(req.ID) } }) if conn == nil { diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 3e8fcc7967..c91664148c 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -69,6 +69,8 @@ type binanceOrderBook struct { baseConversionFactor uint64 quoteConversionFactor uint64 log dex.Logger + + connectedChan chan bool } func newBinanceOrderBook( @@ -86,6 +88,7 @@ func newBinanceOrderBook( quoteConversionFactor: quoteConversionFactor, log: log, getSnapshot: getSnapshot, + connectedChan: make(chan bool), } } @@ -288,6 +291,13 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error if retry != nil { // don't hammer continue } + case connected := <-b.connectedChan: + if !connected { + b.log.Debugf("Unsyncing %s orderbook due to disconnect.", b.mktID, retryFrequency) + desync() + retry = nil + continue + } case <-ctx.Done(): return } @@ -296,7 +306,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error b.log.Infof("Synced %s orderbook", b.mktID) retry = nil } else { - b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) + b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency) desync() // Clears the syncCache retry = time.After(retryFrequency) } @@ -469,6 +479,9 @@ type binance struct { marketStreamMtx sync.RWMutex marketStream comms.WsConn + marketStreamRespsMtx sync.Mutex + marketStreamResps map[uint64]chan<- []string + booksMtx sync.RWMutex books map[string]*binanceOrderBook @@ -476,6 +489,9 @@ type binance struct { tradeInfo map[string]*tradeInfo tradeUpdaters map[int]chan *Trade tradeUpdateCounter int + + listenKey atomic.Value // string + reconnectChan chan struct{} } var _ CEX = (*binance)(nil) @@ -521,9 +537,12 @@ func newBinance(cfg *CEXConfig, binanceUS bool) *binance { tradeInfo: make(map[string]*tradeInfo), tradeUpdaters: make(map[int]chan *Trade), tradeIDNoncePrefix: encode.RandomBytes(10), + reconnectChan: make(chan struct{}), + marketStreamResps: make(map[uint64]chan<- []string), } bnc.markets.Store(make(map[string]*bntypes.Market)) + bnc.listenKey.Store("") return bnc } @@ -1358,6 +1377,13 @@ func (bnc *binance) handleExecutionReport(update *bntypes.StreamUpdate) { } } +func (bnc *binance) handleListenKeyExpired(update *bntypes.StreamUpdate) { + bnc.log.Debugf("Received listenKeyExpired: %+v", update) + expireTime := time.Unix(update.E/1000, 0) + bnc.log.Errorf("Listen key %v expired at %v. Attempting to reconnect and get a new one.", update.ListenKey, expireTime) + bnc.reconnectChan <- struct{}{} +} + func (bnc *binance) handleUserDataStreamUpdate(b []byte) { bnc.log.Tracef("Received user data stream update: %s", string(b)) @@ -1372,19 +1398,23 @@ func (bnc *binance) handleUserDataStreamUpdate(b []byte) { bnc.handleOutboundAccountPosition(msg) case "executionReport": bnc.handleExecutionReport(msg) + case "listenKeyExpired": + bnc.handleListenKeyExpired(msg) } } func (bnc *binance) getListenID(ctx context.Context) (string, error) { var resp *bntypes.DataStreamKey - return resp.ListenKey, bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp) + if err := bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp); err != nil { + return "", err + } + bnc.listenKey.Store(resp.ListenKey) + return resp.ListenKey, nil } func (bnc *binance) getUserDataStream(ctx context.Context) (err error) { - var listenKey string - newConn := func() (*dex.ConnectionMaster, error) { - listenKey, err = bnc.getListenID(ctx) + listenKey, err := bnc.getListenID(ctx) if err != nil { return nil, err } @@ -1426,37 +1456,58 @@ func (bnc *binance) getUserDataStream(ctx context.Context) (err error) { keepAlive := time.NewTicker(time.Minute * 30) defer keepAlive.Stop() + retryKeepAlive := make(<-chan time.Time) + connected := true // do not keep alive on a failed connection + + doReconnect := func() { + if cm != nil { + cm.Disconnect() + } + cm, err = newConn() + if err != nil { + connected = false + bnc.log.Errorf("Error reconnecting: %v", err) + reconnect = time.After(time.Second * 30) + } else { + connected = true + reconnect = time.After(time.Hour * 12) + } + } + + doKeepAlive := func() { + if !connected { + bnc.log.Warn("Cannot keep binance connection alive because we are disconnected. Trying again in 10 seconds.") + retryKeepAlive = time.After(time.Second * 10) + return + } + q := make(url.Values) + q.Add("listenKey", bnc.listenKey.Load().(string)) + // Doing a PUT on a listenKey will extend its validity for 60 minutes. + req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false) + if err != nil { + bnc.log.Errorf("Error generating keep-alive request: %v. Trying again in 10 seconds.", err) + retryKeepAlive = time.After(time.Second * 10) + return + } + if err := requestInto(req, nil); err != nil { + bnc.log.Errorf("Error sending keep-alive request: %v. Trying again in 10 seconds", err) + retryKeepAlive = time.After(time.Second * 10) + return + } + bnc.log.Debug("Binance connection keep alive sent successfully.") + } + for { select { + case <-bnc.reconnectChan: + doReconnect() case <-reconnect: - if cm != nil { - cm.Disconnect() - } - cm, err = newConn() - if err != nil { - connected = false - bnc.log.Errorf("Error reconnecting: %v", err) - reconnect = time.After(time.Second * 30) - } else { - connected = true - reconnect = time.After(time.Hour * 12) - } + doReconnect() + case <-retryKeepAlive: + doKeepAlive() case <-keepAlive.C: - if !connected { - continue - } - q := make(url.Values) - q.Add("listenKey", listenKey) - // Doing a PUT on a listenKey will extend its validity for 60 minutes. - req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false) - if err != nil { - bnc.log.Errorf("Error generating keep-alive request: %v", err) - continue - } - if err := requestInto(req, nil); err != nil { - bnc.log.Errorf("Error sending keep-alive request: %v", err) - } + doKeepAlive() case <-ctx.Done(): return } @@ -1510,8 +1561,27 @@ func (bnc *binance) handleMarketDataNote(b []byte) { bnc.log.Errorf("Error unmarshaling book note: %v", err) return } - if note == nil || note.Data == nil { - bnc.log.Debugf("No data in market data update: %s", string(b)) + if note == nil { + bnc.log.Debugf("Market data update does not parse to a note: %s", string(b)) + return + } + + if note.Data == nil { + var waitingResp bool + bnc.marketStreamRespsMtx.Lock() + if ch, exists := bnc.marketStreamResps[note.ID]; exists { + waitingResp = true + timeout := time.After(time.Second * 5) + select { + case ch <- note.Result: + case <-timeout: + bnc.log.Errorf("Noone waiting for market stream result id %d", note.ID) + } + } + bnc.marketStreamRespsMtx.Unlock() + if !waitingResp { + bnc.log.Debugf("No data in market data update: %s", string(b)) + } return } @@ -1579,22 +1649,125 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b return nil } +func (bnc *binance) streams() []string { + bnc.booksMtx.RLock() + defer bnc.booksMtx.RUnlock() + streamNames := make([]string, 0, len(bnc.books)) + for mktID := range bnc.books { + streamNames = append(streamNames, marketDataStreamID(mktID)) + } + return streamNames +} + +// checkSubs will query binance for current market subscriptions and compare +// that to what subscriptions we should have. If there is a discrepancy a +// warning is logged and the market subbed or unsubbed. +func (bnc *binance) checkSubs(ctx context.Context) error { + bnc.marketStreamMtx.Lock() + defer bnc.marketStreamMtx.Unlock() + streams := bnc.streams() + if len(streams) == 0 { + return nil + } + + method := "LIST_SUBSCRIPTIONS" + id := atomic.AddUint64(&subscribeID, 1) + + resp := make(chan []string, 1) + bnc.marketStreamRespsMtx.Lock() + bnc.marketStreamResps[id] = resp + bnc.marketStreamRespsMtx.Unlock() + + defer func() { + bnc.marketStreamRespsMtx.Lock() + delete(bnc.marketStreamResps, id) + bnc.marketStreamRespsMtx.Unlock() + }() + + req := &bntypes.StreamSubscription{ + Method: method, + ID: id, + } + + b, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("error marshaling subscription stream request: %w", err) + } + + bnc.log.Debugf("Sending %v", method) + if err := bnc.marketStream.SendRaw(b); err != nil { + return fmt.Errorf("error sending subscription stream request: %w", err) + } + + timeout := time.After(time.Second * 5) + var subs []string + select { + case subs = <-resp: + case <-timeout: + return fmt.Errorf("market stream result id %d did not come.", id) + case <-ctx.Done(): + return nil + } + + var sub []string + unsub := make([]string, len(subs)) + for i, s := range subs { + unsub[i] = strings.ToLower(s) + } + +out: + for _, us := range streams { + for i, them := range unsub { + if us == them { + unsub[i] = unsub[len(unsub)-1] + unsub = unsub[:len(unsub)-1] + continue out + } + } + sub = append(sub, us) + } + + for _, s := range sub { + bnc.log.Warnf("Subbing to previously unsubbed stream %s", s) + if err := bnc.subUnsubDepth(true, s); err != nil { + bnc.log.Errorf("Error subscribing to %s: %v", s, err) + } + } + + for _, s := range unsub { + bnc.log.Warnf("Unsubbing to previously subbed stream %s", s) + if err := bnc.subUnsubDepth(false, s); err != nil { + bnc.log.Errorf("Error unsubscribing to %s: %v", s, err) + } + } + + return nil +} + // connectToMarketDataStream is called when the first market is subscribed to. // It creates a connection to the market data stream and starts a goroutine // to reconnect every 12 hours, as Binance will close the stream every 24 // hours. Additional markets are subscribed to by calling // subscribeToAdditionalMarketDataStream. func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quoteID uint32) error { - newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { - bnc.booksMtx.Lock() - streamNames := make([]string, 0, len(bnc.books)) - for mktID := range bnc.books { - streamNames = append(streamNames, marketDataStreamID(mktID)) - } - bnc.booksMtx.Unlock() + reconnectC := make(chan struct{}) - addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(streamNames, "/")) + newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { + addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) // Need to send key but not signature + connectEventFunc := func(cs comms.ConnectionStatus) { + if cs != comms.Disconnected && cs != comms.Connected { + return + } + // If disconnected, set all books to unsynced so bots + // will not place new orders. + connected := cs == comms.Connected + bnc.booksMtx.RLock() + defer bnc.booksMtx.RLock() + for _, b := range bnc.books { + b.connectedChan <- connected + } + } conn, err := comms.NewWsConn(&comms.WsCfg{ URL: addr, // Binance Docs: The websocket server will send a ping frame every 3 @@ -1604,8 +1777,12 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote PingWait: time.Minute * 4, ReconnectSync: func() { bnc.log.Debugf("Binance reconnected") + select { + case reconnectC <- struct{}{}: + default: + } }, - ConnectEventFunc: func(cs comms.ConnectionStatus) {}, + ConnectEventFunc: connectEventFunc, Logger: bnc.log.SubLogger("BNCBOOK"), RawHandler: bnc.handleMarketDataNote, }) @@ -1666,17 +1843,33 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote return nil } + checkSubsInterval := time.Minute + checkSubs := time.After(checkSubsInterval) reconnectTimer := time.After(time.Hour * 12) for { select { - case <-reconnectTimer: - err = reconnect() - if err != nil { + case <-reconnectC: + if err := reconnect(); err != nil { bnc.log.Errorf("Error reconnecting: %v", err) reconnectTimer = time.After(time.Second * 30) + checkSubs = make(<-chan time.Time) + continue + } + checkSubs = time.After(checkSubsInterval) + case <-reconnectTimer: + if err := reconnect(); err != nil { + bnc.log.Errorf("Error refreshing connection: %v", err) + reconnectTimer = time.After(time.Second * 30) + checkSubs = make(<-chan time.Time) continue } reconnectTimer = time.After(time.Hour * 12) + checkSubs = time.After(checkSubsInterval) + case <-checkSubs: + if err := bnc.checkSubs(ctx); err != nil { + bnc.log.Errorf("Error checking subscriptions: %v", err) + } + checkSubs = time.After(checkSubsInterval) case <-ctx.Done(): bnc.marketStreamMtx.Lock() bnc.marketStream = nil diff --git a/client/mm/libxc/bntypes/types.go b/client/mm/libxc/bntypes/types.go index 1f4515fde2..75c4150322 100644 --- a/client/mm/libxc/bntypes/types.go +++ b/client/mm/libxc/bntypes/types.go @@ -77,6 +77,8 @@ type BookUpdate struct { type BookNote struct { StreamName string `json:"stream"` Data *BookUpdate `json:"data"` + ID uint64 `json:"id"` + Result []string `json:"result"` } type WSBalance struct { @@ -86,18 +88,19 @@ type WSBalance struct { } type StreamUpdate struct { - Asset string `json:"a"` - EventType string `json:"e"` - ClientOrderID string `json:"c"` - CurrentOrderStatus string `json:"X"` - Balances []*WSBalance `json:"B"` - BalanceDelta float64 `json:"d,string"` - Filled float64 `json:"z,string"` - QuoteFilled float64 `json:"Z,string"` - OrderQty float64 `json:"q,string"` - QuoteOrderQty float64 `json:"Q,string"` - CancelledOrderID string `json:"C"` - E json.RawMessage `json:"E"` + Asset string `json:"a"` + EventType string `json:"e"` + ClientOrderID string `json:"c"` + CurrentOrderStatus string `json:"X"` + Balances []*WSBalance `json:"B"` + BalanceDelta float64 `json:"d,string"` + Filled float64 `json:"z,string"` + QuoteFilled float64 `json:"Z,string"` + OrderQty float64 `json:"q,string"` + QuoteOrderQty float64 `json:"Q,string"` + CancelledOrderID string `json:"C"` + E int64 `json:"E"` + ListenKey string `json:"listenKey"` } type RateLimit struct {