Skip to content

Commit

Permalink
Use dedicated Queues for each from-to pair
Browse files Browse the repository at this point in the history
Prevents one Processor's message from halting the entire pipeline.
  • Loading branch information
kayabaNerve committed Sep 27, 2023
1 parent 269db1c commit 40b7bc5
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 125 deletions.
12 changes: 6 additions & 6 deletions coordinator/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ impl<D: Db> MainDb<D> {
D::key(b"coordinator_main", dst, key)
}

fn handled_message_key(id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", id.to_le_bytes())
fn handled_message_key(network: NetworkId, id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", (network, id).encode())
}
pub fn save_handled_message(txn: &mut D::Transaction<'_>, id: u64) {
txn.put(Self::handled_message_key(id), []);
pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) {
txn.put(Self::handled_message_key(network, id), []);
}
pub fn handled_message<G: Get>(getter: &G, id: u64) -> bool {
getter.get(Self::handled_message_key(id)).is_some()
pub fn handled_message<G: Get>(getter: &G, network: NetworkId, id: u64) -> bool {
getter.get(Self::handled_message_key(network, id)).is_some()
}

fn acive_tributaries_key() -> Vec<u8> {
Expand Down
81 changes: 24 additions & 57 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,19 +533,20 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
tributary: ActiveTributary<D, P>,
mut recv: mpsc::UnboundedReceiver<processors::Message>,
network: NetworkId,
mut recv: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
) {
let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn
let pub_key = Ristretto::generator() * key.deref();

let ActiveTributary { spec, tributary } = tributary;
let ActiveTributary { spec, tributary } = recv.recv().await.unwrap();
let genesis = spec.genesis();

loop {
let msg: processors::Message = recv.recv().await.unwrap();
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;

if !MainDb::<D>::handled_message(&db, msg.id) {
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
let mut txn = db.txn();

// TODO: We probably want to NOP here, not panic?
Expand Down Expand Up @@ -817,7 +818,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}
}

MainDb::<D>::save_handled_message(&mut txn, msg.id);
MainDb::<D>::save_handled_message(&mut txn, msg.network, msg.id);
txn.commit();
}

Expand All @@ -829,61 +830,27 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
processors: Pro,
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) {
let channels = Arc::new(RwLock::new(HashMap::new()));
// Listen to new tributary events
tokio::spawn({
let db = db.clone();
let processors = processors.clone();
let channels = channels.clone();
async move {
loop {
let channels = channels.clone();
let tributary = new_tributary.recv().await.unwrap();

let (send, recv) = mpsc::unbounded_channel();
// TODO: Support multisig rotation (not per-Tributary yet per-network?)
channels.write().await.insert(tributary.spec.set().network, send);

// For each new tributary, spawn a dedicated task to handle its messages from the processor
// TODO: Redo per network, not per tributary
tokio::spawn(handle_processor_messages(
db.clone(),
key.clone(),
serai.clone(),
processors.clone(),
tributary,
recv,
));
}
}
});
let mut channels = HashMap::new();
for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] {
let (send, recv) = mpsc::unbounded_channel();
tokio::spawn(handle_processor_messages(
db.clone(),
key.clone(),
serai.clone(),
processors.clone(),
network,
recv,
));
channels.insert(network, send);
}

// Dispatch task
let mut last_msg = None;
// Listen to new tributary events
loop {
// TODO: We dispatch this to an async task per-processor, yet we don't move to the next message
// yet as all processor messages are shoved into a global queue.
// Modify message-queue to offer per-sender queues, not per-receiver.
// Alternatively, a peek method with local delineation of handled messages would work.

let msg = processors.recv().await;
// TODO: Check this ID is sane (last handled ID or expected next ID)
if last_msg == Some(msg.id) {
sleep(Duration::from_secs(1)).await;
continue;
}
last_msg = Some(msg.id);

// TODO: Race conditions with above tributary availability?
// TODO: How does this hold up to multisig rotation?
if let Some(channel) = channels.read().await.get(&msg.network) {
channel.send(msg).unwrap();
} else {
log::warn!("received processor message for network we don't have a channel for");
}
let tributary = new_tributary.recv().await.unwrap();
channels[&tributary.spec.set().network].send(tributary).unwrap();
}
}

