Skip to content

Commit

Permalink
Return from ping_once when all replies received
Browse files Browse the repository at this point in the history
Fixes bparli#33
  • Loading branch information
rcloran committed Oct 6, 2023
1 parent b03aafd commit 9d6cb31
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use fastping_rs::PingResult::{Idle, Receive};
use fastping_rs::Pinger;

fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
pretty_env_logger::try_init_timed()?;
let (pinger, results) = match <Pinger>::new(None) {
Ok((pinger, results)) => (pinger, results),
Err(e) => panic!("Error creating pinger: {}", e),
Expand Down
39 changes: 28 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,27 @@ impl<T: PingTransport + 'static> Pinger<T> {
remove_ipaddr(&self.targets, addr)
}

/// Ping each target address once and stop
/// Ping each target address once
///
/// Returns as soon as all replies are received, or after max_rtt times out.
///
/// When this returns, all [`PingResult`] (`Received` or `Idle`) for the current addresses will
/// have been sent on the results channel.
pub fn ping_once(&mut self) {
let timer = Instant::now();
self.transport.send_pings(
self.targets
.lock()
.unwrap()
.values_mut()
.map(|(ping, seen)| {
let sent;
{
let mut targets = self.targets.lock().unwrap();
sent = targets.len();
self.transport.send_pings(
targets.values_mut().map(|(ping, seen)| {
*seen = false;
ping
}),
&self.payload,
);
self.await_replies(timer);
&self.payload,
);
}
self.await_replies(timer, sent);
}

/// Run the pinger continuously
Expand All @@ -126,11 +132,17 @@ impl<T: PingTransport + 'static> Pinger<T> {
let stop_inner = stop.clone();
let targets = self.targets.clone();
let join_handle = thread::spawn(move || loop {
let start = Instant::now();
// Work
self.ping_once();

// check if we've received the stop signal
if stop_inner.load(Ordering::Relaxed) {
return self;
}

// If we received all replies faster than the interval, wait
thread::sleep(self.max_rtt.saturating_sub(start.elapsed()));
});

RunningPinger {
Expand All @@ -140,7 +152,8 @@ impl<T: PingTransport + 'static> Pinger<T> {
}
}

fn await_replies(&self, timer: Instant) {
fn await_replies(&self, timer: Instant, sent: usize) {
let mut received = 0;
loop {
// use recv_timeout so we don't cause a CPU to needlessly spin
match self
Expand All @@ -160,6 +173,7 @@ impl<T: PingTransport + 'static> Pinger<T> {
if ping.get_identifier() == identifier
&& ping.get_sequence_number() == sequence_number
{
received += 1;
*seen = true;
// Send the ping result over the client channel
if let Err(e) =
Expand All @@ -180,6 +194,9 @@ impl<T: PingTransport + 'static> Pinger<T> {
);
}
}
if received >= sent {
return;
}
}
Err(_) => {
// Check we haven't exceeded the max rtt
Expand Down

0 comments on commit 9d6cb31

Please sign in to comment.