From 40b7bc59d06f5cfbaba692a0377c1551a2cba33e Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 27 Sep 2023 12:20:57 -0400 Subject: [PATCH] Use dedicated Queues for each from-to pair Prevents one Processor's message from halting the entire pipeline. --- coordinator/src/db.rs | 12 ++--- coordinator/src/main.rs | 81 ++++++++++------------------------ coordinator/src/processors.rs | 13 +++--- coordinator/src/tests/mod.rs | 2 +- message-queue/src/client.rs | 10 ++--- message-queue/src/main.rs | 71 +++++++++++++++++++---------- message-queue/src/messages.rs | 6 ++- message-queue/src/queue.rs | 8 ++-- processor/src/coordinator.rs | 4 +- tests/coordinator/src/lib.rs | 8 ++-- tests/message-queue/src/lib.rs | 40 ++++++++++++++--- tests/processor/src/lib.rs | 12 ++--- 12 files changed, 142 insertions(+), 125 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 5019b6c01..0704df8f6 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -15,14 +15,14 @@ impl MainDb { D::key(b"coordinator_main", dst, key) } - fn handled_message_key(id: u64) -> Vec { - Self::main_key(b"handled_message", id.to_le_bytes()) + fn handled_message_key(network: NetworkId, id: u64) -> Vec { + Self::main_key(b"handled_message", (network, id).encode()) } - pub fn save_handled_message(txn: &mut D::Transaction<'_>, id: u64) { - txn.put(Self::handled_message_key(id), []); + pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) { + txn.put(Self::handled_message_key(network, id), []); } - pub fn handled_message(getter: &G, id: u64) -> bool { - getter.get(Self::handled_message_key(id)).is_some() + pub fn handled_message(getter: &G, network: NetworkId, id: u64) -> bool { + getter.get(Self::handled_message_key(network, id)).is_some() } fn acive_tributaries_key() -> Vec { diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b95a7df19..c54929481 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -533,19 +533,20 @@ async fn handle_processor_messages( key: Zeroizing<::F>, serai: Arc, mut processors: Pro, - tributary: ActiveTributary, - mut recv: mpsc::UnboundedReceiver, + network: NetworkId, + mut recv: mpsc::UnboundedReceiver>, ) { let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn let pub_key = Ristretto::generator() * key.deref(); - let ActiveTributary { spec, tributary } = tributary; + let ActiveTributary { spec, tributary } = recv.recv().await.unwrap(); let genesis = spec.genesis(); loop { - let msg: processors::Message = recv.recv().await.unwrap(); + // TODO: Check this ID is sane (last handled ID or expected next ID) + let msg = processors.recv(network).await; - if !MainDb::::handled_message(&db, msg.id) { + if !MainDb::::handled_message(&db, msg.network, msg.id) { let mut txn = db.txn(); // TODO: We probably want to NOP here, not panic? @@ -817,7 +818,7 @@ async fn handle_processor_messages( } } - MainDb::::save_handled_message(&mut txn, msg.id); + MainDb::::save_handled_message(&mut txn, msg.network, msg.id); txn.commit(); } @@ -829,61 +830,27 @@ pub async fn handle_processors( db: D, key: Zeroizing<::F>, serai: Arc, - mut processors: Pro, + processors: Pro, mut new_tributary: broadcast::Receiver>, ) { - let channels = Arc::new(RwLock::new(HashMap::new())); - // Listen to new tributary events - tokio::spawn({ - let db = db.clone(); - let processors = processors.clone(); - let channels = channels.clone(); - async move { - loop { - let channels = channels.clone(); - let tributary = new_tributary.recv().await.unwrap(); - - let (send, recv) = mpsc::unbounded_channel(); - // TODO: Support multisig rotation (not per-Tributary yet per-network?) - channels.write().await.insert(tributary.spec.set().network, send); - - // For each new tributary, spawn a dedicated task to handle its messages from the processor - // TODO: Redo per network, not per tributary - tokio::spawn(handle_processor_messages( - db.clone(), - key.clone(), - serai.clone(), - processors.clone(), - tributary, - recv, - )); - } - } - }); + let mut channels = HashMap::new(); + for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] { + let (send, recv) = mpsc::unbounded_channel(); + tokio::spawn(handle_processor_messages( + db.clone(), + key.clone(), + serai.clone(), + processors.clone(), + network, + recv, + )); + channels.insert(network, send); + } - // Dispatch task - let mut last_msg = None; + // Listen to new tributary events loop { - // TODO: We dispatch this to an async task per-processor, yet we don't move to the next message - // yet as all processor messages are shoved into a global queue. - // Modify message-queue to offer per-sender queues, not per-receiver. - // Alternatively, a peek method with local delineation of handled messages would work. - - let msg = processors.recv().await; - // TODO: Check this ID is sane (last handled ID or expected next ID) - if last_msg == Some(msg.id) { - sleep(Duration::from_secs(1)).await; - continue; - } - last_msg = Some(msg.id); - - // TODO: Race conditions with above tributary availability? - // TODO: How does this hold up to multisig rotation? - if let Some(channel) = channels.read().await.get(&msg.network) { - channel.send(msg).unwrap(); - } else { - log::warn!("received processor message for network we don't have a channel for"); - } + let tributary = new_tributary.recv().await.unwrap(); + channels[&tributary.spec.set().network].send(tributary).unwrap(); } } diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs index 7b6c6146b..da31d4b60 100644 --- a/coordinator/src/processors.rs +++ b/coordinator/src/processors.rs @@ -15,7 +15,7 @@ pub struct Message { #[async_trait::async_trait] pub trait Processors: 'static + Send + Sync + Clone { async fn send(&self, network: NetworkId, msg: CoordinatorMessage); - async fn recv(&mut self) -> Message; + async fn recv(&mut self, network: NetworkId) -> Message; async fn ack(&mut self, msg: Message); } @@ -27,13 +27,10 @@ impl Processors for Arc { let msg = serde_json::to_string(&msg).unwrap(); self.queue(metadata, msg.into_bytes()).await; } - async fn recv(&mut self) -> Message { - let msg = self.next().await; + async fn recv(&mut self, network: NetworkId) -> Message { + let msg = self.next(Service::Processor(network)).await; + assert_eq!(msg.from, Service::Processor(network)); - let network = match msg.from { - Service::Processor(network) => network, - Service::Coordinator => panic!("coordinator received coordinator message"), - }; let id = msg.id; // Deserialize it into a ProcessorMessage @@ -43,6 +40,6 @@ impl Processors for Arc { return Message { id, network, msg }; } async fn ack(&mut self, msg: Message) { - MessageQueue::ack(self, msg.id).await + MessageQueue::ack(self, Service::Processor(msg.network), msg.id).await } } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 748ac4a67..4e766ff33 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -35,7 +35,7 @@ impl Processors for MemProcessors { let processor = processors.entry(network).or_insert_with(VecDeque::new); processor.push_back(msg); } - async fn recv(&mut self) -> Message { + async fn recv(&mut self, _: NetworkId) -> Message { todo!() } async fn ack(&mut self, _: Message) { diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index f0f263305..f1bf29d0b 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -140,9 +140,9 @@ impl MessageQueue { } } - pub async fn next(&self) -> QueuedMessage { + pub async fn next(&self, from: Service) -> QueuedMessage { loop { - let json = self.json_call("next", serde_json::json!([self.service])).await; + let json = self.json_call("next", serde_json::json!([from, self.service])).await; // Convert from a Value to a type via reserialization let msg: Option = serde_json::from_str( @@ -179,18 +179,18 @@ impl MessageQueue { } } - pub async fn ack(&self, id: u64) { + pub async fn ack(&self, from: Service, id: u64) { // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. let nonce = Zeroizing::new(::F::random(&mut OsRng)); let nonce_pub = Ristretto::generator() * nonce.deref(); let sig = SchnorrSignature::::sign( &self.priv_key, nonce, - ack_challenge(self.service, self.pub_key, id, nonce_pub), + ack_challenge(self.service, self.pub_key, from, id, nonce_pub), ) .serialize(); - let json = self.json_call("ack", serde_json::json!([self.service, id, sig])).await; + let json = self.json_call("ack", serde_json::json!([from, self.service, id, sig])).await; if json.get("result") != Some(&serde_json::Value::Bool(true)) { panic!("failed to ack message {id}: {json}"); } diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index dcf862252..e7707ed89 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -25,12 +25,17 @@ mod binaries { pub(crate) type Db = serai_db::RocksDB; - lazy_static::lazy_static! { - pub(crate) static ref KEYS: Arc::G>>> = - Arc::new(RwLock::new(HashMap::new())); - pub(crate) static ref QUEUES: Arc>>>> = - Arc::new(RwLock::new(HashMap::new())); + #[allow(clippy::type_complexity)] + mod clippy { + use super::*; + lazy_static::lazy_static! { + pub(crate) static ref KEYS: Arc::G>>> = + Arc::new(RwLock::new(HashMap::new())); + pub(crate) static ref QUEUES: Arc>>>> = + Arc::new(RwLock::new(HashMap::new())); + } } + pub(crate) use self::clippy::*; // queue RPC method /* @@ -71,16 +76,17 @@ mod binaries { fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() } - fn intent_key(from: Service, intent: &[u8]) -> Vec { - key(b"intent_seen", bincode::serialize(&(from, intent)).unwrap()) + fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec { + key(b"intent_seen", bincode::serialize(&(from, to, intent)).unwrap()) } let mut db = db.write().unwrap(); let mut txn = db.txn(); - let intent_key = intent_key(meta.from, &meta.intent); + let intent_key = intent_key(meta.from, meta.to, &meta.intent); if Get::get(&txn, &intent_key).is_some() { log::warn!( - "Prior queued message attempted to be queued again. From: {:?} Intent: {}", + "Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}", meta.from, + meta.to, hex::encode(&meta.intent) ); return; @@ -88,7 +94,7 @@ mod binaries { DbTxn::put(&mut txn, intent_key, []); // Queue it - let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message( + let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message( &mut txn, QueuedMessage { from: meta.from, @@ -105,15 +111,15 @@ mod binaries { // next RPC method /* - Gets the next message in queue for this service. + Gets the next message in queue for the named services. This is not authenticated due to the fact every nonce would have to be saved to prevent replays, or a challenge-response protocol implemented. Neither are worth doing when there should be no sensitive data on this server. */ - pub(crate) fn get_next_message(service: Service) -> Option { + pub(crate) fn get_next_message(from: Service, to: Service) -> Option { let queue_outer = (*QUEUES).read().unwrap(); - let queue = queue_outer[&service].read().unwrap(); + let queue = queue_outer[&(from, to)].read().unwrap(); let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); queue.get_message(next) } @@ -123,10 +129,10 @@ mod binaries { Acknowledges a message as received and handled, meaning it'll no longer be returned as the next message. */ - pub(crate) fn ack_message(service: Service, id: u64, sig: SchnorrSignature) { + pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature) { { - let from = (*KEYS).read().unwrap()[&service]; - assert!(sig.verify(from, ack_challenge(service, from, id, sig.R))); + let to_key = (*KEYS).read().unwrap()[&to]; + assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R))); } // Is it: @@ -136,9 +142,9 @@ mod binaries { // It's the second if we acknowledge messages before saving them as acknowledged // TODO: Check only a proper message is being acked - log::info!("{:?} is acknowledging {}", service, id); + log::info!("{:?} is acknowledging {:?} {}", from, to, id); - (*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id) + (*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id) } } @@ -177,13 +183,29 @@ async fn main() { Some(::G::from_bytes(&repr).unwrap()) }; + const ALL_EXT_NETWORKS: [NetworkId; 3] = + [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero]; + let register_service = |service, key| { (*KEYS).write().unwrap().insert(service, key); - (*QUEUES).write().unwrap().insert(service, RwLock::new(Queue(db.clone(), service))); + let mut queues = (*QUEUES).write().unwrap(); + if service == Service::Coordinator { + for network in ALL_EXT_NETWORKS { + queues.insert( + (service, Service::Processor(network)), + RwLock::new(Queue(db.clone(), service, Service::Processor(network))), + ); + } + } else { + queues.insert( + (service, Service::Coordinator), + RwLock::new(Queue(db.clone(), service, Service::Coordinator)), + ); + } }; // Make queues for each NetworkId, other than Serai - for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] { + for network in ALL_EXT_NETWORKS { // Use a match so we error if the list of NetworkIds changes let Some(key) = read_key(match network { NetworkId::Serai => unreachable!(), @@ -224,17 +246,18 @@ async fn main() { .unwrap(); module .register_method("next", |args, _| { - let args = args.parse::().unwrap(); - Ok(get_next_message(args)) + let (from, to) = args.parse::<(Service, Service)>().unwrap(); + Ok(get_next_message(from, to)) }) .unwrap(); module .register_method("ack", |args, _| { - let args = args.parse::<(Service, u64, Vec)>().unwrap(); + let args = args.parse::<(Service, Service, u64, Vec)>().unwrap(); ack_message( args.0, args.1, - SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), + args.2, + SchnorrSignature::::read(&mut args.3.as_slice()).unwrap(), ); Ok(true) }) diff --git a/message-queue/src/messages.rs b/message-queue/src/messages.rs index 9ad393dea..89557ad7f 100644 --- a/message-queue/src/messages.rs +++ b/message-queue/src/messages.rs @@ -48,15 +48,17 @@ pub fn message_challenge( } pub fn ack_challenge( + to: Service, + to_key: ::G, from: Service, - from_key: ::G, id: u64, nonce: ::G, ) -> ::F { let mut transcript = RecommendedTranscript::new(b"Serai Message Queue v0.1 Ackowledgement"); transcript.domain_separate(b"metadata"); + transcript.append_message(b"to", bincode::serialize(&to).unwrap()); + transcript.append_message(b"to_key", to_key.to_bytes()); transcript.append_message(b"from", bincode::serialize(&from).unwrap()); - transcript.append_message(b"from_key", from_key.to_bytes()); transcript.domain_separate(b"message"); transcript.append_message(b"id", id.to_le_bytes()); transcript.domain_separate(b"signature"); diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs index 282734103..aa9e99cd6 100644 --- a/message-queue/src/queue.rs +++ b/message-queue/src/queue.rs @@ -3,14 +3,14 @@ use serai_db::{DbTxn, Db}; use crate::messages::*; #[derive(Clone, Debug)] -pub(crate) struct Queue(pub(crate) D, pub(crate) Service); +pub(crate) struct Queue(pub(crate) D, pub(crate) Service, pub(crate) Service); impl Queue { fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() } fn message_count_key(&self) -> Vec { - Self::key(b"message_count", serde_json::to_vec(&self.1).unwrap()) + Self::key(b"message_count", bincode::serialize(&(self.1, self.2)).unwrap()) } pub(crate) fn message_count(&self) -> u64 { self @@ -21,7 +21,7 @@ impl Queue { } fn last_acknowledged_key(&self) -> Vec { - Self::key(b"last_acknowledged", serde_json::to_vec(&self.1).unwrap()) + Self::key(b"last_acknowledged", bincode::serialize(&(self.1, self.2)).unwrap()) } pub(crate) fn last_acknowledged(&self) -> Option { self @@ -31,7 +31,7 @@ impl Queue { } fn message_key(&self, id: u64) -> Vec { - Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) + Self::key(b"message", bincode::serialize(&(self.1, self.2, id)).unwrap()) } // TODO: This is fine as-used, yet gets from the DB while having a txn. It should get from the // txn diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index fb8e20f16..294c273ca 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -25,7 +25,7 @@ impl Coordinator for MessageQueue { } async fn recv(&mut self) -> Message { - let msg = self.next().await; + let msg = self.next(Service::Coordinator).await; let id = msg.id; @@ -37,6 +37,6 @@ impl Coordinator for MessageQueue { } async fn ack(&mut self, msg: Message) { - MessageQueue::ack(self, msg.id).await + MessageQueue::ack(self, Service::Coordinator, msg.id).await } } diff --git a/tests/coordinator/src/lib.rs b/tests/coordinator/src/lib.rs index c68bdb577..afad025e2 100644 --- a/tests/coordinator/src/lib.rs +++ b/tests/coordinator/src/lib.rs @@ -200,7 +200,7 @@ impl Processor { Serai::new(&self.serai_rpc).await.unwrap() } - /// Send a message to a processor as its coordinator. + /// Send a message to the coordinator as a processor. pub async fn send_message(&mut self, msg: impl Into) { let msg: ProcessorMessage = msg.into(); self @@ -217,14 +217,14 @@ impl Processor { self.next_send_id += 1; } - /// Receive a message from a processor as its coordinator. + /// Receive a message from the coordinator as a processor. pub async fn recv_message(&mut self) -> CoordinatorMessage { - let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(self.next_recv_id)) + let msg = tokio::time::timeout(Duration::from_secs(10), self.queue.next(Service::Coordinator)) .await .unwrap(); assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.id, self.next_recv_id); - self.queue.ack(self.next_recv_id).await; + self.queue.ack(Service::Coordinator, msg.id).await; self.next_recv_id += 1; serde_json::from_slice(&msg.msg).unwrap() } diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs index 34f49af79..567cb244c 100644 --- a/tests/message-queue/src/lib.rs +++ b/tests/message-queue/src/lib.rs @@ -106,28 +106,54 @@ fn basic_functionality() { // Successfully get it let bitcoin = MessageQueue::new( Service::Processor(NetworkId::Bitcoin), - rpc, + rpc.clone(), Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), ); - let msg = bitcoin.next(0).await; + let msg = bitcoin.next(Service::Coordinator).await; assert_eq!(msg.from, Service::Coordinator); assert_eq!(msg.id, 0); assert_eq!(&msg.msg, b"Hello, World!"); // If we don't ack it, it should continue to be returned - assert_eq!(msg, bitcoin.next(0).await); + assert_eq!(msg, bitcoin.next(Service::Coordinator).await); // Acknowledging it should yield the next message - bitcoin.ack(0).await; + bitcoin.ack(Service::Coordinator, 0).await; - let next_msg = bitcoin.next(1).await; + let next_msg = bitcoin.next(Service::Coordinator).await; assert!(msg != next_msg); assert_eq!(next_msg.from, Service::Coordinator); assert_eq!(next_msg.id, 1); assert_eq!(&next_msg.msg, b"Hello, World, again!"); - bitcoin.ack(1).await; + bitcoin.ack(Service::Coordinator, 1).await; // No further messages should be available - tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(2)).await.unwrap_err(); + tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator)) + .await + .unwrap_err(); + + // Queueing to a distinct processor should work, with a unique ID + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Monero), + // Intents should be per-from-to, making this valid + intent: b"intent".to_vec(), + }, + b"Hello, World!".to_vec(), + ) + .await; + + let monero = MessageQueue::new( + Service::Processor(NetworkId::Monero), + rpc, + Zeroizing::new(priv_keys[&NetworkId::Monero]), + ); + assert_eq!(monero.next(Service::Coordinator).await.id, 0); + monero.ack(Service::Coordinator, 0).await; + tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator)) + .await + .unwrap_err(); }); } diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index a0112429d..b924acdf2 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -222,13 +222,15 @@ impl Coordinator { /// Receive a message from a processor as its coordinator. pub async fn recv_message(&mut self) -> ProcessorMessage { - let msg = - tokio::time::timeout(core::time::Duration::from_secs(10), self.queue.next(self.next_recv_id)) - .await - .unwrap(); + let msg = tokio::time::timeout( + core::time::Duration::from_secs(10), + self.queue.next(Service::Processor(self.network)), + ) + .await + .unwrap(); assert_eq!(msg.from, Service::Processor(self.network)); assert_eq!(msg.id, self.next_recv_id); - self.queue.ack(self.next_recv_id).await; + self.queue.ack(Service::Processor(self.network), msg.id).await; self.next_recv_id += 1; serde_json::from_slice(&msg.msg).unwrap() }