From 974e5523d74e751cd298337f445e49291f2f15c7 Mon Sep 17 00:00:00 2001 From: David Bernard Date: Sun, 24 Nov 2024 15:58:59 +0100 Subject: [PATCH] fix!: use `TraceProvider::flush_force()` during test --- fake-opentelemetry-collector/src/lib.rs | 24 +++++++++++++++--------- testing-tracing-opentelemetry/src/lib.rs | 17 +++++++++++------ 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/fake-opentelemetry-collector/src/lib.rs b/fake-opentelemetry-collector/src/lib.rs index fd2ed9c..830d11a 100644 --- a/fake-opentelemetry-collector/src/lib.rs +++ b/fake-opentelemetry-collector/src/lib.rs @@ -11,7 +11,6 @@ use std::net::SocketAddr; use std::time::{Duration, Instant}; use futures::StreamExt; -use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig}; use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; @@ -88,14 +87,16 @@ impl FakeCollectorServer { async fn recv_many(rx: &mut Receiver, at_least: usize, timeout: Duration) -> Vec { let deadline = Instant::now(); - let pause = (timeout / 5).min(Duration::from_millis(500)); + let pause = (timeout / 10).min(Duration::from_millis(10)); while rx.len() < at_least && deadline.elapsed() < timeout { tokio::time::sleep(pause).await; } std::iter::from_fn(|| rx.try_recv().ok()).collect::>() } -pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer { +pub async fn setup_tracer_provider( + fake_server: &FakeCollectorServer, +) -> opentelemetry_sdk::trace::TracerProvider { // if the environment variable is set (in test or in caller), `with_endpoint` value is ignored std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"); @@ -109,10 +110,9 @@ pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sd opentelemetry_sdk::runtime::Tokio, ) .build() - .tracer("") } -pub async fn setup_logger( +pub async fn setup_logger_provider( fake_server: &FakeCollectorServer, ) -> opentelemetry_sdk::logs::LoggerProvider { opentelemetry_sdk::logs::LoggerProvider::builder() @@ -131,8 +131,8 @@ pub async fn setup_logger( mod tests { use super::*; - use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::logs::{LogRecord, Logger, LoggerProvider, Severity}; + use opentelemetry::trace::TracerProvider; use opentelemetry::trace::{Span, SpanKind, Tracer}; #[tokio::test(flavor = "multi_thread")] @@ -140,7 +140,8 @@ mod tests { let mut fake_collector = FakeCollectorServer::start() .await .expect("fake collector setup and started"); - let tracer = setup_tracer(&fake_collector).await; + let tracer_provider = setup_tracer_provider(&fake_collector).await; + let tracer = tracer_provider.tracer("test"); debug!("Sending span..."); let mut span = tracer @@ -149,7 +150,12 @@ mod tests { .start(&tracer); span.add_event("my-test-event", vec![]); span.end(); - shutdown_tracer_provider(); + + let _ = tracer_provider.force_flush(); + tracer_provider + .shutdown() + .expect("no error during shutdown"); + drop(tracer_provider); let otel_spans = fake_collector .exported_spans(1, Duration::from_secs(20)) @@ -184,7 +190,7 @@ mod tests { .await .expect("fake collector setup and started"); - let logger_provider = setup_logger(&fake_collector).await; + let logger_provider = setup_logger_provider(&fake_collector).await; let logger = logger_provider.logger("test"); let mut record = logger.create_log_record(); record.set_body("This is information".into()); diff --git a/testing-tracing-opentelemetry/src/lib.rs b/testing-tracing-opentelemetry/src/lib.rs index 917ed52..6824516 100644 --- a/testing-tracing-opentelemetry/src/lib.rs +++ b/testing-tracing-opentelemetry/src/lib.rs @@ -1,8 +1,8 @@ use assert2::{check, let_assert}; +use opentelemetry::trace::TracerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; use serde_json::Value; use std::sync::mpsc::{self, Receiver, SyncSender}; - use tracing_subscriber::{ fmt::{format::FmtSpan, MakeWriter}, util::SubscriberInitExt, @@ -88,6 +88,7 @@ pub struct FakeEnvironment { fake_collector: fake_opentelemetry_collector::FakeCollectorServer, rx: Receiver>, _subsciber_guard: tracing::subscriber::DefaultGuard, + tracer_provider: opentelemetry_sdk::trace::TracerProvider, } impl FakeEnvironment { @@ -100,10 +101,11 @@ impl FakeEnvironment { let fake_collector = fake_opentelemetry_collector::FakeCollectorServer::start() .await .unwrap(); - let tracer = fake_opentelemetry_collector::setup_tracer(&fake_collector).await; + let tracer_provider = + fake_opentelemetry_collector::setup_tracer_provider(&fake_collector).await; //let (tracer, mut req_rx) = fake_opentelemetry_collector::setup_tracer().await; opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("fake")); let (make_writer, rx) = duplex_writer(); let fmt_layer = tracing_subscriber::fmt::layer() @@ -119,21 +121,24 @@ impl FakeEnvironment { fake_collector, rx, _subsciber_guard, + tracer_provider, } } pub async fn collect_traces( &mut self, ) -> (Vec, Vec) { - opentelemetry::global::shutdown_tracer_provider(); + let _ = self.tracer_provider.force_flush(); + let otel_spans = self .fake_collector - .exported_spans(1, std::time::Duration::from_millis(5000)) + .exported_spans(1, std::time::Duration::from_millis(100)) .await; // insta::assert_debug_snapshot!(first_span); let tracing_events = std::iter::from_fn(|| { self.rx - .recv_timeout(std::time::Duration::from_millis(500)) + .recv_timeout(std::time::Duration::from_millis(3)) + //.recv() .ok() }) .map(|bytes| serde_json::from_slice::(&bytes).unwrap())