diff --git a/axum-tracing-opentelemetry/src/middleware/trace_extractor.rs b/axum-tracing-opentelemetry/src/middleware/trace_extractor.rs index bdedcb7..a294f0e 100644 --- a/axum-tracing-opentelemetry/src/middleware/trace_extractor.rs +++ b/axum-tracing-opentelemetry/src/middleware/trace_extractor.rs @@ -209,7 +209,7 @@ mod tests { #[case] headers: &[(&str, &str)], #[case] is_trace_id_constant: bool, ) { - let fake_env = FakeEnvironment::setup().await; + let mut fake_env = FakeEnvironment::setup().await; { let mut svc = Router::new() .route("/users/:id", get(|| async { StatusCode::OK })) diff --git a/fake-opentelemetry-collector/Cargo.toml b/fake-opentelemetry-collector/Cargo.toml index 9ac3150..0427d14 100644 --- a/fake-opentelemetry-collector/Cargo.toml +++ b/fake-opentelemetry-collector/Cargo.toml @@ -5,7 +5,7 @@ readme = "README.md" keywords = ["tracing", "opentelemetry", "faker", "mock"] categories = ["development-tools::testing"] edition.workspace = true -version = "0.19.0" +version = "0.20.0" authors.workspace = true repository.workspace = true license.workspace = true @@ -31,11 +31,11 @@ opentelemetry_sdk = { workspace = true, features = [ "testing", ] } serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.39", features = ["full"] } +tokio = { version = "1.40", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = { workspace = true } tracing = { workspace = true } [dev-dependencies] assert2 = "0.3" -insta = { version = "1.39.0", features = ["yaml", "redactions"] } +insta = { version = "1.39", features = ["yaml", "redactions"] } diff --git a/fake-opentelemetry-collector/src/lib.rs b/fake-opentelemetry-collector/src/lib.rs index 56a75f4..7611dac 100644 --- a/fake-opentelemetry-collector/src/lib.rs +++ b/fake-opentelemetry-collector/src/lib.rs @@ -8,13 +8,15 @@ use logs::*; use trace::*; use std::net::SocketAddr; +use std::time::{Duration, Instant}; use futures::StreamExt; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::WithExportConfig; use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer; use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; -use std::sync::mpsc; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; use tokio_stream::wrappers::TcpListenerStream; use tracing::debug; @@ -37,8 +39,8 @@ impl FakeCollectorServer { s }); - let (req_tx, req_rx) = mpsc::sync_channel::(1024); - let (log_tx, log_rx) = mpsc::sync_channel::(1024); + let (req_tx, req_rx) = mpsc::channel::(64); + let (log_tx, log_rx) = mpsc::channel::(64); let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx)); let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx)); let handle = tokio::task::spawn(async move { @@ -67,12 +69,16 @@ impl FakeCollectorServer { format!("http://{}", self.address()) //Devskim: ignore DS137138) } - pub fn exported_spans(&self) -> Vec { - std::iter::from_fn(|| self.req_rx.try_recv().ok()).collect::>() + pub async fn exported_spans( + &mut self, + at_least: usize, + timeout: Duration, + ) -> Vec { + recv_many(&mut self.req_rx, at_least, timeout).await } - pub fn exported_logs(&self) -> Vec { - std::iter::from_fn(|| self.log_rx.try_recv().ok()).collect::>() + pub async fn exported_logs(&mut self, at_least: usize, timeout: Duration) -> Vec { + recv_many(&mut self.log_rx, at_least, timeout).await } pub fn abort(self) { @@ -80,6 +86,14 @@ impl FakeCollectorServer { } } +async fn recv_many(rx: &mut Receiver, at_least: usize, timeout: Duration) -> Vec { + let deadline = Instant::now(); + while rx.len() < at_least && deadline.elapsed() < timeout { + tokio::time::sleep(timeout / 5).await; + } + std::iter::from_fn(|| rx.try_recv().ok()).collect::>() +} + pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer { // if the environment variable is set (in test or in caller), `with_endpoint` value is ignored std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"); @@ -119,7 +133,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_fake_tracer_and_collector() { - let fake_collector = FakeCollectorServer::start() + let mut fake_collector = FakeCollectorServer::start() .await .expect("fake collector setup and started"); let tracer = setup_tracer(&fake_collector).await; @@ -133,7 +147,9 @@ mod tests { span.end(); shutdown_tracer_provider(); - let otel_spans = fake_collector.exported_spans(); + let otel_spans = fake_collector + .exported_spans(1, Duration::from_millis(2000)) + .await; //insta::assert_debug_snapshot!(otel_spans); insta::assert_yaml_snapshot!(otel_spans, { "[].start_time_unix_nano" => "[timestamp]", @@ -160,7 +176,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_fake_logger_and_collector() { - let fake_collector = FakeCollectorServer::start() + let mut fake_collector = FakeCollectorServer::start() .await .expect("fake collector setup and started"); @@ -172,7 +188,10 @@ mod tests { record.set_severity_text("info".into()); logger.emit(record); - let otel_logs = fake_collector.exported_logs(); + let otel_logs = fake_collector + .exported_logs(1, Duration::from_millis(500)) + .await; + insta::assert_yaml_snapshot!(otel_logs, { "[].trace_id" => insta::dynamic_redaction(|value, _path| { assert2::let_assert!(Some(trace_id) = value.as_str()); diff --git a/fake-opentelemetry-collector/src/logs.rs b/fake-opentelemetry-collector/src/logs.rs index ab3c866..10bf1a2 100644 --- a/fake-opentelemetry-collector/src/logs.rs +++ b/fake-opentelemetry-collector/src/logs.rs @@ -4,7 +4,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::{ }; use serde::Serialize; use std::collections::BTreeMap; -use std::sync::{mpsc, Mutex}; +use tokio::sync::mpsc; /// This is created to flatten the log record to make it more compatible with insta for testing #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -37,12 +37,12 @@ impl From for ExportedLog { } pub(crate) struct FakeLogsService { - tx: Mutex>, + tx: mpsc::Sender, } impl FakeLogsService { - pub fn new(tx: mpsc::SyncSender) -> Self { - Self { tx: Mutex::new(tx) } + pub fn new(tx: mpsc::Sender) -> Self { + Self { tx } } } @@ -52,14 +52,22 @@ impl LogsService for FakeLogsService { &self, request: tonic::Request, ) -> Result, tonic::Status> { - request + let sender = self.tx.clone(); + for el in request .into_inner() .resource_logs .into_iter() .flat_map(|rl| rl.scope_logs) .flat_map(|sl| sl.log_records) .map(ExportedLog::from) - .for_each(|el| self.tx.lock().unwrap().send(el).unwrap()); + { + sender + .send(el) + .await + .inspect_err(|e| eprintln!("failed to send to channel: {e}")) + .map_err(|err| tonic::Status::from_error(Box::new(err)))?; + } + Ok(tonic::Response::new(ExportLogsServiceResponse { partial_success: None, })) diff --git a/fake-opentelemetry-collector/src/trace.rs b/fake-opentelemetry-collector/src/trace.rs index ea4435b..9cbc91a 100644 --- a/fake-opentelemetry-collector/src/trace.rs +++ b/fake-opentelemetry-collector/src/trace.rs @@ -5,8 +5,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ }; use serde::Serialize; use std::collections::BTreeMap; -use std::sync::mpsc; -use std::sync::Mutex; +use tokio::sync::mpsc; use tracing::debug; @@ -109,12 +108,12 @@ impl From<&opentelemetry_proto::tonic::trace::v1::span::Event> for Event { } pub(crate) struct FakeTraceService { - tx: Mutex>, + tx: mpsc::Sender, } impl FakeTraceService { - pub fn new(tx: mpsc::SyncSender) -> Self { - Self { tx: Mutex::new(tx) } + pub fn new(tx: mpsc::Sender) -> Self { + Self { tx } } } @@ -125,16 +124,21 @@ impl TraceService for FakeTraceService { request: tonic::Request, ) -> Result, tonic::Status> { debug!("Sending request into channel..."); - request + let sender = self.tx.clone(); + for es in request .into_inner() .resource_spans .into_iter() .flat_map(|rs| rs.scope_spans) .flat_map(|ss| ss.spans) .map(ExportedSpan::from) - .for_each(|es| { - self.tx.lock().unwrap().send(es).expect("Channel full"); - }); + { + sender + .send(es) + .await + .inspect_err(|e| eprintln!("failed to send to channel: {e}")) + .map_err(|err| tonic::Status::from_error(Box::new(err)))?; + } Ok(tonic::Response::new(ExportTraceServiceResponse { partial_success: None, })) diff --git a/testing-tracing-opentelemetry/Cargo.toml b/testing-tracing-opentelemetry/Cargo.toml index 0ea5376..bbbca30 100644 --- a/testing-tracing-opentelemetry/Cargo.toml +++ b/testing-tracing-opentelemetry/Cargo.toml @@ -14,7 +14,7 @@ license.workspace = true [dependencies] assert2 = "0.3" -fake-opentelemetry-collector = { path = "../fake-opentelemetry-collector", version = "0.19" } +fake-opentelemetry-collector = { path = "../fake-opentelemetry-collector", version = "0.20" } insta = { version = "1.29.0", features = ["yaml", "redactions"] } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } diff --git a/testing-tracing-opentelemetry/src/lib.rs b/testing-tracing-opentelemetry/src/lib.rs index 8afc79a..917ed52 100644 --- a/testing-tracing-opentelemetry/src/lib.rs +++ b/testing-tracing-opentelemetry/src/lib.rs @@ -123,16 +123,22 @@ impl FakeEnvironment { } pub async fn collect_traces( - self, + &mut self, ) -> (Vec, Vec) { opentelemetry::global::shutdown_tracer_provider(); - - let otel_span = self.fake_collector.exported_spans(); + let otel_spans = self + .fake_collector + .exported_spans(1, std::time::Duration::from_millis(5000)) + .await; // insta::assert_debug_snapshot!(first_span); - let tracing_events = std::iter::from_fn(|| self.rx.try_recv().ok()) - .map(|bytes| serde_json::from_slice::(&bytes).unwrap()) - .collect::>(); - (tracing_events, otel_span) + let tracing_events = std::iter::from_fn(|| { + self.rx + .recv_timeout(std::time::Duration::from_millis(500)) + .ok() + }) + .map(|bytes| serde_json::from_slice::(&bytes).unwrap()) + .collect::>(); + (tracing_events, otel_spans) } }