Skip to content

Commit

Permalink
redo packet capture dump
Browse files Browse the repository at this point in the history
  • Loading branch information
drunkirishcoder committed Feb 26, 2022
1 parent d21bcad commit 34a01a3
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 6 deletions.
73 changes: 73 additions & 0 deletions core/src/ffi/dpdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub(crate) fn pktmbuf_pool_create<S: Into<String>>(
}

/// Looks up a mempool by the name.
#[cfg(test)]
pub(crate) fn mempool_lookup<S: Into<String>>(name: S) -> Result<MempoolPtr> {
let name: String = name.into();

Expand Down Expand Up @@ -407,6 +408,57 @@ pub(crate) fn eth_rx_queue_setup(
}
}

/// Removes an RX or TX packet callback from a given port and queue.
#[allow(dead_code)]
pub(crate) enum RxTxCallbackGuard {
Rx(PortId, PortRxQueueId, *const cffi::rte_eth_rxtx_callback),
Tx(PortId, PortTxQueueId, *const cffi::rte_eth_rxtx_callback),
}

impl Drop for RxTxCallbackGuard {
fn drop(&mut self) {
if let Err(error) = match self {
RxTxCallbackGuard::Rx(port_id, queue_id, ptr) => {
debug!(port = ?port_id, rxq = ?queue_id, "remove rx callback.");
unsafe {
cffi::rte_eth_remove_rx_callback(port_id.0, queue_id.0, *ptr)
.into_result(DpdkError::from_errno)
}
}
RxTxCallbackGuard::Tx(port_id, queue_id, ptr) => {
debug!(port = ?port_id, txq = ?queue_id, "remove tx callback.");
unsafe {
cffi::rte_eth_remove_tx_callback(port_id.0, queue_id.0, *ptr)
.into_result(DpdkError::from_errno)
}
}
} {
error!(?error);
}
}
}

/// Adds a callback to be called on packet RX on a given port and queue.
#[allow(dead_code)]
pub(crate) fn eth_add_rx_callback<T>(
port_id: PortId,
queue_id: PortRxQueueId,
callback: cffi::rte_rx_callback_fn,
user_param: &mut T,
) -> Result<RxTxCallbackGuard> {
let ptr = unsafe {
cffi::rte_eth_add_rx_callback(
port_id.0,
queue_id.0,
callback,
user_param as *mut T as *mut raw::c_void,
)
.into_result(|_| DpdkError::new())?
};

Ok(RxTxCallbackGuard::Rx(port_id, queue_id, ptr))
}

/// Retrieves a burst of input packets from a receive queue of a device.
pub(crate) fn eth_rx_burst(port_id: PortId, queue_id: PortRxQueueId, rx_pkts: &mut Vec<MbufPtr>) {
let nb_pkts = rx_pkts.capacity();
Expand Down Expand Up @@ -460,6 +512,27 @@ pub(crate) fn eth_tx_queue_setup(
}
}

/// Adds a callback to be called on packet TX on a given port and queue.
#[allow(dead_code)]
pub(crate) fn eth_add_tx_callback<T>(
port_id: PortId,
queue_id: PortTxQueueId,
callback: cffi::rte_tx_callback_fn,
user_param: &mut T,
) -> Result<RxTxCallbackGuard> {
let ptr = unsafe {
cffi::rte_eth_add_tx_callback(
port_id.0,
queue_id.0,
callback,
user_param as *mut T as *mut raw::c_void,
)
.into_result(|_| DpdkError::new())?
};

Ok(RxTxCallbackGuard::Tx(port_id, queue_id, ptr))
}

