Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: concurrency insert between pending and delivered (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd authored Mar 19, 2024
1 parent a5299c8 commit bd5e3f5
Show file tree
Hide file tree
Showing 27 changed files with 352 additions and 52 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/docker_build_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
workflow_file_name: topos:integration-tests.yml
ref: main
wait_interval: 60
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }'
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "topos-smart-contracts-docker-tag": "3.2.0" }'

frontend-erc20-e2e:
runs-on: ubuntu-latest
Expand All @@ -59,4 +59,4 @@ jobs:
workflow_file_name: frontend:erc20-messaging.yml
ref: main
wait_interval: 60
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}" }'
client_payload: '{ "topos-docker-tag": "${{ env.docker_tag }}", "topos-smart-contracts-docker-tag": "3.2.0" }'
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ hyper.workspace = true
prost.workspace = true

topos-core = { path = "../topos-core/" }
ip_network = "0.4.1"

[dev-dependencies]
libp2p-swarm-test = "0.3.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl NetworkBehaviour for Behaviour {
_ => {}
},
gossipsub::Event::Subscribed { peer_id, topic } => {
debug!("Subscribed to {:?} with {peer_id}", topic);
debug!("{peer_id} subscribed to {:?}", topic);

// If the behaviour isn't already healthy we check if this event
// triggers a switch to healthy
Expand All @@ -268,7 +268,7 @@ impl NetworkBehaviour for Behaviour {
}
}
gossipsub::Event::Unsubscribed { peer_id, topic } => {
debug!("Unsubscribed from {:?} with {peer_id}", topic);
debug!("{peer_id} unsubscribed from {:?}", topic);
}
gossipsub::Event::GossipsubNotSupported { peer_id } => {
debug!("Gossipsub not supported by {:?}", peer_id);
Expand Down
3 changes: 1 addition & 2 deletions crates/topos-p2p/src/behaviour/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ pub struct PeerInfoBehaviour {

impl PeerInfoBehaviour {
pub(crate) fn new(identify_protocol: &'static str, peer_key: &Keypair) -> PeerInfoBehaviour {
let ident_config = IdentifyConfig::new(identify_protocol.to_string(), peer_key.public())
.with_push_listen_addr_updates(true);
let ident_config = IdentifyConfig::new(identify_protocol.to_string(), peer_key.public());

let identify = Identify::new(ident_config);

Expand Down
2 changes: 2 additions & 0 deletions crates/topos-p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub struct NetworkConfig {
pub discovery: DiscoveryConfig,
pub yamux_max_buffer_size: usize,
pub yamux_window_size: Option<u32>,
pub allow_private_ip: bool,
}

impl Default for NetworkConfig {
Expand All @@ -16,6 +17,7 @@ impl Default for NetworkConfig {
discovery: Default::default(),
yamux_max_buffer_size: usize::MAX,
yamux_window_size: None,
allow_private_ip: false,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ impl<'a> NetworkBuilder<'a> {
self
}

#[doc(hidden)]
pub fn allow_private_ip(mut self, allow_private_ip: bool) -> Self {
self.config.allow_private_ip = allow_private_ip;

self
}

pub fn store(mut self, store: MemoryStore) -> Self {
self.store = Some(store);

Expand Down
1 change: 1 addition & 0 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
let behaviour = self.swarm.behaviour();

if let Some(event) = self.healthy_status_changed() {
debug!("Healthy status changed: {:?}", event);
_ = self.event_sender.send(event).await;
}

Expand Down
36 changes: 26 additions & 10 deletions crates/topos-p2p/src/runtime/handle_event/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use libp2p::identify::{Event as IdentifyEvent, Info as IdentifyInfo};
use ip_network::IpNetwork;
use libp2p::{
identify::{Event as IdentifyEvent, Info as IdentifyInfo},
multiaddr::Protocol,
Multiaddr,
};
use tracing::info;

use crate::{constants::PEER_INFO_PROTOCOL, Runtime};
Expand All @@ -22,19 +27,30 @@ impl EventHandler<Box<IdentifyEvent>> for Runtime {
{
self.peer_set.insert(peer_id);
for addr in listen_addrs {
info!(
"Adding self-reported address {} from {} to Kademlia DHT.",
addr, peer_id
);
self.swarm
.behaviour_mut()
.discovery
.inner
.add_address(&peer_id, addr);
if self.config.allow_private_ip || is_global_addr(&addr) {
info!(
"Adding self-reported address {} from {} to Kademlia DHT.",
addr, peer_id
);
self.swarm
.behaviour_mut()
.discovery
.inner
.add_address(&peer_id, addr);
}
}
}
}

Ok(())
}
}

pub fn is_global_addr(addr: &Multiaddr) -> bool {
match addr.iter().next() {
Some(Protocol::Dns(_)) | Some(Protocol::Dns4(_)) | Some(Protocol::Dns6(_)) => true,
Some(Protocol::Ip4(ip)) => IpNetwork::from(ip).is_global(),
Some(Protocol::Ip6(ip)) => IpNetwork::from(ip).is_global(),
_ => false,
}
}
2 changes: 1 addition & 1 deletion crates/topos-tce-api/src/graphql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl ServerBuilder {
.take()
.expect("Cannot build GraphQL server without a FullNode store");

let fullnode_store = store.get_fullnode_store();
let fullnode_store = store.fullnode_store();
let runtime = self
.runtime
.take()
Expand Down
4 changes: 4 additions & 0 deletions crates/topos-tce-api/tests/grpc/certificate_precedence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ async fn fetch_latest_pending_certificates() {

assert!(validator_store
.insert_pending_certificate(&certificates[1].certificate)
.await
.unwrap()
.is_none());

assert!(validator_store
.insert_pending_certificate(&certificates[0].certificate)
.await
.unwrap()
.is_some());

Expand Down Expand Up @@ -82,12 +84,14 @@ async fn fetch_latest_pending_certificates_with_conflicts() {
for certificate in certificates.iter().skip(1) {
assert!(validator_store
.insert_pending_certificate(&certificate.certificate)
.await
.unwrap()
.is_none());
}

assert!(validator_store
.insert_pending_certificate(&certificates[0].certificate)
.await
.unwrap()
.is_some());

Expand Down
8 changes: 6 additions & 2 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ async fn get_pending_pool(
create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
_ = store
.insert_pending_certificate(&certificate.certificate)
.await;
}

let storage_client = StorageClient::new(store.clone());
Expand Down Expand Up @@ -800,7 +802,9 @@ async fn check_precedence(
create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
_ = store
.insert_pending_certificate(&certificate.certificate)
.await;
}

let storage_client = StorageClient::new(store.clone());
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc<ValidatorStore>
for cert in &certificates {
_ = validator_store
.insert_pending_certificate(&cert.certificate)
.await
.unwrap();
}

Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::constants::*;
use topos_test_sdk::storage::create_validator_store;

mod task;
mod task_manager;

const CHANNEL_SIZE: usize = 10;
Expand Down Expand Up @@ -189,6 +190,7 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams)
_ = ctx
.validator_store
.insert_pending_certificate(&dummy_cert)
.await
.unwrap();

assert!(matches!(
Expand Down Expand Up @@ -243,6 +245,7 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) {
_ = ctx
.validator_store
.insert_pending_certificate(&dummy_cert)
.await
.unwrap();

assert!(matches!(
Expand Down
84 changes: 84 additions & 0 deletions crates/topos-tce-broadcast/src/tests/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::{future::IntoFuture, sync::Arc, time::Duration};

use rstest::rstest;
use tokio::{
spawn,
sync::{broadcast, mpsc},
};
use topos_core::uci::Certificate;
use topos_crypto::{messages::MessageSigner, validator_id::ValidatorId};
use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::{
certificates::create_certificate_chain,
constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1},
crypto::message_signer,
storage::create_validator_store,
};

use crate::{
double_echo::broadcast_state::BroadcastState, event::ProtocolEvents,
sampler::SubscriptionsView, task_manager::task::Task,
};

#[rstest]
#[test_log::test(tokio::test)]
#[timeout(Duration::from_secs(1))]
async fn start_with_ungossiped_cert(
#[future(awt)]
#[from(create_validator_store)]
validatore_store: Arc<ValidatorStore>,
message_signer: Arc<MessageSigner>,
) {
let certificate = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 1)
.pop()
.unwrap()
.certificate;
let certificate_id = certificate.id;
let validator_id = ValidatorId::default();
let thresholds = topos_config::tce::broadcast::ReliableBroadcastParams {
echo_threshold: 1,
ready_threshold: 1,
delivery_threshold: 1,
};
let (event_sender, mut event_receiver) = mpsc::channel(2);
let (broadcast_sender, _) = broadcast::channel(1);
let need_gossip = true;
let subscriptions = SubscriptionsView::default();

let broadcast_state = BroadcastState::new(
certificate,
validator_id,
thresholds.echo_threshold,
thresholds.ready_threshold,
thresholds.delivery_threshold,
event_sender,
subscriptions,
need_gossip,
message_signer,
);

let (task, _ctx) = Task::new(
certificate_id,
broadcast_state,
validatore_store,
broadcast_sender,
);

let _handle = spawn(task.into_future());

let event = event_receiver.recv().await;
assert!(matches!(
event,
Some(ProtocolEvents::Broadcast {
certificate_id: id
}) if id == certificate_id
));

let event = event_receiver.recv().await;
assert!(matches!(
event,
Some(ProtocolEvents::Gossip {
cert: Certificate { id, .. }
}) if id == certificate_id
));
}
16 changes: 8 additions & 8 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{str::FromStr, sync::Arc};
use std::sync::Arc;

use rstest::rstest;
use tokio::{
Expand All @@ -12,15 +12,20 @@ use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::{
certificates::create_certificate_chain,
constants::{SOURCE_SUBNET_ID_1, TARGET_SUBNET_ID_1},
crypto::message_signer,
storage::create_validator_store,
};

use crate::{sampler::SubscriptionsView, task_manager::TaskManager};

#[rstest]
#[tokio::test]
async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
let validator_store = create_validator_store.await;
async fn can_start(
#[future(awt)]
#[from(create_validator_store)]
validator_store: Arc<ValidatorStore>,
message_signer: Arc<MessageSigner>,
) {
let (message_sender, message_receiver) = mpsc::channel(1);
let (event_sender, _) = mpsc::channel(1);
let (broadcast_sender, _) = broadcast::channel(1);
Expand All @@ -32,11 +37,6 @@ async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
delivery_threshold: 1,
};

let message_signer = Arc::new(
MessageSigner::from_str("122f3ae6ade1fd136b292cea4f6243c7811160352c8821528547a1fe7c459daf")
.unwrap(),
);

let manager = TaskManager::new(
message_receiver,
SubscriptionsView::default(),
Expand Down
Loading

0 comments on commit bd5e3f5

Please sign in to comment.