Skip to content

Commit

Permalink
Merge pull request #2143 from get10101/feat/positions-websocket
Browse files Browse the repository at this point in the history
feat: websocket for sending position/trade updates
  • Loading branch information
bonomat authored Mar 10, 2024
2 parents 80c315e + d552ea2 commit df45d39
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- We need to do something, or the migration fails
select 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE "OrderState_Type" ADD VALUE IF NOT EXISTS 'Deleted';
14 changes: 13 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use coordinator::run_migration;
use coordinator::scheduler::NotificationScheduler;
use coordinator::settings::Settings;
use coordinator::storage::CoordinatorTenTenOneStorage;
use coordinator::trade::websocket::InternalPositionUpdateMessage;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
Expand Down Expand Up @@ -148,7 +149,17 @@ async fn main() -> Result<()> {

let event_handler = CoordinatorEventHandler::new(node.clone(), None);
let running = node.start(event_handler, false)?;
let node = Node::new(node, running, pool.clone(), settings.to_node_settings());

// an internal channel to send updates about our position
let (tx_position_feed, _rx) = broadcast::channel::<InternalPositionUpdateMessage>(100);

let node = Node::new(
node,
running,
pool.clone(),
settings.to_node_settings(),
tx_position_feed.clone(),
);

// TODO: Pass the tokio metrics into Prometheus
if let Some(interval) = opts.tokio_metrics_interval_seconds {
Expand Down Expand Up @@ -284,6 +295,7 @@ async fn main() -> Result<()> {
NODE_ALIAS,
trading_sender,
tx_price_feed,
tx_position_feed,
tx_user_feed,
auth_users_notifier.clone(),
notification_service.get_sender(),
Expand Down
35 changes: 30 additions & 5 deletions coordinator/src/dlc_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::db;
use crate::position::models::PositionState;
use crate::trade::models::NewTrade;
use crate::trade::websocket::InternalPositionUpdateMessage;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
Expand All @@ -20,6 +21,7 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::str::from_utf8;
use time::OffsetDateTime;
use tokio::sync::broadcast::Sender;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_pnl;
use trade::Direction;
Expand Down Expand Up @@ -231,12 +233,12 @@ impl DlcProtocolExecutor {
trader_id: &PublicKey,
contract_id: Option<ContractId>,
channel_id: &DlcChannelId,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
) -> Result<()> {
let mut conn = self.pool.get()?;
let dlc_protocol = db::dlc_protocols::get_dlc_protocol(&mut conn, protocol_id)?;
conn.transaction(|conn| {
let dlc_protocol = db::dlc_protocols::get_dlc_protocol(conn, protocol_id)?;

match dlc_protocol.protocol_type {
match &dlc_protocol.protocol_type {
DlcProtocolType::Open { trade_params }
| DlcProtocolType::Renew { trade_params } => {
let contract_id = contract_id
Expand Down Expand Up @@ -282,6 +284,29 @@ impl DlcProtocolExecutor {
}
})?;

match &dlc_protocol.protocol_type {
DlcProtocolType::Open { trade_params }
| DlcProtocolType::Renew { trade_params }
| DlcProtocolType::Settle { trade_params } => {
if let Err(e) = {
tx_position_feed.send(InternalPositionUpdateMessage::NewTrade {
quantity: if trade_params.direction == Direction::Short {
trade_params.quantity
} else {
// We want to reflect the quantity as seen by the coordinator
trade_params.quantity * -1.0
},
average_entry_price: trade_params.average_price,
})
} {
tracing::error!("Could not notify channel about finished trade {e:#}");
}
}
_ => {
// a trade only happens in Open, Renew and Settle
}
}

Ok(())
}

Expand All @@ -294,7 +319,7 @@ impl DlcProtocolExecutor {
fn finish_close_trade_dlc_protocol(
&self,
conn: &mut PgConnection,
trade_params: TradeParams,
trade_params: &TradeParams,
protocol_id: ProtocolId,
settled_contract: &ContractId,
channel_id: &DlcChannelId,
Expand Down Expand Up @@ -391,7 +416,7 @@ impl DlcProtocolExecutor {
fn finish_open_trade_dlc_protocol(
&self,
conn: &mut PgConnection,
trade_params: TradeParams,
trade_params: &TradeParams,
protocol_id: ProtocolId,
contract_id: &ContractId,
channel_id: &DlcChannelId,
Expand Down
8 changes: 8 additions & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::message::OrderbookMessage;
use crate::node::storage::NodeStorage;
use crate::position::models::PositionState;
use crate::storage::CoordinatorTenTenOneStorage;
use crate::trade::websocket::InternalPositionUpdateMessage;
use crate::trade::TradeExecutor;
use anyhow::bail;
use anyhow::Context;
Expand Down Expand Up @@ -32,6 +33,7 @@ use ln_dlc_node::node::dlc_message_name;
use ln_dlc_node::node::event::NodeEvent;
use ln_dlc_node::node::RunningNode;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -59,6 +61,7 @@ pub struct Node {
_running: Arc<RunningNode>,
pub pool: Pool<ConnectionManager<PgConnection>>,
settings: Arc<RwLock<NodeSettings>>,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
}

impl Node {
Expand All @@ -73,12 +76,14 @@ impl Node {
running: RunningNode,
pool: Pool<ConnectionManager<PgConnection>>,
settings: NodeSettings,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
) -> Self {
Self {
inner,
pool,
settings: Arc::new(RwLock::new(settings)),
_running: Arc::new(running),
tx_position_feed,
}
}

Expand Down Expand Up @@ -281,6 +286,7 @@ impl Node {
&node_id,
channel.get_contract_id(),
channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::SettleFinalize(SettleFinalize {
Expand Down Expand Up @@ -320,6 +326,7 @@ impl Node {
// the settled signed channel does not have a contract
None,
channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::CollaborativeCloseOffer(close_offer) => {
Expand Down Expand Up @@ -377,6 +384,7 @@ impl Node {
&node_id,
channel.get_contract_id(),
&channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::Reject(reject) => {
Expand Down
4 changes: 4 additions & 0 deletions coordinator/src/orderbook/db/custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub(crate) enum OrderState {
Failed,
/// The order expired.
Expired,
/// The order was manually deleted by the trader
Deleted,
}

impl QueryId for OrderStateType {
Expand All @@ -121,6 +123,7 @@ impl ToSql<OrderStateType, Pg> for OrderState {
OrderState::Taken => out.write_all(b"Taken")?,
OrderState::Failed => out.write_all(b"Failed")?,
OrderState::Expired => out.write_all(b"Expired")?,
OrderState::Deleted => out.write_all(b"Deleted")?,
}
Ok(IsNull::No)
}
Expand All @@ -134,6 +137,7 @@ impl FromSql<OrderStateType, Pg> for OrderState {
b"Taken" => Ok(OrderState::Taken),
b"Failed" => Ok(OrderState::Failed),
b"Expired" => Ok(OrderState::Expired),
b"Deleted" => Ok(OrderState::Deleted),
_ => Err("Unrecognized enum variant".into()),
}
}
Expand Down
7 changes: 7 additions & 0 deletions coordinator/src/orderbook/db/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl From<OrderState> for OrderBookOrderState {
OrderState::Taken => OrderBookOrderState::Taken,
OrderState::Failed => OrderBookOrderState::Failed,
OrderState::Expired => OrderBookOrderState::Expired,
OrderState::Deleted => OrderBookOrderState::Deleted,
}
}
}
Expand All @@ -78,6 +79,7 @@ impl From<OrderBookOrderState> for OrderState {
OrderBookOrderState::Taken => OrderState::Taken,
OrderBookOrderState::Failed => OrderState::Failed,
OrderBookOrderState::Expired => OrderState::Expired,
OrderBookOrderState::Deleted => OrderState::Deleted,
}
}
}
Expand Down Expand Up @@ -266,6 +268,11 @@ pub fn set_is_taken(
}
}

/// Updates the order state to `Deleted`
pub fn delete(conn: &mut PgConnection, id: Uuid) -> QueryResult<OrderbookOrder> {
set_order_state(conn, id, commons::OrderState::Deleted)
}

/// Returns the number of affected rows: 1.
pub fn set_order_state(
conn: &mut PgConnection,
Expand Down
14 changes: 14 additions & 0 deletions coordinator/src/orderbook/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ pub async fn put_order(
Ok(Json(order))
}

#[instrument(skip_all, err(Debug))]
pub async fn delete_order(
Path(order_id): Path<Uuid>,
State(state): State<Arc<AppState>>,
) -> Result<Json<Order>, AppError> {
let mut conn = get_db_connection(&state)?;
let order = orderbook::db::orders::delete(&mut conn, order_id)
.map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?;
let sender = state.tx_price_feed.clone();
update_pricefeed(Message::Update(order.clone()), sender);

Ok(Json(order))
}

pub async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
Expand Down
12 changes: 11 additions & 1 deletion coordinator/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::message::NewUserMessage;
use crate::message::OrderbookMessage;
use crate::node::Node;
use crate::notifications::Notification;
use crate::orderbook::routes::delete_order;
use crate::orderbook::routes::get_order;
use crate::orderbook::routes::get_orders;
use crate::orderbook::routes::post_order;
Expand All @@ -35,6 +36,7 @@ use crate::orderbook::trading::NewOrderMessage;
use crate::parse_dlc_channel_id;
use crate::settings::Settings;
use crate::settings::SettingsFile;
use crate::trade::websocket::InternalPositionUpdateMessage;
use crate::AppError;
use axum::extract::ConnectInfo;
use axum::extract::DefaultBodyLimit;
Expand Down Expand Up @@ -96,6 +98,8 @@ pub struct AppState {
pub node: Node,
// Channel used to send messages to all connected clients.
pub tx_price_feed: broadcast::Sender<Message>,
/// A channel used to send messages about position updates
pub tx_position_feed: broadcast::Sender<InternalPositionUpdateMessage>,
pub tx_user_feed: broadcast::Sender<NewUserMessage>,
pub trading_sender: mpsc::Sender<NewOrderMessage>,
pub pool: Pool<ConnectionManager<PgConnection>>,
Expand All @@ -119,6 +123,7 @@ pub fn router(
node_alias: &str,
trading_sender: mpsc::Sender<NewOrderMessage>,
tx_price_feed: broadcast::Sender<Message>,
tx_position_feed: broadcast::Sender<InternalPositionUpdateMessage>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
auth_users_notifier: mpsc::Sender<OrderbookMessage>,
notification_sender: mpsc::Sender<Notification>,
Expand All @@ -131,6 +136,7 @@ pub fn router(
pool,
settings: RwLock::new(settings),
tx_price_feed,
tx_position_feed,
tx_user_feed,
trading_sender,
exporter,
Expand All @@ -157,7 +163,7 @@ pub fn router(
.route("/api/orderbook/orders", get(get_orders).post(post_order))
.route(
"/api/orderbook/orders/:order_id",
get(get_order).put(put_order),
get(get_order).put(put_order).delete(delete_order),
)
.route("/api/orderbook/websocket", get(websocket_handler))
.route("/api/trade", post(post_trade))
Expand Down Expand Up @@ -199,6 +205,10 @@ pub fn router(
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
.route("/api/leaderboard", get(get_leaderboard))
.route(
"/api/admin/trade/websocket",
get(crate::trade::websocket::websocket_handler),
)
.layer(DefaultBodyLimit::disable())
.layer(DefaultBodyLimit::max(50 * 1024))
.with_state(app_state)
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/trade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use trade::Direction;
use uuid::Uuid;

pub mod models;
pub mod websocket;

pub enum TradeAction {
OpenDlcChannel,
Expand Down
Loading

0 comments on commit df45d39

Please sign in to comment.