diff --git a/Cargo.toml b/Cargo.toml index 9d0b6ff1..7859300b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "opentelemetry-*", "examples/*", + "stress", ] resolver = "2" diff --git a/stress/Cargo.toml b/stress/Cargo.toml new file mode 100644 index 00000000..652d9ece --- /dev/null +++ b/stress/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "stress" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] # Bin to run the metrics stress tests for Logs UserEvent Exporter +name = "user-events-enabled" +path = "src/user_events_enabled.rs" +doc = false + +[dependencies] +ctrlc = "3.2.5" +lazy_static = "1.4.0" +num_cpus = "1.15.0" +num-format = "0.4.4" +sysinfo = { version = "0.32", optional = true } +eventheader_dynamic = "0.4.0" + +[features] +stats = ["sysinfo"] + +[profile.release] +debug = true diff --git a/stress/src/throughput.rs b/stress/src/throughput.rs new file mode 100644 index 00000000..131762fa --- /dev/null +++ b/stress/src/throughput.rs @@ -0,0 +1,168 @@ +use num_format::{Locale, ToFormattedString}; +use std::cell::UnsafeCell; +use std::env; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; +#[cfg(feature = "stats")] +use sysinfo::{Pid, System}; + +const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds + +static STOP: AtomicBool = AtomicBool::new(false); + +#[repr(C)] +#[derive(Default)] +struct WorkerStats { + count: u64, + /// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat + /// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines + padding: [u64; 15], +} + +pub fn test_throughput(func: F) +where + F: Fn() + Sync + Send + 'static, +{ + ctrlc::set_handler(move || { + STOP.store(true, Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + let mut num_threads = num_cpus::get(); + let mut args_iter = env::args(); + + if let Some(arg_str) = args_iter.nth(1) { + let arg = arg_str.parse::(); + + if arg.is_err() { + eprintln!("Invalid command line argument '{}' as number of threads. Make sure the value is a positive integer.", arg_str); + std::process::exit(1); + } + + let arg_num = arg.unwrap(); + + if arg_num > 0 { + if arg_num > num_cpus::get() { + println!( + "Specified {} threads which is larger than the number of logical cores ({})!", + arg_num, num_threads + ); + } + num_threads = arg_num; + } else { + eprintln!("Invalid command line argument {} as number of threads. Make sure the value is above 0 and less than or equal to number of available logical cores ({}).", arg_num, num_threads); + std::process::exit(1); + } + } + + println!("Number of threads: {}\n", num_threads); + let func_arc = Arc::new(func); + let mut worker_stats_vec: Vec = Vec::new(); + + for _ in 0..num_threads { + worker_stats_vec.push(WorkerStats::default()); + } + + let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec); + + thread::scope(|s| { + s.spawn(|| { + let mut last_collect_time = Instant::now(); + let mut total_count_old: u64 = 0; + + #[cfg(feature = "stats")] + let pid = Pid::from(std::process::id() as usize); + #[cfg(feature = "stats")] + let mut system = System::new_all(); + + loop { + let current_time = Instant::now(); + let elapsed = current_time.duration_since(last_collect_time).as_secs(); + if elapsed >= SLIDING_WINDOW_SIZE { + let total_count_u64 = shared_mutable_stats_slice.sum(); + last_collect_time = Instant::now(); + let current_count = total_count_u64 - total_count_old; + total_count_old = total_count_u64; + let throughput = current_count / elapsed; + println!( + "Throughput: {} iterations/sec", + throughput.to_formatted_string(&Locale::en) + ); + + #[cfg(feature = "stats")] + { + system.refresh_all(); + if let Some(process) = system.process(pid) { + println!( + "Memory usage: {:.2} MB", + process.memory() as f64 / (1024.0 * 1024.0) + ); + println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32); + println!( + "Virtual memory usage: {:.2} MB", + process.virtual_memory() as f64 / (1024.0 * 1024.0) + ); + } else { + println!("Process not found"); + } + } + + println!("\n"); + } + + if STOP.load(Ordering::SeqCst) { + break; + } + + thread::sleep(Duration::from_millis(5000)); + } + }); + + for thread_index in 0..num_threads { + let func_arc_clone = Arc::clone(&func_arc); + s.spawn(move || loop { + func_arc_clone(); + unsafe { + shared_mutable_stats_slice.increment(thread_index); + } + if STOP.load(Ordering::SeqCst) { + break; + } + }); + } + }); +} + +#[derive(Copy, Clone)] +struct UnsafeSlice<'a> { + slice: &'a [UnsafeCell], +} + +unsafe impl Send for UnsafeSlice<'_> {} +unsafe impl Sync for UnsafeSlice<'_> {} + +impl<'a> UnsafeSlice<'a> { + fn new(slice: &'a mut [WorkerStats]) -> Self { + let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell]; + Self { + slice: unsafe { &*ptr }, + } + } + + // SAFETY: It's assumed that no two threads will write to the same index at the same time + #[inline(always)] + unsafe fn increment(&self, i: usize) { + let value = self.slice[i].get(); + (*value).count += 1; + } + + #[inline(always)] + fn sum(&self) -> u64 { + self.slice + .iter() + .map(|cell| unsafe { (*cell.get()).count }) + .sum() + } +} diff --git a/stress/src/user_events_enabled.rs b/stress/src/user_events_enabled.rs new file mode 100644 index 00000000..ef418f0c --- /dev/null +++ b/stress/src/user_events_enabled.rs @@ -0,0 +1,38 @@ +// To run the test, execute the following command in the stress directory as sudo: +// sudo -E ~/.cargo/bin/cargo rnu --bin user-events-enabled --release + +// TODO : Add stess result here. + +mod throughput; +use eventheader_dynamic::{Provider, ProviderOptions}; +use lazy_static::lazy_static; + +// Global constants for level and keyword +const LEVEL: u8 = 4; // Example level (Informational) +const KEYWORD: u64 = 0x01; // Example keyword + +lazy_static! { + static ref PROVIDER: Provider = { + // Initialize the Provider with dynamic options + let mut options = ProviderOptions::new(); + options = *options.group_name("testprovider"); + let mut provider = Provider::new("testprovider", &options); + + // Register events with specific levels and keywords + provider.register_set(LEVEL.into(), KEYWORD); + + provider + }; +} + +fn main() { + // Execute the throughput test with the test_log function + throughput::test_throughput(test_user_events_enabled); +} + +fn test_user_events_enabled() { + // Find and check if the event is enabled + if let Some(event_set) = PROVIDER.find_set(LEVEL.into(), KEYWORD) { + let _ = event_set.enabled(); // Perform the enabled check + } +}