Skip to content

Commit

Permalink
fix!: use TraceProvider::flush_force() during test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Nov 24, 2024
1 parent 4a321b0 commit 974e552
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
24 changes: 15 additions & 9 deletions fake-opentelemetry-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::net::SocketAddr;
use std::time::{Duration, Instant};

use futures::StreamExt;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig};
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
Expand Down Expand Up @@ -88,14 +87,16 @@ impl FakeCollectorServer {

async fn recv_many<T>(rx: &mut Receiver<T>, at_least: usize, timeout: Duration) -> Vec<T> {
let deadline = Instant::now();
let pause = (timeout / 5).min(Duration::from_millis(500));
let pause = (timeout / 10).min(Duration::from_millis(10));
while rx.len() < at_least && deadline.elapsed() < timeout {
tokio::time::sleep(pause).await;
}
std::iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>()
}

pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer {
pub async fn setup_tracer_provider(
fake_server: &FakeCollectorServer,
) -> opentelemetry_sdk::trace::TracerProvider {
// if the environment variable is set (in test or in caller), `with_endpoint` value is ignored
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");

Expand All @@ -109,10 +110,9 @@ pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sd
opentelemetry_sdk::runtime::Tokio,
)
.build()
.tracer("")
}

pub async fn setup_logger(
pub async fn setup_logger_provider(
fake_server: &FakeCollectorServer,
) -> opentelemetry_sdk::logs::LoggerProvider {
opentelemetry_sdk::logs::LoggerProvider::builder()
Expand All @@ -131,16 +131,17 @@ pub async fn setup_logger(
mod tests {
use super::*;

use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider, Severity};
use opentelemetry::trace::TracerProvider;
use opentelemetry::trace::{Span, SpanKind, Tracer};

#[tokio::test(flavor = "multi_thread")]
async fn test_fake_tracer_and_collector() {
let mut fake_collector = FakeCollectorServer::start()
.await
.expect("fake collector setup and started");
let tracer = setup_tracer(&fake_collector).await;
let tracer_provider = setup_tracer_provider(&fake_collector).await;
let tracer = tracer_provider.tracer("test");

debug!("Sending span...");
let mut span = tracer
Expand All @@ -149,7 +150,12 @@ mod tests {
.start(&tracer);
span.add_event("my-test-event", vec![]);
span.end();
shutdown_tracer_provider();

let _ = tracer_provider.force_flush();
tracer_provider
.shutdown()
.expect("no error during shutdown");
drop(tracer_provider);

let otel_spans = fake_collector
.exported_spans(1, Duration::from_secs(20))
Expand Down Expand Up @@ -184,7 +190,7 @@ mod tests {
.await
.expect("fake collector setup and started");

let logger_provider = setup_logger(&fake_collector).await;
let logger_provider = setup_logger_provider(&fake_collector).await;
let logger = logger_provider.logger("test");
let mut record = logger.create_log_record();
record.set_body("This is information".into());
Expand Down
17 changes: 11 additions & 6 deletions testing-tracing-opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use assert2::{check, let_assert};
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use serde_json::Value;
use std::sync::mpsc::{self, Receiver, SyncSender};

use tracing_subscriber::{
fmt::{format::FmtSpan, MakeWriter},
util::SubscriberInitExt,
Expand Down Expand Up @@ -88,6 +88,7 @@ pub struct FakeEnvironment {
fake_collector: fake_opentelemetry_collector::FakeCollectorServer,
rx: Receiver<Vec<u8>>,
_subsciber_guard: tracing::subscriber::DefaultGuard,
tracer_provider: opentelemetry_sdk::trace::TracerProvider,
}

impl FakeEnvironment {
Expand All @@ -100,10 +101,11 @@ impl FakeEnvironment {
let fake_collector = fake_opentelemetry_collector::FakeCollectorServer::start()
.await
.unwrap();
let tracer = fake_opentelemetry_collector::setup_tracer(&fake_collector).await;
let tracer_provider =
fake_opentelemetry_collector::setup_tracer_provider(&fake_collector).await;
//let (tracer, mut req_rx) = fake_opentelemetry_collector::setup_tracer().await;
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("fake"));

let (make_writer, rx) = duplex_writer();
let fmt_layer = tracing_subscriber::fmt::layer()
Expand All @@ -119,21 +121,24 @@ impl FakeEnvironment {
fake_collector,
rx,
_subsciber_guard,
tracer_provider,
}
}

pub async fn collect_traces(
&mut self,
) -> (Vec<Value>, Vec<fake_opentelemetry_collector::ExportedSpan>) {
opentelemetry::global::shutdown_tracer_provider();
let _ = self.tracer_provider.force_flush();

let otel_spans = self
.fake_collector
.exported_spans(1, std::time::Duration::from_millis(5000))
.exported_spans(1, std::time::Duration::from_millis(100))
.await;
// insta::assert_debug_snapshot!(first_span);
let tracing_events = std::iter::from_fn(|| {
self.rx
.recv_timeout(std::time::Duration::from_millis(500))
.recv_timeout(std::time::Duration::from_millis(3))
//.recv()
.ok()
})
.map(|bytes| serde_json::from_slice::<Value>(&bytes).unwrap())
Expand Down

0 comments on commit 974e552

Please sign in to comment.