Skip to content

Commit

Permalink
fix: Use guard pattern to allow consumers to ensure final trace is se…
Browse files Browse the repository at this point in the history
…nt (#185)

* Use guard pattern to allow consumers to ensure final trace is sent
* Add must_use to struct
* Switch examples to use guard pattern

In discussion with the Rust OpenTelemetry crate maintainers, the `shutdown_tracer_provider` function intentionally no longer ensures that the final traces are sent to the OpenTelemetry server before the application exits. To perform this functionality, we now need to call `force_flush` on the TracerProvider instance.

Since the public API functions for the TracerProvider only return a trait object which does not have this function exposed, we need to keep an instance of the actual TracerProvider around. In order to make this simple for consumers of this crate, I have used the guard pattern here so that they do not need to worry about those details or pull in the crates that provide those types. An instance of the TracerProvider is kept in a private field of the TracingGuard struct, which is returned by the init functions. That struct impls `Drop` to call `force_flush` when the struct is dropped. As such, crate consumers only need to use the `let _guard =` syntax to ensure the guard is only dropped when the function actually exits. This also allows us to, in the future, include other provider flushes in the same pattern, such as for OpenTelemetry logs and metrics.

In addition, as the `Drop` impl only performs a `force_flush`, if any consumers of this crate update to a version with this and do not make any code changes, their code will continue to behave exactly the same - the guard struct will be dropped immediately, triggering the `force_flush`, but the provider will continue to operate. Their application will behave exactly as it did before with regard to sending traces. It only loses the behavior of ensuring the final traces are sent before application exit, but that wasn't working before this anyways.

FIX #184
  • Loading branch information
masongup-mdsol authored Nov 23, 2024
1 parent 9433355 commit 4a321b0
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 93 deletions.
13 changes: 3 additions & 10 deletions axum-tracing-opentelemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ use axum_tracing_opentelemetry::opentelemetry_tracing_layer;
#[tokio::main]
async fn main() -> Result<(), axum::BoxError> {
// very opinionated init of tracing, look as is source to make your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
// very opinionated init of tracing, look at the source to make your own
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let app = app();
// run it
let addr = &"0.0.0.0:3000".parse::<SocketAddr>()?;
tracing::warn!("listening on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app.into_make_service())
//FIXME .with_graceful_shutdown(shutdown_signal())
.await?;
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}
Expand All @@ -43,11 +41,6 @@ fn app() -> Router {
.layer(OtelAxumLayer::default())
.route("/health", get(health)) // request processed without span / trace
}
async fn shutdown_signal() {
//...
opentelemetry::global::shutdown_tracer_provider();
}
```

For more info about how to initialize, you can look at crate [`init-tracing-opentelemetry`] or [`tracing-opentelemetry`].
Expand Down
44 changes: 2 additions & 42 deletions examples/axum-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing_opentelemetry_instrumentation_sdk::find_current_trace_id;
#[tokio::main]
async fn main() -> Result<(), BoxError> {
// very opinionated init of tracing, look as is source to make your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;

let app = app();
// run it
Expand All @@ -20,9 +20,7 @@ async fn main() -> Result<(), BoxError> {
tracing::info!("try to call `curl -i http://127.0.0.1:3003/` (with trace)"); //Devskim: ignore DS137138
tracing::info!("try to call `curl -i http://127.0.0.1:3003/health` (with NO trace)"); //Devskim: ignore DS137138
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal())
.await?;
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}

Expand Down Expand Up @@ -61,41 +59,3 @@ async fn proxy_handler(Path((service, path)): Path<(String, String)>) -> impl In
json!({ "my_trace_id": trace_id, "fake_proxy": { "service": service, "path": path } }),
)
}

async fn shutdown_signal() {
use std::sync::mpsc;
use std::{thread, time::Duration};

let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

tracing::warn!("signal received, starting graceful shutdown");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
opentelemetry::global::shutdown_tracer_provider();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2_000));
if shutdown_res.is_err() {
tracing::error!("failed to shutdown OpenTelemetry");
}
}
3 changes: 1 addition & 2 deletions examples/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod generated {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// very opinionated init of tracing, look as is source to make your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()
.expect("init subscribers");

// let channel = Channel::from_static("http://[::1]:50051").connect().await?;
Expand Down Expand Up @@ -53,6 +53,5 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("RESPONSE={:?}", response);
}

