From 6050c88b531746237e69574dcabdbe278108234c Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Mon, 4 Mar 2024 18:18:44 +1000 Subject: [PATCH 1/3] feat: position websocket Over this websocket we are sending updates about new trades. This means, if a user opens a new position or closes a position, an update is sent out over this websocket. The update includes an incremenatl change but also the total position state as reference. A client listening on this can then reason on this data. --- coordinator/src/bin/coordinator.rs | 14 +- coordinator/src/dlc_protocol.rs | 35 +++- coordinator/src/node.rs | 8 + coordinator/src/routes.rs | 9 ++ coordinator/src/trade/mod.rs | 1 + coordinator/src/trade/websocket.rs | 251 +++++++++++++++++++++++++++++ crates/commons/src/message.rs | 51 ++++++ 7 files changed, 363 insertions(+), 6 deletions(-) create mode 100644 coordinator/src/trade/websocket.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 336e59220..a91e3b735 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -24,6 +24,7 @@ use coordinator::run_migration; use coordinator::scheduler::NotificationScheduler; use coordinator::settings::Settings; use coordinator::storage::CoordinatorTenTenOneStorage; +use coordinator::trade::websocket::InternalPositionUpdateMessage; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::PgConnection; @@ -148,7 +149,17 @@ async fn main() -> Result<()> { let event_handler = CoordinatorEventHandler::new(node.clone(), None); let running = node.start(event_handler, false)?; - let node = Node::new(node, running, pool.clone(), settings.to_node_settings()); + + // an internal channel to send updates about our position + let (tx_position_feed, _rx) = broadcast::channel::(100); + + let node = Node::new( + node, + running, + pool.clone(), + settings.to_node_settings(), + tx_position_feed.clone(), + ); // TODO: Pass the tokio metrics into Prometheus if let Some(interval) = opts.tokio_metrics_interval_seconds { @@ -284,6 +295,7 @@ async fn main() -> Result<()> { NODE_ALIAS, trading_sender, tx_price_feed, + tx_position_feed, tx_user_feed, auth_users_notifier.clone(), notification_service.get_sender(), diff --git a/coordinator/src/dlc_protocol.rs b/coordinator/src/dlc_protocol.rs index 7099c22d2..c91a2b505 100644 --- a/coordinator/src/dlc_protocol.rs +++ b/coordinator/src/dlc_protocol.rs @@ -1,6 +1,7 @@ use crate::db; use crate::position::models::PositionState; use crate::trade::models::NewTrade; +use crate::trade::websocket::InternalPositionUpdateMessage; use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; @@ -20,6 +21,7 @@ use std::fmt::Display; use std::fmt::Formatter; use std::str::from_utf8; use time::OffsetDateTime; +use tokio::sync::broadcast::Sender; use trade::cfd::calculate_margin; use trade::cfd::calculate_pnl; use trade::Direction; @@ -231,12 +233,12 @@ impl DlcProtocolExecutor { trader_id: &PublicKey, contract_id: Option, channel_id: &DlcChannelId, + tx_position_feed: Sender, ) -> Result<()> { let mut conn = self.pool.get()?; + let dlc_protocol = db::dlc_protocols::get_dlc_protocol(&mut conn, protocol_id)?; conn.transaction(|conn| { - let dlc_protocol = db::dlc_protocols::get_dlc_protocol(conn, protocol_id)?; - - match dlc_protocol.protocol_type { + match &dlc_protocol.protocol_type { DlcProtocolType::Open { trade_params } | DlcProtocolType::Renew { trade_params } => { let contract_id = contract_id @@ -282,6 +284,29 @@ impl DlcProtocolExecutor { } })?; + match &dlc_protocol.protocol_type { + DlcProtocolType::Open { trade_params } + | DlcProtocolType::Renew { trade_params } + | DlcProtocolType::Settle { trade_params } => { + if let Err(e) = { + tx_position_feed.send(InternalPositionUpdateMessage::NewTrade { + quantity: if trade_params.direction == Direction::Short { + trade_params.quantity + } else { + // We want to reflect the quantity as seen by the coordinator + trade_params.quantity * -1.0 + }, + average_entry_price: trade_params.average_price, + }) + } { + tracing::error!("Could not notify channel about finished trade {e:#}"); + } + } + _ => { + // a trade only happens in Open, Renew and Settle + } + } + Ok(()) } @@ -294,7 +319,7 @@ impl DlcProtocolExecutor { fn finish_close_trade_dlc_protocol( &self, conn: &mut PgConnection, - trade_params: TradeParams, + trade_params: &TradeParams, protocol_id: ProtocolId, settled_contract: &ContractId, channel_id: &DlcChannelId, @@ -391,7 +416,7 @@ impl DlcProtocolExecutor { fn finish_open_trade_dlc_protocol( &self, conn: &mut PgConnection, - trade_params: TradeParams, + trade_params: &TradeParams, protocol_id: ProtocolId, contract_id: &ContractId, channel_id: &DlcChannelId, diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index ef4175b99..13214a710 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -5,6 +5,7 @@ use crate::message::OrderbookMessage; use crate::node::storage::NodeStorage; use crate::position::models::PositionState; use crate::storage::CoordinatorTenTenOneStorage; +use crate::trade::websocket::InternalPositionUpdateMessage; use crate::trade::TradeExecutor; use anyhow::bail; use anyhow::Context; @@ -32,6 +33,7 @@ use ln_dlc_node::node::dlc_message_name; use ln_dlc_node::node::event::NodeEvent; use ln_dlc_node::node::RunningNode; use std::sync::Arc; +use tokio::sync::broadcast::Sender; use tokio::sync::mpsc; use tokio::sync::RwLock; @@ -59,6 +61,7 @@ pub struct Node { _running: Arc, pub pool: Pool>, settings: Arc>, + tx_position_feed: Sender, } impl Node { @@ -73,12 +76,14 @@ impl Node { running: RunningNode, pool: Pool>, settings: NodeSettings, + tx_position_feed: Sender, ) -> Self { Self { inner, pool, settings: Arc::new(RwLock::new(settings)), _running: Arc::new(running), + tx_position_feed, } } @@ -281,6 +286,7 @@ impl Node { &node_id, channel.get_contract_id(), channel_id, + self.tx_position_feed.clone(), )?; } ChannelMessage::SettleFinalize(SettleFinalize { @@ -320,6 +326,7 @@ impl Node { // the settled signed channel does not have a contract None, channel_id, + self.tx_position_feed.clone(), )?; } ChannelMessage::CollaborativeCloseOffer(close_offer) => { @@ -377,6 +384,7 @@ impl Node { &node_id, channel.get_contract_id(), &channel_id, + self.tx_position_feed.clone(), )?; } ChannelMessage::Reject(reject) => { diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index d828a0835..2a2c00dde 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -35,6 +35,7 @@ use crate::orderbook::trading::NewOrderMessage; use crate::parse_dlc_channel_id; use crate::settings::Settings; use crate::settings::SettingsFile; +use crate::trade::websocket::InternalPositionUpdateMessage; use crate::AppError; use axum::extract::ConnectInfo; use axum::extract::DefaultBodyLimit; @@ -96,6 +97,8 @@ pub struct AppState { pub node: Node, // Channel used to send messages to all connected clients. pub tx_price_feed: broadcast::Sender, + /// A channel used to send messages about position updates + pub tx_position_feed: broadcast::Sender, pub tx_user_feed: broadcast::Sender, pub trading_sender: mpsc::Sender, pub pool: Pool>, @@ -119,6 +122,7 @@ pub fn router( node_alias: &str, trading_sender: mpsc::Sender, tx_price_feed: broadcast::Sender, + tx_position_feed: broadcast::Sender, tx_user_feed: broadcast::Sender, auth_users_notifier: mpsc::Sender, notification_sender: mpsc::Sender, @@ -131,6 +135,7 @@ pub fn router( pool, settings: RwLock::new(settings), tx_price_feed, + tx_position_feed, tx_user_feed, trading_sender, exporter, @@ -199,6 +204,10 @@ pub fn router( .route("/metrics", get(get_metrics)) .route("/health", get(get_health)) .route("/api/leaderboard", get(get_leaderboard)) + .route( + "/api/admin/trade/websocket", + get(crate::trade::websocket::websocket_handler), + ) .layer(DefaultBodyLimit::disable()) .layer(DefaultBodyLimit::max(50 * 1024)) .with_state(app_state) diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index 9c57a0a66..b691c6c3c 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -57,6 +57,7 @@ use trade::Direction; use uuid::Uuid; pub mod models; +pub mod websocket; pub enum TradeAction { OpenDlcChannel, diff --git a/coordinator/src/trade/websocket.rs b/coordinator/src/trade/websocket.rs new file mode 100644 index 000000000..e7a715caf --- /dev/null +++ b/coordinator/src/trade/websocket.rs @@ -0,0 +1,251 @@ +use crate::db; +use crate::position::models::Position; +use crate::routes::AppState; +use axum::extract::ws::Message as WebsocketMessage; +use axum::extract::ws::WebSocket; +use axum::extract::State; +use axum::extract::WebSocketUpgrade; +use axum::response::IntoResponse; +use commons::create_sign_message; +use commons::PositionMessage; +use commons::PositionMessageRequest; +use commons::AUTH_SIGN_MESSAGE; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::PooledConnection; +use diesel::PgConnection; +use futures::SinkExt; +use futures::StreamExt; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc; +use trade::Direction; + +#[derive(Clone)] +pub enum InternalPositionUpdateMessage { + NewTrade { + /// As seen from the coordinator, i.e. if quantity is < 0 then coordinator is short, if > + /// 0, then coordinator is long + quantity: f32, + average_entry_price: f32, + }, +} + +const WEBSOCKET_SEND_TIMEOUT: Duration = Duration::from_secs(5); + +pub async fn websocket_handler( + ws: WebSocketUpgrade, + State(state): State>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| websocket_connection(socket, state)) +} + +// This function deals with a single websocket connection, i.e., a single +// connected client / user, for which we will spawn two independent tasks (for +// receiving / sending messages). +pub async fn websocket_connection(stream: WebSocket, state: Arc) { + // By splitting, we can send and receive at the same time. + let (mut sender, mut receiver) = stream.split(); + + let mut feed = state.tx_position_feed.subscribe(); + + let (local_sender, mut local_receiver) = mpsc::channel::(100); + + let mut local_recv_task = tokio::spawn(async move { + while let Some(local_msg) = local_receiver.recv().await { + match serde_json::to_string(&local_msg) { + Ok(msg) => { + if let Err(err) = tokio::time::timeout( + WEBSOCKET_SEND_TIMEOUT, + sender.send(WebsocketMessage::Text(msg.clone())), + ) + .await + { + tracing::error!("Could not forward message {msg} : {err:#}"); + return; + } + } + Err(error) => { + tracing::warn!("Could not deserialize message {error:#}"); + } + } + } + }); + + // Spawn the first task that will receive broadcast messages and send + // messages over the websocket to our client. + let mut send_task = { + let local_sender = local_sender.clone(); + let pool = state.pool.clone(); + tokio::spawn(async move { + loop { + match feed.recv().await.clone() { + Ok(position_update) => match position_update { + InternalPositionUpdateMessage::NewTrade { + quantity, + average_entry_price, + } => { + if let Err(error) = { + let mut conn = match pool.get() { + Ok(conn) => conn, + Err(err) => { + tracing::error!( + "Could not get connection to db pool {err:#}" + ); + return; + } + }; + + let (total_average_entry_price, total_quantity) = + calculate_position_stats(&mut conn); + local_sender.send(PositionMessage::NewTrade { + total_quantity, + total_average_entry_price, + new_trade_quantity: quantity, + new_trade_average_entry_price: average_entry_price, + }) + } + .await + { + tracing::error!("Could not send message {error:#}"); + return; + } + } + }, + Err(RecvError::Closed) => { + tracing::error!("position feed sender died! Channel closed."); + break; + } + Err(RecvError::Lagged(skip)) => tracing::warn!(%skip, + "Lagging behind on position feed." + ), + } + } + }) + }; + + // Spawn a task that takes messages from the websocket + let local_sender = local_sender.clone(); + let pool = state.pool.clone(); + let mut recv_task = tokio::spawn(async move { + while let Some(Ok(WebsocketMessage::Text(text))) = receiver.next().await { + match serde_json::from_str(text.as_str()) { + Ok(PositionMessageRequest::Authenticate { signature }) => { + let msg = create_sign_message(AUTH_SIGN_MESSAGE.to_vec()); + // TODO(bonomat): in the future we could add authorization as well to only allow + // particular pubkeys get updates + let user_id = signature.pubkey; + let signature = signature.signature; + + let mut conn = match pool.get() { + Ok(conn) => conn, + Err(err) => { + tracing::error!("Could not get connection to db pool {err:#}"); + return; + } + }; + + match signature.verify(&msg, &user_id) { + Ok(_) => { + if let Err(e) = local_sender.send(PositionMessage::Authenticated).await + { + tracing::error!(%user_id, "Could not respond to user {e:#}"); + return; + } + + let (average_entry_price, total_quantity) = + calculate_position_stats(&mut conn); + + if let Err(e) = local_sender + .send(PositionMessage::CurrentPosition { + quantity: total_quantity, + average_entry_price, + }) + .await + { + tracing::error!(%user_id, "Failed to send all open positions to user {e:#}"); + } + } + Err(err) => { + if let Err(er) = local_sender + .send(PositionMessage::InvalidAuthentication(format!( + "Could not authenticate {err:#}" + ))) + .await + { + tracing::error!( + %user_id, "Failed to notify user about invalid authentication: {er:#}" + ); + return; + } + } + } + } + Err(err) => { + tracing::trace!("Could not deserialize msg: {text} {err:#}"); + } + } + } + }); + + // If any one of the tasks run to completion, we abort the other. + tokio::select! { + _ = (&mut send_task) => { + recv_task.abort(); + local_recv_task.abort() + }, + _ = (&mut recv_task) => { + send_task.abort(); + local_recv_task.abort() + }, + _ = (&mut local_recv_task) => { + recv_task.abort(); + send_task.abort(); + }, + }; +} + +/// Calculates position stats and returns as a tuple (`average_entry_price`,`total_quantity`) +fn calculate_position_stats( + conn: &mut PooledConnection>, +) -> (f32, f32) { + let positions = db::positions::Position::get_all_open_positions(conn).unwrap_or_default(); + + let average_entry_price = average_entry_price(&positions); + let total_quantity = positions + .iter() + .map(|pos| { + if pos.trader_direction == Direction::Short { + pos.quantity + } else { + // we want to see the quantity as seen from the coordinator + pos.quantity * -1.0 + } + }) + .sum(); + (average_entry_price, total_quantity) +} + +/// calculates the average execution price for inverse contracts +/// +/// The average execution price follows a simple formula: +/// `total_order_quantity / (quantity_position_0 / execution_price_position_0 + quantity_position_1 +/// / execution_price_position_1 )` +pub fn average_entry_price(positions: &[Position]) -> f32 { + if positions.is_empty() { + return 0.0; + } + if positions.len() == 1 { + return positions + .first() + .expect("to be exactly one") + .average_entry_price; + } + let sum_quantity = positions.iter().fold(0.0, |acc, m| acc + m.quantity); + + let nominal_prices = positions + .iter() + .fold(0.0, |acc, m| acc + (m.quantity / m.average_entry_price)); + + sum_quantity / nominal_prices +} diff --git a/crates/commons/src/message.rs b/crates/commons/src/message.rs index 43f494638..70075a819 100644 --- a/crates/commons/src/message.rs +++ b/crates/commons/src/message.rs @@ -123,3 +123,54 @@ impl Display for Message { } } } + +/// All values are from the perspective of the coordinator +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum PositionMessage { + /// The current position as seen from the coordinator + CurrentPosition { + /// if quantity is < 0 then coordinator is short, if > 0, then coordinator is long + quantity: f32, + average_entry_price: f32, + }, + /// A new trade which was executed successfully + NewTrade { + /// The coordinator's total position + /// + /// if quantity is < 0 then coordinator is short, if > 0, then coordinator is long + total_quantity: f32, + /// The average entry price of the total position + total_average_entry_price: f32, + /// The quantity of the new trade + /// + /// if quantity is < 0 then coordinator is short, if > 0, then coordinator is long + new_trade_quantity: f32, + /// The average entry price of the new trade + new_trade_average_entry_price: f32, + }, + Authenticated, + InvalidAuthentication(String), +} + +impl TryFrom for tungstenite::Message { + type Error = anyhow::Error; + + fn try_from(request: PositionMessage) -> Result { + let msg = serde_json::to_string(&request)?; + Ok(tungstenite::Message::Text(msg)) + } +} + +impl TryFrom for tungstenite::Message { + type Error = anyhow::Error; + + fn try_from(request: PositionMessageRequest) -> Result { + let msg = serde_json::to_string(&request)?; + Ok(tungstenite::Message::Text(msg)) + } +} + +#[derive(Serialize, Clone, Deserialize, Debug)] +pub enum PositionMessageRequest { + Authenticate { signature: Signature }, +} From 26690aeb01501897fefd91dc31e64c07a42af6cd Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Sun, 10 Mar 2024 12:30:46 +0800 Subject: [PATCH 2/3] fix: on update, only keep orders which are open When receiving an order update, we only want to retain `open` orders. Expired, deleted, matched, etc are not of interested in the app anymore. --- mobile/native/src/orderbook.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 107ac4a71..011948e15 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -18,6 +18,7 @@ use bitcoin::secp256k1::SECP256K1; use commons::best_current_price; use commons::Message; use commons::Order; +use commons::OrderState; use commons::OrderbookRequest; use commons::Prices; use commons::Signature; @@ -298,7 +299,9 @@ async fn handle_orderbook_message( tracing::warn!(?updated_order, "Update without prior knowledge of order"); } - orders.push(updated_order); + if updated_order.order_state == OrderState::Open { + orders.push(updated_order); + } update_prices_if_needed(cached_best_price, &orders); } From d552ea2b1ff5a1d12dcfe988a22dfb6c332bea00 Mon Sep 17 00:00:00 2001 From: Philipp Hoenisch Date: Sun, 10 Mar 2024 12:33:54 +0800 Subject: [PATCH 3/3] feat: add possibility to delete orders --- .../down.sql | 2 ++ .../up.sql | 1 + coordinator/src/orderbook/db/custom_types.rs | 4 ++++ coordinator/src/orderbook/db/orders.rs | 7 +++++++ coordinator/src/orderbook/routes.rs | 14 ++++++++++++++ coordinator/src/routes.rs | 3 ++- crates/commons/src/order.rs | 1 + 7 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/down.sql create mode 100644 coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/up.sql diff --git a/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/down.sql b/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/down.sql new file mode 100644 index 000000000..fd4907362 --- /dev/null +++ b/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/down.sql @@ -0,0 +1,2 @@ +-- We need to do something, or the migration fails +select 1; diff --git a/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/up.sql b/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/up.sql new file mode 100644 index 000000000..f1a29c6fa --- /dev/null +++ b/coordinator/migrations/2024-03-10-042249_add_delete_state_to_order/up.sql @@ -0,0 +1 @@ +ALTER TYPE "OrderState_Type" ADD VALUE IF NOT EXISTS 'Deleted'; diff --git a/coordinator/src/orderbook/db/custom_types.rs b/coordinator/src/orderbook/db/custom_types.rs index 2476c1e20..b546c7d64 100644 --- a/coordinator/src/orderbook/db/custom_types.rs +++ b/coordinator/src/orderbook/db/custom_types.rs @@ -102,6 +102,8 @@ pub(crate) enum OrderState { Failed, /// The order expired. Expired, + /// The order was manually deleted by the trader + Deleted, } impl QueryId for OrderStateType { @@ -121,6 +123,7 @@ impl ToSql for OrderState { OrderState::Taken => out.write_all(b"Taken")?, OrderState::Failed => out.write_all(b"Failed")?, OrderState::Expired => out.write_all(b"Expired")?, + OrderState::Deleted => out.write_all(b"Deleted")?, } Ok(IsNull::No) } @@ -134,6 +137,7 @@ impl FromSql for OrderState { b"Taken" => Ok(OrderState::Taken), b"Failed" => Ok(OrderState::Failed), b"Expired" => Ok(OrderState::Expired), + b"Deleted" => Ok(OrderState::Deleted), _ => Err("Unrecognized enum variant".into()), } } diff --git a/coordinator/src/orderbook/db/orders.rs b/coordinator/src/orderbook/db/orders.rs index 48a09a19d..adaefe627 100644 --- a/coordinator/src/orderbook/db/orders.rs +++ b/coordinator/src/orderbook/db/orders.rs @@ -66,6 +66,7 @@ impl From for OrderBookOrderState { OrderState::Taken => OrderBookOrderState::Taken, OrderState::Failed => OrderBookOrderState::Failed, OrderState::Expired => OrderBookOrderState::Expired, + OrderState::Deleted => OrderBookOrderState::Deleted, } } } @@ -78,6 +79,7 @@ impl From for OrderState { OrderBookOrderState::Taken => OrderState::Taken, OrderBookOrderState::Failed => OrderState::Failed, OrderBookOrderState::Expired => OrderState::Expired, + OrderBookOrderState::Deleted => OrderState::Deleted, } } } @@ -266,6 +268,11 @@ pub fn set_is_taken( } } +/// Updates the order state to `Deleted` +pub fn delete(conn: &mut PgConnection, id: Uuid) -> QueryResult { + set_order_state(conn, id, commons::OrderState::Deleted) +} + /// Returns the number of affected rows: 1. pub fn set_order_state( conn: &mut PgConnection, diff --git a/coordinator/src/orderbook/routes.rs b/coordinator/src/orderbook/routes.rs index af21d8d58..7e1046b82 100644 --- a/coordinator/src/orderbook/routes.rs +++ b/coordinator/src/orderbook/routes.rs @@ -154,6 +154,20 @@ pub async fn put_order( Ok(Json(order)) } +#[instrument(skip_all, err(Debug))] +pub async fn delete_order( + Path(order_id): Path, + State(state): State>, +) -> Result, AppError> { + let mut conn = get_db_connection(&state)?; + let order = orderbook::db::orders::delete(&mut conn, order_id) + .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; + let sender = state.tx_price_feed.clone(); + update_pricefeed(Message::Update(order.clone()), sender); + + Ok(Json(order)) +} + pub async fn websocket_handler( ws: WebSocketUpgrade, State(state): State>, diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 2a2c00dde..fa55e3d03 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -26,6 +26,7 @@ use crate::message::NewUserMessage; use crate::message::OrderbookMessage; use crate::node::Node; use crate::notifications::Notification; +use crate::orderbook::routes::delete_order; use crate::orderbook::routes::get_order; use crate::orderbook::routes::get_orders; use crate::orderbook::routes::post_order; @@ -162,7 +163,7 @@ pub fn router( .route("/api/orderbook/orders", get(get_orders).post(post_order)) .route( "/api/orderbook/orders/:order_id", - get(get_order).put(put_order), + get(get_order).put(put_order).delete(delete_order), ) .route("/api/orderbook/websocket", get(websocket_handler)) .route("/api/trade", post(post_trade)) diff --git a/crates/commons/src/order.rs b/crates/commons/src/order.rs index b31ebc0ff..81e377dfc 100644 --- a/crates/commons/src/order.rs +++ b/crates/commons/src/order.rs @@ -111,6 +111,7 @@ pub enum OrderState { Taken, Failed, Expired, + Deleted, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]