Skip to content

Commit

Permalink
Merge pull request #48 from EspressoSystems/rm/topic-version-vectors
Browse files Browse the repository at this point in the history
Make topic subscriptions version vectored
  • Loading branch information
rob-maron authored Jun 19, 2024
2 parents 7bf490b + 1bc0742 commit f3ed67d
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 139 deletions.
15 changes: 14 additions & 1 deletion cdn-broker/src/connections/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,27 @@ mod relational_map;
use std::collections::HashSet;

use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier, message::Topic};
use relational_map::RelationalMap;
use rkyv::{Archive, Deserialize, Serialize};

use self::relational_map::RelationalMap;
use super::versioned_map::VersionedMap;

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Archive, Clone)]
#[archive(check_bytes)]
pub enum SubscriptionStatus {
Subscribed,
Unsubscribed,
}

pub type TopicSyncMap = VersionedMap<Topic, SubscriptionStatus, u32>;

/// Our broadcast map is just two associative (bidirectional, multi) maps:
/// one for brokers and one for users.
pub struct BroadcastMap {
pub users: RelationalMap<UserPublicKey, Topic>,
pub brokers: RelationalMap<BrokerIdentifier, Topic>,

pub topic_sync_map: TopicSyncMap,
pub previous_subscribed_topics: HashSet<Topic>,
}

