Skip to content

Commit

Permalink
assert topics were included in application layer
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed May 7, 2024
1 parent 166e68d commit 3e7960d
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 18 deletions.
57 changes: 57 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 15 additions & 10 deletions cdn-broker/src/tasks/broker/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{sync::Arc, time::Duration};
use cdn_proto::{
authenticate_with_broker, bail,
connection::{auth::broker::BrokerAuth, protocols::Connection as _, Bytes, UserPublicKey},
def::{Connection, RunDef},
def::{Connection, RunDef, Topic as _},
discovery::BrokerIdentifier,
error::{Error, Result},
message::{Message, Topic},
Expand Down Expand Up @@ -131,20 +131,28 @@ impl<Def: RunDef> Inner<Def> {

// If we receive a broadcast message from a broker, we want to send it to all interested users
Message::Broadcast(ref broadcast) => {
let topics = broadcast.topics.clone();
// Get and prune the topics
let mut topics = broadcast.topics.clone();
Def::Topic::prune(&mut topics)?;

self.handle_broadcast_message(topics, &raw_message, true);
self.handle_broadcast_message(&topics, &raw_message, true);
}

// If we receive a subscribe message from a broker, we add them as "interested" locally.
Message::Subscribe(subscribe) => {
Message::Subscribe(mut subscribe) => {
// Prune the topics
Def::Topic::prune(&mut subscribe)?;

self.connections
.write()
.subscribe_broker_to(broker_identifier, subscribe);
}

// If we receive a subscribe message from a broker, we remove them as "interested" locally.
Message::Unsubscribe(unsubscribe) => {
Message::Unsubscribe(mut unsubscribe) => {
// Prune the topics
Def::Topic::prune(&mut unsubscribe)?;

self.connections
.write()
.unsubscribe_broker_from(broker_identifier, &unsubscribe);
Expand Down Expand Up @@ -212,18 +220,15 @@ impl<Def: RunDef> Inner<Def> {
/// This function handles broadcast messages from users and brokers.
pub fn handle_broadcast_message(
self: &Arc<Self>,
mut topics: Vec<Topic>,
topics: &[Topic],
message: &Bytes,
to_users_only: bool,
) {
// Deduplicate topics
topics.dedup();

// Get the list of actors interested in the topics
let (interested_brokers, interested_users) = self
.connections
.read()
.get_interested_by_topic(&topics, to_users_only);
.get_interested_by_topic(&topics.to_vec(), to_users_only);

// Debug log the broadcast
debug!(
Expand Down
27 changes: 21 additions & 6 deletions cdn-broker/src/tasks/user/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;

use cdn_proto::connection::{protocols::Connection as _, UserPublicKey};
use cdn_proto::def::{Connection, RunDef};
use cdn_proto::def::{Connection, RunDef, Topic as _};
use cdn_proto::error::{Error, Result};
use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message, mnemonic};
use tokio::spawn;
Expand All @@ -18,7 +18,8 @@ impl<Def: RunDef> Inner<Def> {
/// This function handles a user (public) connection.
pub async fn handle_user_connection(self: Arc<Self>, connection: Connection<Def::User>) {
// Verify (authenticate) the connection. Needs to happen within 5 seconds
let Ok(Ok((public_key, topics))) = timeout(
// TODO: make this stateless (e.g. separate subscribe message on connect)
let Ok(Ok((public_key, mut topics))) = timeout(
Duration::from_secs(5),
BrokerAuth::<Def>::verify_user(
&connection,
Expand All @@ -32,6 +33,12 @@ impl<Def: RunDef> Inner<Def> {
return;
};

// Prune the supplied topics
//
// We don't care about the error because we want to allow users
// to connect without subscribing to any topics.
let _ = Def::Topic::prune(&mut topics);

// Create a human-readable user identifier (by public key)
let public_key = UserPublicKey::from(public_key);
let user_identifier = mnemonic(&public_key);
Expand Down Expand Up @@ -100,21 +107,29 @@ impl<Def: RunDef> Inner<Def> {

// If we get a broadcast message from a user, send it to both brokers and users.
Message::Broadcast(ref broadcast) => {
let topics = broadcast.topics.clone();
// Get and prune the topics
let mut topics = broadcast.topics.clone();
Def::Topic::prune(&mut topics)?;

self.handle_broadcast_message(topics, &raw_message, false);
self.handle_broadcast_message(&topics, &raw_message, false);
}

// Subscribe messages from users will just update the state locally
Message::Subscribe(subscribe) => {
Message::Subscribe(mut subscribe) => {
// Prune the topics
Def::Topic::prune(&mut subscribe)?;

// TODO: add handle functions for this to make it easier to read
self.connections
.write()
.subscribe_user_to(public_key, subscribe);
}

// Unsubscribe messages from users will just update the state locally
Message::Unsubscribe(unsubscribe) => {
Message::Unsubscribe(mut unsubscribe) => {
// Prune the topics
Def::Topic::prune(&mut unsubscribe)?;

self.connections
.write()
.unsubscribe_user_from(public_key, &unsubscribe);
Expand Down
1 change: 1 addition & 0 deletions cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ rkyv.workspace = true
mnemonic = "1"
rcgen.workspace = true
derivative.workspace = true
num_enum = "0.7"
31 changes: 30 additions & 1 deletion cdn-proto/src/def.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Compile-time run configuration for all CDN components.

use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS;
use num_enum::{IntoPrimitive, TryFromPrimitive};

use crate::connection::middleware::{
Middleware as MiddlewareType, NoMiddleware, TrustedMiddleware, UntrustedMiddleware,
Expand All @@ -10,19 +11,45 @@ use crate::connection::protocols::{quic::Quic, tcp::Tcp, Protocol as ProtocolTyp
use crate::crypto::signature::SignatureScheme;
use crate::discovery::embedded::Embedded;
use crate::discovery::{redis::Redis, DiscoveryClient};
use crate::error::{Error, Result};

/// The test topics for the CDN.
/// An implementation of `Topic` for testing purposes.
#[repr(u8)]
#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)]
pub enum TestTopic {
Global = 0,
DA = 1,
}

/// Defines the topic type for CDN messages
pub trait Topic: Into<u8> + TryFrom<u8> + Clone + Send + Sync {
/// Prunes the topics to only include valid topics.
///
/// # Errors
/// - If no valid topics are supplied
fn prune(topics: &mut Vec<u8>) -> Result<()> {
// Deduplicate the topics
topics.dedup();

// Retain only the topics that can be converted to the desired type
topics.retain(|topic| Self::try_from(*topic).is_ok());

// Make sure we have at least one topic
if topics.is_empty() {
Err(Error::Parse("supplied no valid topics".to_string()))
} else {
Ok(())
}
}
}
impl Topic for TestTopic {}

/// This trait defines the run configuration for all CDN components.
pub trait RunDef: 'static {
type Broker: ConnectionDef;
type User: ConnectionDef;
type DiscoveryClientType: DiscoveryClient;
type Topic: Topic;
}

/// This trait defines the connection configuration for a single CDN component.
Expand All @@ -39,6 +66,7 @@ impl RunDef for ProductionRunDef {
type Broker = ProductionBrokerConnection;
type User = ProductionUserConnection;
type DiscoveryClientType = Redis;
type Topic = TestTopic;
}

/// The production broker connection configuration.
Expand Down Expand Up @@ -77,6 +105,7 @@ impl RunDef for TestingRunDef {
type Broker = TestingConnection;
type User = TestingConnection;
type DiscoveryClientType = Embedded;
type Topic = TestTopic;
}

/// The testing connection configuration.
Expand Down
58 changes: 57 additions & 1 deletion tests/src/tests/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use cdn_proto::{
def::TestTopic,
message::{Broadcast, Message},
};
use tokio::time::timeout;
use tokio::time::{sleep, timeout};

use super::*;

Expand Down Expand Up @@ -107,3 +107,59 @@ async fn test_subscribe() {
.await
.is_err());
}

/// Test that subscribing to an invalid topic kills the connection.
#[tokio::test]
async fn test_invalid_subscribe() {
// Get a temporary path for the discovery endpoint
let discovery_endpoint = get_temp_db_path();

// Create and start a new broker
new_broker(0, "8098", "8099", &discovery_endpoint).await;

// Create and start a new marshal
new_marshal("8100", &discovery_endpoint).await;

// Create and get the handle to a new client subscribed to an invalid topic
let client = new_client(0, vec![99], "8100");

// Ensure the connection is open
let Ok(()) = timeout(Duration::from_secs(1), client.ensure_initialized()).await else {
panic!("client failed to connect");
};

// Subscribe to an invalid topic
let _ = client.subscribe(vec![99]).await;

// Sleep for a bit to allow the client to disconnect
sleep(Duration::from_millis(50)).await;

// Assert we can't send a message (as we are disconnected)
assert!(
client
.send_broadcast_message(vec![1], b"hello invalid".to_vec())
.await
.is_err(),
"sent message but should've been disconnected"
);

// Reinitialize the connection
let Ok(()) = timeout(Duration::from_secs(4), client.ensure_initialized()).await else {
panic!("client failed to connect");
};

// Unsubscribe from the invalid topic
let _ = client.unsubscribe(vec![99]).await;

// Sleep for a bit to allow the client to disconnect
sleep(Duration::from_millis(50)).await;

// Assert we can't send a message (as we are disconnected)
assert!(
client
.send_broadcast_message(vec![1], b"hello invalid".to_vec())
.await
.is_err(),
"sent message but should've been disconnected"
);
}

0 comments on commit 3e7960d

Please sign in to comment.