/// Sends a burst of output packets on a transmit queue of a device.
pub(crate) fn eth_tx_burst(
port_id: PortId,
Expand Down
2 changes: 2 additions & 0 deletions core/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/

pub(crate) mod dpdk;
#[cfg(feature = "pcap-dump")]
pub(crate) mod pcap;

pub(crate) use capsule_ffi::*;

Expand Down
156 changes: 156 additions & 0 deletions core/src/ffi/pcap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2019 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

use super::{AsStr, EasyPtr, ToCString, ToResult};
use crate::ffi::dpdk::MbufPtr;
use anyhow::Result;
use capsule_ffi as cffi;
use libc;
use std::ops::DerefMut;
use std::os::raw;
use std::ptr;
use thiserror::Error;

// Ethernet (10Mb, 100Mb, 1000Mb, and up); the 10MB in the DLT_ name is historical.
const DLT_EN10MB: raw::c_int = 1;

// https://github.com/the-tcpdump-group/libpcap/blob/master/pcap/pcap.h#L152
#[allow(dead_code)]
const PCAP_ERRBUF_SIZE: usize = 256;

/// A `pcap_t` pointer.
pub(crate) type PcapPtr = EasyPtr<cffi::pcap_t>;

/// Creates a `libpcap` handle needed to call other functions.
pub(crate) fn open_dead() -> Result<PcapPtr> {
let ptr = unsafe {
cffi::pcap_open_dead(DLT_EN10MB, cffi::RTE_MBUF_DEFAULT_BUF_SIZE as raw::c_int)
.into_result(|_| PcapError::new("Cannot create libpcap handle."))?
};

Ok(EasyPtr(ptr))
}

/// A `pcap_dumper_t` pointer.
pub(crate) type DumperPtr = EasyPtr<cffi::pcap_dumper_t>;

/// Opens a file to which to write packets.
pub(crate) fn dump_open<S: Into<String>>(handle: &mut PcapPtr, filename: S) -> Result<DumperPtr> {
let filename: String = filename.into();
let ptr = unsafe {
cffi::pcap_dump_open(handle.deref_mut(), filename.into_cstring().as_ptr())
.into_result(|_| PcapError::get_error(handle))?
};

Ok(EasyPtr(ptr))
}

/// Writes a packet to a capture file.
pub(crate) fn dump(dumper: &mut DumperPtr, mbuf: &MbufPtr) {
let mut pkthdr = cffi::pcap_pkthdr::default();
pkthdr.len = mbuf.data_len as u32;
pkthdr.caplen = pkthdr.len;

unsafe {
// If this errors, we'll still want to write packet(s) to the pcap,
let _ = libc::gettimeofday(
&mut pkthdr.ts as *mut cffi::timeval as *mut libc::timeval,
ptr::null_mut(),
);

cffi::pcap_dump(
dumper.deref_mut() as *mut cffi::pcap_dumper_t as *mut raw::c_uchar,
&pkthdr,
(mbuf.buf_addr as *mut u8).offset(mbuf.data_off as isize),
);
}
}

/// Flushes to a savefile packets dumped.
pub(crate) fn dump_flush(dumper: &mut DumperPtr) -> Result<()> {
unsafe {
cffi::pcap_dump_flush(dumper.deref_mut())
.into_result(|_| PcapError::new("Cannot flush packets to capture file."))
.map(|_| ())
}
}

/// Closes a savefile being written to.
pub(crate) fn dump_close(dumper: &mut DumperPtr) {
unsafe {
cffi::pcap_dump_close(dumper.deref_mut());
}
}

/// Closes a capture device or savefile
pub(crate) fn close(handle: &mut PcapPtr) {
unsafe {
cffi::pcap_close(handle.deref_mut());
}
}

/// Opens a saved capture file for reading.
#[cfg(test)]
pub(crate) fn open_offline<S: Into<String>>(filename: S) -> Result<PcapPtr> {
let filename: String = filename.into();
let mut errbuf: [raw::c_char; PCAP_ERRBUF_SIZE] = [0; PCAP_ERRBUF_SIZE];

let ptr = unsafe {
cffi::pcap_open_offline(filename.into_cstring().as_ptr(), errbuf.as_mut_ptr())
.into_result(|_| PcapError::new(errbuf.as_str()))?
};

Ok(EasyPtr(ptr))
}

/// Reads the next packet from a `pcap_t` handle.
#[cfg(test)]
pub(crate) fn next(handle: &mut PcapPtr) -> Result<&[u8]> {
let mut pkthdr: *mut cffi::pcap_pkthdr = ptr::null_mut();
let mut pktdata: *const raw::c_uchar = ptr::null();

unsafe {
match cffi::pcap_next_ex(handle.deref_mut(), &mut pkthdr, &mut pktdata) {
1 => Ok(std::slice::from_raw_parts(
pktdata,
(*pkthdr).caplen as usize,
)),
_ => Err(PcapError::get_error(handle).into()),
}
}
}

/// An error generated in `libpcap`.
#[derive(Debug, Error)]
#[error("{0}")]
pub(crate) struct PcapError(String);

impl PcapError {
/// Returns the `PcapError` with the given error message.
#[inline]
fn new(msg: &str) -> Self {
PcapError(msg.into())
}

/// Returns the `PcapError` pertaining to the last `libpcap` error.
#[inline]
fn get_error(handle: &mut PcapPtr) -> Self {
let msg = unsafe { cffi::pcap_geterr(handle.deref_mut()) };
PcapError::new((msg as *const raw::c_char).as_str())
}
}
35 changes: 30 additions & 5 deletions core/src/rt2/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub struct RuntimeConfig {
/// execution.
pub worker_cores: Vec<usize>,

/// The root data directory the application writes to.
///
/// If unset, the default is `/var/capsule/{app_name}`.
#[serde(default)]
pub data_dir: Option<String>,

/// Per mempool settings. On a system with multiple sockets, aka NUMA
/// nodes, one mempool will be allocated for each socket the apllication
/// uses.
Expand Down Expand Up @@ -193,6 +199,18 @@ impl RuntimeConfig {

eal_args
}

/// Returns the data directory.
#[allow(dead_code)]
pub(crate) fn data_dir(&self) -> String {
self.data_dir.clone().unwrap_or_else(|| {
let base_dir = "/var/capsule";
match &self.app_group {
Some(group) => format!("{}/{}/{}", base_dir, group, self.app_name),
None => format!("{}/{}", base_dir, self.app_name),
}
})
}
}

impl fmt::Debug for RuntimeConfig {
Expand Down Expand Up @@ -367,7 +385,7 @@ mod tests {
use super::*;

#[test]
fn config_defaults() {
fn config_defaults() -> Result<()> {
const CONFIG: &str = r#"
app_name = "myapp"
main_core = 0
Expand All @@ -377,10 +395,11 @@ mod tests {
device = "0000:00:01.0"
"#;

let config: RuntimeConfig = toml::from_str(CONFIG).unwrap();
let config: RuntimeConfig = toml::from_str(CONFIG)?;

assert_eq!(false, config.secondary);
assert_eq!(None, config.app_group);
assert_eq!(None, config.data_dir);
assert_eq!(None, config.dpdk_args);
assert_eq!(default_capacity(), config.mempool.capacity);
assert_eq!(default_cache_size(), config.mempool.cache_size);
Expand All @@ -391,10 +410,14 @@ mod tests {
assert_eq!(default_port_txqs(), config.ports[0].txqs);
assert_eq!(default_promiscuous_mode(), config.ports[0].promiscuous);
assert_eq!(default_multicast_mode(), config.ports[0].multicast);

assert_eq!("/var/capsule/myapp", &config.data_dir());

Ok(())
}

#[test]
fn config_to_eal_args() {
fn config_to_eal_args() -> Result<()> {
const CONFIG: &str = r#"
app_name = "myapp"
secondary = false
Expand All @@ -421,7 +444,7 @@ mod tests {
txqs = 32
"#;

let config: RuntimeConfig = toml::from_str(CONFIG).unwrap();
let config: RuntimeConfig = toml::from_str(CONFIG)?;

assert_eq!(
&[
Expand All @@ -443,6 +466,8 @@ mod tests {
"eal:8"
],
config.to_eal_args().as_slice(),
)
);

Ok(())
}
}
15 changes: 14 additions & 1 deletion core/src/rt2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
mod config;
mod lcore;
mod mempool;
#[cfg(feature = "pcap-dump")]
#[cfg_attr(docsrs, doc(cfg(feature = "pcap-dump")))]
mod pcap_dump;
mod port;

pub use self::config::*;
Expand Down Expand Up @@ -89,6 +92,8 @@ pub struct Runtime {
mempool: ManuallyDrop<Mempool>,
lcores: ManuallyDrop<LcoreMap>,
ports: ManuallyDrop<PortMap>,
#[cfg(feature = "pcap-dump")]
pcap_dump: ManuallyDrop<self::pcap_dump::PcapDump>,
}

impl Runtime {
Expand Down Expand Up @@ -155,13 +160,19 @@ impl Runtime {
port.start()?;
ports.push(port);
}
let ports: PortMap = ports.into();

#[cfg(feature = "pcap-dump")]
let pcap_dump = self::pcap_dump::enable_pcap_dump(&config.data_dir(), &ports, &lcores)?;

info!("runtime ready.");

Ok(Runtime {
mempool: ManuallyDrop::new(mempool),
lcores: ManuallyDrop::new(lcores),
ports: ManuallyDrop::new(ports.into()),
ports: ManuallyDrop::new(ports),
#[cfg(feature = "pcap-dump")]
pcap_dump: ManuallyDrop::new(pcap_dump),
})
}

Expand Down Expand Up @@ -203,6 +214,8 @@ impl Drop for RuntimeGuard {
}

unsafe {
#[cfg(feature = "pcap-dump")]
ManuallyDrop::drop(&mut self.runtime.pcap_dump);
ManuallyDrop::drop(&mut self.runtime.ports);
ManuallyDrop::drop(&mut self.runtime.lcores);
ManuallyDrop::drop(&mut self.runtime.mempool);
Expand Down
Loading

0 comments on commit 34a01a3

Please sign in to comment.