diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c6e0e79830..71b02dc158 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,9 +1,8 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; -use opentelemetry::otel_warn; use opentelemetry::{ - global, logs::{LogError, LogResult}, + otel_debug, trace::TraceContextExt, Context, InstrumentationLibrary, }; @@ -126,17 +125,10 @@ impl LoggerProvider { if errs.is_empty() { Ok(()) } else { - otel_warn!( - name: "logger_provider_shutdown_error", - error = format!("{:?}", errs) - ); - Err(LogError::Other(format!("{:?}", errs).into())) + Err(LogError::Other(format!("{errs:?}").into())) } } else { - otel_warn!( - name: "logger_provider_already_shutdown" - ); - Err(LogError::Other("logger provider already shut down".into())) + Err(LogError::AlreadyShutdown("LoggerProvider".to_string())) } } } @@ -154,6 +146,24 @@ impl LoggerProviderInner { let mut errs = vec![]; for processor in &self.processors { if let Err(err) = processor.shutdown() { + // Log at debug level because: + // - The error is also returned to the user for handling (if applicable) + // - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown, + // which is non-actionable by the user + match err { + // specific handling for mutex poisioning + LogError::MutexPoisoned(_) => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownMutexPoisoned", + ); + } + _ => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownError", + error = format!("{err}") + ); + } + } errs.push(err); } } @@ -164,10 +174,11 @@ impl LoggerProviderInner { impl Drop for LoggerProviderInner { fn drop(&mut self) { if !self.is_shutdown.load(Ordering::Relaxed) { - let errs = self.shutdown(); - if !errs.is_empty() { - global::handle_error(LogError::Other(format!("{:?}", errs).into())); - } + let _ = self.shutdown(); // errors are handled within shutdown + } else { + otel_debug!( + name: "LoggerProvider.Drop.AlreadyShutdown" + ); } } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e6578c7f7e..d74047fe13 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -12,9 +12,8 @@ use futures_util::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; use opentelemetry::{ - global, logs::{LogError, LogResult}, - otel_error, otel_warn, InstrumentationLibrary, + otel_debug, otel_error, otel_warn, InstrumentationLibrary, }; use std::sync::atomic::AtomicBool; @@ -99,8 +98,9 @@ impl LogProcessor for SimpleLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + // this is a warning, as the user is trying to log after the processor has been shutdown otel_warn!( - name: "simple_log_processor_emit_after_shutdown" + name: "SimpleLogProcessor.Emit.ProcessorShutdown", ); return; } @@ -108,17 +108,26 @@ impl LogProcessor for SimpleLogProcessor { let result = self .exporter .lock() - .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) + .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) .and_then(|mut exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); - if let Err(err) = result { - otel_error!( - name: "simple_log_processor_emit_error", - error = format!("{:?}", err) - ); - global::handle_error(err); + // Handle errors with specific static names + match result { + Err(LogError::MutexPoisoned(_)) => { + // logging as debug as this is not a user error + otel_debug!( + name: "SimpleLogProcessor.Emit.MutexPoisoning", + ); + } + Err(err) => { + otel_error!( + name: "SimpleLogProcessor.Emit.ExportError", + error = format!("{}",err) + ); + } + _ => {} } } @@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor { exporter.shutdown(); Ok(()) } else { - otel_error!( - name: "simple_log_processor_shutdown_error" - ); - Err(LogError::Other( - "simple logprocessor mutex poison during shutdown".into(), - )) + Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) } } @@ -170,12 +174,12 @@ impl LogProcessor for BatchLogProcessor { instrumentation.clone(), ))); + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if let Err(err) = result { otel_error!( - name: "batch_log_processor_emit_error", - error = format!("{:?}", err) + name: "BatchLogProcessor.Export.Error", + error = format!("{}", err) ); - global::handle_error(LogError::Other(err.into())); } } @@ -243,10 +247,9 @@ impl BatchLogProcessor { if let Err(err) = result { otel_error!( - name: "batch_log_processor_export_error", - error = format!("{:?}", err) + name: "BatchLogProcessor.Export.Error", + error = format!("{}", err) ); - global::handle_error(err); } } } @@ -261,24 +264,12 @@ impl BatchLogProcessor { .await; if let Some(channel) = res_channel { - if let Err(result) = channel.send(result) { - global::handle_error(LogError::from(format!( - "failed to send flush result: {:?}", - result - ))); - otel_error!( - name: "batch_log_processor_flush_error", - error = format!("{:?}", result), - message = "Failed to send flush result" + if let Err(send_error) = channel.send(result) { + otel_debug!( + name: "BatchLogProcessor.Flush.SendResultError", + error = format!("{:?}", send_error), ); } - } else if let Err(err) = result { - otel_error!( - name: "batch_log_processor_flush_error", - error = format!("{:?}", err), - message = "Flush failed" - ); - global::handle_error(err); } } // Stream has terminated or processor is shutdown, return to finish execution. @@ -293,21 +284,14 @@ impl BatchLogProcessor { exporter.shutdown(); - if let Err(result) = ch.send(result) { - otel_error!( - name: "batch_log_processor_shutdown_error", - error = format!("{:?}", result), - message = "Failed to send shutdown result" + if let Err(send_error) = ch.send(result) { + otel_debug!( + name: "BatchLogProcessor.Shutdown.SendResultError", + error = format!("{:?}", send_error), ); - global::handle_error(LogError::from(format!( - "failed to send batch processor shutdown result: {:?}", - result - ))); } - break; } - // propagate the resource BatchMessage::SetResource(resource) => { exporter.set_resource(&resource); @@ -357,13 +341,7 @@ where pin_mut!(timeout); match future::select(export, timeout).await { Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => { - otel_error!( - name: "export_with_timeout_timeout", - timeout_duration = time_out.as_millis() - ); - ExportResult::Err(LogError::ExportTimedOut(time_out)) - } + Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), } } diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 4c09f38b0c..4391485619 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -66,7 +66,13 @@ macro_rules! otel_warn { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::warn!(name: $name, + target: env!("CARGO_PKG_NAME"), + $($key = { + $value + }),+, + "" + ) } #[cfg(not(feature = "internal-logs"))] { @@ -136,7 +142,13 @@ macro_rules! otel_error { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::error!(name: $name, + target: env!("CARGO_PKG_NAME"), + $($key = { + $value + }),+, + "" + ) } #[cfg(not(feature = "internal-logs"))] { diff --git a/opentelemetry/src/logs/mod.rs b/opentelemetry/src/logs/mod.rs index f0bbe0d660..1a27edb2e0 100644 --- a/opentelemetry/src/logs/mod.rs +++ b/opentelemetry/src/logs/mod.rs @@ -30,6 +30,14 @@ pub enum LogError { #[error("Exporter timed out after {} seconds", .0.as_secs())] ExportTimedOut(Duration), + /// Processor is already shutdown + #[error("{0} already shutdown")] + AlreadyShutdown(String), + + /// Mutex lock poisoning + #[error("mutex lock poisioning for {0}")] + MutexPoisoned(String), + /// Other errors propagated from log SDK that weren't covered above. #[error(transparent)] Other(#[from] Box),