diff --git a/common/client-libs/gateway-client/src/bandwidth.rs b/common/client-libs/gateway-client/src/bandwidth.rs index 25e9a44394d..9fd43765bdd 100644 --- a/common/client-libs/gateway-client/src/bandwidth.rs +++ b/common/client-libs/gateway-client/src/bandwidth.rs @@ -87,8 +87,10 @@ impl ClientBandwidth { if remaining < 0 { tracing::warn!("OUT OF BANDWIDTH. remaining: {remaining_bi2}"); - } else { + } else if remaining < 1_000_000 { tracing::info!("remaining bandwidth: {remaining_bi2}"); + } else { + tracing::debug!("remaining bandwidth: {remaining_bi2}"); } self.inner diff --git a/common/client-libs/gateway-client/src/client/mod.rs b/common/client-libs/gateway-client/src/client/mod.rs index 58e061887e9..cde63a391f8 100644 --- a/common/client-libs/gateway-client/src/client/mod.rs +++ b/common/client-libs/gateway-client/src/client/mod.rs @@ -133,6 +133,10 @@ impl GatewayClient { self.gateway_identity } + pub fn shared_key(&self) -> Option> { + self.shared_key.clone() + } + pub fn ws_fd(&self) -> Option { match &self.connection { SocketState::Available(conn) => ws_fd(conn.as_ref()), @@ -402,7 +406,7 @@ impl GatewayClient { } Some(_) => { - info!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!"); + debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!"); Ok(()) } } @@ -976,24 +980,6 @@ impl GatewayClient { } Ok(()) } - - #[deprecated(note = "this method does not deal with upgraded keys for legacy clients")] - pub async fn authenticate_and_start( - &mut self, - ) -> Result - where - C: DkgQueryClient + Send + Sync, - St: CredentialStorage, - ::StorageError: Send + Sync + 'static, - { - let shared_key = self.perform_initial_authentication().await?; - self.claim_initial_bandwidth().await?; - - // this call is NON-blocking - self.start_listening_for_mixnet_messages()?; - - Ok(shared_key) - } } // type alias for an ease of use diff --git a/common/client-libs/gateway-client/src/socket_state.rs b/common/client-libs/gateway-client/src/socket_state.rs index 942f6506148..7bc0e8d1d05 100644 --- a/common/client-libs/gateway-client/src/socket_state.rs +++ b/common/client-libs/gateway-client/src/socket_state.rs @@ -110,6 +110,11 @@ impl PartiallyDelegatedRouter { } }; + if self.stream_return.is_canceled() { + // nothing to do, receiver has been dropped + return; + } + let return_res = match ret { Err(err) => self.stream_return.send(Err(err)), Ok(_) => { diff --git a/common/node-tester-utils/src/processor.rs b/common/node-tester-utils/src/processor.rs index 0975e7935db..0146c252586 100644 --- a/common/node-tester-utils/src/processor.rs +++ b/common/node-tester-utils/src/processor.rs @@ -35,7 +35,6 @@ pub struct TestPacketProcessor { ack_key: Arc, /// Structure responsible for decrypting and recovering plaintext message from received ciphertexts. - // message_receiver: Mutex, message_receiver: R, _ext_phantom: PhantomData, diff --git a/nym-api/src/network_monitor/mod.rs b/nym-api/src/network_monitor/mod.rs index 4cb05f296ab..557b9fdf412 100644 --- a/nym-api/src/network_monitor/mod.rs +++ b/nym-api/src/network_monitor/mod.rs @@ -16,7 +16,8 @@ use crate::node_status_api::NodeStatusCache; use crate::nym_contract_cache::cache::NymContractCache; use crate::storage::NymApiStorage; use crate::support::caching::cache::SharedCache; -use crate::support::{config, nyxd}; +use crate::support::config::Config; +use crate::support::nyxd; use futures::channel::mpsc; use nym_bandwidth_controller::BandwidthController; use nym_credential_storage::persistent_storage::PersistentStorage; @@ -36,7 +37,7 @@ pub(crate) mod test_route; pub(crate) const ROUTE_TESTING_TEST_NONCE: u64 = 0; pub(crate) fn setup<'a>( - config: &'a config::NetworkMonitor, + config: &'a Config, nym_contract_cache: &NymContractCache, described_cache: SharedCache, node_status_cache: NodeStatusCache, @@ -54,7 +55,7 @@ pub(crate) fn setup<'a>( } pub(crate) struct NetworkMonitorBuilder<'a> { - config: &'a config::NetworkMonitor, + config: &'a Config, nyxd_client: nyxd::Client, node_status_storage: NymApiStorage, contract_cache: NymContractCache, @@ -64,7 +65,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> { impl<'a> NetworkMonitorBuilder<'a> { pub(crate) fn new( - config: &'a config::NetworkMonitor, + config: &'a Config, nyxd_client: nyxd::Client, node_status_storage: NymApiStorage, contract_cache: NymContractCache, @@ -101,7 +102,7 @@ impl<'a> NetworkMonitorBuilder<'a> { self.contract_cache, self.described_cache, self.node_status_cache, - self.config.debug.per_node_test_packets, + self.config.network_monitor.debug.per_node_test_packets, Arc::clone(&ack_key), *identity_keypair.public_key(), *encryption_keypair.public_key(), @@ -110,7 +111,11 @@ impl<'a> NetworkMonitorBuilder<'a> { let bandwidth_controller = { BandwidthController::new( nym_credential_storage::initialise_persistent_storage( - &self.config.storage_paths.credentials_database_path, + &self + .config + .network_monitor + .storage_paths + .credentials_database_path, ) .await, self.nyxd_client.clone(), @@ -118,12 +123,10 @@ impl<'a> NetworkMonitorBuilder<'a> { }; let packet_sender = new_packet_sender( - self.config, + &self.config, gateway_status_update_sender, Arc::clone(&identity_keypair), - self.config.debug.gateway_sending_rate, bandwidth_controller, - self.config.debug.disabled_credentials_mode, ); let received_processor = new_received_processor( @@ -131,14 +134,15 @@ impl<'a> NetworkMonitorBuilder<'a> { Arc::clone(&encryption_keypair), ack_key, ); - let summary_producer = new_summary_producer(self.config.debug.per_node_test_packets); + let summary_producer = + new_summary_producer(self.config.network_monitor.debug.per_node_test_packets); let packet_receiver = new_packet_receiver( gateway_status_update_receiver, received_processor_sender_channel, ); let monitor = Monitor::new( - self.config, + &self.config.network_monitor, packet_preparer, packet_sender, received_processor, @@ -194,22 +198,16 @@ fn new_packet_preparer( } fn new_packet_sender( - config: &config::NetworkMonitor, + config: &Config, gateways_status_updater: GatewayClientUpdateSender, local_identity: Arc, - max_sending_rate: usize, bandwidth_controller: BandwidthController, - disabled_credentials_mode: bool, ) -> PacketSender { PacketSender::new( + config, gateways_status_updater, local_identity, - config.debug.gateway_response_timeout, - config.debug.gateway_connection_timeout, - config.debug.max_concurrent_gateway_clients, - max_sending_rate, bandwidth_controller, - disabled_credentials_mode, ) } @@ -237,7 +235,7 @@ fn new_packet_receiver( // TODO: 1) does it still have to have separate builder or could we get rid of it now? // TODO: 2) how do we make it non-async as other 'start' methods? pub(crate) async fn start( - config: &config::NetworkMonitor, + config: &Config, nym_contract_cache: &NymContractCache, described_cache: SharedCache, node_status_cache: NodeStatusCache, diff --git a/nym-api/src/network_monitor/monitor/gateway_client_handle.rs b/nym-api/src/network_monitor/monitor/gateway_client_handle.rs new file mode 100644 index 00000000000..6e3f157efb9 --- /dev/null +++ b/nym-api/src/network_monitor/monitor/gateway_client_handle.rs @@ -0,0 +1,54 @@ +// Copyright 2021 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; +use crate::support::nyxd; +use nym_credential_storage::persistent_storage::PersistentStorage; +use nym_gateway_client::GatewayClient; +use std::ops::{Deref, DerefMut}; +use tracing::warn; + +pub(crate) struct GatewayClientHandle { + client: GatewayClient, + gateways_status_updater: GatewayClientUpdateSender, +} + +impl GatewayClientHandle { + pub(crate) fn new( + client: GatewayClient, + gateways_status_updater: GatewayClientUpdateSender, + ) -> Self { + GatewayClientHandle { + client, + gateways_status_updater, + } + } +} + +impl Drop for GatewayClientHandle { + fn drop(&mut self) { + if self + .gateways_status_updater + .unbounded_send(GatewayClientUpdate::Disconnect( + self.client.gateway_identity(), + )) + .is_err() + { + warn!("fail to cleanly shutdown gateway connection") + } + } +} + +impl Deref for GatewayClientHandle { + type Target = GatewayClient; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +impl DerefMut for GatewayClientHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.client + } +} diff --git a/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs b/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs deleted file mode 100644 index 4a321d57cd7..00000000000 --- a/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2021 - Nym Technologies SA -// SPDX-License-Identifier: GPL-3.0-only - -use crate::support::nyxd; -use nym_credential_storage::persistent_storage::PersistentStorage; -use nym_crypto::asymmetric::identity::PUBLIC_KEY_LENGTH; -use nym_gateway_client::GatewayClient; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard, TryLockError}; - -pub(crate) struct GatewayClientHandle(Arc); - -struct GatewayClientHandleInner { - client: Mutex>>, - raw_identity: [u8; PUBLIC_KEY_LENGTH], -} - -pub(crate) struct UnlockedGatewayClientHandle<'a>( - MutexGuard<'a, Option>>, -); - -impl GatewayClientHandle { - pub(crate) fn new(gateway_client: GatewayClient) -> Self { - GatewayClientHandle(Arc::new(GatewayClientHandleInner { - raw_identity: gateway_client.gateway_identity().to_bytes(), - client: Mutex::new(Some(gateway_client)), - })) - } - - pub(crate) fn ptr_eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } - - // this could have also been achieved with a normal #[derive(Clone)] but I prefer to be explicit about it, - // because clippy would suggest some potentially confusing 'simplifications' regarding clone - pub(crate) fn clone_data_pointer(&self) -> Self { - GatewayClientHandle(Arc::clone(&self.0)) - } - - pub(crate) fn raw_identity(&self) -> [u8; PUBLIC_KEY_LENGTH] { - self.0.raw_identity - } - - pub(crate) async fn is_invalid(&self) -> bool { - self.0.client.lock().await.is_none() - } - - pub(crate) async fn lock_client(&self) -> UnlockedGatewayClientHandle<'_> { - UnlockedGatewayClientHandle(self.0.client.lock().await) - } - - pub(crate) fn lock_client_unchecked(&self) -> UnlockedGatewayClientHandle<'_> { - UnlockedGatewayClientHandle(self.0.client.try_lock().unwrap()) - } - - pub(crate) fn try_lock_client(&self) -> Result, TryLockError> { - self.0.client.try_lock().map(UnlockedGatewayClientHandle) - } -} - -impl<'a> UnlockedGatewayClientHandle<'a> { - pub(crate) fn get_mut_unchecked( - &mut self, - ) -> &mut GatewayClient { - self.0.as_mut().unwrap() - } - - pub(crate) fn inner_mut( - &mut self, - ) -> Option<&mut GatewayClient> { - self.0.as_mut() - } - - pub(crate) fn invalidate(&mut self) { - *self.0 = None - } -} - -pub(crate) type GatewayClientsMap = HashMap<[u8; PUBLIC_KEY_LENGTH], GatewayClientHandle>; - -#[derive(Clone)] -pub(crate) struct ActiveGatewayClients { - // there is no point in using an RwLock here as there will only ever be two readers here and both - // potentially need write access. - // A BiLock would have been slightly better than a normal Mutex since it's optimised for two - // owners, but it's behind `unstable` feature flag in futures and it would be a headache if the API - // changed. - inner: Arc>, -} - -impl ActiveGatewayClients { - pub(crate) fn new() -> Self { - ActiveGatewayClients { - inner: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub(crate) async fn lock(&self) -> MutexGuard<'_, GatewayClientsMap> { - self.inner.lock().await - } -} diff --git a/nym-api/src/network_monitor/monitor/gateways_pinger.rs b/nym-api/src/network_monitor/monitor/gateways_pinger.rs deleted file mode 100644 index ed09c2d0906..00000000000 --- a/nym-api/src/network_monitor/monitor/gateways_pinger.rs +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2021 - Nym Technologies SA -// SPDX-License-Identifier: GPL-3.0-only - -use crate::network_monitor::monitor::gateway_clients_cache::ActiveGatewayClients; -use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; -use nym_crypto::asymmetric::identity; -use nym_crypto::asymmetric::identity::PUBLIC_KEY_LENGTH; -use nym_task::TaskClient; -use std::time::Duration; -use tokio::time::{sleep, Instant}; -use tracing::{debug, info, trace, warn}; - -// TODO: should it perhaps be moved to config along other timeout values? -const PING_TIMEOUT: Duration = Duration::from_secs(3); - -pub(crate) struct GatewayPinger { - gateway_clients: ActiveGatewayClients, - gateways_status_updater: GatewayClientUpdateSender, - pinging_interval: Duration, -} - -impl GatewayPinger { - pub(crate) fn new( - gateway_clients: ActiveGatewayClients, - gateways_status_updater: GatewayClientUpdateSender, - pinging_interval: Duration, - ) -> Self { - GatewayPinger { - gateway_clients, - gateways_status_updater, - pinging_interval, - } - } - - fn notify_connection_failure(&self, raw_gateway_id: [u8; PUBLIC_KEY_LENGTH]) { - // if this unwrap failed it means something extremely weird is going on - // and we got some solar flare bitflip type of corruption - let gateway_key = identity::PublicKey::from_bytes(&raw_gateway_id) - .expect("failed to recover gateways public key from valid bytes"); - - // remove the gateway listener channels - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::Failure(gateway_key)) - .expect("packet receiver seems to have died!"); - } - - async fn ping_and_cleanup_all_gateways(&self) { - info!("Pinging all active gateways"); - - let lock_acquire_start = Instant::now(); - let active_gateway_clients_guard = self.gateway_clients.lock().await; - trace!( - "Acquiring lock took {:?}", - Instant::now().duration_since(lock_acquire_start) - ); - - if active_gateway_clients_guard.is_empty() { - debug!("no gateways to ping"); - return; - } - - // don't keep the guard the entire time - clone all Arcs and drop it - // - // this clippy warning is a false positive as we cannot get rid of the collect by moving - // everything into a single iterator as it would require us to hold the lock the entire time - // and that is exactly what we want to avoid - #[allow(clippy::needless_collect)] - let active_gateway_clients = active_gateway_clients_guard - .iter() - .map(|(_, handle)| handle.clone_data_pointer()) - .collect::>(); - drop(active_gateway_clients_guard); - - let ping_start = Instant::now(); - - let mut clients_to_purge = Vec::new(); - - // since we don't need to wait for response, we can just ping all gateways sequentially - // if it becomes problem later on, we can adjust it. - for client_handle in active_gateway_clients.into_iter() { - trace!( - "Pinging: {}", - identity::PublicKey::from_bytes(&client_handle.raw_identity()) - .unwrap() - .to_base58_string() - ); - // if we fail to obtain the lock it means the client is being currently used to send messages - // and hence we don't need to ping it to keep connection alive - if let Ok(mut unlocked_handle) = client_handle.try_lock_client() { - if let Some(active_client) = unlocked_handle.inner_mut() { - match tokio::time::timeout(PING_TIMEOUT, active_client.send_ping_message()) - .await - { - Err(_timeout) => { - warn!( - "we timed out trying to ping {} - assuming the connection is dead.", - active_client.gateway_identity().to_base58_string(), - ); - clients_to_purge.push(client_handle.raw_identity()); - } - Ok(Err(err)) => { - warn!( - "failed to send ping message to gateway {} - {} - assuming the connection is dead.", - active_client.gateway_identity().to_base58_string(), - err, - ); - clients_to_purge.push(client_handle.raw_identity()); - } - _ => {} - } - } else { - clients_to_purge.push(client_handle.raw_identity()); - } - } - } - - info!( - "Purging {} gateways, acquiring lock", - clients_to_purge.len() - ); - // purge all dead connections - // reacquire the guard - let lock_acquire_start = Instant::now(); - let mut active_gateway_clients_guard = self.gateway_clients.lock().await; - info!( - "Acquiring lock took {:?}", - Instant::now().duration_since(lock_acquire_start) - ); - - for gateway_id in clients_to_purge.into_iter() { - if let Some(removed_handle) = active_gateway_clients_guard.remove(&gateway_id) { - if !removed_handle.is_invalid().await { - info!("Handle is invalid, purging"); - // it was not invalidated by the packet sender meaning it probably was some unbonded node - // that was never cleared - self.notify_connection_failure(gateway_id); - } - info!("Handle is not invalid, not purged") - } - } - - let ping_end = Instant::now(); - let time_taken = ping_end.duration_since(ping_start); - info!("Pinging all active gateways took {:?}", time_taken); - } - - pub(crate) async fn run(&self, mut shutdown: TaskClient) { - while !shutdown.is_shutdown() { - tokio::select! { - _ = sleep(self.pinging_interval) => { - tokio::select! { - biased; - _ = shutdown.recv() => { - trace!("GatewaysPinger: Received shutdown"); - } - _ = self.ping_and_cleanup_all_gateways() => (), - } - } - _ = shutdown.recv() => { - trace!("GatewaysPinger: Received shutdown"); - } - } - } - } -} diff --git a/nym-api/src/network_monitor/monitor/mod.rs b/nym-api/src/network_monitor/monitor/mod.rs index 23bb48d7c6d..21f6f30c19f 100644 --- a/nym-api/src/network_monitor/monitor/mod.rs +++ b/nym-api/src/network_monitor/monitor/mod.rs @@ -9,33 +9,21 @@ use crate::network_monitor::test_packet::NodeTestMessage; use crate::network_monitor::test_route::TestRoute; use crate::storage::NymApiStorage; use crate::support::config; -use dashmap::DashMap; -use nym_crypto::asymmetric::ed25519; -use nym_gateway_client::SharedGatewayKey; use nym_mixnet_contract_common::NodeId; use nym_sphinx::params::PacketType; use nym_sphinx::receiver::MessageReceiver; use nym_task::TaskClient; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use tokio::time::{sleep, Duration, Instant}; use tracing::{debug, error, info, trace}; -pub(crate) mod gateway_clients_cache; -pub(crate) mod gateways_pinger; +pub(crate) mod gateway_client_handle; pub(crate) mod preparer; pub(crate) mod processor; pub(crate) mod receiver; pub(crate) mod sender; pub(crate) mod summary_producer; -// if we already used particular gateway before, keep its shared keys so that we would not need -// to go through the whole handshake again -// #[derive(Clone)] -// pub(crate) struct SharedKeysCache { -// keys: Arc>>, -// } - pub(super) struct Monitor { test_nonce: u64, packet_preparer: PacketPreparer, @@ -44,7 +32,6 @@ pub(super) struct Monitor { summary_producer: SummaryProducer, node_status_storage: NymApiStorage, run_interval: Duration, - gateway_ping_interval: Duration, packet_delivery_timeout: Duration, /// Number of test packets sent via each "random" route to verify whether they work correctly. @@ -78,7 +65,6 @@ impl Monitor { summary_producer, node_status_storage, run_interval: config.debug.run_interval, - gateway_ping_interval: config.debug.gateway_ping_interval, packet_delivery_timeout: config.debug.packet_delivery_timeout, route_test_packets: config.debug.route_test_packets, test_routes: config.debug.test_routes, @@ -147,11 +133,14 @@ impl Monitor { } self.received_processor.set_route_test_nonce(); - self.packet_sender.send_packets(packets).await; + let gateway_clients = self.packet_sender.send_packets(packets).await; // give the packets some time to traverse the network sleep(self.packet_delivery_timeout).await; + // start all the disconnections in the background + drop(gateway_clients); + let received = self.received_processor.return_received().await; let mut results = self.analyse_received_test_route_packets(&received); @@ -261,7 +250,8 @@ impl Monitor { self.received_processor.set_new_test_nonce(self.test_nonce); info!("Sending packets to all gateways..."); - self.packet_sender + let gateway_clients = self + .packet_sender .send_packets(prepared_packets.packets) .await; @@ -273,6 +263,9 @@ impl Monitor { // give the packets some time to traverse the network sleep(self.packet_delivery_timeout).await; + // start all the disconnections in the background + drop(gateway_clients); + let received = self.received_processor.return_received().await; let total_received = received.len(); info!("Test routes: {:#?}", routes); @@ -320,9 +313,6 @@ impl Monitor { .wait_for_validator_cache_initial_values(self.minimum_test_routes) .await; - self.packet_sender - .spawn_gateways_pinger(self.gateway_ping_interval, shutdown.clone()); - let mut run_interval = tokio::time::interval(self.run_interval); while !shutdown.is_shutdown() { tokio::select! { diff --git a/nym-api/src/network_monitor/monitor/receiver.rs b/nym-api/src/network_monitor/monitor/receiver.rs index 6924e4e8dae..91a8c678110 100644 --- a/nym-api/src/network_monitor/monitor/receiver.rs +++ b/nym-api/src/network_monitor/monitor/receiver.rs @@ -14,7 +14,7 @@ pub(crate) type GatewayClientUpdateSender = mpsc::UnboundedSender; pub(crate) enum GatewayClientUpdate { - Failure(identity::PublicKey), + Disconnect(identity::PublicKey), New( identity::PublicKey, (MixnetMessageReceiver, AcknowledgementReceiver), @@ -45,7 +45,7 @@ impl PacketReceiver { self.gateways_reader .add_receivers(id, message_receiver, ack_receiver); } - GatewayClientUpdate::Failure(id) => { + GatewayClientUpdate::Disconnect(id) => { self.gateways_reader.remove_receivers(id); } } diff --git a/nym-api/src/network_monitor/monitor/sender.rs b/nym-api/src/network_monitor/monitor/sender.rs index 0b371f10031..36e6ec7cb60 100644 --- a/nym-api/src/network_monitor/monitor/sender.rs +++ b/nym-api/src/network_monitor/monitor/sender.rs @@ -1,35 +1,34 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use crate::network_monitor::monitor::gateway_clients_cache::{ - ActiveGatewayClients, GatewayClientHandle, -}; -use crate::network_monitor::monitor::gateways_pinger::GatewayPinger; +use crate::network_monitor::monitor::gateway_client_handle::GatewayClientHandle; use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; +use crate::support::config::Config; use crate::support::nyxd; +use dashmap::DashMap; use futures::channel::mpsc; use futures::stream::{self, FuturesUnordered, StreamExt}; use futures::task::Context; use futures::{Future, Stream}; use nym_bandwidth_controller::BandwidthController; use nym_credential_storage::persistent_storage::PersistentStorage; -use nym_crypto::asymmetric::identity::{self, PUBLIC_KEY_LENGTH}; +use nym_crypto::asymmetric::ed25519; use nym_gateway_client::client::config::GatewayClientConfig; use nym_gateway_client::client::GatewayConfig; use nym_gateway_client::error::GatewayClientError; use nym_gateway_client::{ - AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver, PacketRouter, + AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver, PacketRouter, SharedGatewayKey, }; use nym_sphinx::forwarding::packet::MixPacket; -use nym_task::TaskClient; use pin_project::pin_project; +use sqlx::__rt::timeout; use std::mem; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; const TIME_CHUNK_SIZE: Duration = Duration::from_millis(50); @@ -39,7 +38,7 @@ pub(crate) struct GatewayPackets { pub(crate) clients_address: String, /// Public key of the target gateway. - pub(crate) pub_key: identity::PublicKey, + pub(crate) pub_key: ed25519::PublicKey, /// All the packets that are going to get sent to the gateway. pub(crate) packets: Vec, @@ -48,7 +47,7 @@ pub(crate) struct GatewayPackets { impl GatewayPackets { pub(crate) fn new( clients_address: String, - pub_key: identity::PublicKey, + pub_key: ed25519::PublicKey, packets: Vec, ) -> Self { GatewayPackets { @@ -66,7 +65,7 @@ impl GatewayPackets { } } - pub(crate) fn empty(clients_address: String, pub_key: identity::PublicKey) -> Self { + pub(crate) fn empty(clients_address: String, pub_key: ed25519::PublicKey) -> Self { GatewayPackets { clients_address, pub_key, @@ -89,96 +88,63 @@ impl GatewayPackets { // struct consisting of all external data required to construct a fresh gateway client struct FreshGatewayClientData { gateways_status_updater: GatewayClientUpdateSender, - local_identity: Arc, + local_identity: Arc, gateway_response_timeout: Duration, bandwidth_controller: BandwidthController, disabled_credentials_mode: bool, + gateways_key_cache: DashMap>, } impl FreshGatewayClientData { - fn notify_connection_failure( - self: Arc, - raw_gateway_id: [u8; PUBLIC_KEY_LENGTH], - ) { - // if this unwrap failed it means something extremely weird is going on - // and we got some solar flare bitflip type of corruption - let gateway_key = identity::PublicKey::from_bytes(&raw_gateway_id) - .expect("failed to recover gateways public key from valid bytes"); - - // remove the gateway listener channels - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::Failure(gateway_key)) - .expect("packet receiver seems to have died!"); - } - fn notify_new_connection( self: Arc, - gateway_id: identity::PublicKey, - gateway_channels: Option<(MixnetMessageReceiver, AcknowledgementReceiver)>, + gateway_id: ed25519::PublicKey, + gateway_channels: (MixnetMessageReceiver, AcknowledgementReceiver), ) { - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::New( - gateway_id, - gateway_channels.expect("we created a new client, yet the channels are a None!"), - )) - .expect("packet receiver seems to have died!") + if self + .gateways_status_updater + .unbounded_send(GatewayClientUpdate::New(gateway_id, gateway_channels)) + .is_err() + { + error!("packet receiver seems to have died!") + } } } pub(crate) struct PacketSender { - // TODO: this has a potential long-term issue. If we keep those clients cached between runs, - // malicious gateways could figure out which traffic comes from the network monitor and always - // forward that traffic while dropping the rest. However, at the current stage such sophisticated - // behaviour is unlikely. - active_gateway_clients: ActiveGatewayClients, - fresh_gateway_client_data: Arc, gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, max_concurrent_clients: usize, max_sending_rate: usize, } impl PacketSender { - // at this point I'm not entirely sure how to deal with this warning without - // some considerable refactoring - #[allow(clippy::too_many_arguments)] pub(crate) fn new( + config: &Config, gateways_status_updater: GatewayClientUpdateSender, - local_identity: Arc, - gateway_response_timeout: Duration, - gateway_connection_timeout: Duration, - max_concurrent_clients: usize, - max_sending_rate: usize, + local_identity: Arc, bandwidth_controller: BandwidthController, - disabled_credentials_mode: bool, ) -> Self { PacketSender { - active_gateway_clients: ActiveGatewayClients::new(), fresh_gateway_client_data: Arc::new(FreshGatewayClientData { gateways_status_updater, local_identity, - gateway_response_timeout, + gateway_response_timeout: config.network_monitor.debug.gateway_response_timeout, bandwidth_controller, - disabled_credentials_mode, + disabled_credentials_mode: config.network_monitor.debug.disabled_credentials_mode, + gateways_key_cache: Default::default(), }), - gateway_connection_timeout, - max_concurrent_clients, - max_sending_rate, + gateway_connection_timeout: config.network_monitor.debug.gateway_connection_timeout, + gateway_bandwidth_claim_timeout: config + .network_monitor + .debug + .gateway_bandwidth_claim_timeout, + max_concurrent_clients: config.network_monitor.debug.max_concurrent_gateway_clients, + max_sending_rate: config.network_monitor.debug.gateway_sending_rate, } } - pub(crate) fn spawn_gateways_pinger(&self, pinging_interval: Duration, shutdown: TaskClient) { - let gateway_pinger = GatewayPinger::new( - self.active_gateway_clients.clone(), - self.fresh_gateway_client_data - .gateways_status_updater - .clone(), - pinging_interval, - ); - - tokio::spawn(async move { gateway_pinger.run(shutdown).await }); - } - fn new_gateway_client_handle( config: GatewayConfig, fresh_gateway_client_data: &FreshGatewayClientData, @@ -190,8 +156,6 @@ impl PacketSender { let task_client = nym_task::TaskClient::dummy().named(format!("gateway-{}", config.gateway_identity)); - // TODO: future optimization: if we're remaking client for a gateway to which we used to be connected in the past, - // use old shared keys let (message_sender, message_receiver) = mpsc::unbounded(); // currently we do not care about acks at all, but we must keep the channel alive @@ -204,20 +168,28 @@ impl PacketSender { task_client.fork("packet-router"), ); + let shared_keys = fresh_gateway_client_data + .gateways_key_cache + .get(&config.gateway_identity) + .map(|k| k.value().clone()); + let gateway_client = GatewayClient::new( GatewayClientConfig::new_default() .with_disabled_credentials_mode(fresh_gateway_client_data.disabled_credentials_mode) .with_response_timeout(fresh_gateway_client_data.gateway_response_timeout), config, Arc::clone(&fresh_gateway_client_data.local_identity), - None, + shared_keys, gateway_packet_router, Some(fresh_gateway_client_data.bandwidth_controller.clone()), task_client, ); ( - GatewayClientHandle::new(gateway_client), + GatewayClientHandle::new( + gateway_client, + fresh_gateway_client_data.gateways_status_updater.clone(), + ), (message_receiver, ack_receiver), ) } @@ -227,11 +199,11 @@ impl PacketSender { mut mix_packets: Vec, max_sending_rate: usize, ) -> Result<(), GatewayClientError> { - let gateway_id = client.gateway_identity().to_base58_string(); + let gateway_id = client.gateway_identity(); + info!( - "Got {} packets to send to gateway {}", + "Got {} packets to send to gateway {gateway_id}", mix_packets.len(), - gateway_id ); if mix_packets.len() <= max_sending_rate { @@ -281,47 +253,79 @@ impl PacketSender { Ok(()) } + async fn client_startup( + connection_timeout: Duration, + bandwidth_claim_timeout: Duration, + client: &mut GatewayClientHandle, + ) -> Option> { + let gateway_identity = client.gateway_identity(); + + // 1. attempt to authenticate + let shared_key = + match timeout(connection_timeout, client.perform_initial_authentication()).await { + Err(_timeout) => { + warn!("timed out while trying to authenticate with gateway {gateway_identity}"); + return None; + } + Ok(Err(err)) => { + warn!("failed to authenticate with gateway ({gateway_identity}): {err}"); + return None; + } + Ok(Ok(res)) => res.initial_shared_key, + }; + + // 2. maybe claim bandwidth + match timeout(bandwidth_claim_timeout, client.claim_initial_bandwidth()).await { + Err(_timeout) => { + warn!("timed out while trying to claim initial bandwidth with gateway {gateway_identity}"); + return None; + } + Ok(Err(err)) => { + warn!("failed to claim bandwidth with gateway ({gateway_identity}): {err}"); + return None; + } + Ok(Ok(_)) => (), + } + + // 3. start internal listener + if let Err(err) = client.start_listening_for_mixnet_messages() { + warn!("failed to start message listener for {gateway_identity}: {err}"); + return None; + } + + Some(shared_key) + } + async fn create_new_gateway_client_handle_and_authenticate( config: GatewayConfig, fresh_gateway_client_data: &FreshGatewayClientData, gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, ) -> Option<( GatewayClientHandle, (MixnetMessageReceiver, AcknowledgementReceiver), )> { let gateway_identity = config.gateway_identity; - let (new_client, (message_receiver, ack_receiver)) = + let (mut new_client, (message_receiver, ack_receiver)) = Self::new_gateway_client_handle(config, fresh_gateway_client_data); - // Put this in timeout in case the gateway has incorrectly set their ulimit and our connection - // gets stuck in their TCP queue and just hangs on our end but does not terminate - // (an actual bug we experienced) - // - // Note: locking the client in unchecked manner is fine here as we just created the lock - // and it wasn't shared with anyone, therefore we're the only one holding reference to it - // and hence it's impossible to fail to obtain the permit. - let mut unlocked_client = new_client.lock_client_unchecked(); - - // SAFETY: it's fine to use the deprecated method here as we're creating brand new clients each time, - // and there's no need to deal with any key upgrades - #[allow(deprecated)] - match tokio::time::timeout( + match Self::client_startup( gateway_connection_timeout, - unlocked_client.get_mut_unchecked().authenticate_and_start(), + gateway_bandwidth_claim_timeout, + &mut new_client, ) .await { - Ok(Ok(_)) => { - drop(unlocked_client); + Some(shared_key) => { + fresh_gateway_client_data + .gateways_key_cache + .insert(gateway_identity, shared_key); Some((new_client, (message_receiver, ack_receiver))) } - Ok(Err(err)) => { - warn!("failed to authenticate with new gateway ({gateway_identity}): {err}",); - // we failed to create a client, can't do much here - None - } - Err(_) => { - warn!("timed out while trying to authenticate with new gateway {gateway_identity}",); + None => { + fresh_gateway_client_data + .gateways_key_cache + .remove(&gateway_identity); None } } @@ -344,123 +348,63 @@ impl PacketSender { // than just concurrently? async fn send_gateway_packets( gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, packets: GatewayPackets, fresh_gateway_client_data: Arc, - client: Option, max_sending_rate: usize, ) -> Option { - let existing_client = client.is_some(); - - // Note that in the worst case scenario we will only wait for a second or two to obtain the lock - // as other possibly entity holding the lock (the gateway pinger) is attempting to send - // the ping messages with a maximum timeout. - let (client, gateway_channels) = if let Some(client) = client { - if client.is_invalid().await { - warn!("Our existing client was invalid - two test runs happened back to back without cleanup"); - return None; - } - (client, None) - } else { - let (client, gateway_channels) = - Self::create_new_gateway_client_handle_and_authenticate( - packets.gateway_config(), - &fresh_gateway_client_data, - gateway_connection_timeout, - ) - .await?; - (client, Some(gateway_channels)) - }; + let (mut client, gateway_channels) = + Self::create_new_gateway_client_handle_and_authenticate( + packets.gateway_config(), + &fresh_gateway_client_data, + gateway_connection_timeout, + gateway_bandwidth_claim_timeout, + ) + .await?; + + let identity = client.gateway_identity(); let estimated_time = Duration::from_secs_f64(packets.packets.len() as f64 / max_sending_rate as f64); // give some leeway let timeout = estimated_time * 3; - let mut guard = client.lock_client().await; - let unwrapped_client = guard.get_mut_unchecked(); - - if let Err(err) = Self::check_remaining_bandwidth(unwrapped_client).await { - warn!( - "Failed to claim additional bandwidth for {} - {err}", - unwrapped_client.gateway_identity().to_base58_string(), - ); - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + if let Err(err) = Self::check_remaining_bandwidth(&mut client).await { + warn!("Failed to claim additional bandwidth for {identity}: {err}",); return None; } match tokio::time::timeout( timeout, - Self::attempt_to_send_packets(unwrapped_client, packets.packets, max_sending_rate), + Self::attempt_to_send_packets(&mut client, packets.packets, max_sending_rate), ) .await { Err(_timeout) => { - warn!( - "failed to send packets to {} - we timed out", - packets.pub_key.to_base58_string(), - ); - // if this was a fresh client, there's no need to do anything as it was never - // registered to get read - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + warn!("failed to send packets to {identity} - we timed out",); return None; } Ok(Err(err)) => { - warn!( - "failed to send packets to {} - {:?}", - packets.pub_key.to_base58_string(), - err - ); - // if this was a fresh client, there's no need to do anything as it was never - // registered to get read - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + warn!("failed to send packets to {identity}: {err}",); return None; } Ok(Ok(_)) => { - if !existing_client { - fresh_gateway_client_data - .notify_new_connection(packets.pub_key, gateway_channels); - } + fresh_gateway_client_data.notify_new_connection(identity, gateway_channels) } } - drop(guard); Some(client) } - // point of this is to basically insert handles of fresh clients that didn't exist here before - async fn merge_client_handles(&self, handles: Vec) { - let mut guard = self.active_gateway_clients.lock().await; - for handle in handles { - let raw_identity = handle.raw_identity(); - if let Some(existing) = guard.get(&raw_identity) { - if !handle.ptr_eq(existing) { - panic!("Duplicate client detected!") - } - - if handle.is_invalid().await { - guard.remove(&raw_identity); - } - } else { - // client never existed -> just insert it - guard.insert(raw_identity, handle); - } - } - } - - pub(super) async fn send_packets(&mut self, packets: Vec) { + pub(super) async fn send_packets( + &mut self, + packets: Vec, + ) -> Vec { // we know that each of the elements in the packets array will only ever access a single, // unique element from the existing clients let gateway_connection_timeout = self.gateway_connection_timeout; + let gateway_bandwidth_claim_timeout = self.gateway_bandwidth_claim_timeout; let max_concurrent_clients = if self.max_concurrent_clients > 0 { Some(self.max_concurrent_clients) } else { @@ -468,41 +412,22 @@ impl PacketSender { }; let max_sending_rate = self.max_sending_rate; - let guard = self.active_gateway_clients.lock().await; - // this clippy warning is a false positive as we cannot get rid of the collect by moving - // everything into a single iterator as it would require us to hold the lock the entire time - // and that is exactly what we want to avoid - #[allow(clippy::needless_collect)] let stream_data = packets .into_iter() - .map(|packets| { - let existing_client = guard - .get(&packets.pub_key.to_bytes()) - .map(|client| client.clone_data_pointer()); - ( - packets, - Arc::clone(&self.fresh_gateway_client_data), - existing_client, - ) - }) + .map(|packets| (packets, Arc::clone(&self.fresh_gateway_client_data))) .collect::>(); - // drop the guard immediately so that the other task (gateway pinger) would not need to wait until - // we're done sending packets (note: without this drop, we wouldn't be able to ping gateways that - // we're not interacting with right now) - drop(guard); - // can't chain it all nicely together as there's no adapter method defined on Stream directly // for ForEachConcurrentClientUse let used_clients = ForEachConcurrentClientUse::new( stream::iter(stream_data.into_iter()), max_concurrent_clients, - |(packets, fresh_data, client)| async move { + |(packets, fresh_data)| async move { Self::send_gateway_packets( gateway_connection_timeout, + gateway_bandwidth_claim_timeout, packets, fresh_data, - client, max_sending_rate, ) .await @@ -513,7 +438,8 @@ impl PacketSender { .flatten() .collect(); - self.merge_client_handles(used_clients).await; + // we need to keep clients alive until the test finishes so that we could keep receiving + used_clients } } diff --git a/nym-api/src/support/cli/run.rs b/nym-api/src/support/cli/run.rs index 417710013de..10aec32f5f8 100644 --- a/nym-api/src/support/cli/run.rs +++ b/nym-api/src/support/cli/run.rs @@ -257,7 +257,7 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result( - &config.network_monitor, + &config, &nym_contract_cache_state, described_nodes_cache.clone(), node_status_cache_state.clone(), diff --git a/nym-api/src/support/config/mod.rs b/nym-api/src/support/config/mod.rs index edb56934eec..8e891f05079 100644 --- a/nym-api/src/support/config/mod.rs +++ b/nym-api/src/support/config/mod.rs @@ -38,13 +38,12 @@ const DEFAULT_GATEWAY_SENDING_RATE: usize = 200; const DEFAULT_MAX_CONCURRENT_GATEWAY_CLIENTS: usize = 50; const DEFAULT_PACKET_DELIVERY_TIMEOUT: Duration = Duration::from_secs(20); const DEFAULT_MONITOR_RUN_INTERVAL: Duration = Duration::from_secs(15 * 60); -const DEFAULT_GATEWAY_PING_INTERVAL: Duration = Duration::from_secs(60); // Set this to a high value for now, so that we don't risk sporadic timeouts that might cause // bought bandwidth tokens to not have time to be spent; Once we remove the gateway from the // bandwidth bridging protocol, we can come back to a smaller timeout value const DEFAULT_GATEWAY_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5 * 60); -// This timeout value should be big enough to accommodate an initial bandwidth acquirement -const DEFAULT_GATEWAY_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2 * 60); +const DEFAULT_GATEWAY_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); +const DEFAULT_GATEWAY_BANDWIDTH_CLAIM_TIMEOUT: Duration = Duration::from_secs(2 * 60); const DEFAULT_TEST_ROUTES: usize = 3; const DEFAULT_MINIMUM_TEST_ROUTES: usize = 1; @@ -320,11 +319,6 @@ pub struct NetworkMonitorDebug { #[serde(with = "humantime_serde")] pub run_interval: Duration, - /// Specifies interval at which we should be sending ping packets to all active gateways - /// in order to keep the websocket connections alive. - #[serde(with = "humantime_serde")] - pub gateway_ping_interval: Duration, - /// Specifies maximum rate (in packets per second) of test packets being sent to gateway pub gateway_sending_rate: usize, @@ -340,6 +334,10 @@ pub struct NetworkMonitorDebug { #[serde(with = "humantime_serde")] pub gateway_connection_timeout: Duration, + /// Maximum allowed time for the gateway bandwidth claim to get resolved + #[serde(with = "humantime_serde")] + pub gateway_bandwidth_claim_timeout: Duration, + /// Specifies the duration the monitor is going to wait after sending all measurement /// packets before declaring nodes unreachable. #[serde(with = "humantime_serde")] @@ -367,11 +365,11 @@ impl Default for NetworkMonitorDebug { min_gateway_reliability: DEFAULT_MIN_GATEWAY_RELIABILITY, disabled_credentials_mode: true, run_interval: DEFAULT_MONITOR_RUN_INTERVAL, - gateway_ping_interval: DEFAULT_GATEWAY_PING_INTERVAL, gateway_sending_rate: DEFAULT_GATEWAY_SENDING_RATE, max_concurrent_gateway_clients: DEFAULT_MAX_CONCURRENT_GATEWAY_CLIENTS, gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT, gateway_connection_timeout: DEFAULT_GATEWAY_CONNECTION_TIMEOUT, + gateway_bandwidth_claim_timeout: DEFAULT_GATEWAY_BANDWIDTH_CLAIM_TIMEOUT, packet_delivery_timeout: DEFAULT_PACKET_DELIVERY_TIMEOUT, test_routes: DEFAULT_TEST_ROUTES, minimum_test_routes: DEFAULT_MINIMUM_TEST_ROUTES,