Skip to content

Commit

Permalink
feat: position websocket
Browse files Browse the repository at this point in the history
Over this websocket we are sending updates about new trades. This means, if a user opens a new position or closes a position, an update is sent out over this websocket. The update includes an incremenatl change but also the total position state as reference. A client listening on this can then reason on this data.
  • Loading branch information
bonomat committed Mar 5, 2024
1 parent 08dea3c commit 7a5361c
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 6 deletions.
14 changes: 13 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use coordinator::run_migration;
use coordinator::scheduler::NotificationScheduler;
use coordinator::settings::Settings;
use coordinator::storage::CoordinatorTenTenOneStorage;
use coordinator::trade::websocket::InternalPositionUpdateMessage;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
Expand Down Expand Up @@ -148,7 +149,17 @@ async fn main() -> Result<()> {

let event_handler = CoordinatorEventHandler::new(node.clone(), None);
let running = node.start(event_handler, false)?;
let node = Node::new(node, running, pool.clone(), settings.to_node_settings());

// an internal channel to send updates about our position
let (tx_position_feed, _rx) = broadcast::channel::<InternalPositionUpdateMessage>(100);

let node = Node::new(
node,
running,
pool.clone(),
settings.to_node_settings(),
tx_position_feed.clone(),
);

// TODO: Pass the tokio metrics into Prometheus
if let Some(interval) = opts.tokio_metrics_interval_seconds {
Expand Down Expand Up @@ -284,6 +295,7 @@ async fn main() -> Result<()> {
NODE_ALIAS,
trading_sender,
tx_price_feed,
tx_position_feed,
tx_user_feed,
auth_users_notifier.clone(),
notification_service.get_sender(),
Expand Down
35 changes: 30 additions & 5 deletions coordinator/src/dlc_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db;
use crate::trade::models::NewTrade;
use crate::trade::websocket::InternalPositionUpdateMessage;
use anyhow::Context;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
Expand All @@ -20,6 +21,7 @@ use std::fmt::Display;
use std::fmt::Formatter;
use std::str::from_utf8;
use time::OffsetDateTime;
use tokio::sync::broadcast::Sender;
use trade::cfd::calculate_margin;
use trade::cfd::calculate_pnl;
use trade::Direction;
Expand Down Expand Up @@ -231,12 +233,12 @@ impl DlcProtocolExecutor {
trader_id: &PublicKey,
contract_id: Option<ContractId>,
channel_id: &DlcChannelId,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
) -> Result<()> {
let mut conn = self.pool.get()?;
let dlc_protocol = db::dlc_protocols::get_dlc_protocol(&mut conn, protocol_id)?;
conn.transaction(|conn| {
let dlc_protocol = db::dlc_protocols::get_dlc_protocol(conn, protocol_id)?;

match dlc_protocol.protocol_type {
match &dlc_protocol.protocol_type {
DlcProtocolType::Open { trade_params }
| DlcProtocolType::Renew { trade_params } => {
let contract_id = contract_id
Expand Down Expand Up @@ -282,6 +284,29 @@ impl DlcProtocolExecutor {
}
})?;

match &dlc_protocol.protocol_type {
DlcProtocolType::Open { trade_params }
| DlcProtocolType::Renew { trade_params }
| DlcProtocolType::Settle { trade_params } => {
if let Err(e) = {
tx_position_feed.send(InternalPositionUpdateMessage::NewTrade {
quantity: if trade_params.direction == Direction::Short {
trade_params.quantity
} else {
// We want to reflect the quantity as seen by the coordinator
trade_params.quantity * -1.0
},
average_entry_price: trade_params.average_price,
})
} {
tracing::error!("Could not notify channel about finished trade {e:#}");
}
}
_ => {
// a trade only happens in Open, Renew and Settle
}
}

Ok(())
}

Expand All @@ -294,7 +319,7 @@ impl DlcProtocolExecutor {
fn finish_close_trade_dlc_protocol(
&self,
conn: &mut PgConnection,
trade_params: TradeParams,
trade_params: &TradeParams,
protocol_id: ProtocolId,
settled_contract: &ContractId,
channel_id: &DlcChannelId,
Expand Down Expand Up @@ -391,7 +416,7 @@ impl DlcProtocolExecutor {
fn finish_open_trade_dlc_protocol(
&self,
conn: &mut PgConnection,
trade_params: TradeParams,
trade_params: &TradeParams,
protocol_id: ProtocolId,
contract_id: &ContractId,
channel_id: &DlcChannelId,
Expand Down
8 changes: 8 additions & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::dlc_protocol::ProtocolId;
use crate::message::OrderbookMessage;
use crate::node::storage::NodeStorage;
use crate::storage::CoordinatorTenTenOneStorage;
use crate::trade::websocket::InternalPositionUpdateMessage;
use crate::trade::TradeExecutor;
use anyhow::Context;
use anyhow::Result;
Expand Down Expand Up @@ -31,6 +32,7 @@ use ln_dlc_node::node::dlc_message_name;
use ln_dlc_node::node::event::NodeEvent;
use ln_dlc_node::node::RunningNode;
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -58,6 +60,7 @@ pub struct Node {
_running: Arc<RunningNode>,
pub pool: Pool<ConnectionManager<PgConnection>>,
settings: Arc<RwLock<NodeSettings>>,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
}

impl Node {
Expand All @@ -72,12 +75,14 @@ impl Node {
running: RunningNode,
pool: Pool<ConnectionManager<PgConnection>>,
settings: NodeSettings,
tx_position_feed: Sender<InternalPositionUpdateMessage>,
) -> Self {
Self {
inner,
pool,
settings: Arc::new(RwLock::new(settings)),
_running: Arc::new(running),
tx_position_feed,
}
}

Expand Down Expand Up @@ -278,6 +283,7 @@ impl Node {
&node_id,
channel.get_contract_id(),
channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::SettleFinalize(SettleFinalize {
Expand Down Expand Up @@ -317,6 +323,7 @@ impl Node {
// the settled signed channel does not have a contract
None,
channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::CollaborativeCloseOffer(close_offer) => {
Expand Down Expand Up @@ -374,6 +381,7 @@ impl Node {
&node_id,
channel.get_contract_id(),
&channel_id,
self.tx_position_feed.clone(),
)?;
}
ChannelMessage::Reject(reject) => {
Expand Down
9 changes: 9 additions & 0 deletions coordinator/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::orderbook::trading::NewOrderMessage;
use crate::parse_dlc_channel_id;
use crate::settings::Settings;
use crate::settings::SettingsFile;
use crate::trade::websocket::InternalPositionUpdateMessage;
use crate::AppError;
use axum::extract::ConnectInfo;
use axum::extract::DefaultBodyLimit;
Expand Down Expand Up @@ -92,6 +93,8 @@ pub struct AppState {
pub node: Node,
// Channel used to send messages to all connected clients.
pub tx_price_feed: broadcast::Sender<Message>,
/// A channel used to send messages about position updates
pub tx_position_feed: broadcast::Sender<InternalPositionUpdateMessage>,
pub tx_user_feed: broadcast::Sender<NewUserMessage>,
pub trading_sender: mpsc::Sender<NewOrderMessage>,
pub pool: Pool<ConnectionManager<PgConnection>>,
Expand All @@ -115,6 +118,7 @@ pub fn router(
node_alias: &str,
trading_sender: mpsc::Sender<NewOrderMessage>,
tx_price_feed: broadcast::Sender<Message>,
tx_position_feed: broadcast::Sender<InternalPositionUpdateMessage>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
auth_users_notifier: mpsc::Sender<OrderbookMessage>,
notification_sender: mpsc::Sender<Notification>,
Expand All @@ -127,6 +131,7 @@ pub fn router(
pool,
settings: RwLock::new(settings),
tx_price_feed,
tx_position_feed,
tx_user_feed,
trading_sender,
exporter,
Expand Down Expand Up @@ -195,6 +200,10 @@ pub fn router(
.route("/metrics", get(get_metrics))
.route("/health", get(get_health))
.route("/api/leaderboard", get(get_leaderboard))
.route(
"/api/admin/trade/websocket",
get(crate::trade::websocket::websocket_handler),
)
.layer(DefaultBodyLimit::disable())
.layer(DefaultBodyLimit::max(50 * 1024))
.with_state(app_state)
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/trade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use trade::Direction;
use uuid::Uuid;

pub mod models;
pub mod websocket;

pub enum TradeAction {
OpenDlcChannel,
Expand Down
Loading

0 comments on commit 7a5361c

Please sign in to comment.