Skip to content

Commit

Permalink
depth: fix early snapshot id checking
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Nov 16, 2024
1 parent 40d0b59 commit b2b363b
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 25 deletions.
20 changes: 7 additions & 13 deletions pkg/depth/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package depth
import (
"fmt"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -41,24 +40,21 @@ type Buffer struct {
updateTimeout time.Duration

// bufferingPeriod is used to buffer the update message before we get the full depth
bufferingPeriod atomic.Value
bufferingPeriod time.Duration
}

func NewBuffer(fetcher SnapshotFetcher) *Buffer {
func NewBuffer(fetcher SnapshotFetcher, bufferingPeriod time.Duration) *Buffer {
return &Buffer{
fetcher: fetcher,
resetC: make(chan struct{}, 1),
fetcher: fetcher,
resetC: make(chan struct{}, 1),
bufferingPeriod: bufferingPeriod,
}
}

func (b *Buffer) SetUpdateTimeout(d time.Duration) {
b.updateTimeout = d
}

func (b *Buffer) SetBufferingPeriod(d time.Duration) {
b.bufferingPeriod.Store(d)
}

func (b *Buffer) resetSnapshot() {
b.snapshot = nil
b.finalUpdateID = 0
Expand Down Expand Up @@ -151,7 +147,7 @@ func (b *Buffer) fetchAndPush() error {

if len(b.buffer) > 0 {
// the snapshot is too early
if finalUpdateID < b.buffer[0].FirstUpdateID {
if finalUpdateID < b.buffer[0].FirstUpdateID-1 {
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
Expand Down Expand Up @@ -197,9 +193,7 @@ func (b *Buffer) fetchAndPush() error {

func (b *Buffer) tryFetch() {
for {
if period := b.bufferingPeriod.Load(); period != nil {
<-time.After(period.(time.Duration))
}
<-time.After(b.bufferingPeriod)

err := b.fetchAndPush()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/depth/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func TestDepthBuffer_ReadyState(t *testing.T) {
{Price: itov(99), Volume: itov(1)},
},
}, 33, nil
})
buf.SetBufferingPeriod(time.Millisecond * 5)
}, time.Millisecond*5)

readyC := make(chan struct{})
buf.OnReady(func(snapshot types.SliceOrderBook, updates []Update) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
log.Infof("fetching %s depth...", e.Symbol)
return ex.QueryDepth(context.Background(), e.Symbol)
})
f.SetBufferingPeriod(time.Second)
}, 3*time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
stream.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/kucoin/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {
} else {
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return s.exchange.QueryDepth(context.Background(), e.Symbol)
})
}, 3*time.Second)
s.depthBuffers[e.Symbol] = f
f.SetBufferingPeriod(time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, error: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/exchange/max/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
log.Infof("fetching %s depth with depth = %d...", e.Market, bookDepth)
// the depth of websocket orderbook event is 50 by default, so we use 50 as limit here
return ex.QueryDepth(context.Background(), e.Market, bookDepth)
})
f.SetBufferingPeriod(3 * time.Second)
}, 3*time.Second)
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
s.EmitBookSnapshot(snapshot)
for _, u := range updates {
Expand Down
16 changes: 12 additions & 4 deletions pkg/strategy/xdepthmaker/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ type Strategy struct {
StopHedgeQuoteBalance fixedpoint.Value `json:"stopHedgeQuoteBalance"`
StopHedgeBaseBalance fixedpoint.Value `json:"stopHedgeBaseBalance"`

SkipCleanUpOpenOrders bool `json:"skipCleanUpOpenOrders"`

// Quantity is used for fixed quantity of the first layer
Quantity fixedpoint.Value `json:"quantity"`

Expand Down Expand Up @@ -410,9 +412,11 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
fullReplenishTicker := time.NewTicker(timejitter.Milliseconds(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()

// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Warnf("error cleaning up open orders")
// clean up the previous open orders before starting the quote worker
if !s.SkipCleanUpOpenOrders {
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Warnf("error cleaning up open orders")
}
}

s.updateQuote(ctx, 0)
Expand Down Expand Up @@ -966,10 +970,14 @@ func (s *Strategy) generateMakerOrders(
for _, side := range []types.SideType{types.SideTypeBuy, types.SideTypeSell} {
sideBook := dupPricingBook.SideBook(side)
if sideBook.Len() == 0 {
log.Warnf("orderbook %s side is empty", side)
s.logger.Warnf("orderbook %s side is empty", side)
continue
}

if sideBook.Len() < 5 {
s.logger.Warnf("order book %s side is too thin, size: %d, levels: %+v", side, sideBook.Len(), sideBook)
}

availableSideBalance, ok := availableBalances[side]
if !ok {
log.Warnf("no available balance for side %s side", side)
Expand Down
1 change: 1 addition & 0 deletions pkg/types/price_volume_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (slice PriceVolumeSlice) ElemOrLast(i int) (PriceVolume, bool) {
return slice[i], true
}

// IndexByQuoteVolumeDepth returns the index of the price volume slice by the required quote volume depth
func (slice PriceVolumeSlice) IndexByQuoteVolumeDepth(requiredQuoteVolume fixedpoint.Value) int {
var totalQuoteVolume = fixedpoint.Zero
for x, pv := range slice {
Expand Down

0 comments on commit b2b363b

Please sign in to comment.