Skip to content

Commit

Permalink
Add trigger ability for batcher
Browse files Browse the repository at this point in the history
Batcher triggering is tied to tx and rx change timestamps.
It is not per-peer since we care about general network
activity.

Triggering short-circuits the sleeping until nearest
deadline by trying to batch at the moment of the trigger()
call. Having trigger() call in a tight loop will make
the actions be returned at shortened intervals of T-threshold.

Signed-off-by: Lukas Pukenis <[email protected]>
  • Loading branch information
LukasPukenis committed Oct 10, 2024
1 parent 15957de commit 70350bd
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 116 deletions.
Empty file.
178 changes: 155 additions & 23 deletions crates/telio-batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,28 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::{collections::HashMap, sync::Arc};

use telio_utils::telio_log_debug;
use telio_utils::{telio_log_debug, telio_log_warn};
use tokio;
use tokio::time::sleep_until;

type Action<V, R = std::result::Result<(), ()>> =
Arc<dyn for<'a> Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>;

/// When batcher is evaluating actions, how far in the past the triggered signal should be taken into an account
const TRIGGER_EFFECTIVE_DURATION: tokio::time::Duration = tokio::time::Duration::from_secs(5);

/// Batcher works by holding actions and intervals. When asynchronously queried, batcher will
/// return a list of actions that should be executed now.
pub struct Batcher<K, V> {
actions: HashMap<K, (BatchEntry, Action<V>)>,

/// Adding new action must be immediately executed(returned from `get_actions`) thus we need to notify the tokio::select!
/// about such occurence.
notify_add: tokio::sync::Notify,

/// Triggering batcher must be handled inside of tokio::select!
notify_trigger_timestamp: Option<tokio::time::Instant>,
notify_trigger: tokio::sync::Notify,
}

struct BatchEntry {
Expand All @@ -37,56 +50,77 @@ where
Self {
actions: HashMap::new(),
notify_add: tokio::sync::Notify::new(),
notify_trigger: tokio::sync::Notify::new(),
notify_trigger_timestamp: None,
}
}

