Skip to content

Commit

Permalink
Merge branch 'main' into lambda-log-group
Browse files Browse the repository at this point in the history
  • Loading branch information
garysassano committed Dec 11, 2024
2 parents aa63a14 + 037f914 commit 7e591a3
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"opentelemetry-*",
"examples/*",
"stress",
]
resolver = "2"

Expand Down
7 changes: 5 additions & 2 deletions opentelemetry-aws/src/detector/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl ResourceDetector for LambdaResourceDetector {

let aws_region = env::var(AWS_REGION_ENV_VAR).unwrap_or_default();
let function_version = env::var(AWS_LAMBDA_FUNCTION_VERSION_ENV_VAR).unwrap_or_default();
let function_memory_limit = env::var(AWS_LAMBDA_MEMORY_LIMIT_ENV_VAR).unwrap_or_default();
// Convert memory limit from MB (string) to Bytes (int) as required by semantic conventions.
let function_memory_limit = env::var(AWS_LAMBDA_MEMORY_LIMIT_ENV_VAR)
.map(|s| s.parse::<i64>().unwrap_or_default() * 1024 * 1024)
.unwrap_or_default();
// Instance attributes corresponds to the log stream name for AWS Lambda;
// See the FaaS resource specification for more details.
let instance = env::var(AWS_LAMBDA_LOG_STREAM_NAME_ENV_VAR).unwrap_or_default();
Expand Down Expand Up @@ -81,7 +84,7 @@ mod tests {
),
KeyValue::new(semconv::resource::FAAS_NAME, "my-lambda-function"),
KeyValue::new(semconv::resource::FAAS_VERSION, "$LATEST"),
KeyValue::new(semconv::resource::FAAS_MAX_MEMORY, "128"),
KeyValue::new(semconv::resource::FAAS_MAX_MEMORY, 128 * 1024 * 1024),
KeyValue::new(

Check failure on line 88 in opentelemetry-aws/src/detector/lambda.rs

View workflow job for this annotation

GitHub Actions / lint

type annotations needed
semconv::resource::AWS_LOG_GROUP_NAMES,
vec!["/aws/lambda/my-lambda-function"].into(),
Expand Down
24 changes: 24 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "stress"
version = "0.1.0"
edition = "2021"
publish = false

[[bin]] # Bin to run the metrics stress tests for Logs UserEvent Exporter
name = "user-events-enabled"
path = "src/user_events_enabled.rs"
doc = false

[dependencies]
ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
num-format = "0.4.4"
sysinfo = { version = "0.32", optional = true }
eventheader_dynamic = "0.4.0"

[features]
stats = ["sysinfo"]

[profile.release]
debug = true
168 changes: 168 additions & 0 deletions stress/src/throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use num_format::{Locale, ToFormattedString};
use std::cell::UnsafeCell;
use std::env;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
#[cfg(feature = "stats")]
use sysinfo::{Pid, System};

const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds

static STOP: AtomicBool = AtomicBool::new(false);

#[repr(C)]
#[derive(Default)]
struct WorkerStats {
count: u64,
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
padding: [u64; 15],
}

pub fn test_throughput<F>(func: F)
where
F: Fn() + Sync + Send + 'static,
{
ctrlc::set_handler(move || {
STOP.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");

let mut num_threads = num_cpus::get();
let mut args_iter = env::args();

if let Some(arg_str) = args_iter.nth(1) {
let arg = arg_str.parse::<usize>();

if arg.is_err() {
eprintln!("Invalid command line argument '{}' as number of threads. Make sure the value is a positive integer.", arg_str);
std::process::exit(1);
}

let arg_num = arg.unwrap();

if arg_num > 0 {
if arg_num > num_cpus::get() {
println!(
"Specified {} threads which is larger than the number of logical cores ({})!",
arg_num, num_threads
);
}
num_threads = arg_num;
} else {
eprintln!("Invalid command line argument {} as number of threads. Make sure the value is above 0 and less than or equal to number of available logical cores ({}).", arg_num, num_threads);
std::process::exit(1);
}
}

println!("Number of threads: {}\n", num_threads);
let func_arc = Arc::new(func);
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();

for _ in 0..num_threads {
worker_stats_vec.push(WorkerStats::default());
}

let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec);

thread::scope(|s| {
s.spawn(|| {
let mut last_collect_time = Instant::now();
let mut total_count_old: u64 = 0;

#[cfg(feature = "stats")]
let pid = Pid::from(std::process::id() as usize);
#[cfg(feature = "stats")]
let mut system = System::new_all();

loop {
let current_time = Instant::now();
let elapsed = current_time.duration_since(last_collect_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64 = shared_mutable_stats_slice.sum();
last_collect_time = Instant::now();
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count / elapsed;
println!(
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);

#[cfg(feature = "stats")]
{
system.refresh_all();
if let Some(process) = system.process(pid) {
println!(
"Memory usage: {:.2} MB",
process.memory() as f64 / (1024.0 * 1024.0)
);
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
println!(
"Virtual memory usage: {:.2} MB",
process.virtual_memory() as f64 / (1024.0 * 1024.0)
);
} else {
println!("Process not found");
}
}

println!("\n");
}

if STOP.load(Ordering::SeqCst) {
break;
}

thread::sleep(Duration::from_millis(5000));
}
});

for thread_index in 0..num_threads {
let func_arc_clone = Arc::clone(&func_arc);
s.spawn(move || loop {
func_arc_clone();
unsafe {
shared_mutable_stats_slice.increment(thread_index);
}
if STOP.load(Ordering::SeqCst) {
break;
}
});
}
});
}

#[derive(Copy, Clone)]
struct UnsafeSlice<'a> {
slice: &'a [UnsafeCell<WorkerStats>],
}

unsafe impl Send for UnsafeSlice<'_> {}
unsafe impl Sync for UnsafeSlice<'_> {}

impl<'a> UnsafeSlice<'a> {
fn new(slice: &'a mut [WorkerStats]) -> Self {
let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>];
Self {
slice: unsafe { &*ptr },
}
}

// SAFETY: It's assumed that no two threads will write to the same index at the same time
#[inline(always)]
unsafe fn increment(&self, i: usize) {
let value = self.slice[i].get();
(*value).count += 1;
}

#[inline(always)]
fn sum(&self) -> u64 {
self.slice
.iter()
.map(|cell| unsafe { (*cell.get()).count })
.sum()
}
}
38 changes: 38 additions & 0 deletions stress/src/user_events_enabled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// To run the test, execute the following command in the stress directory as sudo:
// sudo -E ~/.cargo/bin/cargo rnu --bin user-events-enabled --release

// TODO : Add stess result here.

mod throughput;
use eventheader_dynamic::{Provider, ProviderOptions};
use lazy_static::lazy_static;

// Global constants for level and keyword
const LEVEL: u8 = 4; // Example level (Informational)
const KEYWORD: u64 = 0x01; // Example keyword

lazy_static! {
static ref PROVIDER: Provider = {
// Initialize the Provider with dynamic options
let mut options = ProviderOptions::new();
options = *options.group_name("testprovider");
let mut provider = Provider::new("testprovider", &options);

// Register events with specific levels and keywords
provider.register_set(LEVEL.into(), KEYWORD);

provider
};
}

fn main() {
// Execute the throughput test with the test_log function
throughput::test_throughput(test_user_events_enabled);
}

fn test_user_events_enabled() {
// Find and check if the event is enabled
if let Some(event_set) = PROVIDER.find_set(LEVEL.into(), KEYWORD) {
let _ = event_set.enabled(); // Perform the enabled check
}
}

0 comments on commit 7e591a3

Please sign in to comment.