Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid redundant shutdown in TracerProvider::drop when already shut down #2197

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
209 changes: 173 additions & 36 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,73 @@
//! # Trace Provider SDK
//!
//! ## Tracer Creation
//!
//! New [`Tracer`] instances are always created through a [`TracerProvider`].
//!
//! All configuration objects and extension points (span processors,
//! propagators) are provided by the [`TracerProvider`]. [`Tracer`] instances do
//! not duplicate this data to avoid that different [`Tracer`] instances
//! of the [`TracerProvider`] have different versions of these data.
/// # Trace Provider SDK
///
/// The `TracerProvider` handles the creation and management of [`Tracer`] instances and coordinates
/// span processing. It serves as the central configuration point for tracing, ensuring consistency
/// across all [`Tracer`] instances it creates.
///
/// ## Tracer Creation
///
/// New [`Tracer`] instances are always created through a `TracerProvider`. These `Tracer`s share
/// a common configuration, which includes the [`Resource`], span processors, sampling strategies,
/// and span limits. This avoids the need for each `Tracer` to maintain its own version of these
/// configurations, ensuring uniform behavior across all instances.
///
/// ## Cloning and Shutdown
///
/// The `TracerProvider` is designed to be lightweight and clonable. Cloning a `TracerProvider`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think TracerProvider is lightweight. It is pretty heavy, and we expect user to create it only once. It is correct to mention cloning is cheap as it is just creating a new ref.

/// creates a new reference to the same provider, not a new instance. Dropping the last reference
/// to the `TracerProvider` will automatically trigger its shutdown. During shutdown, the provider
/// will flush all remaining spans for **batch processors**, ensuring they are exported to the configured
/// exporters. However, **simple processors** do not require a flush, as they export spans immediately
/// when they end. Users can also manually trigger shutdown using the [`shutdown`](TracerProvider::shutdown)
/// method, which will ensure the same behavior (flushing for batch processors, but no additional action
/// for simple processors).
///
/// Once shut down, the `TracerProvider` transitions into a disabled state. In this state, further
/// operations on its associated `Tracer` instances will result in no-ops, ensuring that no spans
/// are processed or exported after shutdown.
///
/// ## Span Processing and Force Flush
///
/// The `TracerProvider` manages the lifecycle of span processors, which are responsible for
/// collecting, processing, and exporting spans. To ensure all spans are processed before shutdown,
/// users can call the [`force_flush`](TracerProvider::force_flush) method at any time to trigger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove force_flush mention here. I have seen many users doing force_flush in their code (and block their threads).. Not sure why, but lets make sure official docs don't recommend it.

/// an immediate flush of all pending spans for **batch processors** to the exporters. Note that
/// calling [`force_flush`](TracerProvider::force_flush) is optional before shutdown, as `shutdown`
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
/// will automatically trigger a flush for batch processors, but not for simple processors.
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use opentelemetry::global;
/// use opentelemetry_sdk::trace::TracerProvider;
///
/// fn init_tracing() -> TracerProvider {
/// let provider = TracerProvider::default();
///
/// // Set the provider to be used globally
/// let _ = global::set_tracer_provider(provider.clone());
///
/// provider
/// }
///
/// fn main() {
/// let provider = init_tracing();
///
/// // create spans...
///
/// // Flush all spans before shutdown (optional for batch processors)
/// for result in provider.force_flush() {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
/// if let Err(err) = result {
/// // Handle flush error...
/// }
/// }
///
/// // Dropping the provider ensures remaining spans are flushed for batch processors
/// // and shuts down the global tracer provider.
/// drop(provider);
/// global::shutdown_tracer_provider();
/// }
/// ```
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
Expand All @@ -16,7 +76,7 @@
use crate::{InstrumentationLibrary, Resource};
use once_cell::sync::{Lazy, OnceCell};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, trace::TraceResult};
use opentelemetry::{otel_debug, trace::TraceResult};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -36,36 +96,60 @@
span_limits: SpanLimits::default(),
resource: Cow::Owned(Resource::empty()),
},
is_shutdown: AtomicBool::new(true),
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

/// TracerProvider inner type
#[derive(Debug)]
pub(crate) struct TracerProviderInner {
processors: Vec<Box<dyn SpanProcessor>>,
config: crate::trace::Config,
is_shutdown: AtomicBool,
}

impl Drop for TracerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
impl TracerProviderInner {
/// Crate-private shutdown method to be called both from explicit shutdown
/// and from Drop when the last reference is released.
pub(crate) fn shutdown(&self) -> Vec<TraceError> {
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
// - Or the error occurs during `TracerProviderInner::Drop` as part of telemetry shutdown,
// which is non-actionable by the user
otel_debug!(name: "TracerProvider.Drop.ShutdownError",
error = format!("{err}"));

Check warning on line 123 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L123

Added line #L123 was not covered by tests
errs.push(err);
}
}
errs
}
}

impl Drop for TracerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "TracerProviderProvider.Drop.AlreadyShutdown"
lalitb marked this conversation as resolved.
Show resolved Hide resolved
);
}
}
}

