Skip to content

Commit

Permalink
add concurrency to the order book processing in active orders (#470) (#…
Browse files Browse the repository at this point in the history
…476)

* fix: active orders overflow bug

* chore: active orders observability TODOs

* updates

* lint

* add concurrency to the order book processing in active orders

* chore: active orders observability TODOs

Introduce Prometheus error counters

---------

Co-authored-by: Deividas Petraitis <[email protected]>
(cherry picked from commit 37f4424)

# Conflicts:
#	orderbook/telemetry/telemetry.go
#	orderbook/usecase/orderbook_usecase.go

Co-authored-by: Roman <[email protected]>
Co-authored-by: Deividas Petraitis <[email protected]>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent a51e6dd commit babc32e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 54 deletions.
14 changes: 7 additions & 7 deletions orderbook/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package telemetry
import "github.com/prometheus/client_golang/prometheus"

var (
// sqs_orderbook_usecase_get_active_orders_error_total
// sqs_orderbook_usecase_processing_orderbook_active_orders_error_total
//
// counter that measures the number of errors that occur during getting active orders in orderbook usecase
// counter that measures the number of errors that occur during processing active orders in orderbook usecase
//
// Has the following labels:
// * contract - the address of the orderbook contract
// * address - address of the user wallet
// * err - the error message occurred
GetActiveOrdersErrorMetricName = "sqs_orderbook_usecase_get_active_orders_error_total"
ProcessingOrderbookActiveOrdersErrorMetricName = "sqs_orderbook_usecase_processing_orderbook_active_orders_error_total"

// sqs_orderbook_usecase_get_tick_by_id_not_found_total
//
Expand All @@ -27,10 +27,10 @@ var (
// * err - the error message occurred
CreateLimitOrderErrorMetricName = "sqs_orderbook_usecase_create_limit_order_error_total"

GetActiveOrdersErrorCounter = prometheus.NewCounter(
ProcessingOrderbookActiveOrdersErrorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: GetActiveOrdersErrorMetricName,
Help: "counter that measures the number of errors that occur during retrieving active orders from orderbook contract",
Name: ProcessingOrderbookActiveOrdersErrorMetricName,
Help: "counter that measures the number of errors that occur during processing active orders of from orderbook contract",
},
)

Expand All @@ -50,7 +50,7 @@ var (
)

func init() {
prometheus.MustRegister(GetActiveOrdersErrorCounter)
prometheus.MustRegister(ProcessingOrderbookActiveOrdersErrorCounter)
prometheus.MustRegister(GetTickByIDNotFoundCounter)
prometheus.MustRegister(CreateLimitOrderErrorCounter)
}
146 changes: 99 additions & 47 deletions orderbook/usecase/orderbook_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/osmosis-labs/osmosis/osmomath"
cwpoolmodel "github.com/osmosis-labs/osmosis/v25/x/cosmwasmpool/model"
"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/domain/mvc"
orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook"
orderbookgrpcclientdomain "github.com/osmosis-labs/sqs/domain/orderbook/grpcclient"
Expand Down Expand Up @@ -128,69 +129,120 @@ func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
return nil, fmt.Errorf("failed to get all canonical orderbook pool IDs: %w", err)
}

var results []orderbookdomain.LimitOrder
type orderbookResult struct {
orderbookID uint64
limitOrders []orderbookdomain.LimitOrder
err error
}

results := make(chan orderbookResult, len(orderbooks))
defer close(results)

// Process orderbooks concurrently
for _, orderbook := range orderbooks {
orders, count, err := o.orderBookClient.GetActiveOrders(context.TODO(), orderbook.ContractAddress, address)
if err != nil {
telemetry.GetActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.GetActiveOrdersErrorMetricName, zap.Any("contract", orderbook.ContractAddress), zap.Any("contract", address), zap.Any("err", err))
continue
}
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, err := o.processOrderBookActiveOrders(ctx, orderbook, address)

// There are orders to process for given orderbook
if count == 0 {
continue
results <- orderbookResult{
orderbookID: orderbook.PoolID,
limitOrders: limitOrders,
err: err,
}
}(orderbook)
}

