diff --git a/.gitignore b/.gitignore index 8a46c69ab9..3f4414cf04 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ Vagrantfile # pcap out.pcap +*.lnk # logs *.log diff --git a/Cargo.toml b/Cargo.toml index 30f7e6ae03..29bb2a4bb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ members=["framework", "test/srv6-inject", "test/tcp-checksum", "test/icmpv6", - "test/mtu-too-big" + "test/mtu-too-big", + "test/transform-error" # "test/delay-test", # "test/chain-test", # "test/shutdown-test", diff --git a/examples.sh b/examples.sh index 555f2a2365..46f0b73ddf 100644 --- a/examples.sh +++ b/examples.sh @@ -14,6 +14,7 @@ export examples=( test/tcp-checksum test/icmpv6 test/mtu-too-big + test/transform-error # test/delay-test # test/chain-test # test/shutdown-test diff --git a/framework/Cargo.toml b/framework/Cargo.toml index 39e340539a..156aedf549 100644 --- a/framework/Cargo.toml +++ b/framework/Cargo.toml @@ -8,34 +8,33 @@ build = "build.rs" doctest = false [dependencies] -libc = ">= 0.2.4" -time = ">=0.1.0" -getopts = "*" byteorder = "*" clippy = { version = ">=0.0.197", optional = true } +crossbeam = "=0.4" +enum_primitive = "0.1.1" +failure = "0.1.5" fnv = "*" -twox-hash = "*" -regex = "*" +generic-array = ">=0.11.0" +getopts = "*" +hex = "0.3.2" lazy_static = "*" +libc = ">= 0.2.4" +log = { version = "0.4", features = ["std", "serde"]} net2 = "*" # NIX restricts us to just unix for now, we can fix this if someone cares at a later point. nix = "*" -# Figure out if we want this permanently or just for now. -rust-sctp = { git="https://github.com/netsys/rust-sctp", optional = true} -toml = "*" -serde = "1.0" -serde_derive = "1.0" -# Hack for SHM -uuid = { version = "*", features=["v4"] } -error-chain = "0.12.0" num = "0.1" num-derive="0.2" num-traits = "0.2" -generic-array = ">=0.11.0" -crossbeam = "=0.4" rayon = "1.0.3" -hex = "0.3.2" -enum_primitive = "0.1.1" +regex = "*" +rust-sctp = { git="https://github.com/netsys/rust-sctp", optional = true} +serde = "1.0" +serde_derive = "1.0" +time = ">=0.1.0" +toml = "*" +twox-hash = "*" +uuid = { version = "*", features = ["v4"] } [features] default = [] diff --git a/framework/src/common/errors.rs b/framework/src/common/errors.rs index a16ec6a89c..89e2817b87 100644 --- a/framework/src/common/errors.rs +++ b/framework/src/common/errors.rs @@ -1,98 +1,112 @@ -error_chain! { - errors { - FailedAllocation { - description("Failed to allocate memory") - display("Failed to allocate memory") - } - FailedDeallocation { - description("Failed to deallocate memory") - display("Failed to deallocate memory") - } - FailedToInitializePort(port: i32) { - description("Failed to initialize port") - display("Failed to initialize port: {}", port) - } - BadQueue { - description("Invalid queue request") - display("Invalid queue request") - } - CannotSend { - description("Cannot send data out port") - display("Cannot send data out port") - } - BadDev(dev: String) { - description("Cannot find device") - display("Cannot find device: {}", dev) - } - BadVdev(vdev: String) { - description("Bad vdev specification") - display("Bad vdev specification: {}", vdev) - } - BadTxQueue(port: i32, queue: i32) { - description("Bad TX queue") - display("Bad TX queue {} for port {}", queue, port) - } - BadRxQueue(port: i32, queue: i32) { - description("Bad RX queue") - display("Bad RX queue {} for port {}", queue, port) - } - BadOffset(offset: usize) { - description("Attempt to access bad packet offset") - display("Attempt to access bad packet offset {}", offset) - } - - MetadataTooLarge { - description("Metadata is too large") - display("Metadata is too large") - } - - RingAllocationFailure { - description("Could not allocate ring") - display("Could not allocate ring") - } - - InvalidRingSize(size: usize) { - description("Bad ring size, must be power of 2") - display("Bad ring size {}, must be a power of 2", size) - } - - RingDuplicationFailure { - description("Address of second copy of ring does not match expected address") - display("Address of second copy of ring does not match expected address") - } - - ConfigurationError(description: String) { - description("Configuration error") - display("Configuration error: {}", description) - } - - NoRunningSchedulerOnCore(core: i32) { - description("No scheduler running on core") - display("No scheduler running on core {}", core) - } - - FailedToInsertHeader { - description("Failed to insert header into packet") - display("Failed to insert header into packet") - } - - FailedToSwapHeader(new_header: String) { - description("Failed to swap-in new header in packet") - display("Failed to swap-in new header - {} - in packet", new_header) - } - - FailedToRemoveHeader { - description("Failed to remove header from packet") - display("Failed to remove header from packet") - } - - FailedToParseMacAddress(s: String) { - description("Failed to parse MAC address") - display("Failed to parse MAC address: '{}'", s) - } +use failure::{Error, Fail}; + +pub type Result = ::std::result::Result; + +#[derive(Debug, Fail)] +pub enum NetBricksError { + #[fail(display = "Failed to allocate memory")] + FailedAllocation, + + #[fail(display = "Failed to free memory buffer (mbuf) {}", _0)] + FailedToFreeMBuf(i32), + + #[fail(display = "Failed to remove/drop packets from buffer")] + FailedToDropPackets, + + #[fail(display = "Failed to deallocate memory")] + FailedDeallocation, + + #[fail(display = "Failed to initialize port: {}", _0)] + FailedToInitializePort(i32), + + #[fail(display = "Invalid queue request")] + BadQueue, + + #[fail(display = "Cannot send data out port")] + CannotSend, + + #[fail(display = "Cannot find device: {}", _0)] + BadDev(String), + + #[fail(display = "Bad vdev specification: {}", _0)] + BadVdev(String), + + #[fail(display = "Bad TX queue {} for port {}", _0, _1)] + BadTxQueue(i32, i32), + + #[fail(display = "Bad RX queue {} for port {}", _0, _1)] + BadRxQueue(i32, i32), + + #[fail(display = "Attempt to access bad packet offset {}", _0)] + BadOffset(usize), + + #[fail(display = "Metadata is too large")] + MetadataTooLarge, + + #[fail(display = "Could not allocate ring")] + RingAllocationFailure, + + #[fail(display = "Address of second copy of ring does not match expected address")] + RingDuplicationFailure, + + #[fail(display = "Bad ring size {}, must be a power of 2", _0)] + InvalidRingSize(usize), + + #[fail(display = "Configuration error: {}", _0)] + ConfigurationError(String), + + #[fail(display = "No scheduler running on core {}", _0)] + NoRunningSchedulerOnCore(i32), + + #[fail(display = "Failed to insert header into packet")] + FailedToInsertHeader, + + #[fail( + display = "Failed to swap-in new header - {} - in packet, new_header", + _0 + )] + FailedToSwapHeader(String), + + #[fail(display = "Failed to remove header from packet")] + FailedToRemoveHeader, + + #[fail(display = "Failed to parse MAC address: '{}'", _0)] + FailedToParseMacAddress(String), + + #[fail(display = "_")] + #[doc(hidden)] + __Nonexhaustive, +} + +#[macro_export] +macro_rules! error_chain { + ($error:expr) => { + error!("{}", $crate::common::errors::string_chain($error)) + }; +} + +#[macro_export] +macro_rules! warn_chain { + ($error:expr) => { + warn!("{}", $crate::common::errors::string_chain($error)) + }; +} + +/// Read a `failure` `Error` and print out the causes and a backtrace as +/// `log::error`s +pub fn string_chain(e: &Error) -> String { + let mut error = e.to_string(); + + for cause in e.iter_causes() { + error.push_str(&format!("\nCaused by: {}", cause)); } - foreign_links { - Io(::std::io::Error); + if let Ok("1") = ::std::env::var("RUST_BACKTRACE") + .as_ref() + .map(|s| s.as_str()) + { + error.push_str(&format!("\nBacktrace:\n{}", e.backtrace())) } + + error } diff --git a/framework/src/common/mod.rs b/framework/src/common/mod.rs index 5ff20d218f..17cf6d2cad 100644 --- a/framework/src/common/mod.rs +++ b/framework/src/common/mod.rs @@ -1,15 +1,7 @@ -mod errors; pub use self::errors::*; +#[macro_use] +pub mod errors; + /// Null metadata associated with packets initially. pub struct EmptyMetadata; - -pub fn print_error(e: &Error) { - println!("Error: {}", e); - for e in e.iter().skip(1) { - println!("Cause: {}", e); - } - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } -} diff --git a/framework/src/config/config_reader.rs b/framework/src/config/config_reader.rs index 7b84ab0f55..30e9ba642c 100644 --- a/framework/src/config/config_reader.rs +++ b/framework/src/config/config_reader.rs @@ -1,4 +1,4 @@ -use super::{NetbricksConfiguration, PortConfiguration}; +use super::{NetBricksConfiguration, PortConfiguration}; use common::*; use std::fs::File; use std::io::Read; @@ -19,7 +19,7 @@ fn read_port(value: &Value) -> Result { let name = match port_def.get("name") { Some(&Value::String(ref name)) => name.clone(), _ => { - return Err(ErrorKind::ConfigurationError(String::from( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse name for port", )) .into()); @@ -30,7 +30,7 @@ fn read_port(value: &Value) -> Result { Some(&Value::Integer(rxd)) => rxd as i32, None => NUM_RXD, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse number of rx descriptors {:?}", v )) @@ -42,7 +42,7 @@ fn read_port(value: &Value) -> Result { Some(&Value::Integer(txd)) => txd as i32, None => NUM_TXD, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse number of tx descriptors {:?}", v )) @@ -54,7 +54,7 @@ fn read_port(value: &Value) -> Result { Some(&Value::Boolean(l)) => l, None => false, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse loopback spec {:?}", v )) @@ -66,7 +66,7 @@ fn read_port(value: &Value) -> Result { Some(&Value::Boolean(l)) => l, None => false, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse tso spec {:?}", v )) @@ -78,7 +78,7 @@ fn read_port(value: &Value) -> Result { Some(&Value::Boolean(l)) => l, None => false, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse csum spec {:?}", v )) @@ -90,11 +90,7 @@ fn read_port(value: &Value) -> Result { if symmetric_queue && (port_def.contains_key("rx_cores") || port_def.contains_key("tx_cores")) { - println!( - "cores specified along with rx_cores and/or tx_cores for port {}", - name - ); - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "cores specified along with rx_cores and/or tx_cores \ for port {}", name @@ -110,8 +106,8 @@ fn read_port(value: &Value) -> Result { if let Value::Integer(core) = *q { qs.push(core as i32) } else { - return Err(ErrorKind::ConfigurationError(format!( - "Could not parse queue spec {:?}", + return Err(NetBricksError::ConfigurationError(format!( + "Could not parse queue spec {}", q )) .into()); @@ -153,25 +149,24 @@ fn read_port(value: &Value) -> Result { tso: tso, }) } else { - Err(ErrorKind::ConfigurationError(String::from("Could not understand port spec")).into()) + Err(NetBricksError::ConfigurationError(format!("Could not understand port spec")).into()) } } -/// Read a TOML string and create a `NetbricksConfiguration` structure. +/// Read a TOML string and create a `NetBricksConfiguration` structure. /// `configuration` is a TOML formatted string. /// `filename` is used for error reporting purposes, and is otherwise meaningless. pub fn read_configuration_from_str( configuration: &str, filename: &str, -) -> Result { +) -> Result { // Parse string for TOML file. let toml = match toml::de::from_str::(configuration) { Ok(toml) => toml, Err(error) => { - println!("Parse error: {} in file: {}", error, filename); - return Err(ErrorKind::ConfigurationError(format!( - "Experienced {} parse errors in spec.", - error + return Err(NetBricksError::ConfigurationError(format!( + "Parse error: {} in file: {}", + error, filename )) .into()); } @@ -182,8 +177,7 @@ pub fn read_configuration_from_str( Some(&Value::String(ref name)) => name.clone(), None => String::from(DEFAULT_NAME), _ => { - println!("Could not parse name"); - return Err(ErrorKind::ConfigurationError(String::from("Could not parse name")).into()); + return Err(NetBricksError::ConfigurationError(format!("Could not parse name")).into()); } }; @@ -193,7 +187,7 @@ pub fn read_configuration_from_str( Some(&Value::String(ref core)) => match core.parse() { Ok(c) => c, _ => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse {} as core", core )) @@ -202,10 +196,11 @@ pub fn read_configuration_from_str( }, None => DEFAULT_PRIMARY_CORE, v => { - println!("Could not parse core"); - return Err( - ErrorKind::ConfigurationError(format!("Could not parse {:?} as core", v)).into(), - ); + return Err(NetBricksError::ConfigurationError(format!( + "Could not parse {:?} as core", + v + )) + .into()); } }; @@ -214,9 +209,8 @@ pub fn read_configuration_from_str( Some(&Value::Integer(pool)) => pool as u32, None => DEFAULT_POOL_SIZE, _ => { - println!("Could parse pool size"); return Err( - ErrorKind::ConfigurationError(String::from("Could not parse pool size")).into(), + NetBricksError::ConfigurationError(format!("Could not parse pool size",)).into(), ); } }; @@ -226,9 +220,8 @@ pub fn read_configuration_from_str( Some(&Value::Integer(cache)) => cache as u32, None => DEFAULT_CACHE_SIZE, _ => { - println!("Could parse cache size"); return Err( - ErrorKind::ConfigurationError(String::from("Could not parse cache size")).into(), + NetBricksError::ConfigurationError(format!("Could not parse cache size",)).into(), ); } }; @@ -238,8 +231,7 @@ pub fn read_configuration_from_str( Some(&Value::Boolean(secondary)) => secondary, None => DEFAULT_SECONDARY, _ => { - println!("Could not parse whether this is a secondary process"); - return Err(ErrorKind::ConfigurationError(String::from( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse secondary processor spec", )) .into()); @@ -254,7 +246,7 @@ pub fn read_configuration_from_str( if let Value::Integer(core) = *core { cores.push(core as i32) } else { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse core spec {}", core )) @@ -265,8 +257,9 @@ pub fn read_configuration_from_str( } None => Vec::with_capacity(0), _ => { - println!("Cores is not an array"); - return Err(ErrorKind::ConfigurationError(String::from("Cores is not an array")).into()); + return Err( + NetBricksError::ConfigurationError(format!("Cores is not an array")).into(), + ); } }; @@ -274,7 +267,7 @@ pub fn read_configuration_from_str( Some(&Value::Boolean(l)) => l, None => false, v => { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Could not parse strict spec (should be boolean) {:?}", v )) @@ -288,18 +281,18 @@ pub fn read_configuration_from_str( for port in ports { let p = try!(read_port(port)); pouts.push(p); - // match read_port(port) { } pouts } None => Vec::with_capacity(0), _ => { - println!("Ports is not an array"); - return Err(ErrorKind::ConfigurationError(String::from("Ports is not an array")).into()); + return Err( + NetBricksError::ConfigurationError(format!("Ports is not an array")).into(), + ); } }; - Ok(NetbricksConfiguration { + Ok(NetBricksConfiguration { name: name, primary_core: master_lcore, cores: cores, @@ -312,11 +305,10 @@ pub fn read_configuration_from_str( }) } -/// Read a configuration file and create a `NetbricksConfiguration` structure. +/// Read a configuration file and create a `NetBricksConfiguration` structure. /// `filename` should be TOML formatted file. -pub fn read_configuration(filename: &str) -> Result { +pub fn read_configuration(filename: &str) -> Result { let mut toml_str = String::new(); - let _ = try! {File::open(filename).and_then(|mut f| f.read_to_string(&mut toml_str)) - .chain_err(|| ErrorKind::ConfigurationError(String::from("Could not read file")))}; + File::open(filename).and_then(|mut f| f.read_to_string(&mut toml_str))?; read_configuration_from_str(&toml_str[..], filename) } diff --git a/framework/src/config/flag_reader.rs b/framework/src/config/flag_reader.rs index 7ecb5abf5e..7ff3ecc0b4 100644 --- a/framework/src/config/flag_reader.rs +++ b/framework/src/config/flag_reader.rs @@ -1,7 +1,5 @@ -extern crate getopts; -use self::getopts::{Matches, Options}; -use super::{read_configuration, NetbricksConfiguration, PortConfiguration}; -use common::print_error; +use super::{read_configuration, NetBricksConfiguration, PortConfiguration}; +use getopts::{Matches, Options}; use std::collections::HashMap; use std::env; use std::process; @@ -21,14 +19,13 @@ pub fn basic_opts() -> Options { opts.optopt("", "cache_size", "Core Cache Size", "size"); opts.optopt("f", "configuration", "Configuration file", "path"); opts.optmulti("", "dpdk_args", "DPDK arguments", "DPDK arguments"); - opts } /// Read the commonly used configuration flags parsed by `basic_opts()` into -/// a `NetbricksConfiguration`. Some flags may cause side effects -- for example, the +/// a `NetBricksConfiguration`. Some flags may cause side effects -- for example, the /// help flag will print usage information and then exit the process. -pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration { +pub fn read_matches(matches: &Matches, opts: &Options) -> NetBricksConfiguration { if matches.opt_present("h") { let program = env::args().next().unwrap(); print!("{}", opts.usage(&format!("Usage: {} [options]", program))); @@ -46,17 +43,17 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration match read_configuration(&config_file[..]) { Ok(cfg) => cfg, Err(ref e) => { - print_error(e); + eprintln!("Error reading configuration file ~ {}", e); process::exit(1); } } } else { let name = matches.opt_str("n").unwrap_or_else(|| String::from("recv")); - NetbricksConfiguration::new_with_name(&name[..]) + NetBricksConfiguration::new_with_name(&name[..]) }; let configuration = if matches.opt_present("m") { - NetbricksConfiguration { + NetBricksConfiguration { primary_core: matches .opt_str("m") .unwrap() @@ -70,7 +67,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration }; let configuration = if matches.opt_present("secondary") { - NetbricksConfiguration { + NetBricksConfiguration { secondary: true, ..configuration } @@ -79,7 +76,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration }; let configuration = if matches.opt_present("primary") { - NetbricksConfiguration { + NetBricksConfiguration { secondary: false, ..configuration } @@ -88,7 +85,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration }; let configuration = if matches.opt_present("pool_size") { - NetbricksConfiguration { + NetBricksConfiguration { pool_size: matches .opt_str("pool_size") .unwrap() @@ -101,7 +98,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration }; let configuration = if matches.opt_present("cache_size") { - NetbricksConfiguration { + NetBricksConfiguration { cache_size: matches .opt_str("cache_size") .unwrap() @@ -136,7 +133,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration ports.push(PortConfiguration::new_with_queues(*port, cores, cores)) } cores.dedup(); - NetbricksConfiguration { + NetBricksConfiguration { cores: cores, ports: ports, ..configuration @@ -145,7 +142,7 @@ pub fn read_matches(matches: &Matches, opts: &Options) -> NetbricksConfiguration configuration }; - println!("Going to start with configuration {}", configuration); + info!("Going to start with configuration {}", configuration); configuration } diff --git a/framework/src/config/mod.rs b/framework/src/config/mod.rs index 4540cd7c88..e408208d36 100644 --- a/framework/src/config/mod.rs +++ b/framework/src/config/mod.rs @@ -7,7 +7,7 @@ mod flag_reader; /// `NetBricks` control configuration. In theory all applications create one of these, either through the use of /// `read_configuration` or manually using args. #[derive(Debug, Deserialize)] -pub struct NetbricksConfiguration { +pub struct NetBricksConfiguration { /// Name, this is passed on to DPDK. If you want to run multiple DPDK apps, this needs to be unique per application. pub name: String, /// Should this process be run as a secondary process or a primary process? @@ -31,10 +31,10 @@ pub struct NetbricksConfiguration { pub dpdk_args: Option, } -/// Create an empty `NetbricksConfiguration`, useful when initializing through arguments. -impl Default for NetbricksConfiguration { - fn default() -> NetbricksConfiguration { - NetbricksConfiguration { +/// Create an empty `NetBricksConfiguration`, useful when initializing through arguments. +impl Default for NetBricksConfiguration { + fn default() -> NetBricksConfiguration { + NetBricksConfiguration { name: String::new(), pool_size: DEFAULT_POOL_SIZE, cache_size: DEFAULT_CACHE_SIZE, @@ -48,17 +48,17 @@ impl Default for NetbricksConfiguration { } } -impl NetbricksConfiguration { - /// Create a `NetbricksConfiguration` with a name. - pub fn new_with_name(name: &str) -> NetbricksConfiguration { - NetbricksConfiguration { +impl NetBricksConfiguration { + /// Create a `NetBricksConfiguration` with a name. + pub fn new_with_name(name: &str) -> NetBricksConfiguration { + NetBricksConfiguration { name: String::from(name), ..Default::default() } } } -impl fmt::Display for NetbricksConfiguration { +impl fmt::Display for NetBricksConfiguration { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { try!(write!( f, diff --git a/framework/src/headers/mac.rs b/framework/src/headers/mac.rs index 6d64422d4c..88816bd23e 100644 --- a/framework/src/headers/mac.rs +++ b/framework/src/headers/mac.rs @@ -1,5 +1,6 @@ use super::{EndOffset, HeaderUpdates}; use common::*; +use failure::Error; use headers::{NextHeader, NullHeader}; use hex; use num::FromPrimitive; @@ -64,12 +65,11 @@ impl MacAddress { impl FromStr for MacAddress { type Err = Error; - fn from_str(s: &str) -> ::std::result::Result { + + fn from_str(s: &str) -> Result { match hex::decode(s.replace(":", "").replace("-", "")) { Ok(ref v) if v.len() == 6 => Ok(MacAddress::new_from_slice(v.as_slice())), - _ => Err(Error::from_kind(ErrorKind::FailedToParseMacAddress( - s.to_string(), - ))), + _ => Err(NetBricksError::FailedToParseMacAddress(s.to_string()).into()), } } } @@ -233,9 +233,10 @@ mod tests { let address = "go:od:ca:fe:be:ef"; let parsed = MacAddress::from_str(address); assert!(parsed.is_err()); - match parsed.err() { - Some(Error(ErrorKind::FailedToParseMacAddress(s), _)) => { - assert_eq!(address.to_string(), s); + match parsed { + Err(e) => { + let err = format_err!("Failed to parse MAC address: '{}'", address.to_string()); + assert_eq!(e.to_string(), err.to_string()); } _ => assert!(false), } @@ -246,9 +247,10 @@ mod tests { let address = "ab:ad:ad:d4"; let parsed = MacAddress::from_str(address); assert!(parsed.is_err()); - match parsed.err() { - Some(Error(ErrorKind::FailedToParseMacAddress(s), _)) => { - assert_eq!(address.to_string(), s); + match parsed { + Err(e) => { + let err = format_err!("Failed to parse MAC address: '{}'", address.to_string()); + assert_eq!(e.to_string(), err.to_string()); } _ => assert!(false), } diff --git a/framework/src/interface/dpdk.rs b/framework/src/interface/dpdk.rs index 6f34114301..5ba4b0fb22 100644 --- a/framework/src/interface/dpdk.rs +++ b/framework/src/interface/dpdk.rs @@ -1,5 +1,5 @@ use super::METADATA_SLOTS; -use config::{NetbricksConfiguration, DEFAULT_CACHE_SIZE, DEFAULT_POOL_SIZE}; +use config::{NetBricksConfiguration, DEFAULT_CACHE_SIZE, DEFAULT_POOL_SIZE}; use native::libnuma; use native::zcsi; use std::cell::Cell; @@ -60,10 +60,11 @@ pub fn init_system_secondary(name: &str, core: i32) { } /// Initialize the system based on the supplied scheduler configuration. -pub fn init_system(config: &NetbricksConfiguration) { +pub fn init_system(config: &NetBricksConfiguration) { if config.name.is_empty() { panic!("Configuration must provide a name."); } + // We init with all devices blacklisted and rely on the attach logic to white list them as necessary. if config.secondary { // We do not have control over any of the other settings in this case. @@ -85,11 +86,11 @@ thread_local!(static NUMA_DOMAIN: Cell = Cell::new(-1)); pub fn set_numa_domain() { let domain = unsafe { if libnuma::numa_available() == -1 { - println!("No NUMA information found, support disabled"); + info!("No NUMA information found, support disabled"); -1 } else { let domain = libnuma::numa_preferred(); - println!("Running on node {}", domain); + info!("Running on node {}", domain); domain } }; @@ -103,9 +104,9 @@ pub fn init_thread(tid: i32, core: i32) { f.set(numa); }); if numa == -1 { - println!("No NUMA information found, support disabled"); + info!("No NUMA information found, support disabled"); } else { - println!("Running on node {}", numa); + info!("Running on node {}", numa); }; } diff --git a/framework/src/interface/packet.rs b/framework/src/interface/packet.rs index 2c857bbe29..8eb6ed52c0 100644 --- a/framework/src/interface/packet.rs +++ b/framework/src/interface/packet.rs @@ -332,7 +332,7 @@ impl Packet { #[inline] pub fn write_metadata(&mut self, metadata: &M2) -> Result<()> { if size_of::() >= FREEFORM_METADATA_SIZE { - Err(ErrorKind::MetadataTooLarge.into()) + Err(NetBricksError::MetadataTooLarge.into()) } else { unsafe { let ptr = MBuf::mut_metadata_as::(self.mbuf, FREEFORM_METADATA_SLOT); @@ -429,7 +429,7 @@ impl Packet { ptr::copy_nonoverlapping(hdr, fin_dst as *mut T2, 1); Ok(header_size as isize) } else { - Err(ErrorKind::FailedToInsertHeader.into()) + Err(NetBricksError::FailedToInsertHeader.into()) } } } @@ -463,7 +463,7 @@ impl Packet { on_insert(current_header); Ok(()) } else { - Err(ErrorKind::FailedToInsertHeader.into()) + Err(NetBricksError::FailedToInsertHeader.into()) } } } @@ -510,12 +510,12 @@ impl Packet { let removed = (*self.mbuf).remove_data_end(var_header_size); if removed != var_header_size { - Err(ErrorKind::FailedToRemoveHeader.into()) + Err(NetBricksError::FailedToRemoveHeader.into()) } else { Ok(-(var_header_size as isize)) } } else { - Err(ErrorKind::FailedToRemoveHeader.into()) + Err(NetBricksError::FailedToRemoveHeader.into()) } } } @@ -558,7 +558,7 @@ impl Packet { on_remove(current_header); Ok(()) } - _ => Err(ErrorKind::FailedToRemoveHeader.into()), + _ => Err(NetBricksError::FailedToRemoveHeader.into()), } } } @@ -602,7 +602,9 @@ impl Packet { let removed = (*self.mbuf).remove_data_end(diff); if removed <= new_hdr_size && diff != removed { - return Err(ErrorKind::FailedToSwapHeader(format!("{}", new_header)).into()); + return Err( + NetBricksError::FailedToSwapHeader(format!("{}", new_header)).into(), + ); } } Ordering::Greater => { @@ -611,7 +613,9 @@ impl Packet { let added = (*self.mbuf).add_data_end(diff); if added >= new_hdr_size && diff != added { - return Err(ErrorKind::FailedToSwapHeader(format!("{}", new_header)).into()); + return Err( + NetBricksError::FailedToSwapHeader(format!("{}", new_header)).into(), + ); } ptr::copy(payload, move_loc, to_move); } @@ -634,7 +638,7 @@ impl Packet { on_swap(current_header); Ok(diff) } else { - Err(ErrorKind::FailedToSwapHeader(format!("{}", new_header)).into()) + Err(NetBricksError::FailedToSwapHeader(format!("{}", new_header)).into()) } } } @@ -662,7 +666,7 @@ impl Packet { ptr::copy_nonoverlapping(src, dst, size); Ok(()) } else { - Err(ErrorKind::FailedAllocation.into()) + Err(NetBricksError::FailedAllocation.into()) } } } @@ -682,7 +686,7 @@ impl Packet { if added >= size { Ok(()) } else { - Err(ErrorKind::FailedAllocation.into()) + Err(NetBricksError::FailedAllocation.into()) } } } @@ -694,7 +698,7 @@ impl Packet { offset: usize, ) -> Result<()> { if offset > self.payload_size() { - Err(ErrorKind::BadOffset(offset).into()) + Err(NetBricksError::BadOffset(offset).into()) } else { unsafe { let dst = self.payload().offset(offset as isize); diff --git a/framework/src/interface/port/phy_port.rs b/framework/src/interface/port/phy_port.rs index 9a8b0429cc..2902bfd555 100644 --- a/framework/src/interface/port/phy_port.rs +++ b/framework/src/interface/port/phy_port.rs @@ -152,9 +152,9 @@ impl PmdPort { txq: i32, ) -> Result> { if rxq > port.rxqs || rxq < 0 { - Err(ErrorKind::BadRxQueue(port.port, rxq).into()) + Err(NetBricksError::BadRxQueue(port.port, rxq).into()) } else if txq > port.txqs || txq < 0 { - Err(ErrorKind::BadTxQueue(port.port, txq).into()) + Err(NetBricksError::BadTxQueue(port.port, txq).into()) } else { Ok(CacheAligned::allocate(PortQueue { port: port.clone(), @@ -230,10 +230,10 @@ impl PmdPort { stats_tx: (0..txqs).map(|_| Arc::new(PortStats::new())).collect(), })) } else { - Err(ErrorKind::FailedToInitializePort(port).into()) + Err(NetBricksError::FailedToInitializePort(port).into()) } } else { - Err(ErrorKind::FailedToInitializePort(port).into()) + Err(NetBricksError::FailedToInitializePort(port).into()) } } @@ -257,7 +257,7 @@ impl PmdPort { stats_tx: vec![Arc::new(PortStats::new())], })) } else { - Err(ErrorKind::FailedToInitializePort(port).into()) + Err(NetBricksError::FailedToInitializePort(port).into()) } } @@ -277,10 +277,10 @@ impl PmdPort { stats_tx: vec![Arc::new(PortStats::new())], })) } else { - Err(ErrorKind::FailedToInitializePort(port).into()) + Err(NetBricksError::FailedToInitializePort(port).into()) } } - _ => Err(ErrorKind::BadVdev(String::from(name)).into()), + _ => Err(NetBricksError::BadVdev(String::from(name)).into()), } } @@ -299,7 +299,7 @@ impl PmdPort { let cannonical_spec = PmdPort::cannonicalize_pci(spec); let port = unsafe { attach_pmd_device((cannonical_spec[..]).as_ptr()) }; if port >= 0 { - println!("Going to try and use port {}", port); + info!("Going to try and use port {}", port); PmdPort::init_dpdk_port( port, rxqs, @@ -312,9 +312,9 @@ impl PmdPort { tso, csumoffload, ) - .chain_err(|| ErrorKind::BadDev(String::from(spec))) + .map_err(|_| NetBricksError::BadDev(String::from(spec)).into()) } else { - Err(ErrorKind::BadDev(String::from(spec)).into()) + Err(NetBricksError::BadDev(String::from(spec)).into()) } } diff --git a/framework/src/lib.rs b/framework/src/lib.rs index a937a41c95..9ffeccc395 100644 --- a/framework/src/lib.rs +++ b/framework/src/lib.rs @@ -1,12 +1,12 @@ #![recursion_limit = "1024"] #![feature(asm)] -#![feature(log_syntax)] #![feature(box_syntax)] #![feature(specialization)] -#![feature(slice_concat_ext)] #![feature(fnbox)] #![feature(alloc)] +#![feature(result_map_or_else)] #![feature(const_fn)] +#![feature(custom_attribute)] #![feature(type_ascription)] // FIXME: Figure out if this is really the right thing here. #![feature(ptr_internals)] @@ -14,8 +14,6 @@ // Used for cache alignment. #![feature(allocator_api)] #![allow(unused_features)] -#![allow(renamed_and_removed_lints)] -#![feature(integer_atomics)] #![allow(unused_doc_comments)] #![cfg_attr(feature = "dev", allow(unstable_features))] // Need this since PMD port construction triggers too many arguments. @@ -41,6 +39,8 @@ extern crate sctp; extern crate twox_hash; // TOML for scheduling configuration extern crate toml; + +extern crate serde; #[macro_use] extern crate serde_derive; // UUID for SHM naming @@ -52,9 +52,9 @@ extern crate alloc; // Handle generic, compile-time arrays in structs extern crate generic_array; -// Better error handling. -#[macro_use] -extern crate error_chain; +// Error Handling +#[cfg_attr(test, macro_use)] +extern crate failure; // Bring in crossbeam synchronization primatives extern crate crossbeam; @@ -62,14 +62,20 @@ extern crate crossbeam; // Handle execution on other threads extern crate rayon; +extern crate getopts; + +#[macro_use] +extern crate log; + #[cfg(unix)] extern crate nix; -// need this first so other modules in netbricks can use the macros in tests +// need these first so other modules in netbricks can use the macros +#[macro_use] +pub mod common; pub mod tests; pub mod allocators; -pub mod common; pub mod config; pub mod control; pub mod headers; diff --git a/framework/src/operators/act.rs b/framework/src/operators/act.rs index 82cf482131..8a60edd3ef 100644 --- a/framework/src/operators/act.rs +++ b/framework/src/operators/act.rs @@ -17,7 +17,7 @@ pub trait Act { fn capacity(&self) -> i32; #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option; + fn drop_packets(&mut self, idxes: &[usize]) -> Result; /// Remove all packets from the batch (without actually freeing them). #[inline] diff --git a/framework/src/operators/add_metadata.rs b/framework/src/operators/add_metadata.rs index 08af99c264..e644e0290d 100644 --- a/framework/src/operators/add_metadata.rs +++ b/framework/src/operators/add_metadata.rs @@ -103,7 +103,7 @@ where } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/add_metadata_mut.rs b/framework/src/operators/add_metadata_mut.rs index 8cc1698198..3103412a12 100644 --- a/framework/src/operators/add_metadata_mut.rs +++ b/framework/src/operators/add_metadata_mut.rs @@ -103,7 +103,7 @@ where } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/filter_batch.rs b/framework/src/operators/filter_batch.rs index 06615d77f8..03d124f232 100644 --- a/framework/src/operators/filter_batch.rs +++ b/framework/src/operators/filter_batch.rs @@ -61,7 +61,7 @@ where if !self.remove.is_empty() { self.parent .drop_packets(&self.remove[..]) - .expect("Filtering was performed incorrectly"); + .map_or_else(|ref e| error_chain!(e), |_| ()) } self.remove.clear(); } @@ -82,7 +82,7 @@ where } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/macros.rs b/framework/src/operators/macros.rs index cdb4d134e0..91572ffe1b 100644 --- a/framework/src/operators/macros.rs +++ b/framework/src/operators/macros.rs @@ -50,7 +50,7 @@ macro_rules! act { } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/map_batch.rs b/framework/src/operators/map_batch.rs index 25542d19a8..5b6e6513c0 100644 --- a/framework/src/operators/map_batch.rs +++ b/framework/src/operators/map_batch.rs @@ -79,7 +79,7 @@ where } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/map_results.rs b/framework/src/operators/map_results.rs new file mode 100644 index 0000000000..6629714b98 --- /dev/null +++ b/framework/src/operators/map_results.rs @@ -0,0 +1,135 @@ +use super::act::Act; +use super::iterator::*; +use super::packet_batch::PacketBatch; +use super::Batch; +use common::*; +use headers::EndOffset; +use interface::Packet; +use interface::PacketTx; +use std::marker::PhantomData; + +pub type MapFnResult = Box) -> Result<()> + Send>; + +pub struct MapResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + parent: V, + transformer: MapFnResult, + applied: bool, + remove: Vec, + phantom_t: PhantomData, +} + +impl MapResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + pub fn new(parent: V, transformer: MapFnResult) -> MapResults { + let capacity = parent.capacity() as usize; + MapResults { + parent: parent, + transformer: transformer, + applied: false, + remove: Vec::with_capacity(capacity), + phantom_t: PhantomData, + } + } +} + +impl Batch for MapResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ +} + +impl Act for MapResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + #[inline] + fn act(&mut self) { + if !self.applied { + self.parent.act(); + { + let iter = PayloadEnumerator::::new(&mut self.parent); + while let Some(ParsedDescriptor { packet, index: idx }) = + iter.next(&mut self.parent) + { + if let Err(ref e) = (self.transformer)(&packet) { + error_chain!(e); + self.remove.push(idx) + } + } + } + self.applied = true; + + if !self.remove.is_empty() { + self.parent + .drop_packets(&self.remove[..]) + .map_or_else(|ref e| error_chain!(e), |_| ()) + } + self.remove.clear(); + } + } + + #[inline] + fn done(&mut self) { + self.applied = false; + self.parent.done(); + } + + #[inline] + fn send_q(&mut self, port: &PacketTx) -> Result { + self.parent.send_q(port) + } + + #[inline] + fn capacity(&self) -> i32 { + self.parent.capacity() + } + + #[inline] + fn drop_packets(&mut self, idxes: &[usize]) -> Result { + self.parent.drop_packets(idxes) + } + + #[inline] + fn clear_packets(&mut self) { + self.parent.clear_packets() + } + + #[inline] + fn get_packet_batch(&mut self) -> &mut PacketBatch { + self.parent.get_packet_batch() + } + + #[inline] + fn get_task_dependencies(&self) -> Vec { + self.parent.get_task_dependencies() + } +} + +impl BatchIterator for MapResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + type Header = T; + type Metadata = ::Metadata; + + #[inline] + fn start(&mut self) -> usize { + self.parent.start() + } + + #[inline] + unsafe fn next_payload(&mut self, idx: usize) -> Option> { + // self.parent.next_payload(idx).map(|p| {(self.transformer)(&p.packet); p}) + self.parent.next_payload(idx) + } +} diff --git a/framework/src/operators/merge_batch.rs b/framework/src/operators/merge_batch.rs index 8f8665b5ea..d59ff60126 100644 --- a/framework/src/operators/merge_batch.rs +++ b/framework/src/operators/merge_batch.rs @@ -72,7 +72,7 @@ impl Act for MergeBatch { } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parents[self.which].drop_packets(idxes) } diff --git a/framework/src/operators/mod.rs b/framework/src/operators/mod.rs index b29c07c914..fd2429d12f 100644 --- a/framework/src/operators/mod.rs +++ b/framework/src/operators/mod.rs @@ -11,6 +11,8 @@ pub use self::group_by::*; use self::iterator::BatchIterator; pub use self::map_batch::MapBatch; use self::map_batch::MapFn; +use self::map_results::MapFnResult; +pub use self::map_results::MapResults; pub use self::merge_batch::MergeBatch; pub use self::parsed_batch::ParsedBatch; pub use self::receive_batch::ReceiveBatch; @@ -19,6 +21,8 @@ pub use self::restore_header::*; pub use self::send_batch::SendBatch; pub use self::transform_batch::TransformBatch; use self::transform_batch::TransformFn; +use self::transform_results::TransformFnResult; +pub use self::transform_results::TransformResults; use headers::*; use interface::*; use scheduler::Scheduler; @@ -35,6 +39,7 @@ mod filter_batch; mod group_by; mod iterator; mod map_batch; +mod map_results; mod merge_batch; mod packet_batch; mod parsed_batch; @@ -43,6 +48,7 @@ mod reset_parse; mod restore_header; mod send_batch; mod transform_batch; +mod transform_results; /// Merge a vector of batches into one batch. Currently this just round-robins between merged batches, but in the future /// the precise batch being processed will be determined by the scheduling policy used. @@ -104,7 +110,7 @@ pub trait Batch: BatchIterator + Act + Send { CompositionBatch::new(self) } - /// Transform a header field. + /// Transform header fields. fn transform( self, transformer: TransformFn, @@ -115,6 +121,17 @@ pub trait Batch: BatchIterator + Act + Send { TransformBatch::::new(self, transformer) } + /// Transform header fields while returning results. + fn transform_ok( + self, + transformer: TransformFnResult, + ) -> TransformResults + where + Self: Sized, + { + TransformResults::::new(self, transformer) + } + /// Map over a set of header fields. Map and transform primarily differ in map being immutable. Immutability /// provides some optimization opportunities not otherwise available. fn map(self, transformer: MapFn) -> MapBatch @@ -124,6 +141,17 @@ pub trait Batch: BatchIterator + Act + Send { MapBatch::::new(self, transformer) } + /// Map over a set of header fields while returning results. + fn map_ok( + self, + transformer: MapFnResult, + ) -> MapResults + where + Self: Sized, + { + MapResults::::new(self, transformer) + } + /// Filter out packets, any packets for which `filter_f` returns false are dropped from the batch. fn filter( self, diff --git a/framework/src/operators/packet_batch.rs b/framework/src/operators/packet_batch.rs index f23b0c4fac..247b9cd6cd 100644 --- a/framework/src/operators/packet_batch.rs +++ b/framework/src/operators/packet_batch.rs @@ -5,10 +5,10 @@ use common::*; use headers::NullHeader; use interface::*; use native::zcsi::*; -use std::result; -/// Base packet batch structure, this represents an array of mbufs and is the primary interface for sending and -/// receiving packets from DPDK, allocations, etc. As a result many of the actions implemented in other types of batches +/// Base packet batch structure, this represents an array of mbufs and is the +/// primary interface for sending and receiving packets from DPDK, allocations, +/// etc... As a result many of the actions implemented in other types of batches /// ultimately call into this structure. pub struct PacketBatch { array: Vec<*mut MBuf>, @@ -54,7 +54,7 @@ impl PacketBatch { pub fn allocate_partial_batch_with_size(&mut self, len: u16, cnt: i32) -> Result<&mut Self> { match self.alloc_packet_batch(len, cnt) { Ok(_) => Ok(self), - Err(_) => Err(ErrorKind::FailedAllocation.into()), + Err(_) => Err(NetBricksError::FailedAllocation.into()), } } @@ -63,7 +63,7 @@ impl PacketBatch { pub fn deallocate_batch(&mut self) -> Result<&mut Self> { match self.free_packet_batch() { Ok(_) => Ok(self), - Err(_) => Err(ErrorKind::FailedDeallocation.into()), + Err(_) => Err(NetBricksError::FailedDeallocation.into()), } } @@ -98,13 +98,14 @@ impl PacketBatch { } } - /// This drops packet buffers and keeps things ordered. We expect that idxes is an ordered vector of indices, no - /// guarantees are made when this is not the case. + /// This drops packet buffers and keeps things ordered. We expect that idxes + /// is an ordered vector of indices, no guarantees are made when this is not + /// the case. #[inline] - fn drop_packets_stable(&mut self, idxes: &[usize]) -> Option { + fn drop_packets_stable(&mut self, idxes: &[usize]) -> Result { // Short circuit when we don't have to do this work. if idxes.is_empty() { - return Some(0); + return Ok(0); } unsafe { let mut idx_orig = 0; @@ -115,7 +116,6 @@ impl PacketBatch { // First go through the list of indexes to be filtered and get rid of them. while idx_orig < end && (remove_idx < idxes.len()) { let test_idx: usize = idxes[remove_idx]; - assert!(idx_orig <= test_idx); if idx_orig == test_idx { self.scratch.push(self.array[idx_orig]); remove_idx += 1; @@ -132,13 +132,13 @@ impl PacketBatch { idx_new += 1; } - // We did not find an index that was passed in, warn/error out. + // We did not find an index that was passed in: if remove_idx < idxes.len() { - None + Err(NetBricksError::FailedToDropPackets.into()) } else { self.array.set_len(idx_new); if self.scratch.is_empty() { - Some(0) + Ok(0) } else { // Now free the dropped packets let len = self.scratch.len(); @@ -147,9 +147,9 @@ impl PacketBatch { let ret = mbuf_free_bulk(array_ptr, len as i32); self.scratch.clear(); if ret == 0 { - Some(len) + Ok(len) } else { - None + Err(NetBricksError::FailedToDropPackets.into()) } } } @@ -186,21 +186,21 @@ impl PacketBatch { fn alloc_packet_batch(&mut self, len: u16, cnt: i32) -> Result<()> { unsafe { if self.array.capacity() < (cnt as usize) { - Err(ErrorKind::FailedAllocation.into()) + Err(NetBricksError::FailedAllocation.into()) } else { let ret = mbuf_alloc_bulk(self.array.as_mut_ptr(), len, cnt); if ret == 0 { self.array.set_len(cnt as usize); Ok(()) } else { - Err(ErrorKind::FailedAllocation.into()) + Err(NetBricksError::FailedAllocation.into()) } } } } #[inline] - fn free_packet_batch(&mut self) -> result::Result<(), ()> { + fn free_packet_batch(&mut self) -> Result<()> { unsafe { if self.array.is_empty() { Ok(()) @@ -215,7 +215,7 @@ impl PacketBatch { if ret == 0 { Ok(()) } else { - Err(()) + Err(NetBricksError::FailedToFreeMBuf(ret).into()) } } } @@ -277,7 +277,7 @@ impl Act for PacketBatch { } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.drop_packets_stable(idxes) } diff --git a/framework/src/operators/receive_batch.rs b/framework/src/operators/receive_batch.rs index 60c4ab3078..738c0c2f42 100644 --- a/framework/src/operators/receive_batch.rs +++ b/framework/src/operators/receive_batch.rs @@ -80,7 +80,7 @@ impl Act for ReceiveBatch { } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/send_batch.rs b/framework/src/operators/send_batch.rs index 7ee6029d4e..ac12535ea3 100644 --- a/framework/src/operators/send_batch.rs +++ b/framework/src/operators/send_batch.rs @@ -91,7 +91,7 @@ where } #[inline] - fn drop_packets(&mut self, _: &[usize]) -> Option { + fn drop_packets(&mut self, _: &[usize]) -> Result { panic!("Cannot drop packets from a sent batch") } diff --git a/framework/src/operators/transform_batch.rs b/framework/src/operators/transform_batch.rs index ecd6e01936..2b7be8fe00 100644 --- a/framework/src/operators/transform_batch.rs +++ b/framework/src/operators/transform_batch.rs @@ -97,7 +97,7 @@ where } #[inline] - fn drop_packets(&mut self, idxes: &[usize]) -> Option { + fn drop_packets(&mut self, idxes: &[usize]) -> Result { self.parent.drop_packets(idxes) } diff --git a/framework/src/operators/transform_results.rs b/framework/src/operators/transform_results.rs new file mode 100644 index 0000000000..8fbf460f93 --- /dev/null +++ b/framework/src/operators/transform_results.rs @@ -0,0 +1,139 @@ +use super::act::Act; +use super::iterator::*; +use super::packet_batch::PacketBatch; +use super::Batch; +use common::*; +use headers::EndOffset; +use interface::Packet; +use interface::PacketTx; +use std::marker::PhantomData; + +pub type TransformFnResult = Box) -> Result<()> + Send>; + +pub struct TransformResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + parent: V, + transformer: TransformFnResult, + applied: bool, + remove: Vec, + phantom_t: PhantomData, +} + +impl TransformResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + pub fn new( + parent: V, + transformer: TransformFnResult, + ) -> TransformResults { + let capacity = parent.capacity() as usize; + TransformResults { + parent: parent, + transformer: transformer, + applied: false, + remove: Vec::with_capacity(capacity), + phantom_t: PhantomData, + } + } +} + +impl Batch for TransformResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ +} + +impl BatchIterator for TransformResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + type Header = T; + type Metadata = ::Metadata; + #[inline] + fn start(&mut self) -> usize { + self.parent.start() + } + + #[inline] + unsafe fn next_payload(&mut self, idx: usize) -> Option> { + self.parent.next_payload(idx) + } +} + +impl Act for TransformResults +where + T: EndOffset, + V: Batch + BatchIterator
+ Act, +{ + #[inline] + fn act(&mut self) { + if !self.applied { + self.parent.act(); + { + let iter = PayloadEnumerator::::new(&mut self.parent); + while let Some(ParsedDescriptor { + mut packet, + index: idx, + }) = iter.next(&mut self.parent) + { + if let Err(ref e) = (self.transformer)(&mut packet) { + error_chain!(e); + self.remove.push(idx) + } + } + } + + self.applied = true; + + if !self.remove.is_empty() { + self.parent + .drop_packets(&self.remove[..]) + .map_or_else(|ref e| error_chain!(e), |_| ()) + } + self.remove.clear(); + } + } + + #[inline] + fn done(&mut self) { + self.applied = false; + self.parent.done(); + } + + #[inline] + fn send_q(&mut self, port: &PacketTx) -> Result { + self.parent.send_q(port) + } + + #[inline] + fn capacity(&self) -> i32 { + self.parent.capacity() + } + + #[inline] + fn drop_packets(&mut self, idxes: &[usize]) -> Result { + self.parent.drop_packets(idxes) + } + + #[inline] + fn clear_packets(&mut self) { + self.parent.clear_packets() + } + + #[inline] + fn get_packet_batch(&mut self) -> &mut PacketBatch { + self.parent.get_packet_batch() + } + + #[inline] + fn get_task_dependencies(&self) -> Vec { + self.parent.get_task_dependencies() + } +} diff --git a/framework/src/scheduler/context.rs b/framework/src/scheduler/context.rs index ee6faa2ad5..4c84f6fffe 100644 --- a/framework/src/scheduler/context.rs +++ b/framework/src/scheduler/context.rs @@ -1,5 +1,6 @@ use allocators::CacheAligned; -use config::NetbricksConfiguration; +use common::*; +use config::NetBricksConfiguration; use interface::dpdk::{init_system, init_thread}; use interface::{PmdPort, PortQueue, VirtualPort, VirtualQueue}; use scheduler::*; @@ -124,7 +125,7 @@ impl NetBricksContext { .unwrap(); Ok(()) } else { - Err(ErrorKind::NoRunningSchedulerOnCore(core).into()) + Err(NetBricksError::NoRunningSchedulerOnCore(core).into()) } } @@ -146,7 +147,7 @@ impl NetBricksContext { .unwrap(); Ok(()) } else { - Err(ErrorKind::NoRunningSchedulerOnCore(core).into()) + Err(NetBricksError::NoRunningSchedulerOnCore(core).into()) } } @@ -154,7 +155,7 @@ impl NetBricksContext { pub fn execute(&mut self) { for (core, channel) in &self.scheduler_channels { channel.send(SchedulerCommand::Execute).unwrap(); - println!("Starting scheduler on {}", core); + info!("Starting scheduler on {}", core); } } @@ -188,21 +189,21 @@ impl NetBricksContext { pub fn stop(&mut self) { for (core, channel) in &self.scheduler_channels { channel.send(SchedulerCommand::Shutdown).unwrap(); - println!("Issued shutdown for core {}", core); + info!("Issued shutdown for core {}", core); } for (core, join_handle) in self.scheduler_handles.drain() { join_handle.join().unwrap(); - println!("Core {} has shutdown", core); + info!("Core {} has shutdown", core); } - println!("System shutdown"); + info!("System shutdown"); } pub fn wait(&mut self) { for (core, join_handle) in self.scheduler_handles.drain() { join_handle.join().unwrap(); - println!("Core {} has shutdown", core); + info!("Core {} has shutdown", core); } - println!("System shutdown"); + info!("System shutdown"); } /// Shutdown all schedulers. @@ -211,15 +212,17 @@ impl NetBricksContext { } } -/// Initialize the system from a configuration. -pub fn initialize_system(configuration: &NetbricksConfiguration) -> Result { +/// Initialize NetBricks, incl. handling of dpdk configuration, logging, general +/// setup. +/// +/// Return a Context to Execute. +pub fn initialize_system(configuration: &NetBricksConfiguration) -> Result { init_system(configuration); let mut ctx: NetBricksContext = Default::default(); let mut cores: HashSet<_> = configuration.cores.iter().cloned().collect(); for port in &configuration.ports { if ctx.ports.contains_key(&port.name) { - println!("Port {} appears twice in specification", port.name); - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Port {} appears twice in specification", port.name )) @@ -230,7 +233,7 @@ pub fn initialize_system(configuration: &NetbricksConfiguration) -> Result { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Port {} could not be initialized {:?}", port.name, e )) @@ -247,7 +250,7 @@ pub fn initialize_system(configuration: &NetbricksConfiguration) -> Result { - return Err(ErrorKind::ConfigurationError(format!( + return Err(NetBricksError::ConfigurationError(format!( "Queue {} on port {} could not be \ initialized {:?}", rx_q, port.name, e @@ -266,7 +269,7 @@ pub fn initialize_system(configuration: &NetbricksConfiguration) -> Result Result { if bytes & (bytes - 1) != 0 { // We need pages to be a power of 2. - return Err(ErrorKind::InvalidRingSize(bytes).into()); + return Err(NetBricksError::InvalidRingSize(bytes).into()); } Ok(RingBuffer { @@ -85,7 +85,7 @@ impl RingBuffer { let available = self.mask.wrapping_add(self.head).wrapping_sub(self.tail); let write = min(data.len(), available); if write != data.len() { - println!("Not writing all, available {}", available); + info!("Not writing all, available {}", available); } let offset = self.tail & self.mask; self.seek_tail(write); diff --git a/test/chain-test/src/main.rs b/test/chain-test/src/main.rs index f45cda5896..e90c8aaa46 100644 --- a/test/chain-test/src/main.rs +++ b/test/chain-test/src/main.rs @@ -116,10 +116,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/delay-test/src/main.rs b/test/delay-test/src/main.rs index 21768124fc..46b3259823 100644 --- a/test/delay-test/src/main.rs +++ b/test/delay-test/src/main.rs @@ -121,10 +121,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/icmpv6/src/main.rs b/test/icmpv6/src/main.rs index d55e065e3f..7c32cdf2cd 100644 --- a/test/icmpv6/src/main.rs +++ b/test/icmpv6/src/main.rs @@ -71,10 +71,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/ipv4or6/src/main.rs b/test/ipv4or6/src/main.rs index 7d35394b03..d9935c3cf4 100644 --- a/test/ipv4or6/src/main.rs +++ b/test/ipv4or6/src/main.rs @@ -71,10 +71,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/lpm-embedded/src/main.rs b/test/lpm-embedded/src/main.rs index cd0e9e12ef..d85757fd51 100644 --- a/test/lpm-embedded/src/main.rs +++ b/test/lpm-embedded/src/main.rs @@ -14,8 +14,8 @@ mod nf; fn main() { let name = String::from("recv"); - let configuration = NetbricksConfiguration::new_with_name(&name[..]); - let configuration = NetbricksConfiguration { + let configuration = NetBricksConfiguration::new_with_name(&name[..]); + let configuration = NetBricksConfiguration { primary_core: 0, ..configuration }; @@ -36,10 +36,7 @@ fn main() { sched.display_dependencies(task); } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/lpm/src/main.rs b/test/lpm/src/main.rs index d4befb742d..d4abdbe19f 100644 --- a/test/lpm/src/main.rs +++ b/test/lpm/src/main.rs @@ -111,10 +111,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/macswap/src/main.rs b/test/macswap/src/main.rs index 33aca3461f..94e6761ae5 100644 --- a/test/macswap/src/main.rs +++ b/test/macswap/src/main.rs @@ -77,10 +77,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/maglev/src/main.rs b/test/maglev/src/main.rs index eaa40bb941..481cdc8261 100644 --- a/test/maglev/src/main.rs +++ b/test/maglev/src/main.rs @@ -34,7 +34,8 @@ where ReceiveBatch::new(port.clone()), sched, &vec!["Larry", "Curly", "Moe"], - ).send(port.clone()) + ) + .send(port.clone()) }) .collect(); println!("Running {} pipelines", pipelines.len()); @@ -94,10 +95,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/mtu-too-big/src/main.rs b/test/mtu-too-big/src/main.rs index fbd2d9589e..574e01b2c8 100644 --- a/test/mtu-too-big/src/main.rs +++ b/test/mtu-too-big/src/main.rs @@ -70,10 +70,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/nat/src/main.rs b/test/nat/src/main.rs index f56f5e2acf..0c34625d77 100644 --- a/test/nat/src/main.rs +++ b/test/nat/src/main.rs @@ -34,7 +34,8 @@ where ReceiveBatch::new(port.clone()), sched, &Ipv4Addr::new(10, 0, 0, 1), - ).send(port.clone()) + ) + .send(port.clone()) }) .collect(); println!("Running {} pipelines", pipelines.len()); @@ -90,10 +91,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/packet-generation/src/main.rs b/test/packet-generation/src/main.rs index 901f705e3f..8232e4be74 100644 --- a/test/packet-generation/src/main.rs +++ b/test/packet-generation/src/main.rs @@ -93,10 +93,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/packet-test/src/main.rs b/test/packet-test/src/main.rs index 0cc4099489..151b800ac7 100644 --- a/test/packet-test/src/main.rs +++ b/test/packet-test/src/main.rs @@ -91,10 +91,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/reset-parse/src/main.rs b/test/reset-parse/src/main.rs index 4ea5ddc2c4..96a727d24c 100644 --- a/test/reset-parse/src/main.rs +++ b/test/reset-parse/src/main.rs @@ -100,10 +100,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/sctp-test/src/main.rs b/test/sctp-test/src/main.rs index 63fd9e0744..4c85a2eaaf 100644 --- a/test/sctp-test/src/main.rs +++ b/test/sctp-test/src/main.rs @@ -109,10 +109,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/shutdown-test/src/main.rs b/test/shutdown-test/src/main.rs index 6a6656bca7..197af1f5d5 100644 --- a/test/shutdown-test/src/main.rs +++ b/test/shutdown-test/src/main.rs @@ -111,10 +111,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/srv6-compose/src/main.rs b/test/srv6-compose/src/main.rs index ea559499fd..dd389b786e 100644 --- a/test/srv6-compose/src/main.rs +++ b/test/srv6-compose/src/main.rs @@ -72,10 +72,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/srv6-inject/src/main.rs b/test/srv6-inject/src/main.rs index e0aad5866e..6235f5b819 100644 --- a/test/srv6-inject/src/main.rs +++ b/test/srv6-inject/src/main.rs @@ -90,10 +90,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/srv6-inject/src/nf.rs b/test/srv6-inject/src/nf.rs index 76a98f47b1..458fa02142 100644 --- a/test/srv6-inject/src/nf.rs +++ b/test/srv6-inject/src/nf.rs @@ -56,7 +56,7 @@ fn srh_into_packet(pkt: &mut Packet) -> Result<()> { if let Some(Ok(())) = insert { Ok(()) } else { - Err(ErrorKind::FailedToInsertHeader.into()) + Err(NetBricksError::FailedToInsertHeader.into()) } } diff --git a/test/srv6-sighup-flow/data/expect_srv6.out b/test/srv6-sighup-flow/data/expect_srv6.out index 1c48bbfb9b..c65e21e83d 100644 --- a/test/srv6-sighup-flow/data/expect_srv6.out +++ b/test/srv6-sighup-flow/data/expect_srv6.out @@ -1,3 +1,3 @@ -[INFO] Settings/Configuration Static State Val: Bar { baz: false } -[INFO] SR-hdr next_header: Tcp, hdr_ext_len: 8, routing_type: 4, segments_left: 1, last_entry: 3, protected: false, oam: false, alert: false, hmac: false, tag: 0, segments_length: 4, segments: [fe80::3, fe80::6, fe80::9, fe80::c] -[INFO] So long, and thanks for all the fish: Bar { baz: true } +[WARN] Settings/Configuration Static State Val: Bar { baz: false } +[WARN] SR-hdr next_header: Tcp, hdr_ext_len: 8, routing_type: 4, segments_left: 1, last_entry: 3, protected: false, oam: false, alert: false, hmac: false, tag: 0, segments_length: 4, segments: [fe80::3, fe80::6, fe80::9, fe80::c] +[WARN] So long, and thanks for all the fish: Bar { baz: true } diff --git a/test/srv6-sighup-flow/src/main.rs b/test/srv6-sighup-flow/src/main.rs index 1d2459b269..f54ea27617 100644 --- a/test/srv6-sighup-flow/src/main.rs +++ b/test/srv6-sighup-flow/src/main.rs @@ -77,7 +77,7 @@ fn handle_signals(configuration: &'static Atom) { let mut nf_stuff = new_config.bar; nf_stuff.baz = true; configuration.set(nf_stuff); - info!( + warn!( "So long, and thanks for all the fish: {:?}", configuration.get() ); @@ -93,7 +93,7 @@ fn handle_signals(configuration: &'static Atom) { fn start_logger() { WriteLogger::init( - LevelFilter::Info, + LevelFilter::Warn, SimpleConfig { time: None, level: Some(Level::Error), @@ -166,10 +166,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/srv6-sighup-flow/src/nf.rs b/test/srv6-sighup-flow/src/nf.rs index 2236cd3654..cfd4e9027c 100644 --- a/test/srv6-sighup-flow/src/nf.rs +++ b/test/srv6-sighup-flow/src/nf.rs @@ -13,7 +13,7 @@ fn tcp_sr_nf>(parent: T) -> CompositionB parent .parse::>() .map(box |pkt| { - info!("SR-hdr {}", pkt.get_header()); + warn!("SR-hdr {}", pkt.get_header()); }) .compose() } @@ -35,7 +35,7 @@ pub fn nf, S: Scheduler + Sized>( _ => false, }) .map(box move |_pkt| { - info!( + warn!( "Settings/Configuration Static State Val: {:?}", ATOM_CONF.get() ); diff --git a/test/tcp-check/src/main.rs b/test/tcp-check/src/main.rs index ac239c493d..4192ba488f 100644 --- a/test/tcp-check/src/main.rs +++ b/test/tcp-check/src/main.rs @@ -91,10 +91,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/tcp-checksum/src/main.rs b/test/tcp-checksum/src/main.rs index d274e4fef6..ecddb68769 100644 --- a/test/tcp-checksum/src/main.rs +++ b/test/tcp-checksum/src/main.rs @@ -71,10 +71,7 @@ fn main() { } } Err(ref e) => { - println!("Error: {}", e); - if let Some(backtrace) = e.backtrace() { - println!("Backtrace: {:?}", backtrace); - } + println!("Error: {:?}", e); process::exit(1); } } diff --git a/test/transform-error/Cargo.toml b/test/transform-error/Cargo.toml new file mode 100644 index 0000000000..ac8bd14906 --- /dev/null +++ b/test/transform-error/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "transform-error" +version = "0.1.0" +authors = ["William of Ockham "] + +[dependencies] +netbricks = { path = "../../framework", features = ["performance"] } +time = ">=0.1.0" +failure = "0.1.5" +log = "*" +simplelog = "*" + +[features] +default = [] +print = [] diff --git a/test/transform-error/check.sh b/test/transform-error/check.sh new file mode 100755 index 0000000000..b9d7347dfa --- /dev/null +++ b/test/transform-error/check.sh @@ -0,0 +1,24 @@ +#!/bin/bash +TEST_NAME=transform-error +PORT_OPTIONS1="dpdk:eth_pcap0,rx_pcap=data/ipv6_tcp.pcap,tx_pcap=/tmp/out.pcap" + +C='\033[1;34m' +NC='\033[0m' + +echo -e "${C}RUNNING: $TEST_NAME${NC}" + +../../build.sh run $TEST_NAME -p $PORT_OPTIONS1 -c 1 --dur 1 +tcpdump -tner /tmp/out.pcap | tee /dev/tty | diff - data/expect_v6.out +TEST_OUTPUT=$? + +cat test.log | tee /dev/tty | diff - data/transform_error.out +TEST_LOG=$? + +result=$? +echo ---- +if [[ $TEST_OUTPUT != 0 ]] || [[ $TEST_LOG != 0 ]]; then + echo "FAIL: Test Output - $TEST_OUTPUT | Test Log - $TEST_LOG" + exit 1 +else + echo "PASS" +fi diff --git a/test/transform-error/data/expect_v6.out b/test/transform-error/data/expect_v6.out new file mode 100644 index 0000000000..ee086ff849 --- /dev/null +++ b/test/transform-error/data/expect_v6.out @@ -0,0 +1,7 @@ +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::1.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::3.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::4.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::5.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::6.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::7.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 +00:00:00:00:00:00 > 00:00:00:00:00:01, ethertype IPv6 (0x86dd), length 74: fe80::8.1025 > fe80::2.1024: Flags [S], seq 1, win 10, length 0 diff --git a/test/transform-error/data/ipv6_tcp.pcap b/test/transform-error/data/ipv6_tcp.pcap new file mode 100644 index 0000000000..796e1ca36b Binary files /dev/null and b/test/transform-error/data/ipv6_tcp.pcap differ diff --git a/test/transform-error/data/transform_error.out b/test/transform-error/data/transform_error.out new file mode 100644 index 0000000000..02d6a660e1 --- /dev/null +++ b/test/transform-error/data/transform_error.out @@ -0,0 +1,10 @@ +[ERROR] directed by danny devito +[ERROR] directed by danny devito +[ERROR] directed by danny devito +[WARN] v6: fe80::1 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::3 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::4 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::5 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::6 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::7 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 +[WARN] v6: fe80::8 > fe80::2 version: 6 traffic_class: 0 flow_label: 0 len: 20 next_header: Tcp hop_limit: 64 diff --git a/test/transform-error/src/main.rs b/test/transform-error/src/main.rs new file mode 100644 index 0000000000..5d18d3b62c --- /dev/null +++ b/test/transform-error/src/main.rs @@ -0,0 +1,103 @@ +#![feature(box_syntax)] +#![feature(asm)] +#[macro_use] +extern crate log; +extern crate netbricks; +extern crate simplelog; +#[macro_use] +extern crate failure; + +use self::nf::*; +use log::Level; +use netbricks::config::{basic_opts, read_matches}; +use netbricks::interface::*; +use netbricks::operators::*; +use netbricks::scheduler::*; +use simplelog::{Config as SimpleConfig, LevelFilter, WriteLogger}; +use std::env; +use std::fmt::Display; +use std::fs::File as StdFile; +use std::process; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +mod nf; + +fn start_logger() { + WriteLogger::init( + LevelFilter::Warn, + SimpleConfig { + time: None, + level: Some(Level::Error), + target: Some(Level::Debug), + location: Some(Level::Trace), + time_format: None, + }, + StdFile::create("test.log").unwrap(), + ) + .unwrap(); +} + +fn test(ports: Vec, sched: &mut S) +where + T: PacketRx + PacketTx + Display + Clone + 'static, + S: Scheduler + Sized, +{ + println!("Receiving started"); + + let pipelines: Vec<_> = ports + .iter() + .map(|port| nf(ReceiveBatch::new(port.clone())).send(port.clone())) + .collect(); + println!("Running {} pipelines", pipelines.len()); + for pipeline in pipelines { + sched.add_task(pipeline).unwrap(); + } +} + +fn main() { + start_logger(); + + let mut opts = basic_opts(); + opts.optopt( + "", + "dur", + "Test duration", + "If this option is set to a nonzero value, then the \ + test will just loop after 2 seconds", + ); + + let args: Vec = env::args().collect(); + let matches = match opts.parse(&args[1..]) { + Ok(m) => m, + Err(f) => panic!(f.to_string()), + }; + + let configuration = read_matches(&matches, &opts); + + let test_duration: u64 = matches + .opt_str("dur") + .unwrap_or_else(|| String::from("0")) + .parse() + .expect("Could not parse test duration"); + + match initialize_system(&configuration) { + Ok(mut context) => { + context.start_schedulers(); + context.add_pipeline_to_run(Arc::new(move |p, s: &mut StandaloneScheduler| test(p, s))); + context.execute(); + + if test_duration != 0 { + thread::sleep(Duration::from_secs(test_duration)); + } else { + loop { + thread::sleep(Duration::from_secs(2)); + } + } + } + Err(ref e) => { + println!("Error: {:?}", e); + process::exit(1); + } + } +} diff --git a/test/transform-error/src/nf.rs b/test/transform-error/src/nf.rs new file mode 100644 index 0000000000..e91a06c8c1 --- /dev/null +++ b/test/transform-error/src/nf.rs @@ -0,0 +1,38 @@ +use failure::Error; +use netbricks::headers::*; +use netbricks::operators::*; +use std::net::Ipv6Addr; +use std::str::FromStr; + +fn throw_mama_from_the_train(hdr: &Ipv6Header) -> Result<(), Error> { + let hextets = hdr.src().segments(); + let prefix = Ipv6Addr::new(hextets[0], hextets[1], hextets[2], hextets[3], 0, 0, 0, 0); + if prefix == Ipv6Addr::from_str("da75::").unwrap() { + bail!("directed by danny devito") + } else { + Ok(()) + } +} + +pub fn nf>( + parent: T, +) -> MapBatch< + Ipv6Header, + TransformResults< + Ipv6Header, + ParsedBatch>>, + >, +> { + parent + .parse::() + .filter(box |pkt| match pkt.get_header().etype() { + Some(EtherType::IPv6) => true, + _ => false, + }) + .parse::() + .transform_ok(box |pkt| { + let v6 = pkt.get_header(); + throw_mama_from_the_train(v6) + }) + .map(box |pkt| warn!("v6: {}", pkt.get_header())) +}