Skip to content

Commit

Permalink
refactor(trade_executor): Add error handling to trade executor impl
Browse files Browse the repository at this point in the history
  • Loading branch information
holzeis committed Feb 15, 2024
1 parent 499555a commit f0517f6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 63 deletions.
63 changes: 3 additions & 60 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
use crate::db;
use crate::node::storage::NodeStorage;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use crate::position::models::Position;
use crate::position::models::PositionState;
use crate::storage::CoordinatorTenTenOneStorage;
use crate::trade::TradeExecutor;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use commons::MatchState;
use commons::OrderState;
use commons::TradeAndChannelParams;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::Connection;
use diesel::PgConnection;
use dlc_manager::channel::signed_channel::SignedChannel;
use dlc_manager::channel::signed_channel::SignedChannelState;
Expand All @@ -39,7 +33,6 @@ use tokio::sync::RwLock;
use tracing::instrument;
use trade::cfd::calculate_pnl;
use trade::Direction;
use uuid::Uuid;

pub mod connection;
pub mod expired_positions;
Expand Down Expand Up @@ -126,45 +119,12 @@ impl Node {
!usable_channels.is_empty()
}

pub async fn trade(&self, params: &TradeAndChannelParams) -> Result<()> {
let mut connection = self.pool.get()?;

let order_id = params.trade_params.filled_with.order_id;
let trader_id = params.trade_params.pubkey;

pub async fn trade(&self, params: TradeAndChannelParams) -> Result<()> {
let trade_executor =
TradeExecutor::new(self.inner.clone(), self.pool.clone(), self.settings.clone());
match trade_executor.execute(params).await {
Ok(()) => {
tracing::info!(
%trader_id,
%order_id,
"Successfully processed match, setting match to Filled"
);

update_order_and_match(
&mut connection,
order_id,
MatchState::Filled,
OrderState::Taken,
)?;
Ok(())
}
Err(e) => {
if let Err(e) = update_order_and_match(
&mut connection,
order_id,
MatchState::Failed,
OrderState::Failed,
) {
tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}");
};
trade_executor.execute(&params).await;

Err(e).with_context(|| {
format!("Failed to trade with peer {trader_id} for order {order_id}")
})
}
}
Ok(())
}

#[instrument(fields(position_id = position.id, trader_id = position.trader.to_string()),skip(self, conn, position))]
Expand Down Expand Up @@ -478,20 +438,3 @@ impl Node {
Ok(())
}
}

fn update_order_and_match(
connection: &mut PgConnection,
order_id: Uuid,
match_state: MatchState,
order_state: OrderState,
) -> Result<()> {
connection
.transaction(|connection| {
matches::set_match_state(connection, order_id, match_state)?;

orders::set_order_state(connection, order_id, order_state)?;

diesel::result::QueryResult::Ok(())
})
.map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}"))
}
2 changes: 1 addition & 1 deletion coordinator/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ pub async fn post_trade(
State(state): State<Arc<AppState>>,
params: Json<TradeAndChannelParams>,
) -> Result<(), AppError> {
state.node.trade(&params.0).await.map_err(|e| {
state.node.trade(params.0).await.map_err(|e| {
AppError::InternalServerError(format!("Could not handle trade request: {e:#}"))
})
}
Expand Down
57 changes: 55 additions & 2 deletions coordinator/src/trade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,29 @@ use crate::db;
use crate::decimal_from_f32;
use crate::node::storage::NodeStorage;
use crate::node::NodeSettings;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use crate::payout_curve;
use crate::position::models::NewPosition;
use crate::position::models::Position;
use crate::position::models::PositionState;
use crate::storage::CoordinatorTenTenOneStorage;
use crate::trade::models::NewTrade;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::ensure;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use commons::order_matching_fee_taker;
use commons::MatchState;
use commons::OrderState;
use commons::TradeAndChannelParams;
use commons::TradeParams;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::Connection;
use diesel::PgConnection;
use dlc_manager::channel::signed_channel::SignedChannel;
use dlc_manager::channel::signed_channel::SignedChannelState;
Expand All @@ -43,6 +47,7 @@ use trade::cfd::calculate_long_liquidation_price;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_short_liquidation_price;
use trade::Direction;
use uuid::Uuid;

pub mod models;

Expand Down Expand Up @@ -80,6 +85,37 @@ impl TradeExecutor {
}
}

pub async fn execute(&self, params: &TradeAndChannelParams) {
let trader_id = params.trade_params.pubkey;
let order_id = params.trade_params.filled_with.order_id;

match self.execute_internal(params).await {
Ok(()) => {
tracing::info!(
%trader_id,
%order_id,
"Successfully processed match, setting match to Filled"
);

if let Err(e) =
self.update_order_and_match(order_id, MatchState::Filled, OrderState::Taken)
{
tracing::error!(%trader_id,
%order_id,"Failed to update order and match state. Error: {e:#}");
}
}
Err(e) => {
tracing::error!(%trader_id, %order_id,"Failed to execute trade. Error: {e:#}");

if let Err(e) =
self.update_order_and_match(order_id, MatchState::Failed, OrderState::Failed)
{
tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}");
};
}
};
}

/// Execute a trade action according to the coordinator's current trading status with the
/// trader.
///
Expand All @@ -92,7 +128,7 @@ impl TradeExecutor {
/// 2. If no position is found, we open a position.
///
/// 3. If a position of differing quantity is found, we resize the position.
pub async fn execute(&self, params: &TradeAndChannelParams) -> Result<()> {
async fn execute_internal(&self, params: &TradeAndChannelParams) -> Result<()> {
let mut connection = self.pool.get()?;

let order_id = params.trade_params.filled_with.order_id;
Expand Down Expand Up @@ -155,7 +191,6 @@ impl TradeExecutor {
.start_closing_position(&mut connection, &position, closing_price, channel_id)
.await
.with_context(|| format!("Failed at closing position {}", position.id))?,

TradeAction::ResizePosition => unimplemented!(),
};

Expand Down Expand Up @@ -548,6 +583,24 @@ impl TradeExecutor {
)
}

fn update_order_and_match(
&self,
order_id: Uuid,
match_state: MatchState,
order_state: OrderState,
) -> Result<()> {
let mut connection = self.pool.get()?;
connection
.transaction(|connection| {
matches::set_match_state(connection, order_id, match_state)?;

orders::set_order_state(connection, order_id, order_state)?;

diesel::result::QueryResult::Ok(())
})
.map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}"))
}

fn coordinator_leverage_for_trade(&self, _counterparty_peer_id: &PublicKey) -> Result<f32> {
// TODO(bonomat): we will need to configure the leverage on the coordinator differently now
// let channel_details = self.get_counterparty_channel(*counterparty_peer_id)?;
Expand Down

0 comments on commit f0517f6

Please sign in to comment.