opentelemetry::global::shutdown_tracer_provider();
Ok(())
}
31 changes: 2 additions & 29 deletions examples/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Greeter for MyGreeter {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// very opinionated init of tracing, look as is source to make your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()
.expect("init subscribers");

let addr = "0.0.0.0:50051".parse().unwrap();
Expand All @@ -68,35 +68,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.add_service(reflection_service)
//.add_service(GreeterServer::new(greeter))
.add_service(GreeterServer::new(greeter))
.serve_with_shutdown(addr, shutdown_signal())
.serve(addr)
.await?;

Ok(())
}

async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

//tracing::warn!("signal received, starting graceful shutdown");
opentelemetry::global::shutdown_tracer_provider();
}
2 changes: 1 addition & 1 deletion examples/load/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing_opentelemetry_instrumentation_sdk::otel_trace_span;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// very opinionated init of tracing, look as is source to make your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let mut stats = memory_stats();
if stats.is_none() {
eprintln!("Couldn't get the current memory usage :(");
Expand Down
4 changes: 2 additions & 2 deletions init-tracing-opentelemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use axum_tracing_opentelemetry::opentelemetry_tracing_layer;
#[tokio::main]
async fn main() -> Result<(), axum::BoxError> {
// very opinionated init of tracing, look as is source to compose your own
init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
let _guard = init_tracing_opentelemetry::tracing_subscriber_ext::init_subscribers()?;
...;
Ok(())
}
```

AND Call `opentelemetry::global::shutdown_tracer_provider();` on shutdown of the app to be sure to send the pending trace,...
The `init_subscribers` function returns a `TracingGuard` instance. Following the guard pattern, this struct provides no functions but, when dropped, ensures that any pending traces are sent before it exits. The syntax `let _guard` is suggested to ensure that Rust does not drop the struct until the application exits.

To configure opentelemetry tracer & tracing, you can use the functions from `init_tracing_opentelemetry::tracing_subscriber_ext`, but they are very opinionated (and WIP to make them more customizable and friendly), so we recommend making your composition, but look at the code (to avoid some issue) and share your feedback.

Expand Down
27 changes: 20 additions & 7 deletions init-tracing-opentelemetry/src/tracing_subscriber_ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use opentelemetry::trace::{TraceError, TracerProvider};
use opentelemetry_sdk::trace::Tracer;
use opentelemetry_sdk::trace::{self, Tracer};
use tracing::{info, Subscriber};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{filter::EnvFilter, layer::SubscriberExt, registry::LookupSpan, Layer};
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn build_loglevel_filter_layer() -> tracing_subscriber::filter::EnvFilter {
EnvFilter::from_default_env()
}

pub fn build_otel_layer<S>() -> Result<OpenTelemetryLayer<S, Tracer>, TraceError>
pub fn build_otel_layer<S>() -> Result<(OpenTelemetryLayer<S, Tracer>, TracingGuard), TraceError>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
Expand Down Expand Up @@ -88,22 +88,35 @@ where
let layer = tracing_opentelemetry::layer()
.with_error_records_to_exceptions(true)
.with_tracer(tracerprovider.tracer(""));
global::set_tracer_provider(tracerprovider);
Ok(layer)
global::set_tracer_provider(tracerprovider.clone());
Ok((layer, TracingGuard { tracerprovider }))
}

pub fn init_subscribers() -> Result<(), Error> {
#[must_use = "Recommend holding with 'let _guard = ' pattern to ensure final traces are sent to the server"]
pub struct TracingGuard {
tracerprovider: trace::TracerProvider,
}

impl Drop for TracingGuard {
fn drop(&mut self) {
self.tracerprovider.force_flush();
}
}

pub fn init_subscribers() -> Result<TracingGuard, Error> {
//setup a temporary subscriber to log output during setup
let subscriber = tracing_subscriber::registry()
.with(build_loglevel_filter_layer())
.with(build_logger_text());
let _guard = tracing::subscriber::set_default(subscriber);
info!("init logging & tracing");

let (layer, guard) = build_otel_layer()?;

let subscriber = tracing_subscriber::registry()
.with(build_otel_layer()?)
.with(layer)
.with(build_loglevel_filter_layer())
.with(build_logger_text());
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
Ok(guard)
}

0 comments on commit 4a321b0

Please sign in to comment.