Skip to content

Commit

Permalink
Global Log handler cleanup - Logs SDK (#2184)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Oct 14, 2024
1 parent 3f5c230 commit 16c0e10
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 73 deletions.
41 changes: 26 additions & 15 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
use opentelemetry::otel_warn;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_debug,
trace::TraceContextExt,
Context, InstrumentationLibrary,
};
Expand Down Expand Up @@ -126,17 +125,10 @@ impl LoggerProvider {
if errs.is_empty() {
Ok(())
} else {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", errs)
);
Err(LogError::Other(format!("{:?}", errs).into()))
Err(LogError::Other(format!("{errs:?}").into()))
}
} else {
otel_warn!(
name: "logger_provider_already_shutdown"
);
Err(LogError::Other("logger provider already shut down".into()))
Err(LogError::AlreadyShutdown("LoggerProvider".to_string()))
}
}
}
Expand All @@ -154,6 +146,24 @@ impl LoggerProviderInner {
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
// - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown,
// which is non-actionable by the user
match err {
// specific handling for mutex poisioning
LogError::MutexPoisoned(_) => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
);
}
_ => {
otel_debug!(
name: "LoggerProvider.Drop.ShutdownError",
error = format!("{err}")
);
}
}
errs.push(err);
}
}
Expand All @@ -164,10 +174,11 @@ impl LoggerProviderInner {
impl Drop for LoggerProviderInner {
fn drop(&mut self) {
if !self.is_shutdown.load(Ordering::Relaxed) {
let errs = self.shutdown();
if !errs.is_empty() {
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
}
let _ = self.shutdown(); // errors are handled within shutdown
} else {
otel_debug!(
name: "LoggerProvider.Drop.AlreadyShutdown"
);
}
}
}
Expand Down
90 changes: 34 additions & 56 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use futures_util::{
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
otel_error, otel_warn, InstrumentationLibrary,
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
};

use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -99,26 +98,36 @@ impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "simple_log_processor_emit_after_shutdown"
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
if let Err(err) = result {
otel_error!(
name: "simple_log_processor_emit_error",
error = format!("{:?}", err)
);
global::handle_error(err);
// Handle errors with specific static names
match result {
Err(LogError::MutexPoisoned(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);
}
Err(err) => {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)
);
}
_ => {}
}
}

Expand All @@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor {
exporter.shutdown();
Ok(())
} else {
otel_error!(
name: "simple_log_processor_shutdown_error"
);
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
}
}

Expand Down Expand Up @@ -170,12 +174,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
instrumentation.clone(),
)));

// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
if let Err(err) = result {
otel_error!(
name: "batch_log_processor_emit_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
global::handle_error(LogError::Other(err.into()));
}
}

Expand Down Expand Up @@ -243,10 +247,9 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {

if let Err(err) = result {
otel_error!(
name: "batch_log_processor_export_error",
error = format!("{:?}", err)
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
global::handle_error(err);
}
}
}
Expand All @@ -261,24 +264,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
.await;

if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", result),
message = "Failed to send flush result"
if let Err(send_error) = channel.send(result) {
otel_debug!(
name: "BatchLogProcessor.Flush.SendResultError",
error = format!("{:?}", send_error),
);
}
} else if let Err(err) = result {
otel_error!(
name: "batch_log_processor_flush_error",
error = format!("{:?}", err),
message = "Flush failed"
);
global::handle_error(err);
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
Expand All @@ -293,21 +284,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {

exporter.shutdown();

if let Err(result) = ch.send(result) {
otel_error!(
name: "batch_log_processor_shutdown_error",
error = format!("{:?}", result),
message = "Failed to send shutdown result"
if let Err(send_error) = ch.send(result) {
otel_debug!(
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),
);
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}

break;
}

// propagate the resource
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
Expand Down Expand Up @@ -357,13 +341,7 @@ where
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => {
otel_error!(
name: "export_with_timeout_timeout",
timeout_duration = time_out.as_millis()
);
ExportResult::Err(LogError::ExportTimedOut(time_out))
}
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
}
}

Expand Down
16 changes: 14 additions & 2 deletions opentelemetry/src/global/internal_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ macro_rules! otel_warn {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::warn!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down Expand Up @@ -136,7 +142,13 @@ macro_rules! otel_error {
(name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => {
#[cfg(feature = "internal-logs")]
{
tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, "");
tracing::error!(name: $name,
target: env!("CARGO_PKG_NAME"),
$($key = {
$value
}),+,
""
)
}
#[cfg(not(feature = "internal-logs"))]
{
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub enum LogError {
#[error("Exporter timed out after {} seconds", .0.as_secs())]
ExportTimedOut(Duration),

/// Processor is already shutdown
#[error("{0} already shutdown")]
AlreadyShutdown(String),

/// Mutex lock poisoning
#[error("mutex lock poisioning for {0}")]
MutexPoisoned(String),

/// Other errors propagated from log SDK that weren't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
Expand Down

0 comments on commit 16c0e10

Please sign in to comment.