Expand All @@ -24,6 +36,7 @@ impl Default for BroadcastMap {
users: RelationalMap::new(),
brokers: RelationalMap::new(),
previous_subscribed_topics: HashSet::new(),
topic_sync_map: TopicSyncMap::new(0),
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions cdn-broker/src/connections/direct/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
//! This is where we define routing for direct messages.

mod versioned_map;

use cdn_proto::{connection::UserPublicKey, discovery::BrokerIdentifier};

use self::versioned_map::VersionedMap;
use super::versioned_map::VersionedMap;

/// We define the direct map as just a type alias of a `VersionedMap`, which
// deals with version vectors.
Expand Down
58 changes: 42 additions & 16 deletions cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@

use std::collections::{HashMap, HashSet};

use broadcast::BroadcastMap;
use cdn_proto::{
connection::{protocols::Connection, UserPublicKey},
discovery::BrokerIdentifier,
message::Topic,
mnemonic,
};
pub use direct::DirectMap;
use tokio::task::AbortHandle;
use tracing::{error, info, warn};
pub use {
broadcast::{SubscriptionStatus, TopicSyncMap},
direct::DirectMap,
};

use crate::metrics;

use self::broadcast::BroadcastMap;

mod broadcast;
mod direct;
mod versioned_map;

pub struct Connections {
// Our identity. Used for versioned vector conflict resolution.
Expand Down Expand Up @@ -135,30 +138,53 @@ impl Connections {

/// Get the full list of topics that we are interested in.
/// We send this to new brokers when they start.
pub fn get_full_topic_sync(&self) -> Vec<Topic> {
self.broadcast_map.users.get_values()
pub fn get_full_topic_sync(&self) -> TopicSyncMap {
// Create an empty map
let mut map = TopicSyncMap::new(0);

// Add all topics we are subscribed to.
// The initial version will be 0.
for topic in self.broadcast_map.users.get_values() {
map.insert(topic, SubscriptionStatus::Subscribed);
}

map
}

/// Get the partial list of topics that we are interested in. Returns the
/// additions and removals as a tuple `(a, r)` in that order. We send this
/// to other brokers whenever there are changes.
pub fn get_partial_topic_sync(&mut self) -> (Vec<Topic>, Vec<Topic>) {
/// Get the partial list of topics that we are interested in.
/// We send this to existing brokers every so often.
pub fn get_partial_topic_sync(&mut self) -> Option<TopicSyncMap> {
// Lock the maps
let previous = &mut self.broadcast_map.previous_subscribed_topics;
let now = HashSet::from_iter(self.broadcast_map.users.get_values());

// Calculate additions and removals
let added = now.difference(previous);
let removed = previous.difference(&now);
let added: Vec<u8> = now.difference(previous).copied().collect();
let removed: Vec<u8> = previous.difference(&now).copied().collect();

// Clone them
let differences = (added.copied().collect(), removed.copied().collect());
// If there are no changes, return `None`
if added.is_empty() && removed.is_empty() {
return None;
}

// Set the previous to the new one
// Set the previous to the new values
previous.clone_from(&now);

// Return the differences
differences
// Update the topic sync map
for topic in added {
self.broadcast_map
.topic_sync_map
.insert(topic, SubscriptionStatus::Subscribed);
}

for topic in removed {
self.broadcast_map
.topic_sync_map
.insert(topic, SubscriptionStatus::Unsubscribed);
}

// Return the partial map
Some(self.broadcast_map.topic_sync_map.diff())
}

/// Get all the brokers we are connected to. We use this to forward
Expand Down
File renamed without changes.
88 changes: 56 additions & 32 deletions cdn-broker/src/tasks/broker/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use cdn_proto::{
use tokio::{spawn, time::timeout};
use tracing::{debug, error};

use crate::{connections::DirectMap, Inner};
use crate::{
connections::{DirectMap, SubscriptionStatus, TopicSyncMap},
Inner,
};

impl<Def: RunDef> Inner<Def> {
/// This function is the callback for handling a broker (private) connection.
Expand Down Expand Up @@ -79,33 +82,32 @@ impl<Def: RunDef> Inner<Def> {
})
.abort_handle();

// Add to our list of connections, removing the old one if it exists
self.connections
.write()
.add_broker(broker_identifier.clone(), connection, receive_handle);

// Send a full topic sync
if let Err(err) = self.full_topic_sync(&broker_identifier).await {
error!("failed to perform full topic sync: {err}");

// Remove the broker if we fail the initial sync
self.connections
.write()
.remove_broker(&broker_identifier, "failed to send full topic sync");

return;
};

// Send a full user sync
if let Err(err) = self.full_user_sync(&broker_identifier).await {
error!("failed to perform full user sync: {err}");
return;
};

// If we have `strong-consistency` enabled, send partials
#[cfg(feature = "strong-consistency")]
{
if let Err(err) = self.partial_topic_sync().await {
error!("failed to perform partial topic sync: {err}");
}
if let Err(err) = self.partial_user_sync().await {
error!("failed to perform partial user sync: {err}");
}
}

// Add to our broker and remove the old one if it exists
self.connections
.write()
.add_broker(broker_identifier, connection, receive_handle);
// Remove the broker if we fail the initial sync
self.connections
.write()
.remove_broker(&broker_identifier, "failed to send full user sync");
};
}

/// This is the default loop for handling broker connections
Expand All @@ -114,6 +116,9 @@ impl<Def: RunDef> Inner<Def> {
broker_identifier: &BrokerIdentifier,
connection: Connection,
) -> Result<()> {
// The broker's topic sync map
let mut topic_sync_map = TopicSyncMap::new(0);

loop {
// Receive a message from the broker
let raw_message = connection.recv_message_raw().await?;
Expand All @@ -138,20 +143,6 @@ impl<Def: RunDef> Inner<Def> {
.await;
}

// If we receive a subscribe message from a broker, we add them as "interested" locally.
Message::Subscribe(subscribe) => {
self.connections
.write()
.subscribe_broker_to(broker_identifier, subscribe);
}

// If we receive a subscribe message from a broker, we remove them as "interested" locally.
Message::Unsubscribe(unsubscribe) => {
self.connections
.write()
.unsubscribe_broker_from(broker_identifier, &unsubscribe);
}

// If we receive a `UserSync` message, we want to sync with our map
Message::UserSync(user_sync) => {
// Deserialize via `rkyv`
Expand All @@ -164,6 +155,39 @@ impl<Def: RunDef> Inner<Def> {
self.connections.write().apply_user_sync(user_sync);
}

// If we receive a `TopicSync` message, we want to sync with our version of their map
Message::TopicSync(topic_sync) => {
// Deserialize via `rkyv`
let topic_sync: TopicSyncMap = bail!(
rkyv::from_bytes(&topic_sync),
Deserialize,
"failed to deserialize topic sync message"
);

// Merge the topic sync maps
let changed_topics = topic_sync_map.merge(topic_sync);

// For each key changed,
for topic in changed_topics {
// Get the actual value
let Some(value) = topic_sync_map.get(&topic) else {
return Err(Error::Parse("desynchronized topic sync map".to_string()));
};

// If the value is `Subscribed`, add the broker to the topic
if *value == SubscriptionStatus::Subscribed {
self.connections
.write()
.subscribe_broker_to(broker_identifier, vec![topic]);
} else {
// Otherwise, remove the broker from the topic
self.connections
.write()
.unsubscribe_broker_from(broker_identifier, &[topic]);
}
}
}

// Do nothing if we receive an unexpected message
_ => {}
}
Expand Down
75 changes: 27 additions & 48 deletions cdn-broker/src/tasks/broker/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ use tracing::error;
use crate::Inner;

macro_rules! prepare_sync_message {
($map: expr) => {{
($map: expr, $ty: expr) => {{
// Serialize the map using `rkyv`
let message = bail!(
rkyv::to_bytes::<_, 2048>(&$map),
Serialize,
"failed to serialize full user sync map"
"failed to serialize user sync map"
);

// Wrap the message in `UserSync` and serialize it
Bytes::from_unchecked(bail!(
Message::UserSync(message.to_vec()).serialize(),
$ty(message.to_vec()).serialize(),
Serialize,
"failed to serialize full user sync map"
"failed to serialize user sync map"
))
}};
}
Expand All @@ -45,8 +45,11 @@ impl<Def: RunDef> Inner<Def> {
let full_sync_map = self.connections.read().get_full_user_sync();

// Serialize and send the message to the broker
self.try_send_to_broker(broker, prepare_sync_message!(full_sync_map))
.await;
self.try_send_to_broker(
broker,
prepare_sync_message!(full_sync_map, Message::UserSync),
)
.await;

Ok(())
}
Expand All @@ -66,7 +69,7 @@ impl<Def: RunDef> Inner<Def> {
}

// Serialize the message
let raw_message = prepare_sync_message!(partial_sync_map);
let raw_message = prepare_sync_message!(partial_sync_map, Message::UserSync);

// Send to all brokers
self.try_send_to_brokers(&raw_message).await;
Expand All @@ -79,21 +82,14 @@ impl<Def: RunDef> Inner<Def> {
///
/// # Errors
/// - if we fail to serialize the message
pub async fn full_topic_sync(
self: &Arc<Self>,
broker_identifier: &BrokerIdentifier,
) -> Result<()> {
// Get full list of topics
let topics = self.connections.read().get_full_topic_sync();

// Send to broker
pub async fn full_topic_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
// Get full topic sync map
let full_sync_map = self.connections.read().get_full_topic_sync();

// Serialize and send the message to the broker
self.try_send_to_broker(
broker_identifier,
Bytes::from_unchecked(bail!(
Message::Subscribe(topics).serialize(),
Serialize,
"failed to serialize topics"
)),
broker,
prepare_sync_message!(full_sync_map, Message::TopicSync),
)
.await;

Expand All @@ -106,34 +102,17 @@ impl<Def: RunDef> Inner<Def> {
/// # Errors
/// - If we fail to serialize the message
pub async fn partial_topic_sync(self: &Arc<Self>) -> Result<()> {
// Get partial list of topics
let (additions, removals) = self.connections.write().get_partial_topic_sync();

// If we have some additions,
if !additions.is_empty() {
// Serialize the subscribe message
let raw_subscribe_message = Bytes::from_unchecked(bail!(
Message::Subscribe(additions).serialize(),
Serialize,
"failed to serialize topics"
));

// Send to all brokers
self.try_send_to_brokers(&raw_subscribe_message).await;
}
// Get partial topic sync map
let Some(partial_sync_map) = self.connections.write().get_partial_topic_sync() else {
// Return if we haven't had any changes
return Ok(());
};

// If we have some removals,
if !removals.is_empty() {
// Serialize the unsubscribe message
let raw_unsubscribe_message = Bytes::from_unchecked(bail!(
Message::Unsubscribe(removals).serialize(),
Serialize,
"failed to serialize topics"
));

// Send to all brokers
self.try_send_to_brokers(&raw_unsubscribe_message).await;
}
// Serialize the message
let raw_message = prepare_sync_message!(partial_sync_map, Message::TopicSync);

// Send to all brokers
self.try_send_to_brokers(&raw_message).await;

Ok(())
}
Expand Down
Loading

0 comments on commit f3ed67d

Please sign in to comment.