From f0517f657e3423395de0c47ec3f4ff1eef9ab799 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Wed, 14 Feb 2024 13:49:17 +0100 Subject: [PATCH] refactor(trade_executor): Add error handling to trade executor impl --- coordinator/src/node.rs | 63 ++---------------------------------- coordinator/src/routes.rs | 2 +- coordinator/src/trade/mod.rs | 57 ++++++++++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 63 deletions(-) diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 33ab858c6..1cea3aa07 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -1,23 +1,17 @@ use crate::db; use crate::node::storage::NodeStorage; -use crate::orderbook::db::matches; -use crate::orderbook::db::orders; use crate::position::models::Position; use crate::position::models::PositionState; use crate::storage::CoordinatorTenTenOneStorage; use crate::trade::TradeExecutor; -use anyhow::anyhow; use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; -use commons::MatchState; -use commons::OrderState; use commons::TradeAndChannelParams; use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; -use diesel::Connection; use diesel::PgConnection; use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; @@ -39,7 +33,6 @@ use tokio::sync::RwLock; use tracing::instrument; use trade::cfd::calculate_pnl; use trade::Direction; -use uuid::Uuid; pub mod connection; pub mod expired_positions; @@ -126,45 +119,12 @@ impl Node { !usable_channels.is_empty() } - pub async fn trade(&self, params: &TradeAndChannelParams) -> Result<()> { - let mut connection = self.pool.get()?; - - let order_id = params.trade_params.filled_with.order_id; - let trader_id = params.trade_params.pubkey; - + pub async fn trade(&self, params: TradeAndChannelParams) -> Result<()> { let trade_executor = TradeExecutor::new(self.inner.clone(), self.pool.clone(), self.settings.clone()); - match trade_executor.execute(params).await { - Ok(()) => { - tracing::info!( - %trader_id, - %order_id, - "Successfully processed match, setting match to Filled" - ); - - update_order_and_match( - &mut connection, - order_id, - MatchState::Filled, - OrderState::Taken, - )?; - Ok(()) - } - Err(e) => { - if let Err(e) = update_order_and_match( - &mut connection, - order_id, - MatchState::Failed, - OrderState::Failed, - ) { - tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}"); - }; + trade_executor.execute(¶ms).await; - Err(e).with_context(|| { - format!("Failed to trade with peer {trader_id} for order {order_id}") - }) - } - } + Ok(()) } #[instrument(fields(position_id = position.id, trader_id = position.trader.to_string()),skip(self, conn, position))] @@ -478,20 +438,3 @@ impl Node { Ok(()) } } - -fn update_order_and_match( - connection: &mut PgConnection, - order_id: Uuid, - match_state: MatchState, - order_state: OrderState, -) -> Result<()> { - connection - .transaction(|connection| { - matches::set_match_state(connection, order_id, match_state)?; - - orders::set_order_state(connection, order_id, order_state)?; - - diesel::result::QueryResult::Ok(()) - }) - .map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}")) -} diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 095f56acd..0716ee428 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -368,7 +368,7 @@ pub async fn post_trade( State(state): State>, params: Json, ) -> Result<(), AppError> { - state.node.trade(¶ms.0).await.map_err(|e| { + state.node.trade(params.0).await.map_err(|e| { AppError::InternalServerError(format!("Could not handle trade request: {e:#}")) }) } diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index be49ba821..9c815882e 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -3,6 +3,7 @@ use crate::db; use crate::decimal_from_f32; use crate::node::storage::NodeStorage; use crate::node::NodeSettings; +use crate::orderbook::db::matches; use crate::orderbook::db::orders; use crate::payout_curve; use crate::position::models::NewPosition; @@ -10,6 +11,7 @@ use crate::position::models::Position; use crate::position::models::PositionState; use crate::storage::CoordinatorTenTenOneStorage; use crate::trade::models::NewTrade; +use anyhow::anyhow; use anyhow::bail; use anyhow::ensure; use anyhow::Context; @@ -17,11 +19,13 @@ use anyhow::Result; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use commons::order_matching_fee_taker; +use commons::MatchState; use commons::OrderState; use commons::TradeAndChannelParams; use commons::TradeParams; use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; +use diesel::Connection; use diesel::PgConnection; use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; @@ -43,6 +47,7 @@ use trade::cfd::calculate_long_liquidation_price; use trade::cfd::calculate_margin; use trade::cfd::calculate_short_liquidation_price; use trade::Direction; +use uuid::Uuid; pub mod models; @@ -80,6 +85,37 @@ impl TradeExecutor { } } + pub async fn execute(&self, params: &TradeAndChannelParams) { + let trader_id = params.trade_params.pubkey; + let order_id = params.trade_params.filled_with.order_id; + + match self.execute_internal(params).await { + Ok(()) => { + tracing::info!( + %trader_id, + %order_id, + "Successfully processed match, setting match to Filled" + ); + + if let Err(e) = + self.update_order_and_match(order_id, MatchState::Filled, OrderState::Taken) + { + tracing::error!(%trader_id, + %order_id,"Failed to update order and match state. Error: {e:#}"); + } + } + Err(e) => { + tracing::error!(%trader_id, %order_id,"Failed to execute trade. Error: {e:#}"); + + if let Err(e) = + self.update_order_and_match(order_id, MatchState::Failed, OrderState::Failed) + { + tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}"); + }; + } + }; + } + /// Execute a trade action according to the coordinator's current trading status with the /// trader. /// @@ -92,7 +128,7 @@ impl TradeExecutor { /// 2. If no position is found, we open a position. /// /// 3. If a position of differing quantity is found, we resize the position. - pub async fn execute(&self, params: &TradeAndChannelParams) -> Result<()> { + async fn execute_internal(&self, params: &TradeAndChannelParams) -> Result<()> { let mut connection = self.pool.get()?; let order_id = params.trade_params.filled_with.order_id; @@ -155,7 +191,6 @@ impl TradeExecutor { .start_closing_position(&mut connection, &position, closing_price, channel_id) .await .with_context(|| format!("Failed at closing position {}", position.id))?, - TradeAction::ResizePosition => unimplemented!(), }; @@ -548,6 +583,24 @@ impl TradeExecutor { ) } + fn update_order_and_match( + &self, + order_id: Uuid, + match_state: MatchState, + order_state: OrderState, + ) -> Result<()> { + let mut connection = self.pool.get()?; + connection + .transaction(|connection| { + matches::set_match_state(connection, order_id, match_state)?; + + orders::set_order_state(connection, order_id, order_state)?; + + diesel::result::QueryResult::Ok(()) + }) + .map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}")) + } + fn coordinator_leverage_for_trade(&self, _counterparty_peer_id: &PublicKey) -> Result { // TODO(bonomat): we will need to configure the leverage on the coordinator differently now // let channel_details = self.get_counterparty_channel(*counterparty_peer_id)?;