Skip to content

Commit

Permalink
Add network-activity trigger ability for batcher
Browse files Browse the repository at this point in the history
Add triggering ability to batcher so it could
evaluate deadlines and thresholds on demand.

The approach is simple - any activity on sent
or received data on any peer will trigger the batcher.

This is built on assumption that triggering on incoming
data would sync the batchers between two devices. However
triggering only between two devices would leave other devices
unsynced, thus simplified approach works even better
to "sync the clocks" across all the nodes.

Side effects:
consider having 3 interconnected peers: A, B and C.
Peer C is idling and A streams data to B.
Now A and B is each triggered on every packet and in turn
send premature keepalives to C at T_new=T_orig-threshold.

Triggering will happen mostly on wg_consolidate() which happens
every second.

Signed-off-by: Lukas Pukenis <[email protected]>
  • Loading branch information
LukasPukenis committed Oct 10, 2024
1 parent e584bfe commit a8a34d8
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 85 deletions.
Empty file.
178 changes: 153 additions & 25 deletions crates/telio-batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,28 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::{collections::HashMap, sync::Arc};

use telio_utils::telio_log_debug;
use telio_utils::{telio_log_debug, telio_log_warn};
use tokio;
use tokio::time::sleep_until;

type Action<V, R = std::result::Result<(), ()>> =
Arc<dyn for<'a> Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>;

/// Guards against triggers that happened long time ago
const TRIGGER_EFFECTIVE_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(5);

/// Batcher holds (actions, interval, threshold). When polled, batcher will
/// return a list of actions that should be executed now.
pub struct Batcher<K, V> {
actions: HashMap<K, (BatchEntry, Action<V>)>,

/// Adding new action must be immediately returned from `get_actions` if polled right away.
/// In case we're already polling, we need to notify the tokio::select! about such an event.
notify_add: tokio::sync::Notify,

/// Triggering batcher must be handled inside of tokio::select!
notify_trigger_timestamp: Option<tokio::time::Instant>,
notify_trigger: tokio::sync::Notify,
}

struct BatchEntry {
Expand All @@ -37,56 +50,77 @@ where
Self {
actions: HashMap::new(),
notify_add: tokio::sync::Notify::new(),
notify_trigger: tokio::sync::Notify::new(),
notify_trigger_timestamp: None,
}
}

