Skip to content

Commit

Permalink
active order fault tolerance via "best-effort" boolean return (#471) (#…
Browse files Browse the repository at this point in the history
…478)

* fix: active orders overflow bug

* chore: active orders observability TODOs

* updates

* lint

* add concurrency to the order book processing in active orders

* active order fault tolerance

---------

Co-authored-by: Deividas Petraitis <[email protected]>
(cherry picked from commit bc570d6)

Co-authored-by: Roman <[email protected]>
  • Loading branch information
mergify[bot] and p0mvn authored Aug 27, 2024
1 parent babc32e commit 6f32d9f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 42 deletions.
2 changes: 1 addition & 1 deletion domain/mvc/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ type OrderBookUsecase interface {
GetAllTicks(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool)

// GetOrder returns all active orderbook orders for a given address.
GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, error)
GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)
}
8 changes: 5 additions & 3 deletions orderbook/types/get_orders_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func defaultSortOrder(orderA, orderB orderbookdomain.LimitOrder) int {

// GetActiveOrdersResponse represents the response for the /pools/all-orders endpoint.
type GetActiveOrdersResponse struct {
Orders []orderbookdomain.LimitOrder `json:"orders"`
Orders []orderbookdomain.LimitOrder `json:"orders"`
IsBestEffort bool `json:"is_best_effort"`
}

// NewGetAllOrderResponse creates a new GetActiveOrdersResponse.
func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder) *GetActiveOrdersResponse {
func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder, isBestEffort bool) *GetActiveOrdersResponse {
sort.Slice(orders, func(i, j int) bool {
return defaultSortOrder(orders[i], orders[j]) < 0
})
Expand All @@ -100,6 +101,7 @@ func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder) *GetActiveOrder
}

return &GetActiveOrdersResponse{
Orders: orders,
Orders: orders,
IsBestEffort: isBestEffort,
}
}
82 changes: 46 additions & 36 deletions orderbook/usecase/orderbook_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,17 @@ func (o *orderbookUseCaseImpl) ProcessPool(ctx context.Context, pool sqsdomain.P
}

// GetActiveOrders implements mvc.OrderBookUsecase.
func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, error) {
func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) {
orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs()
if err != nil {
return nil, fmt.Errorf("failed to get all canonical orderbook pool IDs: %w", err)
return nil, false, fmt.Errorf("failed to get all canonical orderbook pool IDs: %w", err)
}

type orderbookResult struct {
orderbookID uint64
limitOrders []orderbookdomain.LimitOrder
err error
isBestEffort bool
orderbookID uint64
limitOrders []orderbookdomain.LimitOrder
err error
}

results := make(chan orderbookResult, len(orderbooks))
Expand All @@ -141,33 +142,39 @@ func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
// Process orderbooks concurrently
for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, err := o.processOrderBookActiveOrders(ctx, orderbook, address)
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)

results <- orderbookResult{
orderbookID: orderbook.PoolID,
limitOrders: limitOrders,
err: err,
isBestEffort: isBestEffort,
orderbookID: orderbook.PoolID,
limitOrders: limitOrders,
err: err,
}
}(orderbook)
}

// Collect results
finalResults := []orderbookdomain.LimitOrder{}
isBestEffort := false

for i := 0; i < len(orderbooks); i++ {
select {
case result := <-results:
if result.err != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("orderbook_id", result.orderbookID), zap.Any("err", result.err))
return nil, result.err
return nil, false, result.err
}

isBestEffort = isBestEffort || result.isBestEffort

finalResults = append(finalResults, result.limitOrders...)
case <-ctx.Done():
return nil, ctx.Err()
return nil, false, ctx.Err()
}
}

return finalResults, nil
return finalResults, isBestEffort, nil
}

