Skip to content

Commit

Permalink
🐛 workaround for a delay, batch,... behavior in otlp exporter and tes…
Browse files Browse the repository at this point in the history
…t with fake-opentelemetry-collector (closed too early)
  • Loading branch information
davidB committed Aug 31, 2024
1 parent a7f243c commit a6dfe1a
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {
#[case] headers: &[(&str, &str)],
#[case] is_trace_id_constant: bool,
) {
let fake_env = FakeEnvironment::setup().await;
let mut fake_env = FakeEnvironment::setup().await;
{
let mut svc = Router::new()
.route("/users/:id", get(|| async { StatusCode::OK }))
Expand Down
6 changes: 3 additions & 3 deletions fake-opentelemetry-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ readme = "README.md"
keywords = ["tracing", "opentelemetry", "faker", "mock"]
categories = ["development-tools::testing"]
edition.workspace = true
version = "0.19.0"
version = "0.20.0"
authors.workspace = true
repository.workspace = true
license.workspace = true
Expand All @@ -31,11 +31,11 @@ opentelemetry_sdk = { workspace = true, features = [
"testing",
] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.39", features = ["full"] }
tokio = { version = "1.40", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
assert2 = "0.3"
insta = { version = "1.39.0", features = ["yaml", "redactions"] }
insta = { version = "1.39", features = ["yaml", "redactions"] }
41 changes: 30 additions & 11 deletions fake-opentelemetry-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use logs::*;
use trace::*;

use std::net::SocketAddr;
use std::time::{Duration, Instant};

use futures::StreamExt;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use std::sync::mpsc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::debug;

Expand All @@ -37,8 +39,8 @@ impl FakeCollectorServer {
s
});

let (req_tx, req_rx) = mpsc::sync_channel::<ExportedSpan>(1024);
let (log_tx, log_rx) = mpsc::sync_channel::<ExportedLog>(1024);
let (req_tx, req_rx) = mpsc::channel::<ExportedSpan>(64);
let (log_tx, log_rx) = mpsc::channel::<ExportedLog>(64);
let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx));
let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx));
let handle = tokio::task::spawn(async move {
Expand Down Expand Up @@ -67,19 +69,31 @@ impl FakeCollectorServer {
format!("http://{}", self.address()) //Devskim: ignore DS137138)
}

pub fn exported_spans(&self) -> Vec<ExportedSpan> {
std::iter::from_fn(|| self.req_rx.try_recv().ok()).collect::<Vec<_>>()
pub async fn exported_spans(
&mut self,
at_least: usize,
timeout: Duration,
) -> Vec<ExportedSpan> {
recv_many(&mut self.req_rx, at_least, timeout).await
}

pub fn exported_logs(&self) -> Vec<ExportedLog> {
std::iter::from_fn(|| self.log_rx.try_recv().ok()).collect::<Vec<_>>()
pub async fn exported_logs(&mut self, at_least: usize, timeout: Duration) -> Vec<ExportedLog> {
recv_many(&mut self.log_rx, at_least, timeout).await
}

pub fn abort(self) {
self.handle.abort()
}
}

async fn recv_many<T>(rx: &mut Receiver<T>, at_least: usize, timeout: Duration) -> Vec<T> {
let deadline = Instant::now();
while rx.len() < at_least && deadline.elapsed() < timeout {
tokio::time::sleep(timeout / 5).await;
}
std::iter::from_fn(|| rx.try_recv().ok()).collect::<Vec<_>>()
}

pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer {
// if the environment variable is set (in test or in caller), `with_endpoint` value is ignored
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
Expand Down Expand Up @@ -119,7 +133,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_fake_tracer_and_collector() {
let fake_collector = FakeCollectorServer::start()
let mut fake_collector = FakeCollectorServer::start()
.await
.expect("fake collector setup and started");
let tracer = setup_tracer(&fake_collector).await;
Expand All @@ -133,7 +147,9 @@ mod tests {
span.end();
shutdown_tracer_provider();

let otel_spans = fake_collector.exported_spans();
let otel_spans = fake_collector
.exported_spans(1, Duration::from_millis(2000))
.await;
//insta::assert_debug_snapshot!(otel_spans);
insta::assert_yaml_snapshot!(otel_spans, {
"[].start_time_unix_nano" => "[timestamp]",
Expand All @@ -160,7 +176,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn test_fake_logger_and_collector() {
let fake_collector = FakeCollectorServer::start()
let mut fake_collector = FakeCollectorServer::start()
.await
.expect("fake collector setup and started");

Expand All @@ -172,7 +188,10 @@ mod tests {
record.set_severity_text("info".into());
logger.emit(record);

let otel_logs = fake_collector.exported_logs();
let otel_logs = fake_collector
.exported_logs(1, Duration::from_millis(500))
.await;

insta::assert_yaml_snapshot!(otel_logs, {
"[].trace_id" => insta::dynamic_redaction(|value, _path| {
assert2::let_assert!(Some(trace_id) = value.as_str());
Expand Down
20 changes: 14 additions & 6 deletions fake-opentelemetry-collector/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
};
use serde::Serialize;
use std::collections::BTreeMap;
use std::sync::{mpsc, Mutex};
use tokio::sync::mpsc;

/// This is created to flatten the log record to make it more compatible with insta for testing
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
Expand Down Expand Up @@ -37,12 +37,12 @@ impl From<opentelemetry_proto::tonic::logs::v1::LogRecord> for ExportedLog {
}

pub(crate) struct FakeLogsService {
tx: Mutex<mpsc::SyncSender<ExportedLog>>,
tx: mpsc::Sender<ExportedLog>,
}

impl FakeLogsService {
pub fn new(tx: mpsc::SyncSender<ExportedLog>) -> Self {
Self { tx: Mutex::new(tx) }
pub fn new(tx: mpsc::Sender<ExportedLog>) -> Self {
Self { tx }
}
}

Expand All @@ -52,14 +52,22 @@ impl LogsService for FakeLogsService {
&self,
request: tonic::Request<ExportLogsServiceRequest>,
) -> Result<tonic::Response<ExportLogsServiceResponse>, tonic::Status> {
request
let sender = self.tx.clone();
for el in request
.into_inner()
.resource_logs
.into_iter()
.flat_map(|rl| rl.scope_logs)
.flat_map(|sl| sl.log_records)
.map(ExportedLog::from)
.for_each(|el| self.tx.lock().unwrap().send(el).unwrap());
{
sender
.send(el)
.await
.inspect_err(|e| eprintln!("failed to send to channel: {e}"))
.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
}

Ok(tonic::Response::new(ExportLogsServiceResponse {
partial_success: None,
}))
Expand Down
22 changes: 13 additions & 9 deletions fake-opentelemetry-collector/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
};
use serde::Serialize;
use std::collections::BTreeMap;
use std::sync::mpsc;
use std::sync::Mutex;
use tokio::sync::mpsc;

use tracing::debug;

Expand Down Expand Up @@ -109,12 +108,12 @@ impl From<&opentelemetry_proto::tonic::trace::v1::span::Event> for Event {
}

pub(crate) struct FakeTraceService {
tx: Mutex<mpsc::SyncSender<ExportedSpan>>,
tx: mpsc::Sender<ExportedSpan>,
}

impl FakeTraceService {
pub fn new(tx: mpsc::SyncSender<ExportedSpan>) -> Self {
Self { tx: Mutex::new(tx) }
pub fn new(tx: mpsc::Sender<ExportedSpan>) -> Self {
Self { tx }
}
}

Expand All @@ -125,16 +124,21 @@ impl TraceService for FakeTraceService {
request: tonic::Request<ExportTraceServiceRequest>,
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
debug!("Sending request into channel...");
request
let sender = self.tx.clone();
for es in request
.into_inner()
.resource_spans
.into_iter()
.flat_map(|rs| rs.scope_spans)
.flat_map(|ss| ss.spans)
.map(ExportedSpan::from)
.for_each(|es| {
self.tx.lock().unwrap().send(es).expect("Channel full");
});
{
sender
.send(es)
.await
.inspect_err(|e| eprintln!("failed to send to channel: {e}"))
.map_err(|err| tonic::Status::from_error(Box::new(err)))?;
}
Ok(tonic::Response::new(ExportTraceServiceResponse {
partial_success: None,
}))
Expand Down
2 changes: 1 addition & 1 deletion testing-tracing-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ license.workspace = true

[dependencies]
assert2 = "0.3"
fake-opentelemetry-collector = { path = "../fake-opentelemetry-collector", version = "0.19" }
fake-opentelemetry-collector = { path = "../fake-opentelemetry-collector", version = "0.20" }
insta = { version = "1.29.0", features = ["yaml", "redactions"] }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
Expand Down
20 changes: 13 additions & 7 deletions testing-tracing-opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,22 @@ impl FakeEnvironment {
}

pub async fn collect_traces(
self,
&mut self,
) -> (Vec<Value>, Vec<fake_opentelemetry_collector::ExportedSpan>) {
opentelemetry::global::shutdown_tracer_provider();

let otel_span = self.fake_collector.exported_spans();
let otel_spans = self
.fake_collector
.exported_spans(1, std::time::Duration::from_millis(5000))
.await;
// insta::assert_debug_snapshot!(first_span);
let tracing_events = std::iter::from_fn(|| self.rx.try_recv().ok())
.map(|bytes| serde_json::from_slice::<Value>(&bytes).unwrap())
.collect::<Vec<_>>();
(tracing_events, otel_span)
let tracing_events = std::iter::from_fn(|| {
self.rx
.recv_timeout(std::time::Duration::from_millis(500))
.ok()
})
.map(|bytes| serde_json::from_slice::<Value>(&bytes).unwrap())
.collect::<Vec<_>>();
(tracing_events, otel_spans)
}
}

Expand Down

0 comments on commit a6dfe1a

Please sign in to comment.