Skip to content

Commit

Permalink
add tests for topic sync
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Jun 20, 2024
1 parent f3ed67d commit 1c572e7
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 67 deletions.
232 changes: 211 additions & 21 deletions cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,22 @@ mod broadcast;
mod direct;
mod versioned_map;

/// A broker connection along with the topic sync information and
/// the task handle for the connection handler.
pub struct Broker {
pub connection: Connection,
pub topic_sync_map: TopicSyncMap,
pub handle: AbortHandle,
}

pub struct Connections {
// Our identity. Used for versioned vector conflict resolution.
identity: BrokerIdentifier,

// The current users connected to us, along with their running tasks
users: HashMap<UserPublicKey, (Connection, AbortHandle)>,
// The current brokers connected to us, along with their running tasks
brokers: HashMap<BrokerIdentifier, (Connection, AbortHandle)>,
// The current brokers connected to us
brokers: HashMap<BrokerIdentifier, Broker>,

// The versioned vector for looking up where direct messages should go
direct_map: DirectMap,
Expand Down Expand Up @@ -61,7 +69,9 @@ impl Connections {
&self,
broker_identifier: &BrokerIdentifier,
) -> Option<Connection> {
self.brokers.get(broker_identifier).map(|(c, _)| c.clone())
self.brokers
.get(broker_identifier)
.map(|b| b.connection.clone())
}

/// Get the connection for a given user public key (cloned)
Expand Down Expand Up @@ -114,14 +124,23 @@ impl Connections {

/// Get the full versioned vector map of user -> broker.
/// We send this to other brokers so they can merge it.
pub fn get_full_user_sync(&self) -> DirectMap {
self.direct_map.get_full()
pub fn get_full_user_sync(&self) -> Option<DirectMap> {
if self.direct_map.underlying_map.is_empty() {
None
} else {
Some(self.direct_map.clone())
}
}

/// Get the differences in the versioned vector map of user -> broker
/// We send this to other brokers so they can merge it.
pub fn get_partial_user_sync(&mut self) -> DirectMap {
self.direct_map.diff()
pub fn get_partial_user_sync(&mut self) -> Option<DirectMap> {
let diff = self.direct_map.diff();
if diff.is_empty() {
None
} else {
Some(diff)
}
}

/// Apply a received user sync map. Overwrites our values if they are old.
Expand All @@ -131,24 +150,48 @@ impl Connections {
let users_to_remove = self.direct_map.merge(map);

// We should remove the users that are different, if they exist locally.
for user in users_to_remove {
for (user, _new_broker) in users_to_remove {
self.remove_user(user, "user connected elsewhere");
}
}

/// Apply a received topic sync map. Overwrites our values if they are old.
pub fn apply_topic_sync(
&mut self,
broker_identifier: &BrokerIdentifier,
remote_map: TopicSyncMap,
) {
// Get the local map by broker identifier
let local_map = if let Some(broker) = self.brokers.get_mut(broker_identifier) {
&mut broker.topic_sync_map
} else {
self.remove_broker(broker_identifier, "broker did not exist");
return;
};

// Merge the topic sync maps
let changed_topics = local_map.merge(remote_map);

// For each key changed,
for (topic, status) in changed_topics {
// If the value is `Subscribed`, add the broker to the topic
if status == Some(SubscriptionStatus::Subscribed) {
self.subscribe_broker_to(broker_identifier, vec![topic]);
} else {
// Otherwise, remove the broker from the topic
self.unsubscribe_broker_from(broker_identifier, &[topic]);
}
}
}

/// Get the full list of topics that we are interested in.
/// We send this to new brokers when they start.
pub fn get_full_topic_sync(&self) -> TopicSyncMap {
// Create an empty map
let mut map = TopicSyncMap::new(0);

// Add all topics we are subscribed to.
// The initial version will be 0.
for topic in self.broadcast_map.users.get_values() {
map.insert(topic, SubscriptionStatus::Subscribed);
pub fn get_full_topic_sync(&self) -> Option<TopicSyncMap> {
if self.broadcast_map.topic_sync_map.underlying_map.is_empty() {
None
} else {
Some(self.broadcast_map.topic_sync_map.clone())
}

map
}

/// Get the partial list of topics that we are interested in.
Expand Down Expand Up @@ -207,7 +250,15 @@ impl Connections {
// Remove the old broker if it exists
self.remove_broker(&broker_identifier, "already existed");

self.brokers.insert(broker_identifier, (connection, handle));
// Insert into our map with a new topic sync map
self.brokers.insert(
broker_identifier,
Broker {
connection,
handle,
topic_sync_map: TopicSyncMap::new(0),
},
);
}

/// Insert a user into our map. Updates the versioned vector that
Expand Down Expand Up @@ -244,13 +295,13 @@ impl Connections {
/// from our broadcast map, in case they were subscribed to any topics.
pub fn remove_broker(&mut self, broker_identifier: &BrokerIdentifier, reason: &str) {
// Remove from broker list, cancelling the previous task if it exists
if let Some((_, task)) = self.brokers.remove(broker_identifier) {
if let Some(broker) = self.brokers.remove(broker_identifier) {
// Decrement the metric for the number of brokers connected
metrics::NUM_BROKERS_CONNECTED.dec();
error!(id = %broker_identifier, reason = reason, "broker disconnected");

// Cancel the task
task.abort();
broker.handle.abort();
};

// Remove from all topics
Expand Down Expand Up @@ -323,3 +374,142 @@ impl Connections {
.dissociate_keys_from_value(user_public_key, topics);
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use cdn_proto::{
connection::protocols::Connection, def::TestTopic, discovery::BrokerIdentifier,
};
use tokio::spawn;

use super::Connections;

/// Create a new broker identifier for testing
fn new_broker_identifier(namespace: &str) -> BrokerIdentifier {
format!("test/{namespace}")
.try_into()
.expect("failed to create broker identifier")
}

/// Test that subscribing and unsubscribing works as expected through use
/// of the topic sync map.
#[tokio::test]
async fn test_topic_sync() {
// The identifiers for the local and remote brokers
let local_broker_identifier: BrokerIdentifier = new_broker_identifier("local");
let remote_broker_identifier: BrokerIdentifier = new_broker_identifier("remote");

// Create the local map that needs to stay in sync
let mut local_map = Connections::new(local_broker_identifier.clone());
let connection = Connection::new_test();
let handle = spawn(async move {}).abort_handle();
local_map.add_broker(remote_broker_identifier.clone(), connection, handle);

// Create the remote map that will be having changes applied to it
let mut remote_map = Connections::new(remote_broker_identifier.clone());
let connection = Connection::new_test();
let handle = spawn(async move {}).abort_handle();
remote_map.add_broker(local_broker_identifier, connection, handle);

// Subscribe a user to topics `Global` and `DA` in the remote map
remote_map.subscribe_user_to(
&Arc::from(vec![1]),
vec![TestTopic::Global.into(), TestTopic::DA.into()],
);

// Get the full sync and make sure it is `None`
let full_sync = remote_map.get_full_topic_sync();
assert!(full_sync.is_none());

// Get a partial sync from remote and apply it locally
let partial_sync = remote_map.get_partial_topic_sync();
local_map.apply_topic_sync(&remote_broker_identifier, partial_sync.unwrap());

// Make sure we are subscribed to the Global topic
let (brokers, _) =
local_map.get_interested_by_topic(&vec![TestTopic::Global.into()], false);
assert!(brokers.len() == 1);
assert!(brokers.contains(&remote_broker_identifier));

// Make sure we are subscribed to the DA topic
let (brokers, _) = local_map.get_interested_by_topic(&vec![TestTopic::DA.into()], false);
assert!(brokers.len() == 1);
assert!(brokers.contains(&remote_broker_identifier));

// Unsubscribe the remote user from the Global topic
remote_map.unsubscribe_user_from(&Arc::from(vec![1]), &[TestTopic::Global.into()]);

// Perform another partial sync from remote -> local
let partial_sync = remote_map.get_partial_topic_sync();
local_map.apply_topic_sync(&remote_broker_identifier, partial_sync.unwrap());

// Make sure we are no longer subscribed to the Global topic
let (brokers, _) =
local_map.get_interested_by_topic(&vec![TestTopic::Global.into()], false);
assert!(brokers.is_empty());

// Make sure we are still subscribed to the DA topic
let (brokers, _) = local_map.get_interested_by_topic(&vec![TestTopic::DA.into()], false);
assert!(brokers.len() == 1);
assert!(brokers.contains(&remote_broker_identifier));
}

/// Test that subscribing and unsubscribing works as expected even
/// if messages are received and processed out of order.
#[tokio::test]
async fn test_topic_sync_out_of_order() {
// The identifiers for the local and remote brokers
let local_broker_identifier: BrokerIdentifier = new_broker_identifier("local");
let remote_broker_identifier: BrokerIdentifier = new_broker_identifier("remote");

// Create the local map that needs to stay in sync
let mut local_map = Connections::new(local_broker_identifier.clone());
let connection = Connection::new_test();
let handle = spawn(async move {}).abort_handle();
local_map.add_broker(remote_broker_identifier.clone(), connection, handle);

// Create the remote map that will be having changes applied to it
let mut remote_map = Connections::new(remote_broker_identifier.clone());
let connection = Connection::new_test();
let handle = spawn(async move {}).abort_handle();
remote_map.add_broker(local_broker_identifier, connection, handle);

// Subscribe a user to topics `Global` and `DA` in the remote map
remote_map.subscribe_user_to(
&Arc::from(vec![1]),
vec![TestTopic::Global.into(), TestTopic::DA.into()],
);

// Do a partial sync but don't apply it
let _partial_sync = remote_map.get_partial_topic_sync();

// Unsuscribe the user from both topics
remote_map.unsubscribe_user_from(&Arc::from(vec![1]), &[TestTopic::Global.into()]);
remote_map.unsubscribe_user_from(&Arc::from(vec![1]), &[TestTopic::DA.into()]);

// Do another partial sync and apply it
let partial_sync = remote_map.get_partial_topic_sync();
local_map.apply_topic_sync(&remote_broker_identifier, partial_sync.unwrap());

// Subscribe the user to the DA topic
remote_map.subscribe_user_to(&Arc::from(vec![1]), vec![TestTopic::DA.into()]);
let partial_sync = remote_map.get_partial_topic_sync();
local_map.apply_topic_sync(&remote_broker_identifier, partial_sync.unwrap());

// Perform a full sync
let full_sync = remote_map.get_full_topic_sync();
local_map.apply_topic_sync(&remote_broker_identifier, full_sync.unwrap());

// Make sure we are no longer subscribed to the Global topic
let (brokers, _) =
local_map.get_interested_by_topic(&vec![TestTopic::Global.into()], false);
assert!(brokers.is_empty());

// Make sure we are still subscribed to the DA topic
let (brokers, _) = local_map.get_interested_by_topic(&vec![TestTopic::DA.into()], false);
assert!(brokers.len() == 1);
assert!(brokers.contains(&remote_broker_identifier));
}
}
37 changes: 24 additions & 13 deletions cdn-broker/src/connections/versioned_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ type Tombstone<T> = Option<T>;

/// A `VersionedValue` defines a value with a global version that
/// we use for syncing purposes.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Archive)]
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Archive, Debug)]
#[archive(check_bytes)]
pub struct VersionedValue<T> {
version: u64,
value: Tombstone<T>,
}

#[derive(Clone, Archive, Serialize, Deserialize, Derivative)]
#[derive(Clone, Archive, Serialize, Deserialize, Derivative, Debug)]
#[archive(check_bytes)]
#[derivative(PartialEq)]
/// A data structure responsible for remaining eventually consistent. It does this by
Expand Down Expand Up @@ -60,6 +60,11 @@ impl<
}
}

/// Check if the map is empty.
pub fn is_empty(&self) -> bool {
self.underlying_map.is_empty()
}

/// Get a value from the underlying map, returning it as an optional reference. This maintains parity with
/// `HashMap`.
pub fn get(&self, k: &K) -> Option<&V> {
Expand Down Expand Up @@ -183,10 +188,13 @@ impl<
}

/// Merge the changes from two `VersionedMap`s, keeping only the newest changes. On a conflict,
/// use the `conflict_identity` to figure out who should get the value.
pub fn merge(&mut self, remote: Self) -> Vec<K> {
/// uses the `conflict_identity` to figure out who should get the value.
///
/// Returns a vector of `(K, Option<V>)` pairs that represent the changes that were made.
/// TODO: `Rc` or `Arc` the cloned values?
pub fn merge(&mut self, remote: Self) -> Vec<(K, Option<V>)> {
// We want to return the changes
let mut changes: Vec<K> = Vec::new();
let mut changes: Vec<(K, Option<V>)> = Vec::new();

// For each `(k,v)` pair that has allegedly changed,
for (remote_key, remote_value) in remote.underlying_map {
Expand All @@ -198,7 +206,7 @@ impl<
Ordering::Greater => {
if remote_value.value.is_some() {
// Update our value if it is something.
local_value.value = remote_value.value;
local_value.value.clone_from(&remote_value.value);
local_value.version = remote_value.version;
} else {
// Remove if they sent us a tombstone.
Expand All @@ -208,8 +216,8 @@ impl<
// Remove it from our locally modified keys, in case we also tried to update it.
self.locally_modified_keys.remove(&remote_key);

// Push to our changes that we return.
changes.push(remote_key);
// Push to the changes that we return.
changes.push((remote_key, remote_value.value));
}

// If the remote value is equal to our value,
Expand All @@ -220,7 +228,7 @@ impl<
// TODO: duplicate code here and above. macro it?
if remote_value.value.is_some() {
// Update our value
local_value.value = remote_value.value;
local_value.value.clone_from(&remote_value.value);
local_value.version = remote_value.version;
} else {
// Remove the value if it wa snothing
Expand All @@ -229,8 +237,8 @@ impl<

// Remove it from our locally modified keys, in case we also tried to update it.
self.locally_modified_keys.remove(&remote_key);
// Push to our changes that we return.
changes.push(remote_key);
// Push to the changes that we return.
changes.push((remote_key, remote_value.value));
}
}

Expand All @@ -241,8 +249,11 @@ impl<
// If we don't have a local value for it already,
if remote_value.value.is_some() {
// If the value is something, insert it
self.underlying_map.insert(remote_key.clone(), remote_value);
changes.push(remote_key);
self.underlying_map
.insert(remote_key.clone(), remote_value.clone());

// Push it to the changes that we return
changes.push((remote_key, remote_value.value));
}
};
}
Expand Down
Loading

0 comments on commit 1c572e7

Please sign in to comment.