Skip to content

Commit

Permalink
Merge branch api: Update API to be more resilient
Browse files Browse the repository at this point in the history
 - Take Duration instead of u64 for max_rtt
 - Take IpAddr instead of string in add/remove
 - Add an Error type, and don't return strings
 - Derive Debug for our types
 - Add documentation
  • Loading branch information
rcloran committed Sep 23, 2023
2 parents 8e68bae + 519d3a6 commit d0add5e
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 69 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ fn main() {
Err(e) => panic!("Error creating pinger: {}", e),
};

pinger.add_ipaddr("8.8.8.8");
pinger.add_ipaddr("1.1.1.1");
pinger.add_ipaddr("7.7.7.7");
pinger.add_ipaddr("2001:4860:4860::8888");
pinger.add_ipaddr("8.8.8.8".parse().unwrap());
pinger.add_ipaddr("1.1.1.1".parse().unwrap());
pinger.add_ipaddr("7.7.7.7".parse().unwrap());
pinger.add_ipaddr("2001:4860:4860::8888".parse().unwrap());
pinger.run_pinger();

loop {
Expand All @@ -64,7 +64,7 @@ fn main() {
Note a Pinger is initialized with two arguments: the maximum round trip time before an address is considered "idle" (2 seconds by default) and the size of the ping data packet (16 bytes by default).
To explicitly set these values Pinger would be initialized like so:
```rust
Pinger::new(Some(3000 as u64), Some(24 as usize))
Pinger::new(Some(Duration::from_millis(3000)), Some(24 as usize))
```

The public functions `stop_pinger()` to stop the continuous pinger and `ping_once()` to only run one round of pinging are also available.
Expand Down
12 changes: 7 additions & 5 deletions examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@ extern crate pretty_env_logger;
#[macro_use]
extern crate log;

use std::error::Error;

use fastping_rs::PingResult::{Idle, Receive};
use fastping_rs::Pinger;

fn main() {
fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
let (pinger, results) = match Pinger::new(None, Some(64)) {
Ok((pinger, results)) => (pinger, results),
Err(e) => panic!("Error creating pinger: {}", e),
};

pinger.add_ipaddr("8.8.8.8");
pinger.add_ipaddr("1.1.1.1");
pinger.add_ipaddr("7.7.7.7");
pinger.add_ipaddr("2001:4860:4860::8888");
pinger.add_ipaddr("8.8.8.8".parse()?);
pinger.add_ipaddr("1.1.1.1".parse()?);
pinger.add_ipaddr("7.7.7.7".parse()?);
pinger.add_ipaddr("2001:4860:4860::8888".parse()?);
pinger.run_pinger();

loop {
Expand Down
28 changes: 28 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/// Errors that can be returned by the API
#[derive(Debug)]
pub enum Error {
/// Networking problems, source will be a std::io::Error
NetworkError { source: std::io::Error },
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NetworkError { source } => write!(f, "Network error: {}", source),
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::NetworkError { source } => Some(source),
}
}
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Self::NetworkError { source: e }
}
}
107 changes: 49 additions & 58 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
//! ICMP ping library in Rust inspired by [go-fastping][1] and [AnyEvent::FastPing][2]
//!
//! fastping-rs is a Rust ICMP ping library for quickly sending and measuring batches of ICMP echo
//! request packets. The design prioritizes pinging a large number of hosts over a long time,
//! rather than pinging individual hosts once-off.
//!
//! [`Pinger`] provides the functionality for this module.
//!
//! [1]: https://pkg.go.dev/github.com/kanocz/go-fastping
//! [2]: https://metacpan.org/pod/AnyEvent::FastPing
#![warn(rust_2018_idioms)]

#[macro_use]
extern crate log;

pub mod error;
mod ping;

use crate::error::*;
use ping::{send_pings, Ping, ReceivedPing};
use pnet::packet::icmp::echo_reply::EchoReplyPacket as IcmpEchoReplyPacket;
use pnet::packet::icmpv6::echo_reply::EchoReplyPacket as Icmpv6EchoReplyPacket;
Expand All @@ -23,16 +35,20 @@ use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};