// processOrderBookActiveOrders fetches and processes the active orders for a given orderbook.
Expand All @@ -179,46 +186,39 @@ func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
//
// For every order, if an error occurs processing the order, it is skipped rather than failing the entire process.
// This is a best-effort process.
func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, error) {
func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) {
orders, count, err := o.orderBookClient.GetActiveOrders(ctx, orderBook.ContractAddress, ownerAddress)
if err != nil {
return nil, err
return nil, false, err
}

// There are orders to process for given orderbook
if count == 0 {
return nil, nil
return nil, false, nil
}

quoteToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Quote)
if err != nil {
return nil, err
return nil, false, err
}

baseToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Base)
if err != nil {
return nil, err
return nil, false, err
}

// Create a slice to store the results
results := make([]orderbookdomain.LimitOrder, 0, len(orders))

for _, order := range orders {
tickForOrder, ok := o.orderbookRepository.GetTickByID(orderBook.PoolID, order.TickId)
if !ok {
// Do not return error, just log and continue for fault tolerance
telemetry.GetTickByIDNotFoundCounter.Inc()
o.logger.Info(telemetry.GetTickByIDNotFoundMetricName, zap.Any("contract", orderBook.ContractAddress), zap.Any("ticks", order.TickId), zap.Any("ok", ok))

// Note: initialize empty tick for fault-
tickForOrder = orderbookdomain.OrderbookTick{}
}
// If we encounter
isBestEffort := false

// For each order, create a formatted limit order
for _, order := range orders {
// create limit order
result, err := o.createLimitOrder(
result, err := o.createFormattedLimitOrder(
orderBook.PoolID,
order,
tickForOrder.TickState,
tickForOrder.UnrealizedCancels,
orderbookdomain.Asset{
Symbol: quoteToken.CoinMinimalDenom,
Decimals: quoteToken.Precision,
Expand All @@ -230,27 +230,37 @@ func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context,
orderBook.ContractAddress,
)
if err != nil {
o.logger.Error("failed to create limit order", zap.Any("order", order), zap.Any("err", err))
telemetry.CreateLimitOrderErrorCounter.Inc()
o.logger.Error(telemetry.CreateLimitOrderErrorMetricName, zap.Any("order", order), zap.Any("err", err))

isBestEffort = true

continue
}

results = append(results, result)
}

return results, nil
return results, isBestEffort, nil
}

// createLimitOrder creates a limit order from the orderbook order.
func (o *orderbookUseCaseImpl) createLimitOrder(
// createFormattedLimitOrder creates a limit order from the orderbook order.
func (o *orderbookUseCaseImpl) createFormattedLimitOrder(
poolID uint64,
order orderbookdomain.Order,
tickState orderbookdomain.TickState,
unrealizedCancels orderbookdomain.UnrealizedCancels,
quoteAsset orderbookdomain.Asset,
baseAsset orderbookdomain.Asset,
orderbookAddress string,
) (orderbookdomain.LimitOrder, error) {
tickForOrder, ok := o.orderbookRepository.GetTickByID(poolID, order.TickId)
if !ok {
telemetry.GetTickByIDNotFoundCounter.Inc()
return orderbookdomain.LimitOrder{}, fmt.Errorf("tick not found %s, %d", orderbookAddress, order.TickId)
}

tickState := tickForOrder.TickState
unrealizedCancels := tickForOrder.UnrealizedCancels

// Parse quantity as int64
quantity, err := strconv.ParseInt(order.Quantity, 10, 64)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions passthrough/delivery/http/passthrough_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ func (a *PassthroughHandler) GetActiveOrders(c echo.Context) (err error) {
return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()})
}

orders, err := a.OUsecase.GetActiveOrders(ctx, req.UserOsmoAddress)
orders, isBestEffort, err := a.OUsecase.GetActiveOrders(ctx, req.UserOsmoAddress)
if err != nil {
return c.JSON(http.StatusInternalServerError, domain.ResponseError{Message: err.Error()})
}

resp := types.NewGetAllOrderResponse(orders)
resp := types.NewGetAllOrderResponse(orders, isBestEffort)

return c.JSON(http.StatusOK, resp)
}

0 comments on commit 6f32d9f

Please sign in to comment.