diff --git a/CHANGELOG.md b/CHANGELOG.md index a5cde1549..bf36545b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Chore: don't allow multiple dlc-channels per user - Feat: show dlc-channel opening transaction in transaction history - Feat: allow force-close a DLC channel +- Feat: resend last outbound dlc message on connect ## [1.7.4] - 2023-12-20 diff --git a/coordinator/migrations/2024-01-10-105231_add_dlc_messages/down.sql b/coordinator/migrations/2024-01-10-105231_add_dlc_messages/down.sql new file mode 100644 index 000000000..c5f672274 --- /dev/null +++ b/coordinator/migrations/2024-01-10-105231_add_dlc_messages/down.sql @@ -0,0 +1,7 @@ +-- This file should undo anything in `up.sql` +DROP TABLE "last_outbound_dlc_messages"; +DROP TABLE "dlc_messages"; + +DROP TYPE "Message_Sub_Type_Type"; +DROP TYPE "Message_Type_Type"; + diff --git a/coordinator/migrations/2024-01-10-105231_add_dlc_messages/up.sql b/coordinator/migrations/2024-01-10-105231_add_dlc_messages/up.sql new file mode 100644 index 000000000..4d52890e7 --- /dev/null +++ b/coordinator/migrations/2024-01-10-105231_add_dlc_messages/up.sql @@ -0,0 +1,39 @@ +CREATE TYPE "Message_Type_Type" AS ENUM ( + 'OnChain', + 'Channel' +); + +CREATE TYPE "Message_Sub_Type_Type" AS ENUM ( + 'Offer', + 'Accept', + 'Sign', + 'SettleOffer', + 'SettleAccept', + 'SettleConfirm', + 'SettleFinalize', + 'RenewOffer', + 'RenewAccept', + 'RenewConfirm', + 'RenewFinalize', + 'RenewRevoke', + 'CollaborativeCloseOffer', + 'Reject' +); + +CREATE TABLE "dlc_messages" ( + -- We need to store the hash as TEXT as the BIGINT type overflows on some u64 values breaking the hash value. + message_hash TEXT PRIMARY KEY NOT NULL, + inbound BOOLEAN NOT NULL, + peer_id TEXT NOT NULL, + message_type "Message_Type_Type" NOT NULL, + message_sub_type "Message_Sub_Type_Type" NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE "last_outbound_dlc_messages" ( + peer_id TEXT PRIMARY KEY NOT NULL, + message_hash TEXT REFERENCES dlc_messages(message_hash) NOT NULL, + message TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 0749c4846..b048a77f0 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -3,6 +3,8 @@ use anyhow::Result; use bitcoin::XOnlyPublicKey; use coordinator::backup::SledBackup; use coordinator::cli::Opts; +use coordinator::dlc_handler; +use coordinator::dlc_handler::DlcHandler; use coordinator::logger; use coordinator::message::spawn_delivering_messages_to_authenticated_users; use coordinator::message::NewUserMessage; @@ -28,6 +30,7 @@ use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::PgConnection; use lightning::events::Event; +use ln_dlc_node::node::event::NodeEventHandler; use ln_dlc_node::scorer; use ln_dlc_node::seed::Bip39Seed; use ln_dlc_node::CoordinatorEventHandler; @@ -139,6 +142,15 @@ async fn main() -> Result<()> { node_event_handler.clone(), )?); + let dlc_handler = DlcHandler::new(pool.clone(), node.clone()); + let _handle = + // this handles sending outbound dlc messages as well as keeping track of what + // dlc messages have already been processed and what was the last outbound dlc message + // so it can be resend on reconnect. + // + // this does not handle the incoming dlc messages! + dlc_handler::spawn_handling_dlc_messages(dlc_handler, node_event_handler.subscribe()); + let event_handler = CoordinatorEventHandler::new(node.clone(), Some(node_event_sender)); let running = node.start(event_handler, false)?; let node = Node::new(node, running, pool.clone(), settings.to_node_settings()); diff --git a/coordinator/src/db/custom_types.rs b/coordinator/src/db/custom_types.rs index 87441d212..e719d90b0 100644 --- a/coordinator/src/db/custom_types.rs +++ b/coordinator/src/db/custom_types.rs @@ -3,10 +3,14 @@ use crate::db::payments::HtlcStatus; use crate::db::payments::PaymentFlow; use crate::db::positions::ContractSymbol; use crate::db::positions::PositionState; +use crate::db::dlc_messages::MessageSubType; +use crate::db::dlc_messages::MessageType; use crate::schema::sql_types::ChannelStateType; use crate::schema::sql_types::ContractSymbolType; use crate::schema::sql_types::DirectionType; use crate::schema::sql_types::HtlcStatusType; +use crate::schema::sql_types::MessageSubTypeType; +use crate::schema::sql_types::MessageTypeType; use crate::schema::sql_types::PaymentFlowType; use crate::schema::sql_types::PositionStateType; use diesel::deserialize; @@ -159,3 +163,67 @@ impl FromSql for Direction { } } } + +impl ToSql for MessageType { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + MessageType::OnChain => out.write_all(b"OnChain")?, + MessageType::Channel => out.write_all(b"Channel")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for MessageType { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"OnChain" => Ok(MessageType::OnChain), + b"Channel" => Ok(MessageType::Channel), + _ => Err("Unrecognized enum variant".into()), + } + } +} + +impl ToSql for MessageSubType { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { + match *self { + MessageSubType::Offer => out.write_all(b"Offer")?, + MessageSubType::Accept => out.write_all(b"Accept")?, + MessageSubType::Sign => out.write_all(b"Sign")?, + MessageSubType::SettleOffer => out.write_all(b"SettleOffer")?, + MessageSubType::SettleAccept => out.write_all(b"SettleAccept")?, + MessageSubType::SettleConfirm => out.write_all(b"SettleConfirm")?, + MessageSubType::SettleFinalize => out.write_all(b"SettleFinalize")?, + MessageSubType::RenewOffer => out.write_all(b"RenewOffer")?, + MessageSubType::RenewAccept => out.write_all(b"RenewAccept")?, + MessageSubType::RenewConfirm => out.write_all(b"RenewConfirm")?, + MessageSubType::RenewFinalize => out.write_all(b"RenewFinalize")?, + MessageSubType::RenewRevoke => out.write_all(b"RenewRevoke")?, + MessageSubType::CollaborativeCloseOffer => out.write_all(b"CollaborativeCloseOffer")?, + MessageSubType::Reject => out.write_all(b"Reject")?, + } + Ok(IsNull::No) + } +} + +impl FromSql for MessageSubType { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"Offer" => Ok(MessageSubType::Offer), + b"Accept" => Ok(MessageSubType::Accept), + b"Sign" => Ok(MessageSubType::Sign), + b"SettleOffer" => Ok(MessageSubType::SettleOffer), + b"SettleAccept" => Ok(MessageSubType::SettleAccept), + b"SettleConfirm" => Ok(MessageSubType::SettleConfirm), + b"SettleFinalize" => Ok(MessageSubType::SettleFinalize), + b"RenewOffer" => Ok(MessageSubType::RenewOffer), + b"RenewAccept" => Ok(MessageSubType::RenewAccept), + b"RenewConfirm" => Ok(MessageSubType::RenewConfirm), + b"RenewFinalize" => Ok(MessageSubType::RenewFinalize), + b"RenewRevoke" => Ok(MessageSubType::RenewRevoke), + b"CollaborativeCloseOffer" => Ok(MessageSubType::CollaborativeCloseOffer), + b"Reject" => Ok(MessageSubType::Reject), + _ => Err("Unrecognized enum variant".into()), + } + } +} diff --git a/coordinator/src/db/dlc_messages.rs b/coordinator/src/db/dlc_messages.rs new file mode 100644 index 000000000..9250aab60 --- /dev/null +++ b/coordinator/src/db/dlc_messages.rs @@ -0,0 +1,209 @@ +use crate::schema; +use crate::schema::dlc_messages; +use crate::schema::sql_types::MessageSubTypeType; +use crate::schema::sql_types::MessageTypeType; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::query_builder::QueryId; +use diesel::AsChangeset; +use diesel::AsExpression; +use diesel::ExpressionMethods; +use diesel::FromSqlRow; +use diesel::Insertable; +use diesel::OptionalExtension; +use diesel::PgConnection; +use diesel::QueryDsl; +use diesel::QueryResult; +use diesel::Queryable; +use diesel::QueryableByName; +use diesel::RunQueryDsl; +use std::any::TypeId; +use std::str::FromStr; +use time::OffsetDateTime; + +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] +#[diesel(sql_type = MessageTypeType)] +pub(crate) enum MessageType { + OnChain, + Channel, +} + +impl QueryId for MessageTypeType { + type QueryId = MessageTypeType; + const HAS_STATIC_QUERY_ID: bool = false; + + fn query_id() -> Option { + None + } +} + +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] +#[diesel(sql_type = MessageSubTypeType)] +pub(crate) enum MessageSubType { + Offer, + Accept, + Sign, + SettleOffer, + SettleAccept, + SettleConfirm, + SettleFinalize, + RenewOffer, + RenewAccept, + RenewConfirm, + RenewFinalize, + RenewRevoke, + CollaborativeCloseOffer, + Reject, +} + +impl QueryId for MessageSubTypeType { + type QueryId = MessageSubTypeType; + const HAS_STATIC_QUERY_ID: bool = false; + + fn query_id() -> Option { + None + } +} + +#[derive(Insertable, QueryableByName, Queryable, Debug, Clone, PartialEq, AsChangeset)] +#[diesel(table_name = dlc_messages)] +pub(crate) struct DlcMessage { + pub message_hash: String, + pub inbound: bool, + pub peer_id: String, + pub message_type: MessageType, + pub message_sub_type: MessageSubType, + pub timestamp: OffsetDateTime, +} + +pub(crate) fn get(conn: &mut PgConnection, message_hash: u64) -> QueryResult> { + dlc_messages::table + .filter(dlc_messages::message_hash.eq(message_hash.to_string())) + .first::(conn) + .optional() +} + +pub(crate) fn insert( + conn: &mut PgConnection, + dlc_message: ln_dlc_node::dlc_message::DlcMessage, +) -> Result<()> { + let affected_rows = diesel::insert_into(schema::dlc_messages::table) + .values(DlcMessage::from(dlc_message)) + .execute(conn)?; + + ensure!(affected_rows > 0, "Could not insert dlc message"); + + Ok(()) +} + +impl From for DlcMessage { + fn from(value: ln_dlc_node::dlc_message::DlcMessage) -> Self { + Self { + message_hash: value.message_hash.to_string(), + peer_id: value.peer_id.to_string(), + message_type: MessageType::from(value.clone().message_type), + message_sub_type: MessageSubType::from(value.message_type), + timestamp: value.timestamp, + inbound: value.inbound, + } + } +} + +impl From for MessageType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageType) -> Self { + match value { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(_) => Self::OnChain, + ln_dlc_node::dlc_message::DlcMessageType::Channel(_) => Self::Channel, + } + } +} + +impl From for MessageSubType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageType) -> Self { + let message_sub_type = match value { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(message_sub_type) => message_sub_type, + ln_dlc_node::dlc_message::DlcMessageType::Channel(message_sub_type) => message_sub_type, + }; + MessageSubType::from(message_sub_type) + } +} + +impl From for MessageSubType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageSubType) -> Self { + match value { + ln_dlc_node::dlc_message::DlcMessageSubType::Offer => Self::Offer, + ln_dlc_node::dlc_message::DlcMessageSubType::Accept => Self::Accept, + ln_dlc_node::dlc_message::DlcMessageSubType::Sign => Self::Sign, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleOffer => Self::SettleOffer, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleAccept => Self::SettleAccept, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleConfirm => Self::SettleConfirm, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleFinalize => Self::SettleFinalize, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewOffer => Self::RenewOffer, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewAccept => Self::RenewAccept, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewConfirm => Self::RenewConfirm, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewFinalize => Self::RenewFinalize, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewRevoke => Self::RenewRevoke, + ln_dlc_node::dlc_message::DlcMessageSubType::CollaborativeCloseOffer => { + Self::CollaborativeCloseOffer + } + ln_dlc_node::dlc_message::DlcMessageSubType::Reject => Self::Reject, + } + } +} + +impl From for ln_dlc_node::dlc_message::DlcMessage { + fn from(value: DlcMessage) -> Self { + let dlc_message_sub_type = + ln_dlc_node::dlc_message::DlcMessageSubType::from(value.clone().message_sub_type); + let dlc_message_type = match &value.message_type { + MessageType::OnChain => { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(dlc_message_sub_type) + } + MessageType::Channel => { + ln_dlc_node::dlc_message::DlcMessageType::Channel(dlc_message_sub_type) + } + }; + + Self { + message_hash: u64::from_str(&value.message_hash).expect("valid u64"), + inbound: value.inbound, + message_type: dlc_message_type, + peer_id: PublicKey::from_str(&value.peer_id).expect("valid public key"), + timestamp: value.timestamp, + } + } +} + +impl From for ln_dlc_node::dlc_message::DlcMessageSubType { + fn from(value: MessageSubType) -> Self { + match value { + MessageSubType::Offer => ln_dlc_node::dlc_message::DlcMessageSubType::Offer, + MessageSubType::Accept => ln_dlc_node::dlc_message::DlcMessageSubType::Accept, + MessageSubType::Sign => ln_dlc_node::dlc_message::DlcMessageSubType::Sign, + MessageSubType::SettleOffer => ln_dlc_node::dlc_message::DlcMessageSubType::SettleOffer, + MessageSubType::SettleAccept => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleAccept + } + MessageSubType::SettleConfirm => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleConfirm + } + MessageSubType::SettleFinalize => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleFinalize + } + MessageSubType::RenewOffer => ln_dlc_node::dlc_message::DlcMessageSubType::RenewOffer, + MessageSubType::RenewAccept => ln_dlc_node::dlc_message::DlcMessageSubType::RenewAccept, + MessageSubType::RenewConfirm => { + ln_dlc_node::dlc_message::DlcMessageSubType::RenewConfirm + } + MessageSubType::RenewFinalize => { + ln_dlc_node::dlc_message::DlcMessageSubType::RenewFinalize + } + MessageSubType::RenewRevoke => ln_dlc_node::dlc_message::DlcMessageSubType::RenewRevoke, + MessageSubType::CollaborativeCloseOffer => { + ln_dlc_node::dlc_message::DlcMessageSubType::CollaborativeCloseOffer + } + MessageSubType::Reject => ln_dlc_node::dlc_message::DlcMessageSubType::Reject, + } + } +} diff --git a/coordinator/src/db/last_outbound_dlc_message.rs b/coordinator/src/db/last_outbound_dlc_message.rs new file mode 100644 index 000000000..078defbbe --- /dev/null +++ b/coordinator/src/db/last_outbound_dlc_message.rs @@ -0,0 +1,97 @@ +use crate::db::dlc_messages::MessageSubType; +use crate::db::dlc_messages::MessageType; +use crate::schema; +use crate::schema::dlc_messages; +use crate::schema::last_outbound_dlc_messages; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::AsChangeset; +use diesel::ExpressionMethods; +use diesel::Insertable; +use diesel::JoinOnDsl; +use diesel::OptionalExtension; +use diesel::PgConnection; +use diesel::QueryDsl; +use diesel::QueryResult; +use diesel::Queryable; +use diesel::QueryableByName; +use diesel::RunQueryDsl; +use ln_dlc_node::dlc_message::SerializedDlcMessage; +use time::OffsetDateTime; + +#[derive(Insertable, QueryableByName, Queryable, Debug, Clone, PartialEq, AsChangeset)] +#[diesel(table_name = last_outbound_dlc_messages)] +pub(crate) struct LastOutboundDlcMessage { + pub peer_id: String, + pub message_hash: String, + pub message: String, + pub timestamp: OffsetDateTime, +} + +pub(crate) fn get( + conn: &mut PgConnection, + peer_id: &PublicKey, +) -> QueryResult> { + let last_outbound_dlc_message = last_outbound_dlc_messages::table + .inner_join( + dlc_messages::table + .on(dlc_messages::message_hash.eq(last_outbound_dlc_messages::message_hash)), + ) + .filter(last_outbound_dlc_messages::peer_id.eq(peer_id.to_string())) + .select(( + dlc_messages::message_type, + dlc_messages::message_sub_type, + last_outbound_dlc_messages::message, + )) + .first::<(MessageType, MessageSubType, String)>(conn) + .optional()?; + + let serialized_dlc_message = match last_outbound_dlc_message { + Some((message_type, message_sub_type, message)) => { + let dlc_message_sub_type = + ln_dlc_node::dlc_message::DlcMessageSubType::from(message_sub_type); + let message_type = match &message_type { + MessageType::OnChain => { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(dlc_message_sub_type) + } + MessageType::Channel => { + ln_dlc_node::dlc_message::DlcMessageType::Channel(dlc_message_sub_type) + } + }; + + Some(SerializedDlcMessage { + message, + message_type, + }) + } + None => None, + }; + + Ok(serialized_dlc_message) +} + +pub(crate) fn upsert( + conn: &mut PgConnection, + peer_id: &PublicKey, + sdm: SerializedDlcMessage, +) -> Result<()> { + let values = ( + last_outbound_dlc_messages::peer_id.eq(peer_id.to_string()), + last_outbound_dlc_messages::message_hash.eq(sdm.generate_hash().to_string()), + last_outbound_dlc_messages::message.eq(sdm.message), + ); + let affected_rows = diesel::insert_into(last_outbound_dlc_messages::table) + .values(&values.clone()) + .on_conflict(schema::last_outbound_dlc_messages::peer_id) + .do_update() + .set(values) + .execute(conn)?; + + ensure!( + affected_rows > 0, + "Could not upsert last outbound dlc messages" + ); + + Ok(()) +} diff --git a/coordinator/src/db/mod.rs b/coordinator/src/db/mod.rs index bd82bb0b8..348f13a8f 100644 --- a/coordinator/src/db/mod.rs +++ b/coordinator/src/db/mod.rs @@ -1,6 +1,8 @@ pub mod channels; pub mod collaborative_reverts; pub mod custom_types; +pub mod dlc_messages; +pub mod last_outbound_dlc_message; pub mod liquidity; pub mod liquidity_options; pub mod payments; diff --git a/coordinator/src/dlc_handler.rs b/coordinator/src/dlc_handler.rs new file mode 100644 index 000000000..1faecad91 --- /dev/null +++ b/coordinator/src/dlc_handler.rs @@ -0,0 +1,171 @@ +use crate::db; +use crate::node::storage::NodeStorage; +use crate::storage::CoordinatorTenTenOneStorage; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; +use dlc_messages::Message; +use futures::future::RemoteHandle; +use futures::FutureExt; +use ln_dlc_node::dlc_message::DlcMessage; +use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::node::dlc_channel::send_dlc_message; +use ln_dlc_node::node::event::NodeEvent; +use ln_dlc_node::node::Node; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; + +/// The DlcHandler is responsible for sending dlc messages and marking received ones as +/// processed. It's main purpose is to ensure the following. +/// +/// 1. Mark all received inbound messages as processed. +/// 2. Save the last outbound dlc message, so it can be resend on the next reconnect. +/// 3. Check if a receive message has already been processed and if so inform to skip the message. + +#[derive(Clone)] +pub struct DlcHandler { + node: Arc>, + pool: Pool>, +} + +impl DlcHandler { + pub fn new( + pool: Pool>, + node: Arc>, + ) -> Self { + DlcHandler { node, pool } + } +} + +pub fn spawn_handling_dlc_messages( + dlc_handler: DlcHandler, + mut receiver: broadcast::Receiver, +) -> RemoteHandle<()> { + let (fut, remote_handle) = async move { + loop { + match receiver.recv().await { + Ok(NodeEvent::Connected { peer }) => { + if let Err(e) = dlc_handler.on_connect(peer) { + tracing::error!(peer=%peer, "Failed to process on connect event. {e:#}"); + } + } + Ok(NodeEvent::SendDlcMessage { peer, msg }) => { + if let Err(e) = dlc_handler.send_dlc_message(peer, msg) { + tracing::error!(peer=%peer, "Failed to process end dlc message event. {e:#}"); + } + } + Err(RecvError::Lagged(skipped)) => { + tracing::warn!("Skipped {skipped} messages"); + } + Err(RecvError::Closed) => { + tracing::error!("Lost connection to sender!"); + break; + } + } + } + }.remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +impl DlcHandler { + pub fn send_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + let mut conn = self.pool.get()?; + + let serialized_outbound_message = SerializedDlcMessage::try_from(&msg)?; + let outbound_msg = DlcMessage::new(peer, serialized_outbound_message.clone(), false)?; + + db::dlc_messages::insert(&mut conn, outbound_msg)?; + db::last_outbound_dlc_message::upsert(&mut conn, &peer, serialized_outbound_message)?; + + send_dlc_message( + &self.node.dlc_message_handler, + &self.node.peer_manager, + peer, + msg, + ); + + Ok(()) + } + + pub fn on_connect(&self, peer: PublicKey) -> Result<()> { + tracing::debug!(%peer, "Connected to peer, resending last dlc message!"); + + let mut conn = self.pool.get()?; + + let last_serialized_message = db::last_outbound_dlc_message::get(&mut conn, &peer)?; + + if let Some(last_serialized_message) = last_serialized_message { + tracing::debug!(%peer, ?last_serialized_message.message_type, "Sending last dlc message"); + + let message = Message::try_from(&last_serialized_message)?; + send_dlc_message( + &self.node.dlc_message_handler, + &self.node.peer_manager, + peer, + message, + ) + } else { + tracing::debug!(%peer, "No last dlc message found. Nothing todo."); + } + + Ok(()) + } + + // Returns either the dlc message step or return none, if the dlc message has already been + // processed. + pub fn start_dlc_message_step( + conn: &mut PgConnection, + msg: &Message, + peer_id: PublicKey, + ) -> Result> { + let serialized_inbound_message = SerializedDlcMessage::try_from(msg)?; + let inbound_msg = DlcMessage::new(peer_id, serialized_inbound_message, true)?; + + let dlc_message_step = match db::dlc_messages::get(conn, inbound_msg.message_hash)? { + Some(_) => None, // the dlc message has already been processed, no step necessary. + None => Some(DlcMessageStep { + peer_id, + inbound_msg, + }), + }; + + Ok(dlc_message_step) + } +} + +pub struct DlcMessageStep { + pub peer_id: PublicKey, + pub inbound_msg: DlcMessage, +} + +impl DlcMessageStep { + /// Finishes the current dlc step by storing the received inbound message as processed and + /// caching the last outbound dlc message (if any) into the database. + pub fn finish(&self, conn: &mut PgConnection, response: &Option) -> Result<()> { + tracing::debug!("Marking the received message as processed"); + + db::dlc_messages::insert(conn, self.inbound_msg.clone())?; + + if let Some(resp) = response { + tracing::debug!("Persisting last outbound dlc message"); + let serialized_outbound_message = SerializedDlcMessage::try_from(resp)?; + let outbound_msg = + DlcMessage::new(self.peer_id, serialized_outbound_message.clone(), false)?; + + db::dlc_messages::insert(conn, outbound_msg)?; + db::last_outbound_dlc_message::upsert( + conn, + &self.peer_id, + serialized_outbound_message, + )?; + } + + Ok(()) + } +} diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index ff9299ae6..c538b9e23 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -31,6 +31,7 @@ pub mod node; pub mod notifications; pub mod orderbook; pub mod position; +pub mod dlc_handler; pub mod routes; pub mod routing_fee; pub mod scheduler; diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index 7c29aa16c..51a0fce0d 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -1,6 +1,7 @@ use crate::compute_relative_contracts; use crate::db; use crate::decimal_from_f32; +use crate::dlc_handler::DlcHandler; use crate::node::storage::NodeStorage; use crate::orderbook::db::matches; use crate::orderbook::db::orders; @@ -599,16 +600,39 @@ impl Node { ); let resp = match &msg { - Message::OnChain(_) | Message::Channel(_) => self - .inner - .dlc_manager - .on_dlc_message(&msg, node_id) - .with_context(|| { - format!( - "Failed to handle {} dlc message from {node_id}", - dlc_message_name(&msg) - ) - })?, + Message::OnChain(_) | Message::Channel(_) => { + let dlc_message_step = { + let mut conn = self.pool.get()?; + let dlc_message_step = + DlcHandler::start_dlc_message_step(&mut conn, &msg, node_id)?; + + match dlc_message_step { + Some(dlc_message_step) => dlc_message_step, + None => { + tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping."); + return Ok(()); + } + } + }; + + let resp = self + .inner + .dlc_manager + .on_dlc_message(&msg, node_id) + .with_context(|| { + format!( + "Failed to handle {} dlc message from {node_id}", + dlc_message_name(&msg) + ) + })?; + + { + let mut conn = self.pool.get()?; + dlc_message_step.finish(&mut conn, &resp)?; + } + + resp + } Message::SubChannel(msg) => self .inner .sub_channel_manager @@ -625,6 +649,7 @@ impl Node { // TODO(holzeis): It would be nice if dlc messages are also propagated via events, so the // receiver can decide what events to process and we can skip this component specific logic // here. + // Note: The NodeEventHandler could be used for that! if let Message::Channel(ChannelMessage::RenewFinalize(r)) = &msg { self.finalize_rollover(&r.channel_id)?; } diff --git a/coordinator/src/schema.rs b/coordinator/src/schema.rs index aad226fbe..dd720b14c 100644 --- a/coordinator/src/schema.rs +++ b/coordinator/src/schema.rs @@ -21,6 +21,14 @@ pub mod sql_types { #[diesel(postgres_type(name = "MatchState_Type"))] pub struct MatchStateType; + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "Message_Sub_Type_Type"))] + pub struct MessageSubTypeType; + + #[derive(diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "Message_Type_Type"))] + pub struct MessageTypeType; + #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "OrderReason_Type"))] pub struct OrderReasonType; @@ -76,6 +84,30 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::MessageTypeType; + use super::sql_types::MessageSubTypeType; + + dlc_messages (message_hash) { + message_hash -> Text, + inbound -> Bool, + peer_id -> Text, + message_type -> MessageTypeType, + message_sub_type -> MessageSubTypeType, + timestamp -> Timestamptz, + } +} + +diesel::table! { + last_outbound_dlc_messages (peer_id) { + peer_id -> Text, + message_hash -> Text, + message -> Text, + timestamp -> Timestamptz, + } +} + diesel::table! { liquidity_options (id) { id -> Int4, @@ -262,12 +294,15 @@ diesel::table! { } } +diesel::joinable!(last_outbound_dlc_messages -> dlc_messages (message_hash)); diesel::joinable!(liquidity_request_logs -> liquidity_options (liquidity_option)); diesel::joinable!(trades -> positions (position_id)); diesel::allow_tables_to_appear_in_same_query!( channels, collaborative_reverts, + dlc_messages, + last_outbound_dlc_messages, liquidity_options, liquidity_request_logs, matches, diff --git a/crates/ln-dlc-node/src/dlc_message.rs b/crates/ln-dlc-node/src/dlc_message.rs new file mode 100644 index 000000000..2b4c21665 --- /dev/null +++ b/crates/ln-dlc-node/src/dlc_message.rs @@ -0,0 +1,245 @@ +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use dlc_messages::ChannelMessage; +use dlc_messages::Message; +use dlc_messages::OnChainMessage; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; +use time::OffsetDateTime; +use ureq::serde_json; + +#[derive(Clone)] +pub struct DlcMessage { + pub message_hash: u64, + pub inbound: bool, + pub peer_id: PublicKey, + pub message_type: DlcMessageType, + pub timestamp: OffsetDateTime, +} + +impl DlcMessage { + pub fn new( + peer_id: PublicKey, + serialized_message: SerializedDlcMessage, + inbound: bool, + ) -> Result { + let message_hash = serialized_message.generate_hash(); + + Ok(Self { + message_hash, + inbound, + peer_id, + message_type: serialized_message.message_type, + timestamp: OffsetDateTime::now_utc(), + }) + } +} + +#[derive(Hash, Clone, Debug)] +pub struct SerializedDlcMessage { + pub message: String, + pub message_type: DlcMessageType, +} + +impl SerializedDlcMessage { + pub fn generate_hash(&self) -> u64 { + let mut hasher = DefaultHasher::new(); + self.hash(&mut hasher); + hasher.finish() + } +} + +#[derive(Hash, Clone, Debug)] +pub enum DlcMessageType { + OnChain(DlcMessageSubType), + Channel(DlcMessageSubType), +} + +#[derive(Hash, Clone, Debug)] +pub enum DlcMessageSubType { + Offer, + Accept, + Sign, + SettleOffer, + SettleAccept, + SettleConfirm, + SettleFinalize, + RenewOffer, + RenewAccept, + RenewConfirm, + RenewFinalize, + RenewRevoke, + CollaborativeCloseOffer, + Reject, +} + +impl TryFrom<&SerializedDlcMessage> for Message { + type Error = anyhow::Error; + + fn try_from(serialized_msg: &SerializedDlcMessage) -> Result { + let message = match serialized_msg.clone().message_type { + DlcMessageType::OnChain(serialized_onchain_message_type) => { + match serialized_onchain_message_type { + DlcMessageSubType::Offer => Message::OnChain(OnChainMessage::Offer( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::Accept => Message::OnChain(OnChainMessage::Accept( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::Sign => Message::OnChain(OnChainMessage::Sign( + serde_json::from_str(&serialized_msg.message)?, + )), + _ => unreachable!(), + } + } + DlcMessageType::Channel(serialized_channel_message_type) => { + match serialized_channel_message_type { + DlcMessageSubType::Offer => Message::Channel(ChannelMessage::Offer( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::Accept => Message::Channel(ChannelMessage::Accept( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::Sign => Message::Channel(ChannelMessage::Sign( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::SettleOffer => Message::Channel( + ChannelMessage::SettleOffer(serde_json::from_str(&serialized_msg.message)?), + ), + DlcMessageSubType::SettleAccept => { + Message::Channel(ChannelMessage::SettleAccept(serde_json::from_str( + &serialized_msg.message, + )?)) + } + DlcMessageSubType::SettleConfirm => { + Message::Channel(ChannelMessage::SettleConfirm(serde_json::from_str( + &serialized_msg.message, + )?)) + } + DlcMessageSubType::SettleFinalize => { + Message::Channel(ChannelMessage::SettleFinalize(serde_json::from_str( + &serialized_msg.message, + )?)) + } + DlcMessageSubType::RenewOffer => Message::Channel(ChannelMessage::RenewOffer( + serde_json::from_str(&serialized_msg.message)?, + )), + DlcMessageSubType::RenewAccept => Message::Channel( + ChannelMessage::RenewAccept(serde_json::from_str(&serialized_msg.message)?), + ), + DlcMessageSubType::RenewConfirm => { + Message::Channel(ChannelMessage::RenewConfirm(serde_json::from_str( + &serialized_msg.message, + )?)) + } + DlcMessageSubType::RenewFinalize => { + Message::Channel(ChannelMessage::RenewFinalize(serde_json::from_str( + &serialized_msg.message, + )?)) + } + DlcMessageSubType::RenewRevoke => Message::Channel( + ChannelMessage::RenewRevoke(serde_json::from_str(&serialized_msg.message)?), + ), + DlcMessageSubType::CollaborativeCloseOffer => { + Message::Channel(ChannelMessage::CollaborativeCloseOffer( + serde_json::from_str(&serialized_msg.message)?, + )) + } + DlcMessageSubType::Reject => Message::Channel(ChannelMessage::Reject( + serde_json::from_str(&serialized_msg.message)?, + )), + } + } + }; + + Ok(message) + } +} + +impl TryFrom<&Message> for SerializedDlcMessage { + type Error = anyhow::Error; + + fn try_from(msg: &Message) -> Result { + let (message, message_type) = match &msg { + Message::OnChain(message) => match message { + OnChainMessage::Offer(offer) => ( + serde_json::to_string(&offer)?, + DlcMessageType::OnChain(DlcMessageSubType::Offer), + ), + OnChainMessage::Accept(accept) => ( + serde_json::to_string(&accept)?, + DlcMessageType::OnChain(DlcMessageSubType::Accept), + ), + OnChainMessage::Sign(sign) => ( + serde_json::to_string(&sign)?, + DlcMessageType::OnChain(DlcMessageSubType::Sign), + ), + }, + Message::Channel(message) => match message { + ChannelMessage::Offer(offer) => ( + serde_json::to_string(&offer)?, + DlcMessageType::Channel(DlcMessageSubType::Offer), + ), + ChannelMessage::Accept(accept) => ( + serde_json::to_string(&accept)?, + DlcMessageType::Channel(DlcMessageSubType::Accept), + ), + ChannelMessage::Sign(sign) => ( + serde_json::to_string(&sign)?, + DlcMessageType::Channel(DlcMessageSubType::Sign), + ), + ChannelMessage::SettleOffer(settle_offer) => ( + serde_json::to_string(&settle_offer)?, + DlcMessageType::Channel(DlcMessageSubType::SettleOffer), + ), + ChannelMessage::SettleAccept(settle_accept) => ( + serde_json::to_string(&settle_accept)?, + DlcMessageType::Channel(DlcMessageSubType::SettleAccept), + ), + ChannelMessage::SettleConfirm(settle_confirm) => ( + serde_json::to_string(&settle_confirm)?, + DlcMessageType::Channel(DlcMessageSubType::SettleConfirm), + ), + ChannelMessage::SettleFinalize(settle_finalize) => ( + serde_json::to_string(&settle_finalize)?, + DlcMessageType::Channel(DlcMessageSubType::SettleFinalize), + ), + ChannelMessage::RenewOffer(renew_offer) => ( + serde_json::to_string(&renew_offer)?, + DlcMessageType::Channel(DlcMessageSubType::RenewOffer), + ), + ChannelMessage::RenewAccept(renew_accept) => ( + serde_json::to_string(&renew_accept)?, + DlcMessageType::Channel(DlcMessageSubType::RenewAccept), + ), + ChannelMessage::RenewConfirm(renew_confirm) => ( + serde_json::to_string(&renew_confirm)?, + DlcMessageType::Channel(DlcMessageSubType::RenewConfirm), + ), + ChannelMessage::RenewFinalize(renew_finalize) => ( + serde_json::to_string(&renew_finalize)?, + DlcMessageType::Channel(DlcMessageSubType::RenewFinalize), + ), + ChannelMessage::RenewRevoke(renew_revoke) => ( + serde_json::to_string(&renew_revoke)?, + DlcMessageType::Channel(DlcMessageSubType::RenewRevoke), + ), + ChannelMessage::CollaborativeCloseOffer(collaborative_close_offer) => ( + serde_json::to_string(&collaborative_close_offer)?, + DlcMessageType::Channel(DlcMessageSubType::CollaborativeCloseOffer), + ), + ChannelMessage::Reject(reject) => ( + serde_json::to_string(&reject)?, + DlcMessageType::Channel(DlcMessageSubType::Reject), + ), + }, + _ => unreachable!(), + }; + + Ok(Self { + message, + message_type, + }) + } +} diff --git a/crates/ln-dlc-node/src/lib.rs b/crates/ln-dlc-node/src/lib.rs index 944ab44d6..63a71c39c 100644 --- a/crates/ln-dlc-node/src/lib.rs +++ b/crates/ln-dlc-node/src/lib.rs @@ -36,6 +36,7 @@ pub mod channel; pub mod config; pub mod ln; pub mod node; +pub mod dlc_message; pub mod scorer; pub mod seed; pub mod storage; diff --git a/crates/ln-dlc-node/src/node/dlc_channel.rs b/crates/ln-dlc-node/src/node/dlc_channel.rs index 5f89bdf1d..d5b059802 100644 --- a/crates/ln-dlc-node/src/node/dlc_channel.rs +++ b/crates/ln-dlc-node/src/node/dlc_channel.rs @@ -1,3 +1,4 @@ +use crate::node::event::NodeEvent; use crate::node::Node; use crate::node::Storage as LnDlcStorage; use crate::storage::TenTenOneStorage; @@ -57,8 +58,7 @@ impl Nod let sub_channel_manager = self.sub_channel_manager.clone(); let oracles = contract_input.contract_infos[0].oracles.clone(); let event_id = oracles.event_id; - let dlc_message_handler = self.dlc_message_handler.clone(); - let peer_manager = self.peer_manager.clone(); + let event_handler = self.event_handler.clone(); move || { let announcements: Vec<_> = p2pd_oracles .into_iter() @@ -76,12 +76,13 @@ impl Nod .offer_channel(&contract_input, counterparty)?; let temporary_contract_id = sub_channel_offer.temporary_contract_id; - send_dlc_message( - &dlc_message_handler, - &peer_manager, - counterparty, - Message::Channel(ChannelMessage::Offer(sub_channel_offer)), - ); + + if let Err(e) = event_handler.publish(NodeEvent::SendDlcMessage { + peer: counterparty, + msg: Message::Channel(ChannelMessage::Offer(sub_channel_offer)), + }) { + tracing::error!("Failed to publish send dlc message node event! {e:#}"); + } Ok(temporary_contract_id) } @@ -100,19 +101,17 @@ impl Nod tracing::info!(channel_id = %hex::encode(dlc_channel_id), "Proposing a DLC channel update"); spawn_blocking({ let dlc_manager = self.dlc_manager.clone(); - let dlc_message_handler = self.dlc_message_handler.clone(); - let peer_manager = self.peer_manager.clone(); let dlc_channel_id = *dlc_channel_id; + let event_handler = self.event_handler.clone(); move || { let (renew_offer, counterparty_pubkey) = dlc_manager.renew_offer(&dlc_channel_id, payout_amount, &contract_input)?; - send_dlc_message( - &dlc_message_handler, - &peer_manager, - counterparty_pubkey, - Message::Channel(ChannelMessage::RenewOffer(renew_offer)), - ); + event_handler.publish(NodeEvent::SendDlcMessage { + peer: counterparty_pubkey, + msg: Message::Channel(ChannelMessage::RenewOffer(renew_offer)), + })?; + Ok(()) } }) @@ -148,12 +147,10 @@ impl Nod let (msg, _channel_id, _contract_id, counter_party) = self.dlc_manager.accept_channel(channel_id)?; - send_dlc_message( - &self.dlc_message_handler, - &self.peer_manager, - counter_party, - Message::Channel(ChannelMessage::Accept(msg)), - ); + self.event_handler.publish(NodeEvent::SendDlcMessage { + peer: counter_party, + msg: Message::Channel(ChannelMessage::Accept(msg)), + })?; Ok(()) } @@ -203,8 +200,7 @@ impl Nod SignedChannelState::Settled { .. } | SignedChannelState::RenewFinalized { .. } => { spawn_blocking({ let dlc_manager = self.dlc_manager.clone(); - let dlc_message_handler = self.dlc_message_handler.clone(); - let peer_manager = self.peer_manager.clone(); + let event_handler = self.event_handler.clone(); move || { let settle_offer = dlc_manager .offer_collaborative_close( @@ -215,12 +211,12 @@ impl Nod "Could not propose to collaboratively close the dlc channel.", )?; - send_dlc_message( - &dlc_message_handler, - &peer_manager, - counterparty, - Message::Channel(ChannelMessage::CollaborativeCloseOffer(settle_offer)), - ); + event_handler.publish(NodeEvent::SendDlcMessage { + peer: counterparty, + msg: Message::Channel(ChannelMessage::CollaborativeCloseOffer( + settle_offer, + )), + })?; anyhow::Ok(()) } @@ -252,18 +248,15 @@ impl Nod spawn_blocking({ let dlc_manager = self.dlc_manager.clone(); - let dlc_message_handler = self.dlc_message_handler.clone(); - let peer_manager = self.peer_manager.clone(); + let event_handler = self.event_handler.clone(); move || { let (settle_offer, counterparty) = dlc_manager.settle_offer(&channel_id, accept_settlement_amount)?; - send_dlc_message( - &dlc_message_handler, - &peer_manager, - counterparty, - Message::Channel(ChannelMessage::SettleOffer(settle_offer)), - ); + event_handler.publish(NodeEvent::SendDlcMessage { + peer: counterparty, + msg: Message::Channel(ChannelMessage::SettleOffer(settle_offer)), + })?; Ok(()) } @@ -291,16 +284,12 @@ impl Nod tracing::info!(channel_id = %channel_id_hex, "Accepting DLC channel collaborative settlement"); let dlc_manager = self.dlc_manager.clone(); - let dlc_message_handler = self.dlc_message_handler.clone(); - let peer_manager = self.peer_manager.clone(); let (settle_offer, counterparty_pk) = dlc_manager.accept_settle_offer(&channel_id)?; - send_dlc_message( - &dlc_message_handler, - &peer_manager, - counterparty_pk, - Message::Channel(ChannelMessage::SettleAccept(settle_offer)), - ); + self.event_handler.publish(NodeEvent::SendDlcMessage { + peer: counterparty_pk, + msg: Message::Channel(ChannelMessage::SettleAccept(settle_offer)), + })?; Ok(()) } diff --git a/crates/ln-dlc-node/src/node/event.rs b/crates/ln-dlc-node/src/node/event.rs index 203e332c3..d64ee8030 100644 --- a/crates/ln-dlc-node/src/node/event.rs +++ b/crates/ln-dlc-node/src/node/event.rs @@ -8,6 +8,7 @@ use tokio::sync::broadcast::Receiver; #[derive(Clone, Debug)] pub enum NodeEvent { Connected { peer: PublicKey }, + SendDlcMessage { peer: PublicKey, msg: Message }, } #[derive(Clone)] diff --git a/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/down.sql b/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/down.sql new file mode 100644 index 000000000..c5a84b55d --- /dev/null +++ b/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP TABLE "last_outbound_dlc_messages"; +DROP TABLE "dlc_messages"; diff --git a/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/up.sql b/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/up.sql new file mode 100644 index 000000000..29d48d63e --- /dev/null +++ b/mobile/native/migrations/2024-01-10-105231_add_dlc_messages/up.sql @@ -0,0 +1,17 @@ +CREATE TABLE "dlc_messages" ( + -- We need to store the hash as TEXT as the BIGINT type overflows on some u64 values breaking the hash value. + message_hash TEXT PRIMARY KEY NOT NULL, + inbound BOOLEAN NOT NULL, + peer_id TEXT NOT NULL, + message_type TEXT NOT NULL, + message_sub_type TEXT NOT NULL, + timestamp BIGINT NOT NULL +); + +CREATE TABLE "last_outbound_dlc_messages" ( + peer_id TEXT PRIMARY KEY NOT NULL, + message_hash TEXT REFERENCES dlc_messages(message_hash) NOT NULL, + message TEXT NOT NULL, + timestamp BIGINT NOT NULL +); + diff --git a/mobile/native/src/db/custom_types.rs b/mobile/native/src/db/custom_types.rs index e4bcad081..72cbb15ce 100644 --- a/mobile/native/src/db/custom_types.rs +++ b/mobile/native/src/db/custom_types.rs @@ -1,3 +1,5 @@ +use crate::db::dlc_messages::MessageSubType; +use crate::db::dlc_messages::MessageType; use crate::db::models::ChannelState; use crate::db::models::ContractSymbol; use crate::db::models::Direction; @@ -288,6 +290,76 @@ impl FromSql for ChannelState { } } +impl ToSql for MessageType { + fn to_sql(&self, out: &mut Output) -> serialize::Result { + let text = match *self { + MessageType::OnChain => "OnChain", + MessageType::Channel => "Channel", + }; + out.set_value(text); + Ok(IsNull::No) + } +} + +impl FromSql for MessageType { + fn from_sql(bytes: backend::RawValue) -> deserialize::Result { + let string = >::from_sql(bytes)?; + + return match string.as_str() { + "OnChain" => Ok(MessageType::OnChain), + "Channel" => Ok(MessageType::Channel), + _ => Err("Unrecognized enum variant".into()), + }; + } +} + +impl ToSql for MessageSubType { + fn to_sql(&self, out: &mut Output) -> serialize::Result { + let text = match *self { + MessageSubType::Offer => "Offer", + MessageSubType::Accept => "Accept", + MessageSubType::Sign => "Sign", + MessageSubType::SettleOffer => "SettleOffer", + MessageSubType::SettleAccept => "SettleAccept", + MessageSubType::SettleConfirm => "SettleConfirm", + MessageSubType::SettleFinalize => "SettleFinalize", + MessageSubType::RenewOffer => "RenewOffer", + MessageSubType::RenewAccept => "RenewAccept", + MessageSubType::RenewConfirm => "RenewConfirm", + MessageSubType::RenewFinalize => "RenewFinalize", + MessageSubType::RenewRevoke => "RenewRevoke", + MessageSubType::CollaborativeCloseOffer => "CollaborativeCloseOffer", + MessageSubType::Reject => "Reject", + }; + out.set_value(text); + Ok(IsNull::No) + } +} + +impl FromSql for MessageSubType { + fn from_sql(bytes: backend::RawValue) -> deserialize::Result { + let string = >::from_sql(bytes)?; + + return match string.as_str() { + "Offer" => Ok(MessageSubType::Offer), + "Accept" => Ok(MessageSubType::Accept), + "Sign" => Ok(MessageSubType::Sign), + "SettleOffer" => Ok(MessageSubType::SettleOffer), + "SettleAccept" => Ok(MessageSubType::SettleAccept), + "SettleConfirm" => Ok(MessageSubType::SettleConfirm), + "SettleFinalize" => Ok(MessageSubType::SettleFinalize), + "RenewOffer" => Ok(MessageSubType::RenewOffer), + "RenewAccept" => Ok(MessageSubType::RenewAccept), + "RenewConfirm" => Ok(MessageSubType::RenewConfirm), + "RenewFinalize" => Ok(MessageSubType::RenewFinalize), + "RenewRevoke" => Ok(MessageSubType::RenewRevoke), + "CollaborativeCloseOffer" => Ok(MessageSubType::CollaborativeCloseOffer), + "Reject" => Ok(MessageSubType::Reject), + _ => Err("Unrecognized enum variant".into()), + }; + } +} + #[cfg(test)] mod tests { use crate::db::custom_types::tests::customstruct::id; diff --git a/mobile/native/src/db/dlc_messages.rs b/mobile/native/src/db/dlc_messages.rs new file mode 100644 index 000000000..ae5e26ceb --- /dev/null +++ b/mobile/native/src/db/dlc_messages.rs @@ -0,0 +1,195 @@ +use crate::schema; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::prelude::*; +use diesel::sql_types::Text; +use diesel::AsChangeset; +use diesel::AsExpression; +use diesel::FromSqlRow; +use diesel::Insertable; +use diesel::OptionalExtension; +use diesel::QueryResult; +use diesel::Queryable; +use diesel::QueryableByName; +use diesel::RunQueryDsl; +use diesel::SqliteConnection; +use schema::dlc_messages; +use std::str::FromStr; +use time::OffsetDateTime; + +#[derive(Insertable, QueryableByName, Queryable, Debug, Clone, PartialEq, AsChangeset)] +#[diesel(table_name = dlc_messages)] +pub(crate) struct DlcMessage { + pub message_hash: String, + pub inbound: bool, + pub peer_id: String, + pub message_type: MessageType, + pub message_sub_type: MessageSubType, + pub timestamp: i64, +} + +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] +#[diesel(sql_type = Text)] +pub enum MessageType { + OnChain, + Channel, +} + +#[derive(Debug, Clone, Copy, PartialEq, FromSqlRow, AsExpression)] +#[diesel(sql_type = Text)] +pub enum MessageSubType { + Offer, + Accept, + Sign, + SettleOffer, + SettleAccept, + SettleConfirm, + SettleFinalize, + RenewOffer, + RenewAccept, + RenewConfirm, + RenewFinalize, + RenewRevoke, + CollaborativeCloseOffer, + Reject, +} + +impl DlcMessage { + pub(crate) fn get( + conn: &mut SqliteConnection, + message_hash: u64, + ) -> QueryResult> { + let result = schema::dlc_messages::table + .filter(schema::dlc_messages::message_hash.eq(message_hash.to_string())) + .first::(conn) + .optional()?; + + Ok(result.map(|q| q.into())) + } + + pub(crate) fn insert( + conn: &mut SqliteConnection, + dlc_message: ln_dlc_node::dlc_message::DlcMessage, + ) -> Result<()> { + let affected_rows = diesel::insert_into(schema::dlc_messages::table) + .values(DlcMessage::from(dlc_message)) + .execute(conn)?; + + ensure!(affected_rows > 0, "Could not insert queue"); + + Ok(()) + } +} + +impl From for DlcMessage { + fn from(value: ln_dlc_node::dlc_message::DlcMessage) -> Self { + Self { + message_hash: value.message_hash.to_string(), + peer_id: value.peer_id.to_string(), + message_type: MessageType::from(value.clone().message_type), + message_sub_type: MessageSubType::from(value.message_type), + timestamp: value.timestamp.unix_timestamp(), + inbound: value.inbound, + } + } +} + +impl From for MessageType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageType) -> Self { + match value { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(_) => Self::OnChain, + ln_dlc_node::dlc_message::DlcMessageType::Channel(_) => Self::Channel, + } + } +} + +impl From for MessageSubType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageType) -> Self { + let message_sub_type = match value { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(message_sub_type) => message_sub_type, + ln_dlc_node::dlc_message::DlcMessageType::Channel(message_sub_type) => message_sub_type, + }; + MessageSubType::from(message_sub_type) + } +} + +impl From for MessageSubType { + fn from(value: ln_dlc_node::dlc_message::DlcMessageSubType) -> Self { + match value { + ln_dlc_node::dlc_message::DlcMessageSubType::Offer => Self::Offer, + ln_dlc_node::dlc_message::DlcMessageSubType::Accept => Self::Accept, + ln_dlc_node::dlc_message::DlcMessageSubType::Sign => Self::Sign, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleOffer => Self::SettleOffer, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleAccept => Self::SettleAccept, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleConfirm => Self::SettleConfirm, + ln_dlc_node::dlc_message::DlcMessageSubType::SettleFinalize => Self::SettleFinalize, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewOffer => Self::RenewOffer, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewAccept => Self::RenewAccept, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewConfirm => Self::RenewConfirm, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewFinalize => Self::RenewFinalize, + ln_dlc_node::dlc_message::DlcMessageSubType::RenewRevoke => Self::RenewRevoke, + ln_dlc_node::dlc_message::DlcMessageSubType::CollaborativeCloseOffer => { + Self::CollaborativeCloseOffer + } + ln_dlc_node::dlc_message::DlcMessageSubType::Reject => Self::Reject, + } + } +} + +impl From for ln_dlc_node::dlc_message::DlcMessage { + fn from(value: DlcMessage) -> Self { + let dlc_message_sub_type = + ln_dlc_node::dlc_message::DlcMessageSubType::from(value.clone().message_sub_type); + let dlc_message_type = match &value.message_type { + MessageType::OnChain => { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(dlc_message_sub_type) + } + MessageType::Channel => { + ln_dlc_node::dlc_message::DlcMessageType::Channel(dlc_message_sub_type) + } + }; + + Self { + message_hash: u64::from_str(&value.message_hash).expect("valid u64"), + inbound: value.inbound, + message_type: dlc_message_type, + peer_id: PublicKey::from_str(&value.peer_id).expect("valid public key"), + timestamp: OffsetDateTime::from_unix_timestamp(value.timestamp) + .expect("valid timestamp"), + } + } +} + +impl From for ln_dlc_node::dlc_message::DlcMessageSubType { + fn from(value: MessageSubType) -> Self { + match value { + MessageSubType::Offer => ln_dlc_node::dlc_message::DlcMessageSubType::Offer, + MessageSubType::Accept => ln_dlc_node::dlc_message::DlcMessageSubType::Accept, + MessageSubType::Sign => ln_dlc_node::dlc_message::DlcMessageSubType::Sign, + MessageSubType::SettleOffer => ln_dlc_node::dlc_message::DlcMessageSubType::SettleOffer, + MessageSubType::SettleAccept => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleAccept + } + MessageSubType::SettleConfirm => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleConfirm + } + MessageSubType::SettleFinalize => { + ln_dlc_node::dlc_message::DlcMessageSubType::SettleFinalize + } + MessageSubType::RenewOffer => ln_dlc_node::dlc_message::DlcMessageSubType::RenewOffer, + MessageSubType::RenewAccept => ln_dlc_node::dlc_message::DlcMessageSubType::RenewAccept, + MessageSubType::RenewConfirm => { + ln_dlc_node::dlc_message::DlcMessageSubType::RenewConfirm + } + MessageSubType::RenewFinalize => { + ln_dlc_node::dlc_message::DlcMessageSubType::RenewFinalize + } + MessageSubType::RenewRevoke => ln_dlc_node::dlc_message::DlcMessageSubType::RenewRevoke, + MessageSubType::CollaborativeCloseOffer => { + ln_dlc_node::dlc_message::DlcMessageSubType::CollaborativeCloseOffer + } + MessageSubType::Reject => ln_dlc_node::dlc_message::DlcMessageSubType::Reject, + } + } +} diff --git a/mobile/native/src/db/last_outbound_dlc_messages.rs b/mobile/native/src/db/last_outbound_dlc_messages.rs new file mode 100644 index 000000000..c6b581616 --- /dev/null +++ b/mobile/native/src/db/last_outbound_dlc_messages.rs @@ -0,0 +1,99 @@ +use crate::db::dlc_messages::MessageSubType; +use crate::db::dlc_messages::MessageType; +use crate::schema; +use crate::schema::dlc_messages; +use crate::schema::last_outbound_dlc_messages; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::AsChangeset; +use diesel::ExpressionMethods; +use diesel::Insertable; +use diesel::JoinOnDsl; +use diesel::OptionalExtension; +use diesel::QueryDsl; +use diesel::QueryResult; +use diesel::Queryable; +use diesel::RunQueryDsl; +use diesel::SqliteConnection; +use ln_dlc_node::dlc_message::SerializedDlcMessage; +use time::OffsetDateTime; + +#[derive(Insertable, Queryable, Debug, Clone, PartialEq, AsChangeset)] +#[diesel(table_name = last_outbound_dlc_messages)] +pub(crate) struct LastOutboundDlcMessage { + pub peer_id: String, + pub message_hash: String, + pub message: String, + pub timestamp: i64, +} + +impl LastOutboundDlcMessage { + pub(crate) fn get( + conn: &mut SqliteConnection, + peer_id: &PublicKey, + ) -> QueryResult> { + let last_outbound_dlc_message = last_outbound_dlc_messages::table + .inner_join( + dlc_messages::table + .on(dlc_messages::message_hash.eq(last_outbound_dlc_messages::message_hash)), + ) + .filter(last_outbound_dlc_messages::peer_id.eq(peer_id.to_string())) + .select(( + dlc_messages::message_type, + dlc_messages::message_sub_type, + last_outbound_dlc_messages::message, + )) + .first::<(MessageType, MessageSubType, String)>(conn) + .optional()?; + + let serialized_dlc_message = match last_outbound_dlc_message { + Some((message_type, message_sub_type, message)) => { + let dlc_message_sub_type = + ln_dlc_node::dlc_message::DlcMessageSubType::from(message_sub_type); + let message_type = match &message_type { + MessageType::OnChain => { + ln_dlc_node::dlc_message::DlcMessageType::OnChain(dlc_message_sub_type) + } + MessageType::Channel => { + ln_dlc_node::dlc_message::DlcMessageType::Channel(dlc_message_sub_type) + } + }; + + Some(SerializedDlcMessage { + message, + message_type, + }) + } + None => None, + }; + + Ok(serialized_dlc_message) + } + + pub(crate) fn upsert( + conn: &mut SqliteConnection, + peer_id: &PublicKey, + sdm: SerializedDlcMessage, + ) -> Result<()> { + let values = ( + last_outbound_dlc_messages::peer_id.eq(peer_id.to_string()), + last_outbound_dlc_messages::message_hash.eq(sdm.generate_hash().to_string()), + last_outbound_dlc_messages::message.eq(sdm.message), + last_outbound_dlc_messages::timestamp.eq(OffsetDateTime::now_utc().unix_timestamp()), + ); + let affected_rows = diesel::insert_into(last_outbound_dlc_messages::table) + .values(&values.clone()) + .on_conflict(schema::last_outbound_dlc_messages::peer_id) + .do_update() + .set(values) + .execute(conn)?; + + ensure!( + affected_rows > 0, + "Could not upsert last outbound dlc messages" + ); + + Ok(()) + } +} diff --git a/mobile/native/src/db/mod.rs b/mobile/native/src/db/mod.rs index 4eea7921d..c42bd5ca4 100644 --- a/mobile/native/src/db/mod.rs +++ b/mobile/native/src/db/mod.rs @@ -42,6 +42,8 @@ use time::OffsetDateTime; use uuid::Uuid; mod custom_types; +pub mod dlc_messages; +pub mod last_outbound_dlc_messages; pub mod models; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); diff --git a/mobile/native/src/dlc_handler.rs b/mobile/native/src/dlc_handler.rs new file mode 100644 index 000000000..a74a76593 --- /dev/null +++ b/mobile/native/src/dlc_handler.rs @@ -0,0 +1,158 @@ +use crate::db; +use crate::ln_dlc::node::NodeStorage; +use crate::storage::TenTenOneNodeStorage; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use diesel::SqliteConnection; +use dlc_messages::Message; +use ln_dlc_node::dlc_message::DlcMessage; +use ln_dlc_node::dlc_message::SerializedDlcMessage; +use ln_dlc_node::node::dlc_channel::send_dlc_message; +use ln_dlc_node::node::event::NodeEvent; +use ln_dlc_node::node::Node; +use std::sync::Arc; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; + +/// The DlcHandler is responsible for sending dlc messages and marking received ones as +/// processed. It's main purpose is to ensure the following. +/// +/// 1. Mark all received inbound messages as processed. +/// 2. Save the last outbound dlc message, so it can be resend on the next reconnect. +/// 3. Check if a receive message has already been processed and if so inform to skip the message. + +#[derive(Clone)] +pub struct DlcHandler { + node: Arc>, +} + +impl DlcHandler { + pub fn new(node: Arc>) -> Self { + DlcHandler { node } + } +} +pub async fn handle_dlc_messages( + dlc_handler: DlcHandler, + mut receiver: broadcast::Receiver, +) { + loop { + match receiver.recv().await { + Ok(NodeEvent::Connected { peer }) => { + if let Err(e) = dlc_handler.on_connect(peer) { + tracing::error!(peer=%peer, "Failed to process on connect event. {e:#}"); + } + } + Ok(NodeEvent::SendDlcMessage { peer, msg }) => { + if let Err(e) = dlc_handler.send_dlc_message(peer, msg) { + tracing::error!(peer=%peer, "Failed to process end dlc message event. {e:#}"); + } + } + Err(RecvError::Lagged(skipped)) => { + tracing::warn!("Skipped {skipped} messages"); + } + Err(RecvError::Closed) => { + tracing::error!("Lost connection to sender!"); + break; + } + } + } +} + +impl DlcHandler { + pub fn send_dlc_message(&self, peer: PublicKey, msg: Message) -> Result<()> { + let mut conn = db::connection()?; + + let serialized_outbound_message = SerializedDlcMessage::try_from(&msg)?; + let outbound_msg = DlcMessage::new(peer, serialized_outbound_message.clone(), false)?; + + db::dlc_messages::DlcMessage::insert(&mut conn, outbound_msg)?; + db::last_outbound_dlc_messages::LastOutboundDlcMessage::upsert( + &mut conn, + &peer, + serialized_outbound_message, + )?; + + send_dlc_message( + &self.node.dlc_message_handler, + &self.node.peer_manager, + peer, + msg, + ); + + Ok(()) + } + + pub fn on_connect(&self, peer: PublicKey) -> Result<()> { + let mut conn = db::connection()?; + let last_outbound_serialized_dlc_message = + db::last_outbound_dlc_messages::LastOutboundDlcMessage::get(&mut conn, &peer)?; + + if let Some(last_outbound_serialized_dlc_message) = last_outbound_serialized_dlc_message { + tracing::debug!(%peer, ?last_outbound_serialized_dlc_message.message_type, "Sending last dlc message"); + + let message = Message::try_from(&last_outbound_serialized_dlc_message)?; + send_dlc_message( + &self.node.dlc_message_handler, + &self.node.peer_manager, + peer, + message, + ); + } else { + tracing::debug!(%peer, "No last dlc message found. Nothing todo."); + } + + Ok(()) + } + // Returns either the dlc message step or return none, if the dlc message has already been + // processed. + pub fn start_dlc_message_step( + conn: &mut SqliteConnection, + msg: &Message, + peer_id: PublicKey, + ) -> Result> { + let serialized_inbound_message = SerializedDlcMessage::try_from(msg)?; + let inbound_msg = DlcMessage::new(peer_id, serialized_inbound_message, true)?; + + let dlc_message_step = + match db::dlc_messages::DlcMessage::get(conn, inbound_msg.message_hash)? { + Some(_) => None, // the dlc message has already been processed, no step necessary. + None => Some(DlcMessageStep { + inbound_msg, + peer_id, + }), + }; + + Ok(dlc_message_step) + } +} + +pub struct DlcMessageStep { + pub peer_id: PublicKey, + pub inbound_msg: DlcMessage, +} + +impl DlcMessageStep { + /// Finishes the current dlc step by storing the received inbound message as processed and + /// caching the last outbound dlc message (if any) into the database. + pub fn finish(&self, conn: &mut SqliteConnection, response: &Option) -> Result<()> { + tracing::debug!("Marking the received message as processed"); + + db::dlc_messages::DlcMessage::insert(conn, self.inbound_msg.clone())?; + + if let Some(resp) = response { + tracing::debug!("Persisting last outbound dlc message"); + let serialized_outbound_message = SerializedDlcMessage::try_from(resp)?; + let outbound_msg = + DlcMessage::new(self.peer_id, serialized_outbound_message.clone(), false)?; + + db::dlc_messages::DlcMessage::insert(conn, outbound_msg)?; + db::last_outbound_dlc_messages::LastOutboundDlcMessage::upsert( + conn, + &self.peer_id, + serialized_outbound_message, + )?; + } + + Ok(()) + } +} diff --git a/mobile/native/src/lib.rs b/mobile/native/src/lib.rs index e149dfee2..e1496481f 100644 --- a/mobile/native/src/lib.rs +++ b/mobile/native/src/lib.rs @@ -25,4 +25,5 @@ mod orderbook; mod bridge_generated; mod cipher; mod destination; +mod dlc_handler; mod storage; diff --git a/mobile/native/src/ln_dlc/mod.rs b/mobile/native/src/ln_dlc/mod.rs index 63b0b3164..e52d29dee 100644 --- a/mobile/native/src/ln_dlc/mod.rs +++ b/mobile/native/src/ln_dlc/mod.rs @@ -10,6 +10,7 @@ use crate::commons::reqwest_client; use crate::config; use crate::config::get_rgs_server_url; use crate::db; +use crate::dlc_handler; use crate::event; use crate::event::EventInternal; use crate::ln_dlc::channel_status::track_channel_status; @@ -104,6 +105,7 @@ mod sync_position_to_subchannel; pub mod channel_status; +use crate::dlc_handler::DlcHandler; use crate::storage::TenTenOneNodeStorage; pub use channel_status::ChannelStatus; use ln_dlc_node::node::event::NodeEventHandler; @@ -321,6 +323,16 @@ pub fn run(seed_dir: String, runtime: &Runtime) -> Result<()> { )?; let node = Arc::new(node); + let dlc_handler = DlcHandler::new(node.clone()); + runtime.spawn(async move { + // this handles sending outbound dlc messages as well as keeping track of what + // dlc messages have already been processed and what was the last outbound dlc message + // so it can be resend on reconnect. + // + // this does not handle the incoming dlc messages! + dlc_handler::handle_dlc_messages(dlc_handler, node_event_handler.subscribe()).await + }); + let event_handler = AppEventHandler::new(node.clone(), Some(event_sender)); let _running = node.start(event_handler, true)?; let node = Arc::new(Node::new(node, _running)); diff --git a/mobile/native/src/ln_dlc/node.rs b/mobile/native/src/ln_dlc/node.rs index d86e4082f..fe1efecf0 100644 --- a/mobile/native/src/ln_dlc/node.rs +++ b/mobile/native/src/ln_dlc/node.rs @@ -1,4 +1,5 @@ use crate::db; +use crate::dlc_handler::DlcHandler; use crate::event; use crate::event::BackgroundTask; use crate::event::EventInternal; @@ -150,96 +151,139 @@ impl Node { ); let resp = match &msg { - Message::OnChain(_) => self - .inner - .dlc_manager - .on_dlc_message(&msg, node_id) - .with_context(|| { - format!( - "Failed to handle {} message from {node_id}", - dlc_message_name(&msg) - ) - })?, - Message::SubChannel(ref msg) => { + Message::OnChain(_) => { + let dlc_message_step = { + let mut conn = db::connection()?; + let dlc_message_step = + DlcHandler::start_dlc_message_step(&mut conn, &msg, node_id)?; + + match dlc_message_step { + Some(dlc_message_step) => dlc_message_step, + None => { + tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping."); + return Ok(()); + } + } + }; + let resp = self .inner - .sub_channel_manager - .on_sub_channel_message(msg, &node_id) + .dlc_manager + .on_dlc_message(&msg, node_id) .with_context(|| { format!( "Failed to handle {} message from {node_id}", - sub_channel_message_name(msg) + dlc_message_name(&msg) ) - })? - .map(Message::SubChannel); + })?; - // Some incoming messages require extra action from our part for the protocol to - // continue - match msg { - SubChannelMessage::Offer(offer) => { - // TODO: We should probably verify that: (1) the counterparty is the - // coordinator and (2) the DLC channel offer is expected and correct. + { + let mut conn = db::connection()?; + dlc_message_step.finish(&mut conn, &resp)?; + } + + resp + } + Message::Channel(channel_msg) => { + let dlc_message_step = { + let mut conn = db::connection()?; + let dlc_message_step = + DlcHandler::start_dlc_message_step(&mut conn, &msg, node_id)?; + + match dlc_message_step { + Some(dlc_message_step) => dlc_message_step, + None => { + tracing::debug!(%node_id, kind=%dlc_message_name(&msg), "Received message that has already been processed, skipping."); + return Ok(()); + } + } + }; + + let resp = self + .inner + .dlc_manager + .on_dlc_message(&msg, node_id) + .with_context(|| { + format!( + "Failed to handle {} message from {node_id}", + dlc_message_name(&msg) + ) + })?; + + match channel_msg { + ChannelMessage::Offer(offer) => { let action = decide_subchannel_offer_action( OffsetDateTime::from_unix_timestamp( offer.contract_info.get_closest_maturity_date() as i64, ) .expect("A contract should always have a valid maturity timestamp."), ); - self.process_subchannel_offer(offer.channel_id, action)?; + self.process_dlc_channel_offer(offer.temporary_channel_id, action)?; } - SubChannelMessage::CloseOffer(offer) => { - let channel_id = offer.channel_id; - - // TODO: We should probably verify that: (1) the counterparty is the - // coordinator and (2) the DLC channel close offer is expected and correct. + ChannelMessage::SettleOffer(offer) => { self.inner - .accept_sub_channel_collaborative_settlement(&channel_id) + .accept_dlc_channel_collaborative_settlement(offer.channel_id) .with_context(|| { format!( - "Failed to accept sub channel close offer for channel {}", - hex::encode(channel_id.0) + "Failed to accept DLC channel close offer for channel {}", + hex::encode(offer.channel_id) ) })?; } _ => (), - }; + } + + { + let mut conn = db::connection()?; + dlc_message_step.finish(&mut conn, &resp)?; + } resp } - Message::Channel(channel_msg) => { + Message::SubChannel(ref msg) => { let resp = self .inner - .dlc_manager - .on_dlc_message(&msg, node_id) + .sub_channel_manager + .on_sub_channel_message(msg, &node_id) .with_context(|| { format!( "Failed to handle {} message from {node_id}", - dlc_message_name(&msg) + sub_channel_message_name(msg) ) - })?; + })? + .map(Message::SubChannel); - match channel_msg { - ChannelMessage::Offer(offer) => { + // Some incoming messages require extra action from our part for the protocol to + // continue + match msg { + SubChannelMessage::Offer(offer) => { + // TODO: We should probably verify that: (1) the counterparty is the + // coordinator and (2) the DLC channel offer is expected and correct. let action = decide_subchannel_offer_action( OffsetDateTime::from_unix_timestamp( offer.contract_info.get_closest_maturity_date() as i64, ) .expect("A contract should always have a valid maturity timestamp."), ); - self.process_dlc_channel_offer(offer.temporary_channel_id, action)?; + self.process_subchannel_offer(offer.channel_id, action)?; } - ChannelMessage::SettleOffer(offer) => { + SubChannelMessage::CloseOffer(offer) => { + let channel_id = offer.channel_id; + + // TODO: We should probably verify that: (1) the counterparty is the + // coordinator and (2) the DLC channel close offer is expected and correct. self.inner - .accept_dlc_channel_collaborative_settlement(offer.channel_id) + .accept_sub_channel_collaborative_settlement(&channel_id) .with_context(|| { format!( - "Failed to accept DLC channel close offer for channel {}", - hex::encode(offer.channel_id) + "Failed to accept sub channel close offer for channel {}", + hex::encode(channel_id.0) ) })?; } _ => (), - } + }; + resp } }; diff --git a/mobile/native/src/schema.rs b/mobile/native/src/schema.rs index 81448021d..ce6145bee 100644 --- a/mobile/native/src/schema.rs +++ b/mobile/native/src/schema.rs @@ -17,6 +17,26 @@ diesel::table! { } } +diesel::table! { + dlc_messages (message_hash) { + message_hash -> Text, + inbound -> Bool, + peer_id -> Text, + message_type -> Text, + message_sub_type -> Text, + timestamp -> BigInt, + } +} + +diesel::table! { + last_outbound_dlc_messages (peer_id) { + peer_id -> Text, + message_hash -> Text, + message -> Text, + timestamp -> BigInt, + } +} + diesel::table! { orders (id) { id -> Text, @@ -104,8 +124,12 @@ diesel::table! { } } +diesel::joinable!(last_outbound_dlc_messages -> dlc_messages (message_hash)); + diesel::allow_tables_to_appear_in_same_query!( channels, + dlc_messages, + last_outbound_dlc_messages, orders, payments, positions,