From ab20260a2fd27eed7a8223937901e2d723ddfb4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bogdan-=C8=98tefan=20Neac=C5=9Fu?= Date: Fri, 29 Nov 2024 14:56:39 +0200 Subject: [PATCH] Guard storage access with cache (#5193) * Guard storage access with cache * Do the sync way less freq * Change sync behaviour for bandwidth too * Use bigger delta --- Cargo.lock | 1 + .../src/bandwidth_storage_manager.rs | 2 +- .../src/client_bandwidth.rs | 4 +- common/wireguard/Cargo.toml | 1 + common/wireguard/src/lib.rs | 3 +- common/wireguard/src/peer_controller.rs | 24 ++- common/wireguard/src/peer_handle.rs | 30 ++-- common/wireguard/src/peer_storage_manager.rs | 138 ++++++++++++++++++ 8 files changed, 184 insertions(+), 19 deletions(-) create mode 100644 common/wireguard/src/peer_storage_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 3be4b4751b2..6557b2c99ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6749,6 +6749,7 @@ dependencies = [ "nym-task", "nym-wireguard-types", "thiserror", + "time", "tokio", "tokio-stream", "x25519-dalek", diff --git a/common/credential-verification/src/bandwidth_storage_manager.rs b/common/credential-verification/src/bandwidth_storage_manager.rs index 3e35fd9eb2b..7c3136e91ca 100644 --- a/common/credential-verification/src/bandwidth_storage_manager.rs +++ b/common/credential-verification/src/bandwidth_storage_manager.rs @@ -111,7 +111,7 @@ impl BandwidthStorageManager { } #[instrument(level = "trace", skip_all)] - async fn sync_storage_bandwidth(&mut self) -> Result<()> { + pub async fn sync_storage_bandwidth(&mut self) -> Result<()> { trace!("syncing client bandwidth with the underlying storage"); let updated = self .storage diff --git a/common/credential-verification/src/client_bandwidth.rs b/common/credential-verification/src/client_bandwidth.rs index 9b764714e88..d98f89b5117 100644 --- a/common/credential-verification/src/client_bandwidth.rs +++ b/common/credential-verification/src/client_bandwidth.rs @@ -8,8 +8,8 @@ use std::time::Duration; use time::OffsetDateTime; use tokio::sync::RwLock; -const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5); -const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB +const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_secs(5 * 60); // 5 minutes +const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 5 * 1024 * 1024; // 5MB #[derive(Debug, Clone, Copy)] pub struct BandwidthFlushingBehaviourConfig { diff --git a/common/wireguard/Cargo.toml b/common/wireguard/Cargo.toml index f20651acbd6..c594e6b8911 100644 --- a/common/wireguard/Cargo.toml +++ b/common/wireguard/Cargo.toml @@ -26,6 +26,7 @@ log.workspace = true thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] } tokio-stream = { workspace = true } +time = { workspace = true } nym-authenticator-requests = { path = "../authenticator-requests" } nym-credential-verification = { path = "../credential-verification" } diff --git a/common/wireguard/src/lib.rs b/common/wireguard/src/lib.rs index ecd15769264..938c44e6508 100644 --- a/common/wireguard/src/lib.rs +++ b/common/wireguard/src/lib.rs @@ -20,6 +20,7 @@ const WG_TUN_NAME: &str = "nymwg"; pub(crate) mod error; pub mod peer_controller; pub mod peer_handle; +pub mod peer_storage_manager; pub struct WgApiWrapper { inner: WGApi, @@ -118,7 +119,7 @@ pub async fn start_wireguard storage .insert_wireguard_peer(peer, bandwidth_manager.is_some()) .await?; - peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager); + peer_bandwidth_managers.insert(peer.public_key.clone(), (bandwidth_manager, peer.clone())); } wg_api.create_interface()?; let interface_config = InterfaceConfiguration { diff --git a/common/wireguard/src/peer_controller.rs b/common/wireguard/src/peer_controller.rs index 8c7d94784ae..321b15462f9 100644 --- a/common/wireguard/src/peer_controller.rs +++ b/common/wireguard/src/peer_controller.rs @@ -20,9 +20,9 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use crate::peer_handle::PeerHandle; use crate::WgApiWrapper; use crate::{error::Error, peer_handle::SharedBandwidthStorageManager}; +use crate::{peer_handle::PeerHandle, peer_storage_manager::PeerStorageManager}; pub enum PeerControlRequest { AddPeer { @@ -79,7 +79,7 @@ impl PeerController { storage: St, wg_api: Arc, initial_host_information: Host, - bw_storage_managers: HashMap>>, + bw_storage_managers: HashMap>, Peer)>, request_tx: mpsc::Sender, request_rx: mpsc::Receiver, task_client: nym_task::TaskClient, @@ -88,11 +88,16 @@ impl PeerController { tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK), ); let host_information = Arc::new(RwLock::new(initial_host_information)); - for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() { - let mut handle = PeerHandle::new( + for (public_key, (bandwidth_storage_manager, peer)) in bw_storage_managers.iter() { + let peer_storage_manager = PeerStorageManager::new( storage.clone(), + peer.clone(), + bandwidth_storage_manager.is_some(), + ); + let mut handle = PeerHandle::new( public_key.clone(), host_information.clone(), + peer_storage_manager, bandwidth_storage_manager.clone(), request_tx.clone(), &task_client, @@ -103,6 +108,10 @@ impl PeerController { } }); } + let bw_storage_managers = bw_storage_managers + .into_iter() + .map(|(k, (m, _))| (k, m)) + .collect(); PeerController { storage, @@ -184,10 +193,15 @@ impl PeerController { Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key) .await? .map(|bw_m| Arc::new(RwLock::new(bw_m))); - let mut handle = PeerHandle::new( + let peer_storage_manager = PeerStorageManager::new( self.storage.clone(), + peer.clone(), + bandwidth_storage_manager.is_some(), + ); + let mut handle = PeerHandle::new( peer.public_key.clone(), self.host_information.clone(), + peer_storage_manager, bandwidth_storage_manager.clone(), self.request_tx.clone(), &self.task_client, diff --git a/common/wireguard/src/peer_handle.rs b/common/wireguard/src/peer_handle.rs index 71fa06f8474..a6e5f9e5026 100644 --- a/common/wireguard/src/peer_handle.rs +++ b/common/wireguard/src/peer_handle.rs @@ -3,6 +3,7 @@ use crate::error::Error; use crate::peer_controller::PeerControlRequest; +use crate::peer_storage_manager::PeerStorageManager; use defguard_wireguard_rs::host::Peer; use defguard_wireguard_rs::{host::Host, key::Key}; use futures::channel::oneshot; @@ -21,9 +22,9 @@ pub(crate) type SharedBandwidthStorageManager = Arc { - storage: St, public_key: Key, host_information: Arc>, + peer_storage_manager: PeerStorageManager, bandwidth_storage_manager: Option>, request_tx: mpsc::Sender, timeout_check_interval: IntervalStream, @@ -33,9 +34,9 @@ pub struct PeerHandle { impl PeerHandle { pub fn new( - storage: St, public_key: Key, host_information: Arc>, + peer_storage_manager: PeerStorageManager, bandwidth_storage_manager: Option>, request_tx: mpsc::Sender, task_client: &TaskClient, @@ -46,9 +47,9 @@ impl PeerHandle { let mut task_client = task_client.fork(format!("peer-{public_key}")); task_client.disarm(); PeerHandle { - storage, public_key, host_information, + peer_storage_manager, bandwidth_storage_manager, request_tx, timeout_check_interval, @@ -84,16 +85,19 @@ impl PeerHandle { .ok_or(Error::InconsistentConsumedBytes)? .try_into() .map_err(|_| Error::InconsistentConsumedBytes)?; - if spent_bandwidth > 0 - && bandwidth_manager + if spent_bandwidth > 0 { + self.peer_storage_manager.update_trx(kernel_peer); + if bandwidth_manager .write() .await .try_use_bandwidth(spent_bandwidth) .await .is_err() - { - let success = self.remove_peer().await?; - return Ok(!success); + { + let success = self.remove_peer().await?; + self.peer_storage_manager.remove_peer(); + return Ok(!success); + } } } else { if SystemTime::now().duration_since(self.startup_timestamp)? >= AUTO_REMOVE_AFTER { @@ -132,7 +136,7 @@ impl PeerHandle { // the host information hasn't beed updated yet continue; }; - let Some(storage_peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else { + let Some(storage_peer) = self.peer_storage_manager.get_peer() else { log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key); return Ok(()); }; @@ -141,12 +145,18 @@ impl PeerHandle { return Ok(()); } else { // Update storage values - self.storage.insert_wireguard_peer(&kernel_peer, self.bandwidth_storage_manager.is_some()).await?; + self.peer_storage_manager.sync_storage_peer().await?; } } _ = self.task_client.recv() => { log::trace!("PeerHandle: Received shutdown"); + if let Some(bandwidth_manager) = &self.bandwidth_storage_manager { + if let Err(e) = bandwidth_manager.write().await.sync_storage_bandwidth().await { + log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed"); + } + } + log::trace!("PeerHandle: Finished shutdown"); } } } diff --git a/common/wireguard/src/peer_storage_manager.rs b/common/wireguard/src/peer_storage_manager.rs new file mode 100644 index 00000000000..f7992b8bb0a --- /dev/null +++ b/common/wireguard/src/peer_storage_manager.rs @@ -0,0 +1,138 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use crate::error::Error; +use defguard_wireguard_rs::host::Peer; +use nym_gateway_storage::models::WireguardPeer; +use nym_gateway_storage::Storage; +use std::time::Duration; +use time::OffsetDateTime; + +const DEFAULT_PEER_MAX_FLUSHING_RATE: Duration = Duration::from_secs(60 * 60 * 24); // 24h +const DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT: u64 = 512 * 1024 * 1024; // 512MB + +#[derive(Debug, Clone, Copy)] +pub struct PeerFlushingBehaviourConfig { + /// Defines maximum delay between peer information being flushed to the persistent storage. + pub peer_max_flushing_rate: Duration, + + /// Defines a maximum change in peer before it gets flushed to the persistent storage. + pub peer_max_delta_flushing_amount: u64, +} + +impl Default for PeerFlushingBehaviourConfig { + fn default() -> Self { + Self { + peer_max_flushing_rate: DEFAULT_PEER_MAX_FLUSHING_RATE, + peer_max_delta_flushing_amount: DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT, + } + } +} + +pub struct PeerStorageManager { + pub(crate) storage: S, + pub(crate) peer_information: Option, + pub(crate) cfg: PeerFlushingBehaviourConfig, + pub(crate) with_client_id: bool, +} + +impl PeerStorageManager { + pub(crate) fn new(storage: S, peer: Peer, with_client_id: bool) -> Self { + let peer_information = Some(PeerInformation::new(peer)); + Self { + storage, + peer_information, + cfg: PeerFlushingBehaviourConfig::default(), + with_client_id, + } + } + + pub(crate) fn get_peer(&self) -> Option { + self.peer_information + .as_ref() + .map(|p| p.peer.clone().into()) + } + + pub(crate) fn remove_peer(&mut self) { + self.peer_information = None; + } + + pub(crate) fn update_trx(&mut self, kernel_peer: &Peer) { + if let Some(peer_information) = self.peer_information.as_mut() { + peer_information.update_trx_bytes(kernel_peer.tx_bytes, kernel_peer.rx_bytes); + } + } + + pub(crate) async fn sync_storage_peer(&mut self) -> Result<(), Error> { + let Some(peer_information) = self.peer_information.as_mut() else { + return Ok(()); + }; + if !peer_information.should_sync(self.cfg) { + return Ok(()); + } + if self + .storage + .get_wireguard_peer(&peer_information.peer().public_key.to_string()) + .await? + .is_none() + { + self.peer_information = None; + return Ok(()); + } + self.storage + .insert_wireguard_peer(peer_information.peer(), self.with_client_id) + .await?; + + peer_information.resync_peer_with_storage(); + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct PeerInformation { + pub(crate) peer: Peer, + pub(crate) last_synced: OffsetDateTime, + + pub(crate) bytes_delta_since_sync: u64, +} + +impl PeerInformation { + pub fn new(peer: Peer) -> PeerInformation { + PeerInformation { + peer, + last_synced: OffsetDateTime::now_utc(), + bytes_delta_since_sync: 0, + } + } + + pub(crate) fn should_sync(&self, cfg: PeerFlushingBehaviourConfig) -> bool { + if self.bytes_delta_since_sync >= cfg.peer_max_delta_flushing_amount { + return true; + } + + if self.last_synced + cfg.peer_max_flushing_rate < OffsetDateTime::now_utc() + && self.bytes_delta_since_sync != 0 + { + return true; + } + + false + } + + pub(crate) fn peer(&self) -> &Peer { + &self.peer + } + + pub(crate) fn update_trx_bytes(&mut self, tx_bytes: u64, rx_bytes: u64) { + self.bytes_delta_since_sync += tx_bytes.saturating_sub(self.peer.tx_bytes) + + rx_bytes.saturating_sub(self.peer.rx_bytes); + self.peer.tx_bytes = tx_bytes; + self.peer.rx_bytes = rx_bytes; + } + + pub(crate) fn resync_peer_with_storage(&mut self) { + self.bytes_delta_since_sync = 0; + self.last_synced = OffsetDateTime::now_utc(); + } +}