Skip to content

Commit

Permalink
Merge pull request #42 from EspressoSystems/rm/move-topics-to-app-layer
Browse files Browse the repository at this point in the history
Move topics to application layer
  • Loading branch information
rob-maron authored May 6, 2024
2 parents 2d93e79 + 6d8c530 commit 966cdde
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 328 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() {
private_key,
})
// Subscribe to the global consensus topic
.subscribed_topics(vec![Topic::Global])
.subscribed_topics(vec![TestTopic::Global as u8])
.build()
.unwrap();

Expand Down Expand Up @@ -70,7 +70,7 @@ async fn main() {

// Send a broadcast message to the global topic
client
.send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec())
.send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec())
.await
.unwrap();

Expand All @@ -84,7 +84,7 @@ async fn main() {
assert!(
message
== Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"hello broadcast".to_vec()
})
);
Expand Down
14 changes: 9 additions & 5 deletions cdn-broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::time::Duration;
use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::message::{Broadcast, Message, Topic};
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Broadcast, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};

/// The function under bench for broadcasting a message to two users.
async fn broadcast_user(run: &TestRun) {
// Allocate a rather large message
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: vec![0; 10000],
});

Expand All @@ -28,7 +29,7 @@ async fn broadcast_user(run: &TestRun) {
async fn broadcast_broker(run: &TestRun) {
// Allocate a rather large message
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: vec![0; 10000],
});

Expand All @@ -48,7 +49,7 @@ fn bench_broadcast_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -71,7 +72,10 @@ fn bench_broadcast_broker(c: &mut Criterion) {
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![]],
connected_brokers: vec![(vec![], vec![Topic::Global]), (vec![], vec![Topic::Global])],
connected_brokers: vec![
(vec![], vec![TestTopic::Global as u8]),
(vec![], vec![TestTopic::Global as u8]),
],
};

run_definition.into_run().await
Expand Down
15 changes: 8 additions & 7 deletions cdn-broker/benches/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::time::Duration;
use cdn_broker::reexports::tests::{TestDefinition, TestRun};
use cdn_broker::{assert_received, send_message_as};
use cdn_proto::connection::{protocols::Connection as _, Bytes};
use cdn_proto::message::{Direct, Message, Topic};
use cdn_proto::def::TestTopic;
use cdn_proto::message::{Direct, Message};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};

Expand Down Expand Up @@ -75,7 +76,7 @@ fn bench_direct_user_to_self(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -98,7 +99,7 @@ fn bench_direct_user_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![],
};

Expand All @@ -121,8 +122,8 @@ fn bench_direct_user_to_broker(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_brokers: vec![(vec![2], vec![Topic::Global])],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
};

run_definition.into_run().await
Expand All @@ -144,8 +145,8 @@ fn bench_direct_broker_to_user(c: &mut Criterion) {
// Set up our broker under test
let run = benchmark_runtime.block_on(async move {
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global]],
connected_brokers: vec![(vec![2], vec![Topic::Global])],
connected_users: vec![vec![TestTopic::Global as u8], vec![TestTopic::Global as u8]],
connected_brokers: vec![(vec![2], vec![TestTopic::Global as u8])],
};

run_definition.into_run().await
Expand Down
2 changes: 1 addition & 1 deletion cdn-broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl<Def: RunDef> Connections<Def> {
let removed = previous.difference(&now);

// Clone them
let differences = (added.cloned().collect(), removed.cloned().collect());
let differences = (added.copied().collect(), removed.copied().collect());

// Set the previous to the new one
*previous = now.clone();
Expand Down
31 changes: 16 additions & 15 deletions cdn-broker/src/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;

use cdn_proto::{
connection::{protocols::Connection, Bytes},
message::{Broadcast, Message, Topic},
def::TestTopic,
message::{Broadcast, Message},
};
use tokio::time::{sleep, timeout};

Expand All @@ -21,13 +22,13 @@ async fn test_broadcast_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![Topic::Global, Topic::DA],
vec![Topic::DA],
vec![Topic::Global],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![4], vec![Topic::Global, Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
],
};
Expand All @@ -40,7 +41,7 @@ async fn test_broadcast_user() {

// Create a broadcast message with the global topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"test broadcast global".to_vec(),
});

Expand All @@ -59,7 +60,7 @@ async fn test_broadcast_user() {

// Now we test the DA topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::DA],
topics: vec![TestTopic::DA as u8],
message: b"test broadcast DA".to_vec(),
});

Expand Down Expand Up @@ -87,13 +88,13 @@ async fn test_broadcast_broker() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![
vec![Topic::Global, Topic::DA],
vec![Topic::DA],
vec![Topic::Global],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
vec![TestTopic::DA as u8],
vec![TestTopic::Global as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![4], vec![Topic::Global, Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![4], vec![TestTopic::Global as u8, TestTopic::DA as u8]),
(vec![5], vec![]),
],
};
Expand All @@ -106,7 +107,7 @@ async fn test_broadcast_broker() {

// Create a broadcast message with the global topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"test broadcast global".to_vec(),
});