// result type returned by fastping_rs::Pinger::new()
pub type NewPingerResult = Result<(Pinger, Receiver<PingResult>), String>;

// ping result type. Idle represents pings that have not received a repsonse within the max_rtt.
// Receive represents pings which have received a repsonse
/// The result of a single ping
#[derive(Debug)]
pub enum PingResult {
/// Pings that have not received a response within max_rtt
Idle { addr: IpAddr },
/// Pings which have received a response
Receive { addr: IpAddr, rtt: Duration },
}

/// A long-lived pinger
///
/// [`Pinger`]s create raw sockets for sending and receiving ICMP echo requests, which requires
/// special privileges on most operating systems. A thread is created to read from each (IPv4 and
/// IPv6) socket, and results are provided to the client on the channel in the `results` field.
pub struct Pinger {
// Number of milliseconds of an idle timeout. Once it passed,
// the library calls an idle callback function. Default is 2000
Expand Down Expand Up @@ -73,29 +89,26 @@ pub struct Pinger {
}

impl Pinger {
// initialize the pinger and start the icmp and icmpv6 listeners
pub fn new(_max_rtt: Option<u64>, _size: Option<usize>) -> NewPingerResult {
/// Create a [`Pinger`], create sockets, and start network listener threads
pub fn new(
max_rtt: Option<Duration>,
size: Option<usize>,
) -> Result<(Self, Receiver<PingResult>), Error> {
let targets = BTreeMap::new();
let (sender, receiver) = channel();

let protocol = Layer4(Ipv4(IpNextHeaderProtocols::Icmp));
let (tx, rx) = match transport_channel(4096, protocol) {
Ok((tx, rx)) => (tx, rx),
Err(e) => return Err(e.to_string()),
};
let (tx, rx) = transport_channel(4096, protocol)?;

let protocolv6 = Layer4(Ipv6(IpNextHeaderProtocols::Icmpv6));
let (txv6, rxv6) = match transport_channel(4096, protocolv6) {
Ok((txv6, rxv6)) => (txv6, rxv6),
Err(e) => return Err(e.to_string()),
};
let (txv6, rxv6) = transport_channel(4096, protocolv6)?;

let (thread_tx, thread_rx) = channel();

let mut pinger = Pinger {
max_rtt: Arc::new(Duration::from_millis(2000)),
let pinger = Pinger {
max_rtt: Arc::new(max_rtt.unwrap_or(Duration::from_millis(2000))),
targets: Arc::new(Mutex::new(targets)),
size: _size.unwrap_or(16),
size: size.unwrap_or(16),
results_sender: sender,
tx: Arc::new(Mutex::new(tx)),
rx: Arc::new(Mutex::new(rx)),
Expand All @@ -106,58 +119,36 @@ impl Pinger {
timer: Arc::new(RwLock::new(Instant::now())),
stop: Arc::new(Mutex::new(false)),
};
if let Some(rtt_value) = _max_rtt {
pinger.max_rtt = Arc::new(Duration::from_millis(rtt_value));
}
if let Some(size_value) = _size {
pinger.size = size_value;
}

pinger.start_listener();
Ok((pinger, receiver))
}

// add either an ipv4 or ipv6 target address for pinging
pub fn add_ipaddr(&self, ipaddr: &str) {
let addr = ipaddr.parse::<IpAddr>();
match addr {
Ok(valid_addr) => {
debug!("Address added {}", valid_addr);
let new_ping = Ping::new(valid_addr);
self.targets.lock().unwrap().insert(valid_addr, new_ping);
}
Err(e) => {
error!("Error adding ip address {}. Error: {}", ipaddr, e);
}
};
/// Add a new target for pinging
pub fn add_ipaddr(&self, addr: IpAddr) {
debug!("Address added {}", addr);
let new_ping = Ping::new(addr);
self.targets.lock().unwrap().insert(addr, new_ping);
}

// remove a previously added ipv4 or ipv6 target address
pub fn remove_ipaddr(&self, ipaddr: &str) {
let addr = ipaddr.parse::<IpAddr>();
match addr {
Ok(valid_addr) => {
debug!("Address removed {}", valid_addr);
self.targets.lock().unwrap().remove(&valid_addr);
}
Err(e) => {
error!("Error removing ip address {}. Error: {}", ipaddr, e);
}
};
/// Remove a previously added target address
pub fn remove_ipaddr(&self, addr: IpAddr) {
debug!("Address removed {}", addr);
self.targets.lock().unwrap().remove(&addr);
}

// stop running the continous pinger
/// Stop running the continous pinger
pub fn stop_pinger(&self) {
let mut stop = self.stop.lock().unwrap();
*stop = true;
}

// run one round of pinging and stop
/// Ping each target address once and stop
pub fn ping_once(&self) {
self.run_pings(true)
}

// run the continuous pinger
/// Run the pinger continuously
pub fn run_pinger(&self) {
self.run_pings(false)
}
Expand Down Expand Up @@ -194,7 +185,7 @@ impl Pinger {
tx,
txv6,
targets,
max_rtt,
&max_rtt,
);
} else {
thread::spawn(move || {
Expand All @@ -207,7 +198,7 @@ impl Pinger {
tx,
txv6,
targets,
max_rtt,
&max_rtt,
);
});
}
Expand Down Expand Up @@ -309,7 +300,7 @@ mod tests {
// test we can create a new pinger with optional arguments,
// test it returns the new pinger and a client channel
// test we can use the client channel
let (pinger, channel) = Pinger::new(Some(3000), Some(24))?;
let (pinger, channel) = Pinger::new(Some(Duration::from_millis(3000)), Some(24))?;

assert_eq!(pinger.max_rtt, Arc::new(Duration::new(3, 0)));
assert_eq!(pinger.size, 24);
Expand All @@ -330,15 +321,15 @@ mod tests {
#[test]
fn test_add_remove_addrs() -> Result<(), Box<dyn std::error::Error>> {
let (pinger, _) = Pinger::new(None, None)?;
pinger.add_ipaddr("127.0.0.1");
pinger.add_ipaddr([127, 0, 0, 1].into());
assert_eq!(pinger.targets.lock().unwrap().len(), 1);
assert!(pinger
.targets
.lock()
.unwrap()
.contains_key(&"127.0.0.1".parse::<IpAddr>().unwrap()));

pinger.remove_ipaddr("127.0.0.1");
pinger.remove_ipaddr([127, 0, 0, 1].into());
assert_eq!(pinger.targets.lock().unwrap().len(), 0);
assert!(!pinger
.targets
Expand All @@ -365,7 +356,7 @@ mod tests {
let test_addrs = ["127.0.0.1", "7.7.7.7", "::1"];

for target in test_addrs {
pinger.add_ipaddr(target);
pinger.add_ipaddr(target.parse()?);
}
pinger.ping_once();

Expand Down
4 changes: 3 additions & 1 deletion src/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

#[derive(Debug)]
pub struct Ping {
addr: IpAddr,
identifier: u16,
sequence_number: u16,
pub seen: bool,
}

#[derive(Debug)]
pub struct ReceivedPing {
pub addr: IpAddr,
pub identifier: u16,
Expand Down Expand Up @@ -100,7 +102,7 @@ pub fn send_pings(
tx: Arc<Mutex<TransportSender>>,
txv6: Arc<Mutex<TransportSender>>,
targets: Arc<Mutex<BTreeMap<IpAddr, Ping>>>,
max_rtt: Arc<Duration>,
max_rtt: &Duration,
) {
loop {
for (addr, ping) in targets.lock().unwrap().iter_mut() {
Expand Down

0 comments on commit d0add5e

Please sign in to comment.