diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 3340836a57..62368e182e 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -90,6 +90,25 @@ pub trait LogExporter: Send + Sync + Debug { // By default, all logs are enabled true } + + /// This is a hint to ensure that the export of any Logs the exporter + /// has received prior to the call to this function SHOULD be completed + /// as soon as possible, preferably before returning from this method. + /// + /// This function SHOULD provide a way to let the caller know + /// whether it succeeded, failed or timed out. + /// + /// This function SHOULD only be called in cases where it is absolutely necessary, + /// such as when using some FaaS providers that may suspend the process after + /// an invocation, but before the exporter exports the completed logs. + /// + /// This function SHOULD complete or abort within some timeout. This function can be + /// implemented as a blocking API or an asynchronous API which notifies the caller via + /// a callback or an event. OpenTelemetry client authors can decide if they want to + /// make the flush timeout configurable. + fn force_flush(&mut self) -> ExportResult { + Ok(()) + } /// Set the resource for the exporter. fn set_resource(&mut self, _resource: &Resource) {} } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b3fa80fd01..392f54b3f4 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -129,7 +129,11 @@ impl LogProcessor for SimpleLogProcessor { } fn force_flush(&self) -> LogResult<()> { - Ok(()) + if let Ok(mut exporter) = self.exporter.lock() { + exporter.force_flush() + } else { + Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) + } } fn shutdown(&self) -> LogResult<()> { @@ -278,7 +282,8 @@ impl BatchLogProcessor { &timeout_runtime, logs.split_off(0), ) - .await; + .await + .and(exporter.as_mut().force_flush()); if let Some(channel) = res_channel { if let Err(send_error) = channel.send(result) { @@ -803,6 +808,25 @@ mod tests { let _ = provider.shutdown(); } + #[tokio::test(flavor = "multi_thread")] + async fn test_batch_forceflush() { + let exporter = InMemoryLogExporterBuilder::default().build(); + // TODO: Verify exporter.force_flush() is called + + let processor = BatchLogProcessor::new( + Box::new(exporter.clone()), + BatchConfig::default(), + runtime::Tokio, + ); + + let mut record = LogRecord::default(); + let instrumentation = InstrumentationScope::default(); + + processor.emit(&mut record, &instrumentation); + processor.force_flush().unwrap(); + assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); + } + #[tokio::test(flavor = "multi_thread")] async fn test_batch_shutdown() { // assert we will receive an error @@ -820,7 +844,6 @@ mod tests { let instrumentation = InstrumentationScope::default(); processor.emit(&mut record, &instrumentation); - processor.force_flush().unwrap(); processor.shutdown().unwrap(); // todo: expect to see errors here. How should we assert this? processor.emit(&mut record, &instrumentation);