/// Batching works by sleeping until the nearest future and then trying to batch more actions
/// based on the threshold value. Higher delay before calling the function will increase the chances of batching
/// because the deadlines will _probably_ be in the past already.
/// Adding a new action wakes up the batcher due to immediate trigger of the action.
/// Batching works by sleeping until the nearest future(or being triggered explicitly) and then trying to batch more actions
/// at that time-instant based on the threshold value of each action.
/// Calling this function after the deadline has expired on action will that action for execution.
/// Adding new action has a deadline of now.
pub async fn get_actions(&mut self) -> Vec<(K, Action<V>)> {
let mut batched_actions: Vec<(K, Action<V>)> = vec![];

loop {
if !self.actions.is_empty() {
let actions = &mut self.actions;

// TODO: This can be optimized by early breaking and precollecting items beforehand
if let Some(closest_entry) = actions.values().min_by_key(|entry| entry.0.deadline) {
tokio::select! {
_ = self.notify_add.notified() => {
// Item was added, we need to immediately emit it
}
_ = sleep_until(closest_entry.0.deadline) => {
// Closest action should now be emitted
let active_trigger = self.notify_trigger_timestamp.take().map_or(false, |ts| {
tokio::time::Instant::now() - ts < TRIGGER_EFFECTIVE_DURATION
});

if !active_trigger {
if let Some(closest_entry) =
actions.values().min_by_key(|entry| entry.0.deadline)
{
tokio::select! {
_ = self.notify_add.notified() => {
// Item was added, we need to immediately emit it
}
_ = sleep_until(closest_entry.0.deadline) => {
// Closest action should now be emitted
}
// we received a premature batch trigger
_ = self.notify_trigger.notified() => {
// trigger notification
}
}
}
}

let now = tokio::time::Instant::now();
// at this point in time we know we're at the earliest spot for batching, thus we can check if we have more actions to add
for (key, action) in actions.iter_mut() {
let adjusted_action_deadline = now + action.0.threshold;
let now = tokio::time::Instant::now();
for (key, action) in actions.iter_mut() {
let adjusted_deadline = now + action.0.threshold;

if action.0.deadline <= adjusted_action_deadline {
action.0.deadline = now + action.0.interval;
batched_actions.push((key.clone(), action.1.clone()));
}
if action.0.deadline <= adjusted_deadline {
action.0.deadline = now + action.0.interval;
batched_actions.push((key.clone(), action.1.clone()));
}
}

return batched_actions;
} else {
let _ = self.notify_add.notified().await;
_ = self.notify_add.notified().await;
}
}
}

/// Remove batcher action. Action is no longer eligible for batching
pub fn remove(&mut self, key: &K) {
telio_log_debug!("removing item from batcher with key({:?})", key);
self.actions.remove(key);
}

/// Due to async nature of batcher code it will await until an action becomes available.
/// This function allows for premature evaluation of actions.
/// Calling this function in a tight loop with result in actions
/// being returned at T-threshold time.
pub fn trigger(&mut self) {
telio_log_debug!("triggering batcher");
self.notify_trigger_timestamp = Some(tokio::time::Instant::now());
self.notify_trigger.notify_waiters();
}

/// Add batcher action. Batcher itself doesn't run the tasks and depends
/// on actions being manually invoked. Adding an action immediately triggers it
/// thus if the call site awaits for the future then it will resolve immediately after this
Expand All @@ -104,6 +138,20 @@ where
interval,
threshold,
);

let threshold = {
if threshold >= interval {
let capped_threshold = interval / 2;
telio_log_warn!(
"Threshold should not be bigger than the interval. Overriding to ({:?})",
capped_threshold
);
capped_threshold
} else {
threshold
}
};

let entry = BatchEntry {
deadline: tokio::time::Instant::now(),
interval,
Expand Down Expand Up @@ -133,6 +181,90 @@ mod tests {

use crate::batcher::Batcher;

#[tokio::test(start_paused = true)]
async fn batch_and_trigger() {
let start_time = tokio::time::Instant::now();
let mut batcher = Batcher::<String, TestChecker>::new();

batcher.add(
"key0".to_owned(),
Duration::from_secs(100),
Duration::from_secs(50),
Arc::new(|s: _| {
Box::pin(async move {
s.values
.push(("key0".to_owned(), tokio::time::Instant::now()));
Ok(())
})
}),
);

let mut test_checker = TestChecker { values: Vec::new() };

// pick up the immediate fire
for ac in batcher.get_actions().await {
ac.1(&mut test_checker).await.unwrap();
}
assert!(test_checker.values.len() == 1);

let create_time_checkpoint =
|add: u64| tokio::time::Instant::now() + tokio::time::Duration::from_secs(add);

let mut trigger_timepoints = vec![
create_time_checkpoint(10),
create_time_checkpoint(20),
create_time_checkpoint(60),
create_time_checkpoint(90),
create_time_checkpoint(200),
create_time_checkpoint(270),
create_time_checkpoint(280),
create_time_checkpoint(730),
create_time_checkpoint(1000),
];

use tokio::time::sleep_until;
loop {
tokio::select! {
_ = tokio::time::advance(tokio::time::Duration::from_secs(1)) => {
}

_ = sleep_until(trigger_timepoints[0]) => { batcher.trigger(); trigger_timepoints.remove(0); if trigger_timepoints.len() == 0 {break} }

actions = batcher.get_actions() => {
for ac in &actions {
ac.1(&mut test_checker).await.unwrap();
}
}
}
}

let round_duration_to_nearest_10_secs = |duration: Duration| -> Duration {
let seconds =
duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let rounded = (seconds / 10.0).round() * 10.0;
Duration::from_secs(rounded as u64)
};

let key0_entries: Vec<tokio::time::Duration> = test_checker
.values
.iter()
.filter(|e| e.0 == "key0")
.map(|e| round_duration_to_nearest_10_secs(e.1.duration_since(start_time)))
.collect();

let expected_diff_values: Vec<Duration> =
vec![0, 60, 160, 260, 360, 460, 560, 660, 730, 830, 930]
.iter()
.map(|v| tokio::time::Duration::from_secs(*v))
.collect();
assert!(
key0_entries == expected_diff_values,
"expected: {:?}, got: {:?}",
expected_diff_values,
key0_entries
);
}

#[tokio::test(start_paused = true)]
async fn batch_one() {
let start_time = tokio::time::Instant::now();
Expand Down
10 changes: 7 additions & 3 deletions crates/telio-relay/src/derp/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub async fn start_read<R: AsyncRead + Unpin>(

if enabled!(Level::TRACE) {
telio_log_trace!(
"DERP Rx: {} -> {}, frame type: {:?}, data len: {}, pubkey: {:?}",
"Remote->DERP->Local Recv: {} -> {}, frame type: {:?}, data len: {}, pubkey: {:?}",
addr.remote,
addr.local,
frame_type,
Expand All @@ -211,10 +211,9 @@ pub async fn start_read<R: AsyncRead + Unpin>(
// Derp -> LocalNode
FrameType::ControlMessage => {
telio_log_trace!(
"DERP Rx: {} -> {}, frame type: {:?}, data len: {}",
"DERP->Local ctrl: {} -> {}, data len: {}",
addr.remote,
addr.local,
frame_type,
data.len(),
);
sender_direct.send(data).await?
Expand Down Expand Up @@ -261,6 +260,11 @@ pub async fn start_write<W: AsyncWrite + Unpin>(
if let Some(data) = received {
let mut buf = Vec::<u8>::new();
buf.write_all(&data).await?;

if enabled!(Level::TRACE) {
telio_log_trace!("Local->DERP Tx: {} -> {}, data len: {}", addr.local, addr.remote, data.len());
}

write_frame(&mut writer, FrameType::ControlMessage, buf).await?;
} else {
break;
Expand Down
1 change: 1 addition & 0 deletions crates/telio-traversal/src/endpoint_providers/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl<T: WireGuard, G: GetIfAddrs> Runtime for State<T, G> {
tokio::select! {
Ok((len, addr)) = self.udp_socket.recv_from(&mut rx_buff) => {
let buf = rx_buff.get(..len).ok_or(())?;
telio_log_debug!("local provider recv data: {:?}", buf);
self.handle_rx_packet(buf, &addr).await.unwrap_or_else(
|e| {
telio_log_warn!("Failed to handle packet received no local interface endpoint provider {:?}", e);
Expand Down
44 changes: 39 additions & 5 deletions crates/telio-traversal/src/session_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use telio_task::{task_exec, BoxAction, Runtime, Task};
use telio_utils::{
dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions,
};

use telio_wg::NetworkActivityGetter;
const PING_PAYLOAD_SIZE: usize = 56;

/// Possible [SessionKeeper] errors.
Expand Down Expand Up @@ -62,7 +62,10 @@ pub struct SessionKeeper {
}

impl SessionKeeper {
pub fn start(sock_pool: Arc<SocketPool>) -> Result<Self> {
pub fn start(
sock_pool: Arc<SocketPool>,
network_activity_getter: Option<Arc<dyn NetworkActivityGetter>>,
) -> Result<Self> {
let (client_v4, client_v6) = (
PingerClient::new(&Self::make_builder(ICMP::V4).build())
.map_err(|e| Error::PingerCreationError(ICMP::V4, e))?,
Expand All @@ -81,6 +84,9 @@ impl SessionKeeper {
},
batched_actions: Batcher::new(),
nonbatched_actions: RepeatedActions::default(),
network_activity_getter,
last_tx_ts: None,
last_rx_ts: None,
}),
})
}
Expand Down Expand Up @@ -128,14 +134,19 @@ async fn ping(pingers: &Pingers, targets: (&PublicKey, &DualTarget)) -> Result<(
let (primary, secondary) = targets.1.get_targets()?;
let public_key = targets.0;

telio_log_debug!("Pinging primary target {:?} on {:?}", public_key, primary);

let primary_client = match primary {
IpAddr::V4(_) => &pingers.pinger_client_v4,
IpAddr::V6(_) => &pingers.pinger_client_v6,
};

let ping_id = PingIdentifier(rand::random());

telio_log_debug!(
"Pinging primary target {:?} on {:?} with ping_id: {:?}",
public_key,
primary,
ping_id
);
if let Err(e) = primary_client
.pinger(primary, ping_id)
.await
Expand Down Expand Up @@ -264,7 +275,11 @@ struct State {
pingers: Pingers,
batched_actions: Batcher<PublicKey, Self>,
nonbatched_actions: RepeatedActions<PublicKey, Self, Result<()>>,
network_activity_getter: Option<Arc<dyn NetworkActivityGetter>>,
last_tx_ts: Option<tokio::time::Instant>,
last_rx_ts: Option<tokio::time::Instant>,
}

#[async_trait]
impl Runtime for State {
const NAME: &'static str = "SessionKeeper";
Expand All @@ -274,6 +289,25 @@ impl Runtime for State {
where
F: Future<Output = BoxAction<Self, std::result::Result<(), Self::Err>>> + Send,
{
let mut tx_has_changed = false;
let mut rx_has_changed = false;

// We just care about any network activity, thus no per-peer filtering.
if let Some(wg) = self.network_activity_getter.as_ref() {
if let Ok(Some(timestamps)) = wg.get_latest_timestamps().await {
tx_has_changed = self.last_tx_ts.map_or(false, |ts| timestamps.tx_ts > ts);
self.last_tx_ts = Some(timestamps.tx_ts);

rx_has_changed = self.last_rx_ts.map_or(false, |ts| timestamps.rx_ts > ts);
self.last_rx_ts = Some(timestamps.rx_ts);
}
}

if tx_has_changed || rx_has_changed {
telio_log_debug!("Triggering batcher based on network activity");
self.batched_actions.trigger();
}

tokio::select! {
Ok((pk, action)) = self.nonbatched_actions.select_action() => {
let pk = *pk;
Expand Down Expand Up @@ -324,7 +358,7 @@ mod tests {
)
.unwrap(),
));
let sess_keep = SessionKeeper::start(socket_pool).unwrap();
let sess_keep = SessionKeeper::start(socket_pool, None).unwrap();

let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
.parse::<PublicKey>()
Expand Down
Loading

0 comments on commit 70350bd

Please sign in to comment.