// Collect results
finalResults := []orderbookdomain.LimitOrder{}
for i := 0; i < len(orderbooks); i++ {
select {
case result := <-results:
if result.err != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("orderbook_id", result.orderbookID), zap.Any("err", result.err))
return nil, result.err
}
finalResults = append(finalResults, result.limitOrders...)
case <-ctx.Done():
return nil, ctx.Err()
}
}

o.logger.Info("Active orders", zap.Any("orders", orders), zap.Any("count", count), zap.Any("err", err))
return finalResults, nil
}

quoteToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderbook.Quote)
if err != nil {
o.logger.Error("failed to get token metadata for quote", zap.Any("quote", orderbook.Quote), zap.Error(err))
continue
// processOrderBookActiveOrders fetches and processes the active orders for a given orderbook.
// It returns the active formatted limit orders and an error if any.
// Errors if:
// - failed to fetch active orders
// - failed to fetch metadata by chain denom
// - failed to create limit order
//
// For every order, if an error occurs processing the order, it is skipped rather than failing the entire process.
// This is a best-effort process.
func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, error) {
orders, count, err := o.orderBookClient.GetActiveOrders(ctx, orderBook.ContractAddress, ownerAddress)
if err != nil {
return nil, err
}

// There are orders to process for given orderbook
if count == 0 {
return nil, nil
}

quoteToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Quote)
if err != nil {
return nil, err
}

baseToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Base)
if err != nil {
return nil, err
}

// Create a slice to store the results
results := make([]orderbookdomain.LimitOrder, 0, len(orders))

for _, order := range orders {
tickForOrder, ok := o.orderbookRepository.GetTickByID(orderBook.PoolID, order.TickId)
if !ok {
// Do not return error, just log and continue for fault tolerance
telemetry.GetTickByIDNotFoundCounter.Inc()
o.logger.Info(telemetry.GetTickByIDNotFoundMetricName, zap.Any("contract", orderBook.ContractAddress), zap.Any("ticks", order.TickId), zap.Any("ok", ok))

// Note: initialize empty tick for fault-
tickForOrder = orderbookdomain.OrderbookTick{}
}

baseToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderbook.Base)
// create limit order
result, err := o.createLimitOrder(
order,
tickForOrder.TickState,
tickForOrder.UnrealizedCancels,
orderbookdomain.Asset{
Symbol: quoteToken.CoinMinimalDenom,
Decimals: quoteToken.Precision,
},
orderbookdomain.Asset{
Symbol: baseToken.CoinMinimalDenom,
Decimals: baseToken.Precision,
},
orderBook.ContractAddress,
)
if err != nil {
o.logger.Error("failed to get token metadata for base", zap.Any("base", orderbook.Base), zap.Error(err))
o.logger.Error("failed to create limit order", zap.Any("order", order), zap.Any("err", err))
telemetry.CreateLimitOrderErrorCounter.Inc()
o.logger.Error(telemetry.CreateLimitOrderErrorMetricName, zap.Any("order", order), zap.Any("err", err))
continue
}

for _, order := range orders {
repositoryTick, ok := o.orderbookRepository.GetTickByID(orderbook.PoolID, order.TickId)
if !ok {
telemetry.GetTickByIDNotFoundCounter.Inc()
o.logger.Info(telemetry.GetTickByIDNotFoundMetricName, zap.Any("contract", orderbook.ContractAddress), zap.Any("ticks", order.TickId), zap.Any("ok", ok))
}

result, err := o.createLimitOrder(
order,
repositoryTick.TickState,
repositoryTick.UnrealizedCancels,
orderbookdomain.Asset{
Symbol: quoteToken.CoinMinimalDenom,
Decimals: quoteToken.Precision,
},
orderbookdomain.Asset{
Symbol: baseToken.CoinMinimalDenom,
Decimals: baseToken.Precision,
},
orderbook.ContractAddress,
)
if err != nil {
telemetry.CreateLimitOrderErrorCounter.Inc()
o.logger.Error(telemetry.CreateLimitOrderErrorMetricName, zap.Any("order", order), zap.Any("err", err))
continue
}

results = append(results, result)
}
results = append(results, result)
}

return results, nil
}

// TransformOrder transforms an order into a mapped limit order.
// createLimitOrder creates a limit order from the orderbook order.
func (o *orderbookUseCaseImpl) createLimitOrder(
order orderbookdomain.Order,
tickState orderbookdomain.TickState,
Expand Down

0 comments on commit babc32e

Please sign in to comment.