Skip to content

Commit

Permalink
Guard storage access with cache
Browse files Browse the repository at this point in the history
  • Loading branch information
neacsu committed Nov 29, 2024
1 parent 889d464 commit 6155398
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/wireguard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ log.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "io-util"] }
tokio-stream = { workspace = true }
time = { workspace = true }

nym-authenticator-requests = { path = "../authenticator-requests" }
nym-credential-verification = { path = "../credential-verification" }
Expand Down
3 changes: 2 additions & 1 deletion common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
pub(crate) mod error;
pub mod peer_controller;
pub mod peer_handle;
pub mod peer_storage_manager;

pub struct WgApiWrapper {
inner: WGApi,
Expand Down Expand Up @@ -118,7 +119,7 @@ pub async fn start_wireguard<St: nym_gateway_storage::Storage + Clone + 'static>
storage
.insert_wireguard_peer(peer, bandwidth_manager.is_some())
.await?;
peer_bandwidth_managers.insert(peer.public_key.clone(), bandwidth_manager);
peer_bandwidth_managers.insert(peer.public_key.clone(), (bandwidth_manager, peer.clone()));
}
wg_api.create_interface()?;
let interface_config = InterfaceConfiguration {
Expand Down
24 changes: 19 additions & 5 deletions common/wireguard/src/peer_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::{wrappers::IntervalStream, StreamExt};

use crate::peer_handle::PeerHandle;
use crate::WgApiWrapper;
use crate::{error::Error, peer_handle::SharedBandwidthStorageManager};
use crate::{peer_handle::PeerHandle, peer_storage_manager::PeerStorageManager};

pub enum PeerControlRequest {
AddPeer {
Expand Down Expand Up @@ -79,7 +79,7 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
storage: St,
wg_api: Arc<WgApiWrapper>,
initial_host_information: Host,
bw_storage_managers: HashMap<Key, Option<SharedBandwidthStorageManager<St>>>,
bw_storage_managers: HashMap<Key, (Option<SharedBandwidthStorageManager<St>>, Peer)>,
request_tx: mpsc::Sender<PeerControlRequest>,
request_rx: mpsc::Receiver<PeerControlRequest>,
task_client: nym_task::TaskClient,
Expand All @@ -88,11 +88,16 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
tokio::time::interval(DEFAULT_PEER_TIMEOUT_CHECK),
);
let host_information = Arc::new(RwLock::new(initial_host_information));
for (public_key, bandwidth_storage_manager) in bw_storage_managers.iter() {
let mut handle = PeerHandle::new(
for (public_key, (bandwidth_storage_manager, peer)) in bw_storage_managers.iter() {
let peer_storage_manager = PeerStorageManager::new(
storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
let mut handle = PeerHandle::new(
public_key.clone(),
host_information.clone(),
peer_storage_manager,
bandwidth_storage_manager.clone(),
request_tx.clone(),
&task_client,
Expand All @@ -103,6 +108,10 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
}
});
}
let bw_storage_managers = bw_storage_managers
.into_iter()
.map(|(k, (m, _))| (k, m))
.collect();

PeerController {
storage,
Expand Down Expand Up @@ -184,10 +193,15 @@ impl<St: Storage + Clone + 'static> PeerController<St> {
Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key)
.await?
.map(|bw_m| Arc::new(RwLock::new(bw_m)));
let mut handle = PeerHandle::new(
let peer_storage_manager = PeerStorageManager::new(
self.storage.clone(),
peer.clone(),
bandwidth_storage_manager.is_some(),
);
let mut handle = PeerHandle::new(
peer.public_key.clone(),
self.host_information.clone(),
peer_storage_manager,
bandwidth_storage_manager.clone(),
self.request_tx.clone(),
&self.task_client,
Expand Down
24 changes: 14 additions & 10 deletions common/wireguard/src/peer_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::error::Error;
use crate::peer_controller::PeerControlRequest;
use crate::peer_storage_manager::PeerStorageManager;
use defguard_wireguard_rs::host::Peer;
use defguard_wireguard_rs::{host::Host, key::Key};
use futures::channel::oneshot;
Expand All @@ -21,9 +22,9 @@ pub(crate) type SharedBandwidthStorageManager<St> = Arc<RwLock<BandwidthStorageM
const AUTO_REMOVE_AFTER: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 30 days

pub struct PeerHandle<St> {
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager<St>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
timeout_check_interval: IntervalStream,
Expand All @@ -33,9 +34,9 @@ pub struct PeerHandle<St> {

impl<St: Storage + Clone + 'static> PeerHandle<St> {
pub fn new(
storage: St,
public_key: Key,
host_information: Arc<RwLock<Host>>,
peer_storage_manager: PeerStorageManager<St>,
bandwidth_storage_manager: Option<SharedBandwidthStorageManager<St>>,
request_tx: mpsc::Sender<PeerControlRequest>,
task_client: &TaskClient,
Expand All @@ -46,9 +47,9 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
let mut task_client = task_client.fork(format!("peer-{public_key}"));
task_client.disarm();
PeerHandle {
storage,
public_key,
host_information,
peer_storage_manager,
bandwidth_storage_manager,
request_tx,
timeout_check_interval,
Expand Down Expand Up @@ -84,16 +85,19 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
.ok_or(Error::InconsistentConsumedBytes)?
.try_into()
.map_err(|_| Error::InconsistentConsumedBytes)?;
if spent_bandwidth > 0
&& bandwidth_manager
if spent_bandwidth > 0 {
self.peer_storage_manager.update_trx(kernel_peer);
if bandwidth_manager
.write()
.await
.try_use_bandwidth(spent_bandwidth)
.await
.is_err()
{
let success = self.remove_peer().await?;
return Ok(!success);
{
let success = self.remove_peer().await?;
self.peer_storage_manager.remove_peer();
return Ok(!success);
}
}
} else {
if SystemTime::now().duration_since(self.startup_timestamp)? >= AUTO_REMOVE_AFTER {
Expand Down Expand Up @@ -132,7 +136,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
// the host information hasn't beed updated yet
continue;
};
let Some(storage_peer) = self.storage.get_wireguard_peer(&self.public_key.to_string()).await? else {
let Some(storage_peer) = self.peer_storage_manager.get_peer() else {
log::debug!("Peer {:?} not in storage anymore, shutting down handle", self.public_key);
return Ok(());
};
Expand All @@ -141,7 +145,7 @@ impl<St: Storage + Clone + 'static> PeerHandle<St> {
return Ok(());
} else {
// Update storage values
self.storage.insert_wireguard_peer(&kernel_peer, self.bandwidth_storage_manager.is_some()).await?;
self.peer_storage_manager.sync_storage_peer().await?;
}
}

Expand Down
138 changes: 138 additions & 0 deletions common/wireguard/src/peer_storage_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::error::Error;
use defguard_wireguard_rs::host::Peer;
use nym_gateway_storage::models::WireguardPeer;
use nym_gateway_storage::Storage;
use std::time::Duration;
use time::OffsetDateTime;

const DEFAULT_PEER_MAX_FLUSHING_RATE: Duration = Duration::from_secs(5);
const DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT: u64 = 512 * 1024; // 512kB

#[derive(Debug, Clone, Copy)]
pub struct PeerFlushingBehaviourConfig {
/// Defines maximum delay between peer information being flushed to the persistent storage.
pub peer_max_flushing_rate: Duration,

/// Defines a maximum change in peer before it gets flushed to the persistent storage.
pub peer_max_delta_flushing_amount: u64,
}

impl Default for PeerFlushingBehaviourConfig {
fn default() -> Self {
Self {
peer_max_flushing_rate: DEFAULT_PEER_MAX_FLUSHING_RATE,
peer_max_delta_flushing_amount: DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT,
}
}
}

pub struct PeerStorageManager<S> {
pub(crate) storage: S,
pub(crate) peer_information: Option<PeerInformation>,
pub(crate) cfg: PeerFlushingBehaviourConfig,
pub(crate) with_client_id: bool,
}

impl<S: Storage + Clone + 'static> PeerStorageManager<S> {
pub(crate) fn new(storage: S, peer: Peer, with_client_id: bool) -> Self {
let peer_information = Some(PeerInformation::new(peer));
Self {
storage,
peer_information,
cfg: PeerFlushingBehaviourConfig::default(),
with_client_id,
}
}

pub(crate) fn get_peer(&self) -> Option<WireguardPeer> {
self.peer_information
.as_ref()
.map(|p| p.peer.clone().into())
}

pub(crate) fn remove_peer(&mut self) {
self.peer_information = None;
}

pub(crate) fn update_trx(&mut self, kernel_peer: &Peer) {
if let Some(peer_information) = self.peer_information.as_mut() {
peer_information.update_trx_bytes(kernel_peer.tx_bytes, kernel_peer.rx_bytes);
}
}

pub(crate) async fn sync_storage_peer(&mut self) -> Result<(), Error> {
let Some(peer_information) = self.peer_information.as_mut() else {
return Ok(());
};
if !peer_information.should_sync(self.cfg) {
return Ok(());
}
if self
.storage
.get_wireguard_peer(&peer_information.peer().public_key.to_string())
.await?
.is_none()
{
self.peer_information = None;
return Ok(());
}
self.storage
.insert_wireguard_peer(peer_information.peer(), self.with_client_id)
.await?;

peer_information.resync_peer_with_storage();

Ok(())
}
}

#[derive(Clone, Debug)]
pub(crate) struct PeerInformation {
pub(crate) peer: Peer,
pub(crate) last_synced: OffsetDateTime,

pub(crate) bytes_delta_since_sync: u64,
}

impl PeerInformation {
pub fn new(peer: Peer) -> PeerInformation {
PeerInformation {
peer,
last_synced: OffsetDateTime::now_utc(),
bytes_delta_since_sync: 0,
}
}

pub(crate) fn should_sync(&self, cfg: PeerFlushingBehaviourConfig) -> bool {
if self.bytes_delta_since_sync >= cfg.peer_max_delta_flushing_amount {
return true;
}

if self.last_synced + cfg.peer_max_flushing_rate < OffsetDateTime::now_utc()
&& self.bytes_delta_since_sync != 0
{
return true;
}

false
}

pub(crate) fn peer(&self) -> &Peer {
&self.peer
}

pub(crate) fn update_trx_bytes(&mut self, tx_bytes: u64, rx_bytes: u64) {
self.bytes_delta_since_sync += tx_bytes.saturating_sub(self.peer.tx_bytes)
+ rx_bytes.saturating_sub(self.peer.rx_bytes);
self.peer.tx_bytes = tx_bytes;
self.peer.rx_bytes = rx_bytes;
}

pub(crate) fn resync_peer_with_storage(&mut self) {
self.bytes_delta_since_sync = 0;
self.last_synced = OffsetDateTime::now_utc();
}
}

0 comments on commit 6155398

Please sign in to comment.