Skip to content

Commit

Permalink
dont keep persistent gateway connections. instead make them on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
jstuczyn committed Dec 4, 2024
1 parent 0f0bea7 commit e90a01f
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 544 deletions.
4 changes: 3 additions & 1 deletion common/client-libs/gateway-client/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ impl ClientBandwidth {

if remaining < 0 {
tracing::warn!("OUT OF BANDWIDTH. remaining: {remaining_bi2}");
} else {
} else if remaining < 1_000_000 {
tracing::info!("remaining bandwidth: {remaining_bi2}");
} else {
tracing::debug!("remaining bandwidth: {remaining_bi2}");
}

self.inner
Expand Down
24 changes: 5 additions & 19 deletions common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity
}

pub fn shared_key(&self) -> Option<Arc<SharedGatewayKey>> {
self.shared_key.clone()
}

pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => ws_fd(conn.as_ref()),
Expand Down Expand Up @@ -402,7 +406,7 @@ impl<C, St> GatewayClient<C, St> {
}

Some(_) => {
info!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
Ok(())
}
}
Expand Down Expand Up @@ -976,24 +980,6 @@ impl<C, St> GatewayClient<C, St> {
}
Ok(())
}

#[deprecated(note = "this method does not deal with upgraded keys for legacy clients")]
pub async fn authenticate_and_start(
&mut self,
) -> Result<AuthenticationResponse, GatewayClientError>
where
C: DkgQueryClient + Send + Sync,
St: CredentialStorage,
<St as CredentialStorage>::StorageError: Send + Sync + 'static,
{
let shared_key = self.perform_initial_authentication().await?;
self.claim_initial_bandwidth().await?;

// this call is NON-blocking
self.start_listening_for_mixnet_messages()?;

Ok(shared_key)
}
}

// type alias for an ease of use
Expand Down
5 changes: 5 additions & 0 deletions common/client-libs/gateway-client/src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ impl PartiallyDelegatedRouter {
}
};

if self.stream_return.is_canceled() {
// nothing to do, receiver has been dropped
return;
}

let return_res = match ret {
Err(err) => self.stream_return.send(Err(err)),
Ok(_) => {
Expand Down
1 change: 0 additions & 1 deletion common/node-tester-utils/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct TestPacketProcessor<T, R: MessageReceiver = SphinxMessageReceiver> {
ack_key: Arc<AckKey>,

/// Structure responsible for decrypting and recovering plaintext message from received ciphertexts.
// message_receiver: Mutex<R>,
message_receiver: R,

_ext_phantom: PhantomData<T>,
Expand Down
38 changes: 18 additions & 20 deletions nym-api/src/network_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::storage::NymApiStorage;
use crate::support::caching::cache::SharedCache;
use crate::support::{config, nyxd};
use crate::support::config::Config;
use crate::support::nyxd;
use futures::channel::mpsc;
use nym_bandwidth_controller::BandwidthController;
use nym_credential_storage::persistent_storage::PersistentStorage;
Expand All @@ -36,7 +37,7 @@ pub(crate) mod test_route;
pub(crate) const ROUTE_TESTING_TEST_NONCE: u64 = 0;

pub(crate) fn setup<'a>(
config: &'a config::NetworkMonitor,
config: &'a Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand All @@ -54,7 +55,7 @@ pub(crate) fn setup<'a>(
}

pub(crate) struct NetworkMonitorBuilder<'a> {
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand All @@ -64,7 +65,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> {

impl<'a> NetworkMonitorBuilder<'a> {
pub(crate) fn new(
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand Down Expand Up @@ -101,7 +102,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
self.contract_cache,
self.described_cache,
self.node_status_cache,
self.config.debug.per_node_test_packets,
self.config.network_monitor.debug.per_node_test_packets,
Arc::clone(&ack_key),
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
Expand All @@ -110,35 +111,38 @@ impl<'a> NetworkMonitorBuilder<'a> {
let bandwidth_controller = {
BandwidthController::new(
nym_credential_storage::initialise_persistent_storage(
&self.config.storage_paths.credentials_database_path,
&self
.config
.network_monitor
.storage_paths
.credentials_database_path,
)
.await,
self.nyxd_client.clone(),
)
};

let packet_sender = new_packet_sender(
self.config,
&self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
self.config.debug.gateway_sending_rate,
bandwidth_controller,
self.config.debug.disabled_credentials_mode,
);

let received_processor = new_received_processor(
received_processor_receiver_channel,
Arc::clone(&encryption_keypair),
ack_key,
);
let summary_producer = new_summary_producer(self.config.debug.per_node_test_packets);
let summary_producer =
new_summary_producer(self.config.network_monitor.debug.per_node_test_packets);
let packet_receiver = new_packet_receiver(
gateway_status_update_receiver,
received_processor_sender_channel,
);

let monitor = Monitor::new(
self.config,
&self.config.network_monitor,
packet_preparer,
packet_sender,
received_processor,
Expand Down Expand Up @@ -194,22 +198,16 @@ fn new_packet_preparer(
}

fn new_packet_sender(
config: &config::NetworkMonitor,
config: &Config,
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
) -> PacketSender {
PacketSender::new(
config,
gateways_status_updater,
local_identity,
config.debug.gateway_response_timeout,
config.debug.gateway_connection_timeout,
config.debug.max_concurrent_gateway_clients,
max_sending_rate,
bandwidth_controller,
disabled_credentials_mode,
)
}

Expand Down Expand Up @@ -237,7 +235,7 @@ fn new_packet_receiver(
// TODO: 1) does it still have to have separate builder or could we get rid of it now?
// TODO: 2) how do we make it non-async as other 'start' methods?
pub(crate) async fn start<R: MessageReceiver + Send + Sync + 'static>(
config: &config::NetworkMonitor,
config: &Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand Down
54 changes: 54 additions & 0 deletions nym-api/src/network_monitor/monitor/gateway_client_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender};
use crate::support::nyxd;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_gateway_client::GatewayClient;
use std::ops::{Deref, DerefMut};
use tracing::warn;

pub(crate) struct GatewayClientHandle {
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
}

impl GatewayClientHandle {
pub(crate) fn new(
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
) -> Self {
GatewayClientHandle {
client,
gateways_status_updater,
}
}
}

impl Drop for GatewayClientHandle {
fn drop(&mut self) {
if self
.gateways_status_updater
.unbounded_send(GatewayClientUpdate::Disconnect(
self.client.gateway_identity(),
))
.is_err()
{
warn!("fail to cleanly shutdown gateway connection")
}
}
}

impl Deref for GatewayClientHandle {
type Target = GatewayClient<nyxd::Client, PersistentStorage>;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl DerefMut for GatewayClientHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
102 changes: 0 additions & 102 deletions nym-api/src/network_monitor/monitor/gateway_clients_cache.rs

This file was deleted.

Loading

0 comments on commit e90a01f

Please sign in to comment.