/// Creator and registry of named [`Tracer`] instances.
///
/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
/// Cloning and dropping them will not stop the span processing. To stop span processing, users
/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
/// `TracerProvider` is a lightweight container holding pointers to `SpanProcessor` and other components.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not introduced in this PR, but advertising TracerProvider as lightweight is incorrect, and can lead to users repeatedly creating them, instead of doing it once.

/// Cloning a `TracerProvider` instance will not stop span processing. To stop span processing, users
lalitb marked this conversation as resolved.
Show resolved Hide resolved
/// must either call the `shutdown` method explicitly or allow the last reference to the `TracerProvider`
/// to be dropped. When the last reference is dropped, the shutdown process will be automatically triggered
/// to ensure proper cleanup.
#[derive(Clone, Debug)]
pub struct TracerProvider {
inner: Arc<TracerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

impl Default for TracerProvider {
Expand All @@ -79,7 +163,6 @@
pub(crate) fn new(inner: TracerProviderInner) -> Self {
TracerProvider {
inner: Arc::new(inner),
is_shutdown: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -101,7 +184,7 @@
/// true if the provider has been shutdown
/// Don't start span or export spans when provider is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::Relaxed)
self.inner.is_shutdown.load(Ordering::Relaxed)
}

/// Force flush all remaining spans in span processors and return results.
Expand Down Expand Up @@ -153,28 +236,20 @@
/// Note that shut down doesn't means the TracerProvider has dropped
pub fn shutdown(&self) -> TraceResult<()> {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new spans after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);
}
}

let errs = self.inner.shutdown();
if errs.is_empty() {
Ok(())
} else {
Err(TraceError::Other(format!("{errs:?}").into()))
}
} else {
Err(TraceError::Other(
"tracer provider already shut down".into(),
))
Err(TraceError::AlreadyShutdown("TracerProvider".to_string()))

Check warning on line 252 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L252

Added line #L252 was not covered by tests
}
}
}
Expand Down Expand Up @@ -215,7 +290,7 @@
}

fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
}
Tracer::new(library, self.clone())
Expand Down Expand Up @@ -292,7 +367,12 @@
p.set_resource(config.resource.as_ref());
}

TracerProvider::new(TracerProviderInner { processors, config })
let is_shutdown = AtomicBool::new(false);
TracerProvider::new(TracerProviderInner {
processors,
config,
is_shutdown,
})
}
}

Expand Down Expand Up @@ -391,6 +471,7 @@
Box::from(TestSpanProcessor::new(false)),
],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

let results = tracer_provider.force_flush();
Expand Down Expand Up @@ -534,6 +615,7 @@
let tracer_provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

let test_tracer_1 = tracer_provider.tracer("test1");
Expand All @@ -554,14 +636,69 @@

// after shutdown we should get noop tracer
let noop_tracer = tracer_provider.tracer("noop");

// noop tracer cannot start anything
let _ = noop_tracer.start("test");
assert!(assert_handle.started_span_count(2));
// noop tracer's tracer provider should be shutdown
assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));
assert!(noop_tracer.provider().is_shutdown());

// existing tracer becomes noops after shutdown
let _ = test_tracer_1.start("test");
assert!(assert_handle.started_span_count(2));
}

#[test]
fn test_tracer_provider_inner_drop_shutdown() {
// Test 1: Already shutdown case
{
let processor = TestSpanProcessor::new(true);
let assert_handle = processor.assert_info();
let provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

// Create multiple providers sharing same inner
let provider2 = provider.clone();
let provider3 = provider.clone();

// Shutdown explicitly first
assert!(provider.shutdown().is_ok());

// Drop all providers - should not trigger another shutdown in TracerProviderInner::drop
drop(provider);
drop(provider2);
drop(provider3);

// Verify shutdown was called exactly once
assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this verify that shutdown was called only once? It looks like it's only verifying that shutdown was called (could have been called once or multiple times)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think I should be using CountingShutdownProcessor which was added in #2195.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made this use CountingShutdownProcessor now.

}

// Test 2: Not shutdown case
{
let processor = TestSpanProcessor::new(true);
let assert_handle = processor.assert_info();
let provider = super::TracerProvider::new(TracerProviderInner {
processors: vec![Box::from(processor)],
config: Default::default(),
is_shutdown: AtomicBool::new(false),
});

// Create multiple providers sharing same inner
let provider2 = provider.clone();
let provider3 = provider.clone();

// Drop providers without explicit shutdown
drop(provider);
drop(provider2);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify that shutdown was not called by asserting that assert_handle.0.is_shutdown is still false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a look again, the tests are more focused now.

// Last drop should trigger shutdown in TracerProviderInner::drop
drop(provider3);

// Verify shutdown was called exactly once
assert!(assert_handle.0.is_shutdown.load(Ordering::SeqCst));
}
}
}
4 changes: 4 additions & 0 deletions opentelemetry/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ pub enum TraceError {
#[error("Exporting timed out after {} seconds", .0.as_secs())]
ExportTimedOut(time::Duration),

/// already shutdown error
#[error("{0} already shutdown")]
AlreadyShutdown(String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect to use this variant for anything other than TracerProvider?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thought to use them for the processors and exporters too. But I believe we can customize it later if required. For now, made it static for TracerProvider.


/// Other errors propagated from trace SDK that weren't covered above
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down
Loading