/// Batching works by sleeping until the nearest future and then trying to batch more actions
/// based on the threshold value. Higher delay before calling the function will increase the chances of batching
/// because the deadlines will _probably_ be in the past already.
/// Adding a new action wakes up the batcher due to immediate trigger of the action.
/// Batcher works in a polling manner, meaning the call site must invoke the actions.
/// When polled, batcher will await until the nearest future, new action addition or a trigger.
/// Once resolved, batcher will try to batch all the actions that are within their respective
/// thresholds.
pub async fn get_actions(&mut self) -> Vec<(K, Action<V>)> {
let mut batched_actions: Vec<(K, Action<V>)> = vec![];

loop {
if !self.actions.is_empty() {
let actions = &mut self.actions;

// TODO: This can be optimized by early breaking and precollecting items beforehand
if let Some(closest_entry) = actions.values().min_by_key(|entry| entry.0.deadline) {
tokio::select! {
_ = self.notify_add.notified() => {
// Item was added, we need to immediately emit it
}
_ = sleep_until(closest_entry.0.deadline) => {
// Closest action should now be emitted
let active_trigger = self
.notify_trigger_timestamp
.take()
.map_or(false, |ts| ts.elapsed() < TRIGGER_EFFECTIVE_DURATION);

if !active_trigger {
if let Some(closest_entry) =
actions.values().min_by_key(|entry| entry.0.deadline)
{
tokio::select! {
_ = self.notify_add.notified() => {
telio_log_debug!("New item added");
}
_ = sleep_until(closest_entry.0.deadline) => {
telio_log_debug!("Action deadline reached");
}
_ = self.notify_trigger.notified() => {
telio_log_debug!("Trigger received");
}
}
}
}

let now = tokio::time::Instant::now();
// at this point in time we know we're at the earliest spot for batching, thus we can check if we have more actions to add
for (key, action) in actions.iter_mut() {
let adjusted_action_deadline = now + action.0.threshold;
let now = tokio::time::Instant::now();
for (key, action) in actions.iter_mut() {
let adjusted_deadline = now + action.0.threshold;

if action.0.deadline <= adjusted_action_deadline {
action.0.deadline = now + action.0.interval;
batched_actions.push((key.clone(), action.1.clone()));
}
if action.0.deadline <= adjusted_deadline {
action.0.deadline = now + action.0.interval;
batched_actions.push((key.clone(), action.1.clone()));
}
}

return batched_actions;
} else {
let _ = self.notify_add.notified().await;
_ = self.notify_add.notified().await;
}
}
}

/// Remove batcher action. Action is no longer eligible for batching
pub fn remove(&mut self, key: &K) {
telio_log_debug!("removing item from batcher with key({:?})", key);
telio_log_debug!("Removing item from batcher with key({:?})", key);
self.actions.remove(key);
}

/// Due to async nature of batcher code it will await until an action becomes available.
/// This function allows for premature evaluation of actions.
/// Calling this function in a tight loop with result in actions
/// being returned at T-threshold time.
pub fn trigger(&mut self) {
telio_log_debug!("Triggering batcher");
self.notify_trigger_timestamp = Some(tokio::time::Instant::now());
self.notify_trigger.notify_waiters();
}

/// Add batcher action. Batcher itself doesn't run the tasks and depends
/// on actions being manually invoked. Adding an action immediately triggers it
/// thus if the call site awaits for the future then it will resolve immediately after this
Expand All @@ -99,11 +133,25 @@ where
action: Action<V>,
) {
telio_log_debug!(
"adding item to batcher with key({:?}), interval({:?}), threshold({:?})",
"Adding item to batcher with key({:?}), interval({:?}), threshold({:?})",
key,
interval,
threshold,
);

let threshold = {
if threshold >= interval {
let capped_threshold = interval / 2;
telio_log_warn!(
"Threshold should not be bigger than the interval. Overriding to ({:?})",
capped_threshold
);
capped_threshold
} else {
threshold
}
};

let entry = BatchEntry {
deadline: tokio::time::Instant::now(),
interval,
Expand Down Expand Up @@ -133,6 +181,86 @@ mod tests {

use crate::batcher::Batcher;

#[tokio::test(start_paused = true)]
async fn batch_and_trigger() {
let start_time = tokio::time::Instant::now();
let mut batcher = Batcher::<String, TestChecker>::new();

batcher.add(
"key0".to_owned(),
Duration::from_secs(100),
Duration::from_secs(50),
Arc::new(|s: _| {
Box::pin(async move {
s.values
.push(("key0".to_owned(), tokio::time::Instant::now()));
Ok(())
})
}),
);

let mut test_checker = TestChecker { values: Vec::new() };

// pick up the immediate fire
for ac in batcher.get_actions().await {
ac.1(&mut test_checker).await.unwrap();
}
assert!(test_checker.values.len() == 1);

let create_time_checkpoint =
|add: u64| tokio::time::Instant::now() + tokio::time::Duration::from_secs(add);

let mut trigger_timepoints = vec![
create_time_checkpoint(10),
create_time_checkpoint(20),
create_time_checkpoint(60),
create_time_checkpoint(90),
create_time_checkpoint(200),
create_time_checkpoint(270),
create_time_checkpoint(280),
create_time_checkpoint(730),
create_time_checkpoint(1000),
];

use tokio::time::sleep_until;
loop {
tokio::select! {
_ = sleep_until(trigger_timepoints[0]) => {
batcher.trigger();
trigger_timepoints.remove(0);
if trigger_timepoints.len() == 0 {
break
}
}

actions = batcher.get_actions() => {
for ac in &actions {
ac.1(&mut test_checker).await.unwrap();
}
}
}
}

let key0_entries: Vec<tokio::time::Duration> = test_checker
.values
.iter()
.filter(|e| e.0 == "key0")
.map(|e| e.1.duration_since(start_time))
.collect();

let expected_diff_values: Vec<Duration> =
vec![0, 60, 160, 260, 360, 460, 560, 660, 730, 830, 930]
.iter()
.map(|v| tokio::time::Duration::from_secs(*v))
.collect();
assert!(
key0_entries == expected_diff_values,
"expected: {:?}, got: {:?}",
expected_diff_values,
key0_entries
);
}

#[tokio::test(start_paused = true)]
async fn batch_one() {
let start_time = tokio::time::Instant::now();
Expand Down
44 changes: 39 additions & 5 deletions crates/telio-traversal/src/session_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use telio_task::{task_exec, BoxAction, Runtime, Task};
use telio_utils::{
dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions,
};

use telio_wg::NetworkActivityGetter;
const PING_PAYLOAD_SIZE: usize = 56;

/// Possible [SessionKeeper] errors.
Expand Down Expand Up @@ -62,7 +62,10 @@ pub struct SessionKeeper {
}

impl SessionKeeper {
pub fn start(sock_pool: Arc<SocketPool>) -> Result<Self> {
pub fn start(
sock_pool: Arc<SocketPool>,
network_activity_getter: Option<Arc<dyn NetworkActivityGetter>>,
) -> Result<Self> {
let (client_v4, client_v6) = (
PingerClient::new(&Self::make_builder(ICMP::V4).build())
.map_err(|e| Error::PingerCreationError(ICMP::V4, e))?,
Expand All @@ -81,6 +84,9 @@ impl SessionKeeper {
},
batched_actions: Batcher::new(),
nonbatched_actions: RepeatedActions::default(),
network_activity_getter,
last_tx_ts: None,
last_rx_ts: None,
}),
})
}
Expand Down Expand Up @@ -128,14 +134,19 @@ async fn ping(pingers: &Pingers, targets: (&PublicKey, &DualTarget)) -> Result<(
let (primary, secondary) = targets.1.get_targets()?;
let public_key = targets.0;

telio_log_debug!("Pinging primary target {:?} on {:?}", public_key, primary);

let primary_client = match primary {
IpAddr::V4(_) => &pingers.pinger_client_v4,
IpAddr::V6(_) => &pingers.pinger_client_v6,
};

let ping_id = PingIdentifier(rand::random());

telio_log_debug!(
"Pinging primary target {:?} on {:?} with ping_id: {:?}",
public_key,
primary,
ping_id
);
if let Err(e) = primary_client
.pinger(primary, ping_id)
.await
Expand Down Expand Up @@ -264,7 +275,11 @@ struct State {
pingers: Pingers,
batched_actions: Batcher<PublicKey, Self>,
nonbatched_actions: RepeatedActions<PublicKey, Self, Result<()>>,
network_activity_getter: Option<Arc<dyn NetworkActivityGetter>>,
last_tx_ts: Option<tokio::time::Instant>,
last_rx_ts: Option<tokio::time::Instant>,
}

#[async_trait]
impl Runtime for State {
const NAME: &'static str = "SessionKeeper";
Expand All @@ -274,6 +289,25 @@ impl Runtime for State {
where
F: Future<Output = BoxAction<Self, std::result::Result<(), Self::Err>>> + Send,
{
let mut tx_has_changed = false;
let mut rx_has_changed = false;

// We just care about any network activity, thus no per-peer filtering.
if let Some(wg) = self.network_activity_getter.as_ref() {
if let Ok(Some(timestamps)) = wg.get_ts().await {
tx_has_changed = self.last_tx_ts.map_or(false, |ts| timestamps.tx_ts > ts);
self.last_tx_ts = Some(timestamps.tx_ts);

rx_has_changed = self.last_rx_ts.map_or(false, |ts| timestamps.rx_ts > ts);
self.last_rx_ts = Some(timestamps.rx_ts);
}
}

if tx_has_changed || rx_has_changed {
telio_log_debug!("Triggering batcher based on network activity");
self.batched_actions.trigger();
}

tokio::select! {
Ok((pk, action)) = self.nonbatched_actions.select_action() => {
let pk = *pk;
Expand Down Expand Up @@ -324,7 +358,7 @@ mod tests {
)
.unwrap(),
));
let sess_keep = SessionKeeper::start(socket_pool).unwrap();
let sess_keep = SessionKeeper::start(socket_pool, None).unwrap();

let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
.parse::<PublicKey>()
Expand Down
Loading

0 comments on commit a8a34d8

Please sign in to comment.