Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
ZachNo committed Jan 19, 2024
2 parents 1d82bfb + 4f22538 commit 3520d16
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 36 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ jobs:
toolchain: [stable, beta]
profile: ['', --release]
features: ['', '--all-features']
exclude:
# capture-stream is not supported on Windows.
- os: windows-latest
features: '--all-features'

uses: ./.github/workflows/ci-steps.yml
with:
Expand Down
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ errno = "0.2"
tokio = { version = "1.0", features = ["net", "rt", "macros", "rt-multi-thread"], optional = true }
futures = { version = "0.3", optional = true }
gat-std = { version = "0.1.1", optional = true }
# these really belong in [dev-dependencies], but can't have optional dev-deps
# and these libraries would truncate the min support Rust version (MSRV)
tun-tap = { version = "0.1.3", optional = true }
# This really belongs in [dev-dependencies], but we can't have optional dev-deps
# and this libraries would truncate the min support Rust version (MSRV)
etherparse = { version = "0.13.0", optional = true }
libloading = "0.8"

[target.'cfg(target_os = "windows")'.dependencies]
windows-sys = { version = "0.36.1", features = ["Win32_Foundation", "Win32_Networking_WinSock"] }
once_cell = "1.19"

[target.'cfg(not(target_os = "windows"))'.dependencies]
# This really belongs in [dev-dependencies], but we can't have optional dev-deps
# and this libraries would truncate the min support Rust version (MSRV)
tun-tap = { version = "0.1.3", optional = true }

[dev-dependencies]
tempdir = "0.3"

Expand All @@ -43,7 +47,7 @@ pkg-config = "0.3"
[features]
# This feature enables access to the function Capture::stream.
# This is disabled by default, because it depends on a tokio
capture-stream = ["tokio", "futures"]
capture-stream = ["tokio", "futures", "windows-sys/Win32_System_Threading"]
tap-tests = ["tun-tap", "etherparse"]
lending-iter = ["gat-std"]
dynamic-load = []
Expand Down
28 changes: 2 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod raw_common;
use dynamic_win_raw as raw;
pub mod sendqueue;
#[cfg(feature = "capture-stream")]
#[cfg_attr(windows, path = "stream_windows.rs")]
mod stream;
#[cfg(feature = "capture-stream")]
pub use stream::PacketStream;
Expand Down Expand Up @@ -1286,7 +1287,7 @@ impl<T: Activated + ?Sized> Capture<T> {
if !self.nonblock {
return Err(NonNonBlock);
}
PacketStream::new(SelectableCapture::new(self)?, codec)
PacketStream::new(self, codec)
}

/// Sets the filter on the capture using the given BPF program string. Internally this is
Expand Down Expand Up @@ -1403,31 +1404,6 @@ impl AsRawFd for Capture<Active> {
}
}

/// Newtype [`Capture`] wrapper that exposes `pcap_get_selectable_fd()`.
#[cfg(feature = "capture-stream")]
struct SelectableCapture<T: State + ?Sized> {
inner: Capture<T>,
fd: RawFd,
}

#[cfg(feature = "capture-stream")]
impl<T: Activated + ?Sized> SelectableCapture<T> {
fn new(capture: Capture<T>) -> Result<Self, Error> {
let fd = unsafe { raw::pcap_get_selectable_fd(capture.handle.as_ptr()) };
if fd == -1 {
return Err(InvalidRawFd);
}
Ok(Self { inner: capture, fd })
}
}

#[cfg(all(unix, feature = "capture-stream"))]
impl<T: Activated + ?Sized> AsRawFd for SelectableCapture<T> {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}

