Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitfinex: Fix WS trade processing #1754

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ issues:
- text: "Expect WriteFile permissions to be 0600 or less"
linters:
- gosec
- text: 'shadow: declaration of "(err|ctx)" shadows declaration at'
linters: [ govet ]


exclude-dirs:
- vendor
Expand Down
135 changes: 76 additions & 59 deletions exchanges/bitfinex/bitfinex_test.go

Large diffs are not rendered by default.

25 changes: 14 additions & 11 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package bitfinex

import (
"encoding/json"
"errors"
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/types"
)

var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
Expand Down Expand Up @@ -488,16 +489,18 @@ type WebsocketBook struct {
Period int64
}

// WebsocketTrade holds trade information
type WebsocketTrade struct {
// wsTrade holds trade information
type wsTrade struct {
ID int64
Timestamp int64
Price float64
Timestamp types.Time
Amount float64
// Funding rate of the trade
Rate float64
// Funding offer period in days
Period int64
Price float64
Period int64 // Funding offer period in days
}

// UnmarshalJSON unmarshals json bytes into a wsTrade
func (t *wsTrade) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &[]any{&t.ID, &t.Timestamp, &t.Amount, &t.Price, &t.Period})
}

// Candle holds OHLC data
Expand Down Expand Up @@ -625,7 +628,7 @@ const (
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeExecutionUpdate = "tu"
wsTradeUpdated = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
Expand All @@ -636,7 +639,7 @@ const (
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdate = "ftu"
wsFundingTradeUpdated = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
Expand Down
222 changes: 93 additions & 129 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"math"
"net/http"
"sort"
"strconv"
Expand Down Expand Up @@ -33,6 +34,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var (
errParsingWSField = errors.New("error parsing WS field")
)
Comment on lines +37 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var (
errParsingWSField = errors.New("error parsing WS field")
)
var errParsingWSField = errors.New("error parsing WS field")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so convinced, oddly, since it feels like this is a block of error vars, that happens to only contain one.
I want devs to add to it, essentially.

Did linter or editor highlight this?


var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
Expand Down Expand Up @@ -162,8 +167,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
eventType, hasEventType := d[1].(string)

if chanID != 0 {
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
if s := b.Websocket.GetSubscription(chanID); s != nil {
return b.handleWSChannelUpdate(s, respRaw, eventType, d)
}
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw)
Expand Down Expand Up @@ -201,8 +206,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return b.handleWSPositionSnapshot(d)
case wsPositionNew, wsPositionUpdate, wsPositionClose:
return b.handleWSPositionUpdate(d)
case wsTradeExecuted, wsTradeExecutionUpdate:
return b.handleWSTradeUpdate(d, eventType)
case wsTradeExecuted, wsTradeUpdated:
return b.handleWSMyTradeUpdate(d, eventType)
case wsFundingOfferSnapshot:
if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 {
if _, ok := snapBundle[0].([]interface{}); ok {
Expand Down Expand Up @@ -398,7 +403,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
b.Websocket.DataHandler <- fundingInfo
}
}
case wsFundingTradeExecuted, wsFundingTradeUpdate:
case wsFundingTradeExecuted, wsFundingTradeUpdated:
if data, ok := d[2].([]interface{}); ok && len(data) > 0 {
var wsFundingTrade WsFundingTrade
tradeID, ok := data[0].(float64)
Expand Down Expand Up @@ -544,16 +549,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}

if eventType == wsChecksum {
switch eventType {
case wsChecksum:
return b.handleWSChecksum(s, d)
}

if eventType == wsHeartbeat {
case wsHeartbeat:
return nil
}

Expand All @@ -569,7 +573,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
return b.handleWSAllTrades(s, respRaw)
}

return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
Expand Down Expand Up @@ -869,139 +873,99 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf
return nil
}

func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSAllTrades(s *subscription.Subscription, respRaw []byte) error {
feedEnabled := b.IsTradeFeedEnabled()
if !feedEnabled && !b.IsSaveTradeDataEnabled() {
return nil
}
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
if !b.IsSaveTradeDataEnabled() {
return nil
}
if c.Asset == asset.MarginFunding {
return nil
_, valueType, _, err := jsonparser.Get(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err)
}
var tradeHolder []WebsocketTrade
switch len(d) {
case 2:
snapshot, ok := d[1].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot data")
}
for i := range snapshot {
elem, ok := snapshot[i].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot element data")
}
tradeID, ok := elem[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
}
timestamp, ok := elem[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
}
amount, ok := elem[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
}
if len(elem) == 5 {
rate, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
wsTrade.Rate = rate
period, ok := elem[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Period = int64(period)
} else {
price, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
}
wsTrade.Rate = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
case 3:
if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate {
return fmt.Errorf("unhandled WS trade update event: %s", eventType)
var wsTrades []*wsTrade
switch valueType {
case jsonparser.String:
if t, err := b.handleWSPublicTradeUpdate(respRaw); err != nil {
return err
} else {
wsTrades = []*wsTrade{t}
}
data, ok := d[2].([]interface{})
if !ok {
return errors.New("trade data type assertion error")
case jsonparser.Array:
if wsTrades, err = b.handleWSPublicTradesSnapshot(respRaw); err != nil {
return err
}

tradeID, ok := data[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
default:
return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
trades := make([]trade.Data, len(wsTrades))
for i, w := range wsTrades {
t := trade.Data{
Exchange: b.Name,
AssetType: s.Asset,
CurrencyPair: s.Pairs[0],
TID: strconv.FormatInt(w.ID, 10),
Timestamp: w.Timestamp.Time().UTC(),
Side: order.Buy,
Amount: w.Amount,
Price: w.Price,
}
timestamp, ok := data[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
if w.Period != 0 {
t.AssetType = asset.MarginFunding
}
amount, ok := data[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
if t.Amount < 0 {
t.Side = order.Sell
t.Amount = math.Abs(t.Amount)
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
if feedEnabled {
b.Websocket.DataHandler <- &t
}
if len(data) == 5 {
rate, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
period, ok := data[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Rate = rate
wsTrade.Period = int64(period)
trades[i] = t
}
if b.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(b.GetName(), trades...)
}
return err
}

func (b *Bitfinex) handleWSPublicTradesSnapshot(respRaw []byte) (trades []*wsTrade, errs error) {
handleTrade := func(v []byte, valueType jsonparser.ValueType, _ int, _ error) {
if valueType != jsonparser.Array {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesSnapshot[1][*]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType))
} else {
price, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
t := &wsTrade{}
if err := json.Unmarshal(v, t); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesSnapshot[1][*]`: %w", errParsingWSField, err))
} else {
trades = append(trades, t)
}
wsTrade.Price = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
}
}

return b.AddTradesToBuffer(trades...)
if _, err := jsonparser.ArrayEach(respRaw, handleTrade, "[1]"); err != nil {
errs = common.AppendError(errs, err)
}
return
}

func (b *Bitfinex) handleWSPublicTradeUpdate(respRaw []byte) (*wsTrade, error) {
v, valueType, _, err := jsonparser.Get(respRaw, "[2]")
if err != nil {
return nil, fmt.Errorf("%w `tradesUpdate[2]`: %w", errParsingWSField, err)
}
if valueType != jsonparser.Array {
return nil, fmt.Errorf("%w `tradesUpdate[2]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
t := &wsTrade{}
if err := json.Unmarshal(v, t); err != nil {
return nil, fmt.Errorf("%w `tradeUpdate[2]`: %w", errParsingWSField, err)
}
return t, nil
}

func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error {
Expand Down Expand Up @@ -1173,7 +1137,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error {
return nil
}

func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error {
func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error {
tradeData, ok := d[2].([]interface{})
if !ok {
return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate")
Expand Down
5 changes: 5 additions & 0 deletions exchanges/bitfinex/testdata/wsAllTrades.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[18788,[[412685577,1580268444802,11.1998,176.3],[412685578,1580268484802,-5,176.29952759],[412685579,1580269005757,4.2,0.1244,12]],1]
[18788,"te",[5690221201,1734237017719,0.00991467,102570],2]
[18788,"tu",[5690221202,1734237017704,-0.01925285,102560],3]
[18788,"fte",[5690221203,1734237018019,0.00991467,102550,30],4]
[18788,"ftu",[5690221204,1734237018094,-0.01925285,102540,30],5]
2 changes: 1 addition & 1 deletion exchanges/btse/btse_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ func (b *BTSE) UpdateOrderExecutionLimits(ctx context.Context, a asset.Item) err
var errs error
limits := make([]order.MinMaxLevel, 0, len(summary))
for _, marketInfo := range summary {
p, err := marketInfo.Pair() //nolint:govet // Deliberately shadow err
p, err := marketInfo.Pair()
if err != nil {
errs = common.AppendError(err, fmt.Errorf("%s: %w", p, err))
continue
Expand Down
Loading
Loading