Skip to content

Commit

Permalink
Merge branch 'main' into trace-set-resource
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored May 28, 2024
2 parents 5f7a564 + 0ce6a6d commit 080d6ce
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 11 deletions.
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

- **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans.

## v0.23.0

- Fix SimpleSpanProcessor to be consistent with log counterpart. Also removed
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod tests {
}
}

fn shutdown(&mut self) -> TraceResult<()> {
fn shutdown(&self) -> TraceResult<()> {
self.force_flush()
}

Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
fn force_flush(&self) -> TraceResult<()>;
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processors to do any cleanup required.
fn shutdown(&mut self) -> TraceResult<()>;
fn shutdown(&self) -> TraceResult<()>;
/// Set the resource for the log processor.
fn set_resource(&mut self, _resource: &Resource);
}
Expand Down Expand Up @@ -140,7 +140,7 @@ impl SpanProcessor for SimpleSpanProcessor {
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
fn shutdown(&self) -> TraceResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
Expand Down Expand Up @@ -258,7 +258,7 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
.and_then(|identity| identity)
}

fn shutdown(&mut self) -> TraceResult<()> {
fn shutdown(&self) -> TraceResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
Expand Down Expand Up @@ -709,7 +709,7 @@ mod tests {
#[test]
fn simple_span_processor_on_end_calls_export() {
let exporter = InMemorySpanExporterBuilder::new().build();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
Expand Down Expand Up @@ -741,7 +741,7 @@ mod tests {
#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let exporter = InMemorySpanExporterBuilder::new().build();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
assert!(!exporter.get_finished_spans().unwrap().is_empty());
Expand Down Expand Up @@ -897,7 +897,7 @@ mod tests {
scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
..Default::default()
};
let mut processor =
let processor =
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
let handle = tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -999,7 +999,7 @@ mod tests {
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: async_std::task::sleep,
};
let mut processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
processor.on_end(new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
Expand All @@ -1023,7 +1023,7 @@ mod tests {
delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
delay_fn: tokio::time::sleep,
};
let mut processor =
let processor =
BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
processor.on_end(new_test_export_span_data());
Expand Down
4 changes: 4 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.30.12", optional = true }

[features]
stats = ["sysinfo"]
8 changes: 8 additions & 0 deletions stress/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ Throughput: 3,905,200 iterations/sec
Throughput: 4,106,600 iterations/sec
Throughput: 5,075,400 iterations/sec
```

## Feature flags

"stats" - Prints memory and CPU usage. Has slight impact on throughput.

```sh
cargo run --release --bin metrics --feature=stats
```
30 changes: 29 additions & 1 deletion stress/src/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
#[cfg(feature = "stats")]
use sysinfo::{Pid, System};

const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds
const BATCH_SIZE: u64 = 1000;
Expand All @@ -27,7 +29,7 @@ where
})
.expect("Error setting Ctrl-C handler");
let num_threads = num_cpus::get();
println!("Number of threads: {}", num_threads);
println!("Number of threads: {}\n", num_threads);
let mut handles = Vec::with_capacity(num_threads);
let func_arc = Arc::new(func);
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();
Expand All @@ -42,6 +44,12 @@ where
let mut start_time = Instant::now();
let mut end_time = start_time;
let mut total_count_old: u64 = 0;

#[cfg(feature = "stats")]
let pid = Pid::from(std::process::id() as usize);
#[cfg(feature = "stats")]
let mut system = System::new_all();

loop {
let elapsed = end_time.duration_since(start_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
Expand All @@ -56,6 +64,26 @@ where
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);

#[cfg(feature = "stats")]
{
system.refresh_all();
if let Some(process) = system.process(pid) {
println!(
"Memory usage: {:.2} MB",
process.memory() as f64 / (1024.0 * 1024.0)
);
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
println!(
"Virtual memory usage: {:.2} MB",
process.virtual_memory() as f64 / (1024.0 * 1024.0)
);
} else {
println!("Process not found");
}
}

println!("\n");
start_time = Instant::now();
}

Expand Down
2 changes: 1 addition & 1 deletion stress/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl SpanProcessor for NoOpSpanProcessor {
Ok(())
}

fn shutdown(&mut self) -> TraceResult<()> {
fn shutdown(&self) -> TraceResult<()> {
Ok(())
}

Expand Down

0 comments on commit 080d6ce

Please sign in to comment.