impl<T: State + ?Sized> Drop for Capture<T> {
fn drop(&mut self) {
unsafe { raw::pcap_close(self.handle.as_ptr()) }
Expand Down
30 changes: 28 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
use super::Activated;
use super::Capture;
use super::Error;
use super::SelectableCapture;
use crate::raw;
use crate::PacketCodec;
use crate::State;
use futures::ready;
use std::io;
use std::marker::Unpin;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::task::{self, Poll};
use tokio::io::unix::AsyncFd;
Expand All @@ -20,7 +23,8 @@ pub struct PacketStream<T: Activated + ?Sized, C> {
}

impl<T: Activated + ?Sized, C> PacketStream<T, C> {
pub(crate) fn new(capture: SelectableCapture<T>, codec: C) -> Result<Self, Error> {
pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
let capture = SelectableCapture::new(capture)?;
Ok(PacketStream {
inner: AsyncFd::with_interest(capture, tokio::io::Interest::READABLE)?,
codec,
Expand Down Expand Up @@ -60,3 +64,25 @@ impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T,
}
}
}

/// Newtype [`Capture`] wrapper that exposes `pcap_get_selectable_fd()`.
struct SelectableCapture<T: State + ?Sized> {
inner: Capture<T>,
fd: RawFd,
}

impl<T: Activated + ?Sized> SelectableCapture<T> {
fn new(capture: Capture<T>) -> Result<Self, Error> {
let fd = unsafe { raw::pcap_get_selectable_fd(capture.handle.as_ptr()) };
if fd == -1 {
return Err(Error::InvalidRawFd);
}
Ok(Self { inner: capture, fd })
}
}

impl<T: Activated + ?Sized> AsRawFd for SelectableCapture<T> {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
124 changes: 124 additions & 0 deletions src/stream_windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Support for asynchronous packet iteration.
//!
//! See [`Capture::stream`](super::Capture::stream).
use super::Activated;
use super::Capture;
use super::Error;
use crate::PacketCodec;
use futures::ready;
use futures::FutureExt;
use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
use tokio::task::JoinHandle;
use windows_sys::Win32::Foundation::HANDLE;
use windows_sys::Win32::System::Threading::WaitForSingleObject;

/// Implement Stream for async use of pcap
pub struct PacketStream<T: Activated + ?Sized, C> {
event_handle: EventHandle,
capture: Capture<T>,
codec: C,
}

impl<T: Activated + ?Sized, C> PacketStream<T, C> {
pub(crate) fn new(capture: Capture<T>, codec: C) -> Result<Self, Error> {
Ok(Self {
event_handle: EventHandle::new(&capture),
capture,
codec,
})
}

/// Returns a mutable reference to the inner [`Capture`].
///
/// The caller must ensure the capture will not be set to be
/// blocking.
pub fn capture_mut(&mut self) -> &mut Capture<T> {
&mut self.capture
}
}

impl<T: Activated + ?Sized, C> Unpin for PacketStream<T, C> {}

impl<T: Activated + ?Sized, C: PacketCodec> futures::Stream for PacketStream<T, C> {
type Item = Result<C::Item, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
let stream = Pin::into_inner(self);
let codec = &mut stream.codec;

loop {
ready!(stream.event_handle.poll_ready(cx));

let res = match stream.capture.next_packet() {
Ok(p) => Ok(codec.decode(p)),
Err(Error::TimeoutExpired) => {
stream.event_handle.clear_ready();
continue;
}
Err(e) => Err(e),
};
return Poll::Ready(Some(res));
}
}
}

/// A wrapper around a HANDLE that can be used to call `WaitForSingleObject`
/// from an asynchronous context. Once the call to `WaitForSingleObject`
/// completes, the handle is considered ready and will keep returning `Ready`
/// until it's reset.
struct EventHandle {
handle: HANDLE,
state: EventHandleState,
}

enum EventHandleState {
/// We haven't started waiting for an event yet.
Init,
/// We're currently waiting for an event.
Polling(JoinHandle<()>),
/// We waited for an event.
Ready,
}

impl EventHandle {
pub fn new<T: Activated + ?Sized>(capture: &Capture<T>) -> Self {
Self {
handle: unsafe {
// SAFETY: PacketStream stores the handle before the capture,
// so the handle will be dropped before the capture.
capture.get_event()
},
state: EventHandleState::Init,
}
}

pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<()> {
loop {
match self.state {
EventHandleState::Init => {
let handle = self.handle;
self.state =
EventHandleState::Polling(tokio::task::spawn_blocking(move || {
const INFINITE: u32 = !0;
unsafe {
WaitForSingleObject(handle, INFINITE);
}
}));
}
EventHandleState::Polling(ref mut join_handle) => {
let _ = ready!(join_handle.poll_unpin(cx));
self.state = EventHandleState::Ready;
}
EventHandleState::Ready => return Poll::Ready(()),
}
}
}

/// Reset the internal state. This will trigger a call to
/// `WaitForSingleObject` the next time `poll_ready` is called.
pub fn clear_ready(&mut self) {
self.state = EventHandleState::Init;
}
}

0 comments on commit 3520d16

Please sign in to comment.