Skip to content

Commit

Permalink
removed overly complex logic for requesting mutex permits for packet …
Browse files Browse the repository at this point in the history
…processing
  • Loading branch information
jstuczyn committed Dec 4, 2024
1 parent d0f7815 commit 0f0bea7
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 153 deletions.
9 changes: 9 additions & 0 deletions common/crypto/src/asymmetric/identity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ed25519_dalek::{Signer, SigningKey};
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
Expand Down Expand Up @@ -122,6 +123,14 @@ impl PemStorableKeyPair for KeyPair {
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct PublicKey(ed25519_dalek::VerifyingKey);

impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
// each public key has unique bytes representation which can be used
// for the hash implementation
self.to_bytes().hash(state)
}
}

impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.to_base58_string(), f)
Expand Down
1 change: 1 addition & 0 deletions common/node-tester-utils/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct TestPacketProcessor<T, R: MessageReceiver = SphinxMessageReceiver> {
ack_key: Arc<AckKey>,

/// Structure responsible for decrypting and recovering plaintext message from received ciphertexts.
// message_receiver: Mutex<R>,
message_receiver: R,

_ext_phantom: PhantomData<T>,
Expand Down
20 changes: 9 additions & 11 deletions nym-api/src/network_monitor/gateways_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only

use futures::Stream;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{ed25519, identity};
use nym_gateway_client::{AcknowledgementReceiver, MixnetMessageReceiver};
use nym_mixnet_contract_common::IdentityKey;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::StreamMap;
Expand All @@ -15,8 +14,8 @@ pub(crate) enum GatewayMessages {
}

pub(crate) struct GatewaysReader {
ack_map: StreamMap<IdentityKey, AcknowledgementReceiver>,
stream_map: StreamMap<IdentityKey, MixnetMessageReceiver>,
ack_map: StreamMap<ed25519::PublicKey, AcknowledgementReceiver>,
stream_map: StreamMap<ed25519::PublicKey, MixnetMessageReceiver>,
}

impl GatewaysReader {
Expand All @@ -33,19 +32,18 @@ impl GatewaysReader {
message_receiver: MixnetMessageReceiver,
ack_receiver: AcknowledgementReceiver,
) {
let channel_id = id.to_string();
self.stream_map.insert(channel_id.clone(), message_receiver);
self.ack_map.insert(channel_id, ack_receiver);
self.stream_map.insert(id, message_receiver);
self.ack_map.insert(id, ack_receiver);
}

pub fn remove_receivers(&mut self, id: &str) {
self.stream_map.remove(id);
self.ack_map.remove(id);
pub fn remove_receivers(&mut self, id: ed25519::PublicKey) {
self.stream_map.remove(&id);
self.ack_map.remove(&id);
}
}

impl Stream for GatewaysReader {
type Item = (IdentityKey, GatewayMessages);
type Item = (ed25519::PublicKey, GatewayMessages);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.ack_map).poll_next(cx) {
Expand Down
8 changes: 4 additions & 4 deletions nym-api/src/network_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) async fn build<R: MessageReceiver + Send + 'static>(
pub(crate) async fn build<R: MessageReceiver + Send + Sync + 'static>(
self,
) -> NetworkMonitorRunnables<R> {
// TODO: those keys change constant throughout the whole execution of the monitor.
Expand Down Expand Up @@ -154,12 +154,12 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + 'static> {
pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + Sync + 'static> {
monitor: Monitor<R>,
packet_receiver: PacketReceiver,
}

impl<R: MessageReceiver + Send + 'static> NetworkMonitorRunnables<R> {
impl<R: MessageReceiver + Send + Sync + 'static> NetworkMonitorRunnables<R> {
// TODO: note, that is not exactly doing what we want, because when
// `ReceivedProcessor` is constructed, it already spawns a future
// this needs to be refactored!
Expand Down Expand Up @@ -236,7 +236,7 @@ fn new_packet_receiver(

// TODO: 1) does it still have to have separate builder or could we get rid of it now?
// TODO: 2) how do we make it non-async as other 'start' methods?
pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
pub(crate) async fn start<R: MessageReceiver + Send + Sync + 'static>(
config: &config::NetworkMonitor,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
Expand Down
21 changes: 15 additions & 6 deletions nym-api/src/network_monitor/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use crate::network_monitor::test_packet::NodeTestMessage;
use crate::network_monitor::test_route::TestRoute;
use crate::storage::NymApiStorage;
use crate::support::config;
use dashmap::DashMap;
use nym_crypto::asymmetric::ed25519;
use nym_gateway_client::SharedGatewayKey;
use nym_mixnet_contract_common::NodeId;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::MessageReceiver;
use nym_task::TaskClient;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, error, info, trace};

Expand All @@ -25,7 +29,14 @@ pub(crate) mod receiver;
pub(crate) mod sender;
pub(crate) mod summary_producer;

pub(super) struct Monitor<R: MessageReceiver + Send + 'static> {
// if we already used particular gateway before, keep its shared keys so that we would not need
// to go through the whole handshake again
// #[derive(Clone)]
// pub(crate) struct SharedKeysCache {
// keys: Arc<DashMap<ed25519::PublicKey, Arc<SharedGatewayKey>>>,
// }

pub(super) struct Monitor<R: MessageReceiver + Send + Sync + 'static> {
test_nonce: u64,
packet_preparer: PacketPreparer,
packet_sender: PacketSender,
Expand All @@ -49,7 +60,7 @@ pub(super) struct Monitor<R: MessageReceiver + Send + 'static> {
packet_type: PacketType,
}

impl<R: MessageReceiver + Send> Monitor<R> {
impl<R: MessageReceiver + Send + Sync> Monitor<R> {
pub(super) fn new(
config: &config::NetworkMonitor,
packet_preparer: PacketPreparer,
Expand Down Expand Up @@ -135,7 +146,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
packets.push(gateway_packets);
}

self.received_processor.set_route_test_nonce().await;
self.received_processor.set_route_test_nonce();
self.packet_sender.send_packets(packets).await;

// give the packets some time to traverse the network
Expand Down Expand Up @@ -247,9 +258,7 @@ impl<R: MessageReceiver + Send> Monitor<R> {
.flat_map(|packets| packets.packets.iter())
.count();

self.received_processor
.set_new_test_nonce(self.test_nonce)
.await;
self.received_processor.set_new_test_nonce(self.test_nonce);

info!("Sending packets to all gateways...");
self.packet_sender
Expand Down
Loading

0 comments on commit 0f0bea7

Please sign in to comment.