Skip to content

Commit

Permalink
Use thread::Builder API instead of ::spawn
Browse files Browse the repository at this point in the history
`thread::spawn` panics on failure. The `Builder` allows us to pass any
errors back up to the caller.

We can also give the threads a name, which is often useful in debugging.
Example from a Linux machine:

```
$ ps -eLl
F S   UID     PID    SPID    PPID  C PRI  NI ADDR SZ WCHAN  TTY          TIME CMD
...
4 S     0 4073149 4073149 4068718  0  80   0 - 51875 -      pts/0    00:00:00 ping
1 S     0 4073149 4073150 4068718  0  80   0 - 51875 -      pts/0    00:00:00 pnet-v4-listene
1 S     0 4073149 4073151 4068718  0  80   0 - 51875 -      pts/0    00:00:00 pnet-v6-listene
1 S     0 4073149 4073152 4068718  0  80   0 - 51875 -      pts/0    00:00:00 pinger
...
```
  • Loading branch information
rcloran committed Sep 28, 2023
1 parent dbc60ca commit 9f26a94
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() -> Result<(), Box<dyn Error>> {
pinger.add_ipaddr("1.1.1.1".parse()?);
pinger.add_ipaddr("7.7.7.7".parse()?);
pinger.add_ipaddr("2001:4860:4860::8888".parse()?);
pinger.run();
pinger.run()?;

loop {
match results.recv() {
Expand Down
17 changes: 10 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern crate log;
pub mod error;
pub mod transport;

use crate::error::*;
use crate::error::Error;
use crate::transport::{Ping, PingTransport, ReceivedPing};
use std::collections::BTreeMap;
use std::net::IpAddr;
Expand Down Expand Up @@ -148,11 +148,14 @@ impl<T: PingTransport + 'static> Pinger<T> {
///
/// This consumes the [`Pinger`] object and returns a new [`RunningPinger`]. The original
/// [`Pinger`] may be obtained from [`RunningPinger::stop`].
pub fn run(mut self) -> RunningPinger<T> {
///
/// An [`Err`] variant is returned if a thread can not be created for the [`Pinger`].
pub fn run(mut self) -> Result<RunningPinger<T>, Error> {
let stop = Arc::new(AtomicBool::new(false));
let stop_inner = stop.clone();
let targets = self.targets.clone();
let join_handle = thread::spawn(move || loop {
let builder = thread::Builder::new().name("pinger".into());
let join_handle = builder.spawn(move || loop {
let start = Instant::now();
// Work
self.ping_once();
Expand All @@ -164,13 +167,13 @@ impl<T: PingTransport + 'static> Pinger<T> {

// If we received all replies faster than the interval, wait
thread::sleep(self.max_rtt.saturating_sub(start.elapsed()));
});
})?;

RunningPinger {
Ok(RunningPinger {
stop,
join_handle,
targets,
}
})
}

fn await_replies(&self, timer: Instant, sent: usize) {
Expand Down Expand Up @@ -377,7 +380,7 @@ mod tests {
let targets_len = pinger.targets.lock().unwrap().len();
// Copy attributes of the original Pinger
let max_rtt = pinger.max_rtt;
let stop_handle = pinger.run();
let stop_handle = pinger.run()?;
let pinger = stop_handle.stop().unwrap();
// Try to verify we have the same Pinger back. Other attributes can't be cloned.
assert_eq!(max_rtt, pinger.max_rtt);
Expand Down
28 changes: 18 additions & 10 deletions src/transport/pnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl crate::transport::PingTransport for PingTransport {
timer: Arc::new(RwLock::new(Instant::now())),
};

start_listener(rx, rxv6, resp_sender, transport.timer.clone());
start_listener(rx, rxv6, resp_sender, transport.timer.clone())?;
Ok(transport)
}

Expand Down Expand Up @@ -163,18 +163,21 @@ fn start_listener(
rxv6: TransportReceiver,
resp_sender: Sender<ReceivedPing>,
timer: Arc<RwLock<Instant>>,
) {
) -> Result<(), Error> {
// start icmp listeners in the background and use internal channels for results
start_listener_v4(rxv4, resp_sender.clone(), timer.clone());
start_listener_v6(rxv6, resp_sender.clone(), timer.clone());
start_listener_v4(rxv4, resp_sender.clone(), timer.clone())?;
start_listener_v6(rxv6, resp_sender.clone(), timer.clone())?;

Ok(())
}

fn start_listener_v4(
mut receiver: TransportReceiver,
resp_sender: Sender<ReceivedPing>,
timer: Arc<RwLock<Instant>>,
) {
thread::spawn(move || {
) -> Result<(), Error> {
let builder = thread::Builder::new().name("pnet-v4-listener".into());
builder.spawn(move || {
let mut iter = icmp_packet_iter(&mut receiver);
loop {
match iter.next() {
Expand Down Expand Up @@ -206,15 +209,18 @@ fn start_listener_v4(
}
}
}
});
})?;

Ok(())
}

fn start_listener_v6(
mut receiver: TransportReceiver,
resp_sender: Sender<ReceivedPing>,
timer: Arc<RwLock<Instant>>,
) {
thread::spawn(move || {
) -> Result<(), Error> {
let builder = thread::Builder::new().name("pnet-v6-listener".into());
builder.spawn(move || {
let mut iter = icmpv6_packet_iter(&mut receiver);
loop {
match iter.next() {
Expand Down Expand Up @@ -246,5 +252,7 @@ fn start_listener_v6(
}
}
}
});
})?;

Ok(())
}

0 comments on commit 9f26a94

Please sign in to comment.