Expand All @@ -124,7 +125,7 @@ async fn test_broadcast_broker() {

// Now we test the DA topic
let message = Message::Broadcast(Broadcast {
topics: vec![Topic::DA],
topics: vec![TestTopic::DA as u8],
message: b"test broadcast DA.".to_vec(),
});

Expand Down
24 changes: 17 additions & 7 deletions cdn-broker/src/tests/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::time::Duration;

use cdn_proto::{
connection::{protocols::Connection, Bytes},
message::{Direct, Message, Topic},
def::TestTopic,
message::{Direct, Message},
};
use tokio::time::{sleep, timeout};

Expand All @@ -21,9 +22,12 @@ use crate::{assert_received, send_message_as};
async fn test_direct_user_to_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![2], vec![Topic::DA]),
(vec![2], vec![TestTopic::DA as u8]),
(vec![3], vec![]),
(vec![4], vec![]),
],
Expand Down Expand Up @@ -73,9 +77,12 @@ async fn test_direct_user_to_user() {
async fn test_direct_user_to_broker() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![2], vec![]),
(vec![4], vec![]),
],
Expand Down Expand Up @@ -109,9 +116,12 @@ async fn test_direct_user_to_broker() {
async fn test_direct_broker_to_user() {
// This run definition: 3 brokers, 6 users
let run_definition = TestDefinition {
connected_users: vec![vec![Topic::Global], vec![Topic::Global, Topic::DA]],
connected_users: vec![
vec![TestTopic::Global as u8],
vec![TestTopic::Global as u8, TestTopic::DA as u8],
],
connected_brokers: vec![
(vec![3], vec![Topic::DA]),
(vec![3], vec![TestTopic::DA as u8]),
(vec![2], vec![]),
(vec![4], vec![]),
],
Expand Down
7 changes: 5 additions & 2 deletions cdn-client/src/binaries/bad-connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use std::time::Duration;

use cdn_client::{Client, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic};
use cdn_proto::{
crypto::signature::KeyPair,
def::{ProductionClientConnection, TestTopic},
};
use clap::Parser;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
Expand Down Expand Up @@ -52,7 +55,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down
9 changes: 6 additions & 3 deletions cdn-client/src/binaries/bad-sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! This is useful for testing the broker's ability to handle many messages.

use cdn_client::{Client, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionClientConnection, message::Topic};
use cdn_proto::{
crypto::signature::KeyPair,
def::{ProductionClientConnection, TestTopic},
};
use clap::Parser;
use jf_primitives::signatures::{
bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme,
Expand Down Expand Up @@ -48,7 +51,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down Expand Up @@ -77,7 +80,7 @@ async fn main() {

// Send a direct message to ourselves
if let Err(e) = client
.send_broadcast_message(vec![Topic::Global], message.clone())
.send_broadcast_message(vec![TestTopic::Global as u8], message.clone())
.await
{
println!("failed to send broadcast message: {e:?}");
Expand Down
10 changes: 5 additions & 5 deletions cdn-client/src/binaries/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::time::Duration;
use cdn_client::{Client, Config};
use cdn_proto::{
crypto::signature::{KeyPair, Serializable},
def::ProductionClientConnection,
message::{Broadcast, Direct, Message, Topic},
def::{ProductionClientConnection, TestTopic},
message::{Broadcast, Direct, Message},
};
use clap::Parser;
use jf_primitives::signatures::{
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn main() {
public_key,
private_key,
},
subscribed_topics: vec![Topic::Global],
subscribed_topics: vec![TestTopic::Global as u8],
use_local_authority: true,
};

Expand Down Expand Up @@ -91,7 +91,7 @@ async fn main() {

// Send a broadcast message to the global topic
client
.send_broadcast_message(vec![Topic::Global], b"hello broadcast".to_vec())
.send_broadcast_message(vec![TestTopic::Global as u8], b"hello broadcast".to_vec())
.await
.expect("failed to send message");
info!("broadcasted \"hello broadcast\" to ourselves");
Expand All @@ -106,7 +106,7 @@ async fn main() {
assert!(
message
== Message::Broadcast(Broadcast {
topics: vec![Topic::Global],
topics: vec![TestTopic::Global as u8],
message: b"hello broadcast".to_vec()
})
);
Expand Down
1 change: 1 addition & 0 deletions cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ description = "Contains the common protocol definition and common code for the b

[build-dependencies]
rcgen.workspace = true
capnpc = "0.19"

[features]
metrics = ["dep:prometheus"]
Expand Down
Loading

0 comments on commit 966cdde

Please sign in to comment.