From 4f225385d3fb1fb0eea738e8127b7d8ae251179a Mon Sep 17 00:00:00 2001 From: Tom Dohrmann Date: Fri, 19 Jan 2024 17:51:07 +0100 Subject: [PATCH] add Windows support for capture-stream (#320) * move `SelectableCapture` into `stream` In the future the `stream.rs` file will only be compiled on non-windows platforms. `SelectableCapture` is an implementation detail of how stream is implemented on those platforms, so it makes sense to move `SelectableCapture` into that module. * add capture-stream implementation for Windows * don't depend on tun-tap for Windows This library fails to compile on Windows. * test capture-stream on Windows --- .github/workflows/ci.yml | 4 -- Cargo.toml | 12 ++-- src/lib.rs | 28 +-------- src/stream.rs | 30 +++++++++- src/stream_windows.rs | 124 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 162 insertions(+), 36 deletions(-) create mode 100644 src/stream_windows.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b6f52c429..31bfe854a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,10 +21,6 @@ jobs: toolchain: [stable, beta] profile: ['', --release] features: ['', '--all-features'] - exclude: - # capture-stream is not supported on Windows. - - os: windows-latest - features: '--all-features' uses: ./.github/workflows/ci-steps.yml with: diff --git a/Cargo.toml b/Cargo.toml index fd9d0368d..a0abaff31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,14 +19,18 @@ errno = "0.2" tokio = { version = "1.0", features = ["net", "rt", "macros", "rt-multi-thread"], optional = true } futures = { version = "0.3", optional = true } gat-std = { version = "0.1.1", optional = true } -# these really belong in [dev-dependencies], but can't have optional dev-deps -# and these libraries would truncate the min support Rust version (MSRV) -tun-tap = { version = "0.1.3", optional = true } +# This really belongs in [dev-dependencies], but we can't have optional dev-deps +# and this libraries would truncate the min support Rust version (MSRV) etherparse = { version = "0.13.0", optional = true } [target.'cfg(target_os = "windows")'.dependencies] windows-sys = { version = "0.36.1", features = ["Win32_Foundation", "Win32_Networking_WinSock"] } +[target.'cfg(not(target_os = "windows"))'.dependencies] +# This really belongs in [dev-dependencies], but we can't have optional dev-deps +# and this libraries would truncate the min support Rust version (MSRV) +tun-tap = { version = "0.1.3", optional = true } + [dev-dependencies] tempdir = "0.3" @@ -41,7 +45,7 @@ pkg-config = "0.3" [features] # This feature enables access to the function Capture::stream. # This is disabled by default, because it depends on a tokio -capture-stream = ["tokio", "futures"] +capture-stream = ["tokio", "futures", "windows-sys/Win32_System_Threading"] tap-tests = ["tun-tap", "etherparse"] lending-iter = ["gat-std"] # an empty feature to detect if '--all-features' was set diff --git a/src/lib.rs b/src/lib.rs index 201418f46..717d69de5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,7 @@ mod raw; #[cfg(windows)] pub mod sendqueue; #[cfg(feature = "capture-stream")] +#[cfg_attr(windows, path = "stream_windows.rs")] mod stream; #[cfg(feature = "capture-stream")] pub use stream::PacketStream; @@ -1281,7 +1282,7 @@ impl Capture { if !self.nonblock { return Err(NonNonBlock); } - PacketStream::new(SelectableCapture::new(self)?, codec) + PacketStream::new(self, codec) } /// Sets the filter on the capture using the given BPF program string. Internally this is @@ -1398,31 +1399,6 @@ impl AsRawFd for Capture { } } -/// Newtype [`Capture`] wrapper that exposes `pcap_get_selectable_fd()`. -#[cfg(feature = "capture-stream")] -struct SelectableCapture { - inner: Capture, - fd: RawFd, -} - -#[cfg(feature = "capture-stream")] -impl SelectableCapture { - fn new(capture: Capture) -> Result { - let fd = unsafe { raw::pcap_get_selectable_fd(capture.handle.as_ptr()) }; - if fd == -1 { - return Err(InvalidRawFd); - } - Ok(Self { inner: capture, fd }) - } -} - -#[cfg(all(unix, feature = "capture-stream"))] -impl AsRawFd for SelectableCapture { - fn as_raw_fd(&self) -> RawFd { - self.fd - } -} - impl Drop for Capture { fn drop(&mut self) { unsafe { raw::pcap_close(self.handle.as_ptr()) } diff --git a/src/stream.rs b/src/stream.rs index ccc13d458..5ad50c3c1 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -4,11 +4,14 @@ use super::Activated; use super::Capture; use super::Error; -use super::SelectableCapture; +use crate::raw; use crate::PacketCodec; +use crate::State; use futures::ready; use std::io; use std::marker::Unpin; +use std::os::fd::AsRawFd; +use std::os::fd::RawFd; use std::pin::Pin; use std::task::{self, Poll}; use tokio::io::unix::AsyncFd; @@ -20,7 +23,8 @@ pub struct PacketStream { } impl PacketStream { - pub(crate) fn new(capture: SelectableCapture, codec: C) -> Result { + pub(crate) fn new(capture: Capture, codec: C) -> Result { + let capture = SelectableCapture::new(capture)?; Ok(PacketStream { inner: AsyncFd::with_interest(capture, tokio::io::Interest::READABLE)?, codec, @@ -60,3 +64,25 @@ impl futures::Stream for PacketStream { + inner: Capture, + fd: RawFd, +} + +impl SelectableCapture { + fn new(capture: Capture) -> Result { + let fd = unsafe { raw::pcap_get_selectable_fd(capture.handle.as_ptr()) }; + if fd == -1 { + return Err(Error::InvalidRawFd); + } + Ok(Self { inner: capture, fd }) + } +} + +impl AsRawFd for SelectableCapture { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} diff --git a/src/stream_windows.rs b/src/stream_windows.rs new file mode 100644 index 000000000..1b95b53d1 --- /dev/null +++ b/src/stream_windows.rs @@ -0,0 +1,124 @@ +//! Support for asynchronous packet iteration. +//! +//! See [`Capture::stream`](super::Capture::stream). +use super::Activated; +use super::Capture; +use super::Error; +use crate::PacketCodec; +use futures::ready; +use futures::FutureExt; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{self, Poll}; +use tokio::task::JoinHandle; +use windows_sys::Win32::Foundation::HANDLE; +use windows_sys::Win32::System::Threading::WaitForSingleObject; + +/// Implement Stream for async use of pcap +pub struct PacketStream { + event_handle: EventHandle, + capture: Capture, + codec: C, +} + +impl PacketStream { + pub(crate) fn new(capture: Capture, codec: C) -> Result { + Ok(Self { + event_handle: EventHandle::new(&capture), + capture, + codec, + }) + } + + /// Returns a mutable reference to the inner [`Capture`]. + /// + /// The caller must ensure the capture will not be set to be + /// blocking. + pub fn capture_mut(&mut self) -> &mut Capture { + &mut self.capture + } +} + +impl Unpin for PacketStream {} + +impl futures::Stream for PacketStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { + let stream = Pin::into_inner(self); + let codec = &mut stream.codec; + + loop { + ready!(stream.event_handle.poll_ready(cx)); + + let res = match stream.capture.next_packet() { + Ok(p) => Ok(codec.decode(p)), + Err(Error::TimeoutExpired) => { + stream.event_handle.clear_ready(); + continue; + } + Err(e) => Err(e), + }; + return Poll::Ready(Some(res)); + } + } +} + +/// A wrapper around a HANDLE that can be used to call `WaitForSingleObject` +/// from an asynchronous context. Once the call to `WaitForSingleObject` +/// completes, the handle is considered ready and will keep returning `Ready` +/// until it's reset. +struct EventHandle { + handle: HANDLE, + state: EventHandleState, +} + +enum EventHandleState { + /// We haven't started waiting for an event yet. + Init, + /// We're currently waiting for an event. + Polling(JoinHandle<()>), + /// We waited for an event. + Ready, +} + +impl EventHandle { + pub fn new(capture: &Capture) -> Self { + Self { + handle: unsafe { + // SAFETY: PacketStream stores the handle before the capture, + // so the handle will be dropped before the capture. + capture.get_event() + }, + state: EventHandleState::Init, + } + } + + pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<()> { + loop { + match self.state { + EventHandleState::Init => { + let handle = self.handle; + self.state = + EventHandleState::Polling(tokio::task::spawn_blocking(move || { + const INFINITE: u32 = !0; + unsafe { + WaitForSingleObject(handle, INFINITE); + } + })); + } + EventHandleState::Polling(ref mut join_handle) => { + let _ = ready!(join_handle.poll_unpin(cx)); + self.state = EventHandleState::Ready; + } + EventHandleState::Ready => return Poll::Ready(()), + } + } + } + + /// Reset the internal state. This will trigger a call to + /// `WaitForSingleObject` the next time `poll_ready` is called. + pub fn clear_ready(&mut self) { + self.state = EventHandleState::Init; + } +}