Expand Down
13 changes: 5 additions & 8 deletions coordinator/src/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Message {
#[async_trait::async_trait]
pub trait Processors: 'static + Send + Sync + Clone {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage);
async fn recv(&mut self) -> Message;
async fn recv(&mut self, network: NetworkId) -> Message;
async fn ack(&mut self, msg: Message);
}

Expand All @@ -27,13 +27,10 @@ impl Processors for Arc<MessageQueue> {
let msg = serde_json::to_string(&msg).unwrap();
self.queue(metadata, msg.into_bytes()).await;
}
async fn recv(&mut self) -> Message {
let msg = self.next().await;
async fn recv(&mut self, network: NetworkId) -> Message {
let msg = self.next(Service::Processor(network)).await;
assert_eq!(msg.from, Service::Processor(network));

let network = match msg.from {
Service::Processor(network) => network,
Service::Coordinator => panic!("coordinator received coordinator message"),
};
let id = msg.id;

// Deserialize it into a ProcessorMessage
Expand All @@ -43,6 +40,6 @@ impl Processors for Arc<MessageQueue> {
return Message { id, network, msg };
}
async fn ack(&mut self, msg: Message) {
MessageQueue::ack(self, msg.id).await
MessageQueue::ack(self, Service::Processor(msg.network), msg.id).await
}
}
2 changes: 1 addition & 1 deletion coordinator/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Processors for MemProcessors {
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg);
}
async fn recv(&mut self) -> Message {
async fn recv(&mut self, _: NetworkId) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {
Expand Down
10 changes: 5 additions & 5 deletions message-queue/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ impl MessageQueue {
}
}

pub async fn next(&self) -> QueuedMessage {
pub async fn next(&self, from: Service) -> QueuedMessage {
loop {
let json = self.json_call("next", serde_json::json!([self.service])).await;
let json = self.json_call("next", serde_json::json!([from, self.service])).await;

// Convert from a Value to a type via reserialization
let msg: Option<QueuedMessage> = serde_json::from_str(
Expand Down Expand Up @@ -179,18 +179,18 @@ impl MessageQueue {
}
}

pub async fn ack(&self, id: u64) {
pub async fn ack(&self, from: Service, id: u64) {
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign(
&self.priv_key,
nonce,
ack_challenge(self.service, self.pub_key, id, nonce_pub),
ack_challenge(self.service, self.pub_key, from, id, nonce_pub),
)
.serialize();

let json = self.json_call("ack", serde_json::json!([self.service, id, sig])).await;
let json = self.json_call("ack", serde_json::json!([from, self.service, id, sig])).await;
if json.get("result") != Some(&serde_json::Value::Bool(true)) {
panic!("failed to ack message {id}: {json}");
}
Expand Down
71 changes: 47 additions & 24 deletions message-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ mod binaries {

pub(crate) type Db = serai_db::RocksDB;

lazy_static::lazy_static! {
pub(crate) static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
Arc::new(RwLock::new(HashMap::new()));
pub(crate) static ref QUEUES: Arc<RwLock<HashMap<Service, RwLock<Queue<Db>>>>> =
Arc::new(RwLock::new(HashMap::new()));
#[allow(clippy::type_complexity)]
mod clippy {
use super::*;
lazy_static::lazy_static! {
pub(crate) static ref KEYS: Arc<RwLock<HashMap<Service, <Ristretto as Ciphersuite>::G>>> =
Arc::new(RwLock::new(HashMap::new()));
pub(crate) static ref QUEUES: Arc<RwLock<HashMap<(Service, Service), RwLock<Queue<Db>>>>> =
Arc::new(RwLock::new(HashMap::new()));
}
}
pub(crate) use self::clippy::*;

// queue RPC method
/*
Expand Down Expand Up @@ -71,24 +76,25 @@ mod binaries {
fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
[&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat()
}
fn intent_key(from: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", bincode::serialize(&(from, intent)).unwrap())
fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec<u8> {
key(b"intent_seen", bincode::serialize(&(from, to, intent)).unwrap())
}
let mut db = db.write().unwrap();
let mut txn = db.txn();
let intent_key = intent_key(meta.from, &meta.intent);
let intent_key = intent_key(meta.from, meta.to, &meta.intent);
if Get::get(&txn, &intent_key).is_some() {
log::warn!(
"Prior queued message attempted to be queued again. From: {:?} Intent: {}",
"Prior queued message attempted to be queued again. From: {:?} To: {:?} Intent: {}",
meta.from,
meta.to,
hex::encode(&meta.intent)
);
return;
}
DbTxn::put(&mut txn, intent_key, []);

// Queue it
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
&mut txn,
QueuedMessage {
from: meta.from,
Expand All @@ -105,15 +111,15 @@ mod binaries {

// next RPC method
/*
Gets the next message in queue for this service.
Gets the next message in queue for the named services.
This is not authenticated due to the fact every nonce would have to be saved to prevent
replays, or a challenge-response protocol implemented. Neither are worth doing when there
should be no sensitive data on this server.
*/
pub(crate) fn get_next_message(service: Service) -> Option<QueuedMessage> {
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
let queue_outer = (*QUEUES).read().unwrap();
let queue = queue_outer[&service].read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
queue.get_message(next)
}
Expand All @@ -123,10 +129,10 @@ mod binaries {
Acknowledges a message as received and handled, meaning it'll no longer be returned as the next
message.
*/
pub(crate) fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{
let from = (*KEYS).read().unwrap()[&service];
assert!(sig.verify(from, ack_challenge(service, from, id, sig.R)));
let to_key = (*KEYS).read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
}

// Is it:
Expand All @@ -136,9 +142,9 @@ mod binaries {
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked

log::info!("{:?} is acknowledging {}", service, id);
log::info!("{:?} is acknowledging {:?} {}", from, to, id);

(*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id)
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
}
}

Expand Down Expand Up @@ -177,13 +183,29 @@ async fn main() {
Some(<Ristretto as Ciphersuite>::G::from_bytes(&repr).unwrap())
};

const ALL_EXT_NETWORKS: [NetworkId; 3] =
[NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero];

let register_service = |service, key| {
(*KEYS).write().unwrap().insert(service, key);
(*QUEUES).write().unwrap().insert(service, RwLock::new(Queue(db.clone(), service)));
let mut queues = (*QUEUES).write().unwrap();
if service == Service::Coordinator {
for network in ALL_EXT_NETWORKS {
queues.insert(
(service, Service::Processor(network)),
RwLock::new(Queue(db.clone(), service, Service::Processor(network))),
);
}
} else {
queues.insert(
(service, Service::Coordinator),
RwLock::new(Queue(db.clone(), service, Service::Coordinator)),
);
}
};

// Make queues for each NetworkId, other than Serai
for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] {
for network in ALL_EXT_NETWORKS {
// Use a match so we error if the list of NetworkIds changes
let Some(key) = read_key(match network {
NetworkId::Serai => unreachable!(),
Expand Down Expand Up @@ -224,17 +246,18 @@ async fn main() {
.unwrap();
module
.register_method("next", |args, _| {
let args = args.parse::<Service>().unwrap();
Ok(get_next_message(args))
let (from, to) = args.parse::<(Service, Service)>().unwrap();
Ok(get_next_message(from, to))
})
.unwrap();
module
.register_method("ack", |args, _| {
let args = args.parse::<(Service, u64, Vec<u8>)>().unwrap();
let args = args.parse::<(Service, Service, u64, Vec<u8>)>().unwrap();
ack_message(
args.0,
args.1,
SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(),
args.2,
SchnorrSignature::<Ristretto>::read(&mut args.3.as_slice()).unwrap(),
);
Ok(true)
})
Expand Down
6 changes: 4 additions & 2 deletions message-queue/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ pub fn message_challenge(
}

pub fn ack_challenge(
to: Service,
to_key: <Ristretto as Ciphersuite>::G,
from: Service,
from_key: <Ristretto as Ciphersuite>::G,
id: u64,
nonce: <Ristretto as Ciphersuite>::G,
) -> <Ristretto as Ciphersuite>::F {
let mut transcript = RecommendedTranscript::new(b"Serai Message Queue v0.1 Ackowledgement");
transcript.domain_separate(b"metadata");
transcript.append_message(b"to", bincode::serialize(&to).unwrap());
transcript.append_message(b"to_key", to_key.to_bytes());
transcript.append_message(b"from", bincode::serialize(&from).unwrap());
transcript.append_message(b"from_key", from_key.to_bytes());
transcript.domain_separate(b"message");
transcript.append_message(b"id", id.to_le_bytes());
transcript.domain_separate(b"signature");
Expand Down
Loading

0 comments on commit 40b7bc5

Please sign in to comment.