From 602eca1a78b0778672444e49f6a20fc7e46ad763 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 4 Jun 2024 12:48:52 +0200 Subject: [PATCH] fixup! feat: Rewrite order matching strategy Do not block on database interactions TODO: match market order --- coordinator/src/orderbook/mod.rs | 81 ++++++++++++++-------------- coordinator/src/orderbook/trading.rs | 33 +++--------- crates/xxi-node/src/commons/order.rs | 41 ++++++++++++++ crates/xxi-node/src/commons/trade.rs | 2 +- 4 files changed, 90 insertions(+), 67 deletions(-) diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index c73c113b8..6a3c5b9f9 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -74,12 +74,19 @@ impl OrderbookSide { entry.push(order); } - /// removes the order by the given id from the orderbook. - pub fn remove_order(&mut self, order_id: Uuid) { - for vec in self.orders.values_mut() { - vec.retain(|order| order.id != order_id); + /// removes the order by the given id from the orderbook. Returns true if the order was found, + /// returns false otherwise. + fn remove_order(&mut self, order_id: Uuid) -> bool { + for (_price, orders) in self.orders.iter_mut() { + if let Some(pos) = orders.iter().position(|order| order.id == order_id) { + orders.remove(pos); + if orders.is_empty() { + self.orders.retain(|_price, orders| !orders.is_empty()); + } + return true; + } } - self.orders.retain(|_, vec| !vec.is_empty()); + false } /// returns the a sorted vec orders of the orderbook side with the best price at first. @@ -204,24 +211,25 @@ impl Orderbook { } /// Adds a limit order to the orderbook. - async fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Result { - let order = spawn_blocking({ - let mut conn = self.pool.clone().get()?; + fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Message { + spawn_blocking({ + let pool = self.pool.clone(); move || { + let mut conn = pool.clone().get()?; let order = orders::insert_limit_order(&mut conn, new_order, OrderReason::Manual) .map_err(|e| anyhow!(e)) .context("Failed to insert new order into DB")?; anyhow::Ok(order) } - }) - .await??; + }); + let order: Order = new_order.into(); match order.direction { commons::Direction::Short => self.shorts.add_order(order), commons::Direction::Long => self.longs.add_order(order), } - Ok(Message::NewOrder(order)) + Message::NewOrder(order) } /// Matches a market order against the orderbook. Will fail if the market order can't be fully @@ -235,7 +243,7 @@ impl Orderbook { // TODO(holzeis): We might want to delay that action in a regular task allowing us to // process orderbook updates much faster in memory and postpone the expensive database // transactions. - let order = spawn_blocking({ + spawn_blocking({ let mut conn = self.pool.clone().get()?; move || { let order = orders::insert_market_order(&mut conn, new_order, order_reason) @@ -244,8 +252,9 @@ impl Orderbook { anyhow::Ok(order) } - }) - .await??; + }); + + let order: Order = new_order.into(); // find a match for the market order. let matched_orders = match order.direction.opposite() { @@ -274,7 +283,7 @@ impl Orderbook { "Couldn't match order due to insufficient liquidity in the orderbook." ); - spawn_blocking(fail_order).await??; + spawn_blocking(fail_order); bail!("No match found."); } @@ -359,8 +368,8 @@ impl Orderbook { /// Updates a limit order in the orderbook. If the order id couldn't be found the update will be /// ignored. - async fn update_limit_order(&mut self, order: Order) -> Result { - self.remove_order(order.id).await?; + fn update_limit_order(&mut self, order: Order) -> Message { + self.remove_order(order.id); self.add_limit_order(NewLimitOrder { id: order.id, contract_symbol: order.contract_symbol, @@ -371,10 +380,9 @@ impl Orderbook { leverage: Decimal::try_from(order.leverage).expect("to fit"), expiry: order.expiry, stable: false, - }) - .await?; + }); - Ok(Message::Update(order)) + Message::Update(order) } /// Sets the order and the corresponding match to failed by the given order id @@ -424,13 +432,11 @@ impl Orderbook { /// Removes a limit order from the orderbook. Ignores removing market orders, since they aren't /// stored into the orderbook. - async fn remove_order(&mut self, order_id: Uuid) -> Result> { - // TODO(holzeis): We might want to delay that action in a regular task allowing us to - // process orderbook updates much faster in memory and postpone the expensive database - // transactions. - let order = spawn_blocking({ - let mut conn = self.pool.clone().get()?; + fn remove_order(&mut self, order_id: Uuid) -> Option { + spawn_blocking({ + let pool = self.pool.clone(); move || { + let mut conn = pool.clone().get()?; let matches = matches::get_matches_by_order_id(&mut conn, order_id)?; let order = if !matches.is_empty() { @@ -444,22 +450,17 @@ impl Orderbook { anyhow::Ok(order) } - }) - .await??; + }); - let message = match (order.order_type, order.direction) { - (OrderType::Limit, commons::Direction::Short) => { - self.shorts.remove_order(order_id); - Some(DeleteOrder(order_id)) - } - (OrderType::Limit, commons::Direction::Long) => { - self.longs.remove_order(order_id); - Some(DeleteOrder(order_id)) - } - (OrderType::Market, _) => None, - }; + if self.shorts.remove_order(order_id) { + return Some(DeleteOrder(order_id)); + } + + if self.longs.remove_order(order_id) { + return Some(DeleteOrder(order_id)); + } - Ok(message) + None } } diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 2dfbdc385..ece330bfb 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -118,38 +118,19 @@ pub fn spawn_orderbook( new_order: NewOrder::Limit(new_order), .. } => { - let trader_pubkey = new_order.trader_id; - let order_id = new_order.id; - match orderbook.add_limit_order(new_order).await { - Ok(message) => Some(message), - Err(e) => { - tracing::error!(%trader_pubkey, %order_id, "Failed to process new limit order. Error: {e:#}"); - continue; - } - } - } - OrderbookMessage::DeleteOrder(order_id) => { - match orderbook.remove_order(order_id).await { - Ok(message) => message, - Err(e) => { - tracing::error!(%order_id, "Failed to process remove order. Error: {e:#}"); - continue; - } - } + let message = orderbook.add_limit_order(new_order); + Some(message) } + OrderbookMessage::DeleteOrder(order_id) => orderbook.remove_order(order_id), OrderbookMessage::Update( order @ Order { order_type: commons::OrderType::Limit, .. }, - ) => match orderbook.update_limit_order(order).await { - Ok(message) => Some(message), - Err(e) => { - tracing::error!(trader_pubkey=%order.trader_id, order_id=%order.id, - "Failed to process update order. Error: {e:#}"); - continue; - } - }, + ) => { + let message = orderbook.update_limit_order(order); + Some(message) + } OrderbookMessage::Update(Order { order_type: commons::OrderType::Market, .. diff --git a/crates/xxi-node/src/commons/order.rs b/crates/xxi-node/src/commons/order.rs index 849ffec73..5f3a19654 100644 --- a/crates/xxi-node/src/commons/order.rs +++ b/crates/xxi-node/src/commons/order.rs @@ -4,6 +4,7 @@ use anyhow::Result; use bitcoin::hashes::sha256; use bitcoin::secp256k1::PublicKey; use bitcoin::Amount; +use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use secp256k1::ecdsa::Signature; use secp256k1::Message; @@ -114,6 +115,26 @@ pub struct NewLimitOrder { pub stable: bool, } +impl From for Order { + fn from(value: NewLimitOrder) -> Self { + Self { + id: value.id, + price: value.price, + leverage: value.leverage.to_f32().expect("to fit"), + contract_symbol: value.contract_symbol, + trader_id: value.trader_id, + direction: value.direction, + quantity: value.quantity, + order_type: OrderType::Limit, + timestamp: OffsetDateTime::now_utc(), + expiry: value.expiry, + order_state: OrderState::Open, + order_reason: OrderReason::Manual, + stable: false, + } + } +} + impl NewLimitOrder { pub fn message(&self) -> Message { let mut vec: Vec = vec![]; @@ -144,6 +165,26 @@ impl NewLimitOrder { } } +impl From for Order { + fn from(value: NewMarketOrder) -> Self { + Self { + id: value.id, + price: Decimal::ZERO, // market orders don't have a price. + leverage: value.leverage.to_f32().expect("to fit"), + contract_symbol: value.contract_symbol, + trader_id: value.trader_id, + direction: value.direction, + quantity: value.quantity, + order_type: OrderType::Market, + timestamp: OffsetDateTime::now_utc(), + expiry: value.expiry, + order_state: OrderState::Open, + order_reason: OrderReason::Manual, + stable: false, + } + } +} + impl NewMarketOrder { pub fn message(&self) -> Message { let mut vec: Vec = vec![]; diff --git a/crates/xxi-node/src/commons/trade.rs b/crates/xxi-node/src/commons/trade.rs index 8da735f99..6fa6aa15d 100644 --- a/crates/xxi-node/src/commons/trade.rs +++ b/crates/xxi-node/src/commons/trade.rs @@ -43,7 +43,7 @@ pub struct TradeParams { /// The filling information from the orderbook /// /// This is used by the coordinator to be able to make sure both trading parties are acting. - /// The `quantity` has to match the cummed up quantities of the matches in `filled_with`. + /// The `quantity` has to match the summed up quantities of the matches in `filled_with`. pub filled_with: FilledWith, }