From 52018e34bb7e81191f18c898d24844f5a0368cb9 Mon Sep 17 00:00:00 2001 From: Nicklas Ansman Date: Fri, 22 Nov 2024 11:33:17 -0500 Subject: [PATCH 1/2] Prevent ANR during SDK initialization Description When initializing the OpenTelemetry Android SDK with disk buffering enabled, we discovered that synchronous disk space checks were causing ANRs in production. These checks occur during the creation of disk buffering exporters, specifically in `DiskManager.getMaxFolderSize()`, which makes blocking IPC calls through `StorageManager.getAllocatableBytes()` on the main thread. The issue manifests in the following ANR stacktrace: ``` android.os.BinderProxy.transact (BinderProxy.java:662) android.os.storage.IStorageManager$Stub$Proxy.getAllocatableBytes (IStorageManager.java:2837) android.os.storage.StorageManager.getAllocatableBytes (StorageManager.java:2414) android.os.storage.StorageManager.getAllocatableBytes (StorageManager.java:2404) io.opentelemetry.android.internal.services.CacheStorage.getAvailableSpace (CacheStorage.java:66) io.opentelemetry.android.internal.services.CacheStorage.ensureCacheSpaceAvailable (CacheStorage.java:50) io.opentelemetry.android.internal.features.persistence.DiskManager.getMaxFolderSize (DiskManager.kt:58) io.opentelemetry.android.OpenTelemetryRumBuilder.createStorageConfiguration (OpenTelemetryRumBuilder.java:338) io.opentelemetry.android.OpenTelemetryRumBuilder.build (OpenTelemetryRumBuilder.java:286) ``` Our Solution To fix this we moved initialization to run on a background executor and buffer the data in memory until it completes. The process works like this: 1. Initialize the SDK with `BufferDelegatingExporter` instances that can immediately accept telemetry data. 2. Move exporter initialization off the main thread. 3. Once async initialization completes, flush buffered signals to initialized exporters and delegate all future signals. The primary goal of this solution is to be unobtrusive and prevent ANRs caused by initialization of disk exporters, while preventing signals from being dropped. Testing We have added unit tests to cover the buffering, delevation, and RUM building. We've also verified this with both disk enabled and disk disabled. --- .../android/OpenTelemetryRumBuilder.java | 135 ++++++++++++------ .../export/BufferDelegatingLogExporter.kt | 35 +++++ .../export/BufferDelegatingSpanExporter.kt | 35 +++++ .../export/BufferedDelegatingExporter.kt | 100 +++++++++++++ .../android/OpenTelemetryRumBuilderTest.java | 83 +++++++---- .../export/BufferDelegatingLogExporterTest.kt | 92 ++++++++++++ .../BufferDelegatingSpanExporterTest.kt | 118 +++++++++++++++ 7 files changed, 527 insertions(+), 71 deletions(-) create mode 100644 core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt create mode 100644 core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt create mode 100644 core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt create mode 100644 core/src/test/java/io/opentelemetry/android/export/BufferDelegatingLogExporterTest.kt create mode 100644 core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt diff --git a/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java b/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java index a01e497ec..58bb35aaf 100644 --- a/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java +++ b/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java @@ -10,8 +10,11 @@ import android.app.Application; import android.util.Log; import androidx.annotation.NonNull; +import androidx.annotation.Nullable; import io.opentelemetry.android.common.RumConstants; import io.opentelemetry.android.config.OtelRumConfig; +import io.opentelemetry.android.export.BufferDelegatingLogExporter; +import io.opentelemetry.android.export.BufferDelegatingSpanExporter; import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration; import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter; import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduleHandler; @@ -63,7 +66,6 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import javax.annotation.Nullable; import kotlin.jvm.functions.Function0; /** @@ -94,8 +96,13 @@ public final class OpenTelemetryRumBuilder { private Resource resource; - @Nullable private ServiceManager serviceManager; - @Nullable private ExportScheduleHandler exportScheduleHandler; + private final Object lock = new Object(); + + // Writes guarded by "lock" + @Nullable private volatile ServiceManager serviceManager; + + // Writes guarded by "lock" + @Nullable private volatile ExportScheduleHandler exportScheduleHandler; private static TextMapPropagator buildDefaultPropagator() { return TextMapPropagator.composite( @@ -279,6 +286,56 @@ public OpenTelemetryRum build() { InitializationEvents initializationEvents = InitializationEvents.get(); applyConfiguration(initializationEvents); + BufferDelegatingLogExporter bufferDelegatingLogExporter = new BufferDelegatingLogExporter(); + + BufferDelegatingSpanExporter bufferDelegatingSpanExporter = + new BufferDelegatingSpanExporter(); + + SessionManager sessionManager = + SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos()); + + OpenTelemetrySdk sdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + buildTracerProvider( + sessionManager, application, bufferDelegatingSpanExporter)) + .setLoggerProvider( + buildLoggerProvider( + sessionManager, application, bufferDelegatingLogExporter)) + .setMeterProvider(buildMeterProvider(application)) + .setPropagators(buildFinalPropagators()) + .build(); + + otelSdkReadyListeners.forEach(listener -> listener.accept(sdk)); + + SdkPreconfiguredRumBuilder delegate = + new SdkPreconfiguredRumBuilder( + application, + sdk, + timeoutHandler, + sessionManager, + config.shouldDiscoverInstrumentations(), + getServiceManager()); + + // AsyncTask is deprecated but the thread pool is still used all over the Android SDK + // and it provides a way to get a background thread without having to create a new one. + android.os.AsyncTask.THREAD_POOL_EXECUTOR.execute( + () -> + initializeExporters( + initializationEvents, + bufferDelegatingSpanExporter, + bufferDelegatingLogExporter)); + + instrumentations.forEach(delegate::addInstrumentation); + + return delegate.build(); + } + + private void initializeExporters( + InitializationEvents initializationEvents, + BufferDelegatingSpanExporter bufferDelegatingSpanExporter, + BufferDelegatingLogExporter bufferedDelegatingLogExporter) { + DiskBufferingConfiguration diskBufferingConfiguration = config.getDiskBufferingConfiguration(); SpanExporter spanExporter = buildSpanExporter(); @@ -306,45 +363,31 @@ public OpenTelemetryRum build() { } initializationEvents.spanExporterInitialized(spanExporter); - SessionManager sessionManager = - SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos()); + bufferedDelegatingLogExporter.setDelegate(logsExporter); - OpenTelemetrySdk sdk = - OpenTelemetrySdk.builder() - .setTracerProvider( - buildTracerProvider(sessionManager, application, spanExporter)) - .setLoggerProvider( - buildLoggerProvider(sessionManager, application, logsExporter)) - .setMeterProvider(buildMeterProvider(application)) - .setPropagators(buildFinalPropagators()) - .build(); - - otelSdkReadyListeners.forEach(listener -> listener.accept(sdk)); + bufferDelegatingSpanExporter.setDelegate(spanExporter); scheduleDiskTelemetryReader(signalFromDiskExporter); - - SdkPreconfiguredRumBuilder delegate = - new SdkPreconfiguredRumBuilder( - application, - sdk, - timeoutHandler, - sessionManager, - config.shouldDiscoverInstrumentations(), - getServiceManager()); - instrumentations.forEach(delegate::addInstrumentation); - return delegate.build(); } @NonNull private ServiceManager getServiceManager() { if (serviceManager == null) { - serviceManager = ServiceManagerImpl.Companion.create(application); + synchronized (lock) { + if (serviceManager == null) { + serviceManager = ServiceManagerImpl.Companion.create(application); + } + } } - return serviceManager; + // This can never be null since we never write `null` to it + return requireNonNull(serviceManager); } - public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager) { - this.serviceManager = serviceManager; + public OpenTelemetryRumBuilder setServiceManager(@NonNull ServiceManager serviceManager) { + requireNonNull(serviceManager, "serviceManager cannot be null"); + synchronized (lock) { + this.serviceManager = serviceManager; + } return this; } @@ -353,8 +396,11 @@ public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager) * If not specified, the default schedule exporter will be used. */ public OpenTelemetryRumBuilder setExportScheduleHandler( - ExportScheduleHandler exportScheduleHandler) { - this.exportScheduleHandler = exportScheduleHandler; + @NonNull ExportScheduleHandler exportScheduleHandler) { + requireNonNull(exportScheduleHandler, "exportScheduleHandler cannot be null"); + synchronized (lock) { + this.exportScheduleHandler = exportScheduleHandler; + } return this; } @@ -376,17 +422,24 @@ private StorageConfiguration createStorageConfiguration() throws IOException { } private void scheduleDiskTelemetryReader(@Nullable SignalFromDiskExporter signalExporter) { - if (exportScheduleHandler == null) { - ServiceManager serviceManager = getServiceManager(); - // TODO: Is it safe to get the work service yet here? If so, we can - // avoid all this lazy supplier stuff.... - Function0 getWorkService = serviceManager::getPeriodicWorkService; - exportScheduleHandler = - new DefaultExportScheduleHandler( - new DefaultExportScheduler(getWorkService), getWorkService); + synchronized (lock) { + if (exportScheduleHandler == null) { + ServiceManager serviceManager = getServiceManager(); + // TODO: Is it safe to get the work service yet here? If so, we can + // avoid all this lazy supplier stuff.... + Function0 getWorkService = + serviceManager::getPeriodicWorkService; + exportScheduleHandler = + new DefaultExportScheduleHandler( + new DefaultExportScheduler(getWorkService), getWorkService); + } + } } + final ExportScheduleHandler exportScheduleHandler = + requireNonNull(this.exportScheduleHandler); + if (signalExporter == null) { // Disabling here allows to cancel previously scheduled exports using tools that // can run even after the app has been terminated (such as WorkManager). diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt new file mode 100644 index 000000000..d75a22b24 --- /dev/null +++ b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.logs.data.LogRecordData +import io.opentelemetry.sdk.logs.export.LogRecordExporter + +/** + * An in-memory buffer delegating log exporter that buffers log records in memory until a delegate is set. + * Once a delegate is set, the buffered log records are exported to the delegate. + * + * The buffer size is set to 5,000 log entries by default. If the buffer is full, the exporter will drop new log records. + */ +internal class BufferDelegatingLogExporter( + maxBufferedLogs: Int = 5_000, +) : BufferedDelegatingExporter(bufferedSignals = maxBufferedLogs), + LogRecordExporter { + override fun exportToDelegate( + delegate: LogRecordExporter, + data: Collection, + ): CompletableResultCode = delegate.export(data) + + override fun shutdownDelegate(delegate: LogRecordExporter): CompletableResultCode = delegate.shutdown() + + override fun export(logs: Collection): CompletableResultCode = bufferOrDelegate(logs) + + override fun flush(): CompletableResultCode = + withDelegateOrNull { delegate -> + delegate?.flush() ?: CompletableResultCode.ofSuccess() + } +} diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt new file mode 100644 index 000000000..f6683a375 --- /dev/null +++ b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.sdk.trace.export.SpanExporter + +/** + * An in-memory buffer delegating span exporter that buffers span data in memory until a delegate is set. + * Once a delegate is set, the buffered span data is exported to the delegate. + * + * The buffer size is set to 5,000 spans by default. If the buffer is full, the exporter will drop new span data. + */ +internal class BufferDelegatingSpanExporter( + maxBufferedSpans: Int = 5_000, +) : BufferedDelegatingExporter(bufferedSignals = maxBufferedSpans), + SpanExporter { + override fun exportToDelegate( + delegate: SpanExporter, + data: Collection, + ): CompletableResultCode = delegate.export(data) + + override fun shutdownDelegate(delegate: SpanExporter): CompletableResultCode = delegate.shutdown() + + override fun export(spans: Collection): CompletableResultCode = bufferOrDelegate(spans) + + override fun flush(): CompletableResultCode = + withDelegateOrNull { delegate -> + delegate?.flush() ?: CompletableResultCode.ofSuccess() + } +} diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt new file mode 100644 index 000000000..da8dc4337 --- /dev/null +++ b/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt @@ -0,0 +1,100 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.opentelemetry.sdk.common.CompletableResultCode +import java.util.concurrent.atomic.AtomicBoolean + +/** + * An in-memory buffer delegating signal exporter that buffers signal in memory until a delegate is set. + * Once a delegate is set, the buffered signals are exported to the delegate. + * + * The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new signals. + */ +internal abstract class BufferedDelegatingExporter(private val bufferedSignals: Int = 5_000) { + @Volatile + private var delegate: D? = null + private val buffer = arrayListOf() + private val lock = Any() + private var isShutDown = AtomicBoolean(false) + + /** + * Sets the delegate for this exporter and flushes the buffer to the delegate. + * + * If the delegate has already been set, an [IllegalStateException] will be thrown. + * If this exporter has been shut down, the delegate will be shut down immediately. + * + * @param delegate the delegate to set + * + * @throws IllegalStateException if a delegate has already been set + */ + fun setDelegate(delegate: D) { + synchronized(lock) { + check(this.delegate == null) { "Exporter delegate has already been set." } + + flushToDelegate(delegate) + + this.delegate = delegate + + if (isShutDown.get()) { + shutdownDelegate(delegate) + } + } + } + + /** + * Buffers the given data if the delegate has not been set, otherwise exports the data to the delegate. + * + * @param data the data to buffer or export + */ + protected fun bufferOrDelegate(data: Collection): CompletableResultCode = + withDelegateOrNull { + if (it != null) { + exportToDelegate(it, data) + } else { + val amountToTake = bufferedSignals - buffer.size + buffer.addAll(data.take(amountToTake)) + CompletableResultCode.ofSuccess() + } + } + + /** + * Executes the given block with the delegate if it has been set, otherwise executes the block with a null delegate. + * + * @param block the block to execute + */ + protected fun withDelegateOrNull(block: (D?) -> R): R { + delegate?.let { return block(it) } + return synchronized(lock) { block(delegate) } + } + + open fun shutdown(): CompletableResultCode = bufferedShutDown() + + protected abstract fun exportToDelegate( + delegate: D, + data: Collection, + ): CompletableResultCode + + protected abstract fun shutdownDelegate(delegate: D): CompletableResultCode + + private fun flushToDelegate(delegate: D) { + exportToDelegate(delegate, buffer) + buffer.clear() + buffer.trimToSize() + } + + private fun bufferedShutDown(): CompletableResultCode { + isShutDown.set(true) + + return withDelegateOrNull { + if (it != null) { + shutdownDelegate(it) + } else { + CompletableResultCode.ofSuccess() + } + } + } +} diff --git a/core/src/test/java/io/opentelemetry/android/OpenTelemetryRumBuilderTest.java b/core/src/test/java/io/opentelemetry/android/OpenTelemetryRumBuilderTest.java index 28654643c..1fd9663d7 100644 --- a/core/src/test/java/io/opentelemetry/android/OpenTelemetryRumBuilderTest.java +++ b/core/src/test/java/io/opentelemetry/android/OpenTelemetryRumBuilderTest.java @@ -135,21 +135,26 @@ public void shouldBuildTracerProvider() { SimpleSpanProcessor.create(spanExporter))) .build(); - String sessionId = openTelemetryRum.getRumSessionId(); - openTelemetryRum - .getOpenTelemetry() - .getTracer("test") - .spanBuilder("test span") - .startSpan() - .end(); - - List spans = spanExporter.getFinishedSpanItems(); - assertThat(spans).hasSize(1); - assertThat(spans.get(0)) - .hasName("test span") - .hasResource(resource) - .hasAttributesSatisfyingExactly( - equalTo(SESSION_ID, sessionId), equalTo(SCREEN_NAME_KEY, "unknown")); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + String sessionId = openTelemetryRum.getRumSessionId(); + openTelemetryRum + .getOpenTelemetry() + .getTracer("test") + .spanBuilder("test span") + .startSpan() + .end(); + + List spans = spanExporter.getFinishedSpanItems(); + assertThat(spans).hasSize(1); + assertThat(spans.get(0)) + .hasName("test span") + .hasResource(resource) + .hasAttributesSatisfyingExactly( + equalTo(SESSION_ID, sessionId), + equalTo(SCREEN_NAME_KEY, "unknown")); + }); } @Test @@ -344,11 +349,17 @@ public void diskBufferingEnabled() { .setServiceManager(serviceManager) .build(); - assertThat(SignalFromDiskExporter.get()).isNotNull(); - verify(scheduleHandler).enable(); - verify(scheduleHandler, never()).disable(); - verify(initializationEvents).spanExporterInitialized(exporterCaptor.capture()); - assertThat(exporterCaptor.getValue()).isInstanceOf(SpanToDiskExporter.class); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + assertThat(SignalFromDiskExporter.get()).isNotNull(); + verify(scheduleHandler).enable(); + verify(scheduleHandler, never()).disable(); + verify(initializationEvents) + .spanExporterInitialized(exporterCaptor.capture()); + assertThat(exporterCaptor.getValue()) + .isInstanceOf(SpanToDiskExporter.class); + }); } @Test @@ -373,11 +384,17 @@ public void diskBufferingEnabled_when_exception_thrown() { .setExportScheduleHandler(scheduleHandler) .build(); - verify(initializationEvents).spanExporterInitialized(exporterCaptor.capture()); - verify(scheduleHandler, never()).enable(); - verify(scheduleHandler).disable(); - assertThat(exporterCaptor.getValue()).isNotInstanceOf(SpanToDiskExporter.class); - assertThat(SignalFromDiskExporter.get()).isNull(); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + verify(initializationEvents) + .spanExporterInitialized(exporterCaptor.capture()); + verify(scheduleHandler, never()).enable(); + verify(scheduleHandler).disable(); + assertThat(exporterCaptor.getValue()) + .isNotInstanceOf(SpanToDiskExporter.class); + assertThat(SignalFromDiskExporter.get()).isNull(); + }); } @Test @@ -404,11 +421,17 @@ public void diskBufferingDisabled() { .setExportScheduleHandler(scheduleHandler) .build(); - verify(initializationEvents).spanExporterInitialized(exporterCaptor.capture()); - verify(scheduleHandler, never()).enable(); - verify(scheduleHandler).disable(); - assertThat(exporterCaptor.getValue()).isNotInstanceOf(SpanToDiskExporter.class); - assertThat(SignalFromDiskExporter.get()).isNull(); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> { + verify(initializationEvents) + .spanExporterInitialized(exporterCaptor.capture()); + verify(scheduleHandler, never()).enable(); + verify(scheduleHandler).disable(); + assertThat(exporterCaptor.getValue()) + .isNotInstanceOf(SpanToDiskExporter.class); + assertThat(SignalFromDiskExporter.get()).isNull(); + }); } @Test diff --git a/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingLogExporterTest.kt b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingLogExporterTest.kt new file mode 100644 index 000000000..44017314e --- /dev/null +++ b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingLogExporterTest.kt @@ -0,0 +1,92 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import io.opentelemetry.sdk.logs.data.LogRecordData +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter +import org.junit.Test + +class BufferDelegatingLogExporterTest { + @Test + fun `test setDelegate`() { + val inMemoryBufferDelegatingLogExporter = BufferDelegatingLogExporter() + val logRecordExporter = InMemoryLogRecordExporter.create() + + val logRecordData = mockk() + inMemoryBufferDelegatingLogExporter.export(listOf(logRecordData)) + inMemoryBufferDelegatingLogExporter.setDelegate(logRecordExporter) + + assertThat(logRecordExporter.finishedLogRecordItems) + .containsExactly(logRecordData) + } + + @Test + fun `test buffer limit handling`() { + val inMemoryBufferDelegatingLogExporter = BufferDelegatingLogExporter(10) + val logRecordExporter = InMemoryLogRecordExporter.create() + + repeat(11) { + val logRecordData = mockk() + inMemoryBufferDelegatingLogExporter.export(listOf(logRecordData)) + } + + inMemoryBufferDelegatingLogExporter.setDelegate(logRecordExporter) + + assertThat(logRecordExporter.finishedLogRecordItems) + .hasSize(10) + } + + @Test + fun `test flush with delegate`() { + val inMemoryBufferDelegatingLogExporter = BufferDelegatingLogExporter() + val delegate = spyk() + + val logRecordData = mockk() + inMemoryBufferDelegatingLogExporter.export(listOf(logRecordData)) + + inMemoryBufferDelegatingLogExporter.setDelegate(delegate) + + inMemoryBufferDelegatingLogExporter.flush() + + verify { delegate.flush() } + } + + @Test + fun `test export with delegate`() { + val inMemoryBufferDelegatingLogExporter = BufferDelegatingLogExporter() + val delegate = spyk() + + val logRecordData = mockk() + inMemoryBufferDelegatingLogExporter.export(listOf(logRecordData)) + + verify(exactly = 0) { delegate.export(any()) } + + inMemoryBufferDelegatingLogExporter.setDelegate(delegate) + + verify(exactly = 1) { delegate.export(any()) } + + val logRecordData2 = mockk() + inMemoryBufferDelegatingLogExporter.export(listOf(logRecordData2)) + + verify(exactly = 2) { delegate.export(any()) } + } + + @Test + fun `test shutdown with delegate`() { + val inMemoryBufferDelegatingLogExporter = BufferDelegatingLogExporter() + val delegate = spyk() + + inMemoryBufferDelegatingLogExporter.setDelegate(delegate) + + inMemoryBufferDelegatingLogExporter.shutdown() + + verify { delegate.shutdown() } + } +} diff --git a/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt new file mode 100644 index 000000000..d6f0b5460 --- /dev/null +++ b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt @@ -0,0 +1,118 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter +import io.opentelemetry.sdk.trace.data.SpanData +import org.junit.Test + +class BufferDelegatingSpanExporterTest { + @Test + fun `test setDelegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + val spanExporter = InMemorySpanExporter.create() + + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + bufferDelegatingSpanExporter.setDelegate(spanExporter) + + assertThat(spanExporter.finishedSpanItems) + .containsExactly(spanData) + } + + @Test + fun `test buffer limit handling`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter(10) + val spanExporter = InMemorySpanExporter.create() + + repeat(11) { + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + } + + bufferDelegatingSpanExporter.setDelegate(spanExporter) + + assertThat(spanExporter.finishedSpanItems) + .hasSize(10) + } + + @Test + fun `test flush with delegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + val delegate = spyk() + + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + + bufferDelegatingSpanExporter.setDelegate(delegate) + + verify(exactly = 0) { delegate.flush() } + + bufferDelegatingSpanExporter.flush() + + verify { delegate.flush() } + } + + @Test + fun `test export with delegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + val delegate = spyk() + + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + + verify(exactly = 0) { delegate.export(any()) } + + bufferDelegatingSpanExporter.setDelegate(delegate) + + verify(exactly = 1) { delegate.export(any()) } + + val spanData2 = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData2)) + + verify(exactly = 2) { delegate.export(any()) } + } + + @Test + fun `test shutdown with delegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + val delegate = spyk() + + bufferDelegatingSpanExporter.setDelegate(delegate) + + bufferDelegatingSpanExporter.shutdown() + + verify { delegate.shutdown() } + } + + @Test + fun `test flush without delegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + + val flushResult = bufferDelegatingSpanExporter.flush() + + assertThat(flushResult.isSuccess).isTrue() + } + + @Test + fun `test shutdown without delegate`() { + val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + + val spanData = mockk() + bufferDelegatingSpanExporter.export(listOf(spanData)) + + val shutdownResult = bufferDelegatingSpanExporter.shutdown() + + assertThat(shutdownResult.isSuccess).isTrue() + } +} From afaf8e9199f309b882c44a90a51245a68b5ce6d8 Mon Sep 17 00:00:00 2001 From: Nicklas Ansman Date: Wed, 11 Dec 2024 16:54:51 -0500 Subject: [PATCH 2/2] Address the comments from the reviews --- core/opentelemetry/signals/logs/1733967890157 | Bin 0 -> 1 bytes core/opentelemetry/signals/logs/1733968675537 | Bin 0 -> 1 bytes .../opentelemetry/signals/spans/1733967890160 | Bin 0 -> 1 bytes .../opentelemetry/signals/spans/1733968675539 | Bin 0 -> 1 bytes .../android/OpenTelemetryRumBuilder.java | 62 ++++--- .../export/BufferDelegatingLogExporter.kt | 27 +-- .../export/BufferDelegatingSpanExporter.kt | 27 +-- .../export/BufferedDelegatingExporter.kt | 100 ----------- .../android/export/DelegatingExporter.kt | 161 ++++++++++++++++++ .../BufferDelegatingSpanExporterTest.kt | 109 ++++++------ 10 files changed, 279 insertions(+), 207 deletions(-) create mode 100644 core/opentelemetry/signals/logs/1733967890157 create mode 100644 core/opentelemetry/signals/logs/1733968675537 create mode 100644 core/opentelemetry/signals/spans/1733967890160 create mode 100644 core/opentelemetry/signals/spans/1733968675539 delete mode 100644 core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt create mode 100644 core/src/main/java/io/opentelemetry/android/export/DelegatingExporter.kt diff --git a/core/opentelemetry/signals/logs/1733967890157 b/core/opentelemetry/signals/logs/1733967890157 new file mode 100644 index 0000000000000000000000000000000000000000..f76dd238ade08917e6712764a16a22005a50573d GIT binary patch literal 1 IcmZPo000310RR91 literal 0 HcmV?d00001 diff --git a/core/opentelemetry/signals/logs/1733968675537 b/core/opentelemetry/signals/logs/1733968675537 new file mode 100644 index 0000000000000000000000000000000000000000..f76dd238ade08917e6712764a16a22005a50573d GIT binary patch literal 1 IcmZPo000310RR91 literal 0 HcmV?d00001 diff --git a/core/opentelemetry/signals/spans/1733967890160 b/core/opentelemetry/signals/spans/1733967890160 new file mode 100644 index 0000000000000000000000000000000000000000..f76dd238ade08917e6712764a16a22005a50573d GIT binary patch literal 1 IcmZPo000310RR91 literal 0 HcmV?d00001 diff --git a/core/opentelemetry/signals/spans/1733968675539 b/core/opentelemetry/signals/spans/1733968675539 new file mode 100644 index 0000000000000000000000000000000000000000..f76dd238ade08917e6712764a16a22005a50573d GIT binary patch literal 1 IcmZPo000310RR91 literal 0 HcmV?d00001 diff --git a/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java b/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java index 58bb35aaf..66b3c89be 100644 --- a/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java +++ b/core/src/main/java/io/opentelemetry/android/OpenTelemetryRumBuilder.java @@ -96,13 +96,11 @@ public final class OpenTelemetryRumBuilder { private Resource resource; - private final Object lock = new Object(); + private boolean isBuilt = false; - // Writes guarded by "lock" - @Nullable private volatile ServiceManager serviceManager; + @Nullable private ServiceManager serviceManager; - // Writes guarded by "lock" - @Nullable private volatile ExportScheduleHandler exportScheduleHandler; + @Nullable private ExportScheduleHandler exportScheduleHandler; private static TextMapPropagator buildDefaultPropagator() { return TextMapPropagator.composite( @@ -129,6 +127,7 @@ public static OpenTelemetryRumBuilder create(Application application, OtelRumCon * @return {@code this} */ public OpenTelemetryRumBuilder setResource(Resource resource) { + checkNotBuilt(); this.resource = resource; return this; } @@ -141,6 +140,7 @@ public OpenTelemetryRumBuilder setResource(Resource resource) { * @return {@code this} */ public OpenTelemetryRumBuilder mergeResource(Resource resource) { + checkNotBuilt(); this.resource = this.resource.merge(resource); return this; } @@ -180,6 +180,7 @@ public OpenTelemetryRumBuilder addTracerProviderCustomizer( */ public OpenTelemetryRumBuilder addMeterProviderCustomizer( BiFunction customizer) { + checkNotBuilt(); meterProviderCustomizers.add(customizer); return this; } @@ -200,6 +201,7 @@ public OpenTelemetryRumBuilder addMeterProviderCustomizer( public OpenTelemetryRumBuilder addLoggerProviderCustomizer( BiFunction customizer) { + checkNotBuilt(); loggerProviderCustomizers.add(customizer); return this; } @@ -211,6 +213,7 @@ public OpenTelemetryRumBuilder addLoggerProviderCustomizer( */ public OpenTelemetryRumBuilder addInstrumentation(AndroidInstrumentation instrumentation) { instrumentations.add(instrumentation); + checkNotBuilt(); return this; } @@ -225,6 +228,7 @@ public OpenTelemetryRumBuilder addInstrumentation(AndroidInstrumentation instrum public OpenTelemetryRumBuilder addPropagatorCustomizer( Function propagatorCustomizer) { requireNonNull(propagatorCustomizer, "propagatorCustomizer"); + checkNotBuilt(); Function existing = this.propagatorCustomizer; this.propagatorCustomizer = @@ -244,6 +248,7 @@ public OpenTelemetryRumBuilder addPropagatorCustomizer( public OpenTelemetryRumBuilder addSpanExporterCustomizer( Function spanExporterCustomizer) { requireNonNull(spanExporterCustomizer, "spanExporterCustomizer"); + checkNotBuilt(); Function existing = this.spanExporterCustomizer; this.spanExporterCustomizer = @@ -263,6 +268,7 @@ public OpenTelemetryRumBuilder addSpanExporterCustomizer( public OpenTelemetryRumBuilder addLogRecordExporterCustomizer( Function logRecordExporterCustomizer) { + checkNotBuilt(); Function existing = this.logRecordExporterCustomizer; this.logRecordExporterCustomizer = @@ -283,6 +289,10 @@ public OpenTelemetryRumBuilder addLogRecordExporterCustomizer( * @return A new {@link OpenTelemetryRum} instance. */ public OpenTelemetryRum build() { + if (isBuilt) { + throw new IllegalStateException("You cannot call build multiple times"); + } + isBuilt = true; InitializationEvents initializationEvents = InitializationEvents.get(); applyConfiguration(initializationEvents); @@ -373,11 +383,7 @@ private void initializeExporters( @NonNull private ServiceManager getServiceManager() { if (serviceManager == null) { - synchronized (lock) { - if (serviceManager == null) { - serviceManager = ServiceManagerImpl.Companion.create(application); - } - } + serviceManager = ServiceManagerImpl.Companion.create(application); } // This can never be null since we never write `null` to it return requireNonNull(serviceManager); @@ -385,9 +391,8 @@ private ServiceManager getServiceManager() { public OpenTelemetryRumBuilder setServiceManager(@NonNull ServiceManager serviceManager) { requireNonNull(serviceManager, "serviceManager cannot be null"); - synchronized (lock) { - this.serviceManager = serviceManager; - } + checkNotBuilt(); + this.serviceManager = serviceManager; return this; } @@ -398,9 +403,8 @@ public OpenTelemetryRumBuilder setServiceManager(@NonNull ServiceManager service public OpenTelemetryRumBuilder setExportScheduleHandler( @NonNull ExportScheduleHandler exportScheduleHandler) { requireNonNull(exportScheduleHandler, "exportScheduleHandler cannot be null"); - synchronized (lock) { - this.exportScheduleHandler = exportScheduleHandler; - } + checkNotBuilt(); + this.exportScheduleHandler = exportScheduleHandler; return this; } @@ -423,18 +427,13 @@ private StorageConfiguration createStorageConfiguration() throws IOException { private void scheduleDiskTelemetryReader(@Nullable SignalFromDiskExporter signalExporter) { if (exportScheduleHandler == null) { - synchronized (lock) { - if (exportScheduleHandler == null) { - ServiceManager serviceManager = getServiceManager(); - // TODO: Is it safe to get the work service yet here? If so, we can - // avoid all this lazy supplier stuff.... - Function0 getWorkService = - serviceManager::getPeriodicWorkService; - exportScheduleHandler = - new DefaultExportScheduleHandler( - new DefaultExportScheduler(getWorkService), getWorkService); - } - } + ServiceManager serviceManager = getServiceManager(); + // TODO: Is it safe to get the work service yet here? If so, we can + // avoid all this lazy supplier stuff.... + Function0 getWorkService = serviceManager::getPeriodicWorkService; + exportScheduleHandler = + new DefaultExportScheduleHandler( + new DefaultExportScheduler(getWorkService), getWorkService); } final ExportScheduleHandler exportScheduleHandler = @@ -461,6 +460,7 @@ private void scheduleDiskTelemetryReader(@Nullable SignalFromDiskExporter signal * @return this */ public OpenTelemetryRumBuilder addOtelSdkReadyListener(Consumer callback) { + checkNotBuilt(); otelSdkReadyListeners.add(callback); return this; } @@ -574,4 +574,10 @@ private ContextPropagators buildFinalPropagators() { TextMapPropagator defaultPropagator = buildDefaultPropagator(); return ContextPropagators.create(propagatorCustomizer.apply(defaultPropagator)); } + + private void checkNotBuilt() { + if (isBuilt) { + throw new IllegalStateException("This method cannot be called after calling build"); + } + } } diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt index d75a22b24..563c0fc11 100644 --- a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt +++ b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingLogExporter.kt @@ -17,19 +17,22 @@ import io.opentelemetry.sdk.logs.export.LogRecordExporter */ internal class BufferDelegatingLogExporter( maxBufferedLogs: Int = 5_000, -) : BufferedDelegatingExporter(bufferedSignals = maxBufferedLogs), - LogRecordExporter { - override fun exportToDelegate( - delegate: LogRecordExporter, - data: Collection, - ): CompletableResultCode = delegate.export(data) +) : LogRecordExporter { + private val delegatingExporter = + DelegatingExporter( + doExport = LogRecordExporter::export, + doFlush = LogRecordExporter::flush, + doShutdown = LogRecordExporter::shutdown, + maxBufferedData = maxBufferedLogs, + ) - override fun shutdownDelegate(delegate: LogRecordExporter): CompletableResultCode = delegate.shutdown() + fun setDelegate(delegate: LogRecordExporter) { + delegatingExporter.setDelegate(delegate) + } - override fun export(logs: Collection): CompletableResultCode = bufferOrDelegate(logs) + override fun export(logs: Collection): CompletableResultCode = delegatingExporter.export(logs) - override fun flush(): CompletableResultCode = - withDelegateOrNull { delegate -> - delegate?.flush() ?: CompletableResultCode.ofSuccess() - } + override fun flush(): CompletableResultCode = delegatingExporter.flush() + + override fun shutdown(): CompletableResultCode = delegatingExporter.shutdown() } diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt index f6683a375..88724a483 100644 --- a/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt +++ b/core/src/main/java/io/opentelemetry/android/export/BufferDelegatingSpanExporter.kt @@ -17,19 +17,22 @@ import io.opentelemetry.sdk.trace.export.SpanExporter */ internal class BufferDelegatingSpanExporter( maxBufferedSpans: Int = 5_000, -) : BufferedDelegatingExporter(bufferedSignals = maxBufferedSpans), - SpanExporter { - override fun exportToDelegate( - delegate: SpanExporter, - data: Collection, - ): CompletableResultCode = delegate.export(data) +) : SpanExporter { + private val delegatingExporter = + DelegatingExporter( + doExport = SpanExporter::export, + doFlush = SpanExporter::flush, + doShutdown = SpanExporter::shutdown, + maxBufferedData = maxBufferedSpans, + ) - override fun shutdownDelegate(delegate: SpanExporter): CompletableResultCode = delegate.shutdown() + fun setDelegate(delegate: SpanExporter) { + delegatingExporter.setDelegate(delegate) + } - override fun export(spans: Collection): CompletableResultCode = bufferOrDelegate(spans) + override fun export(spans: Collection): CompletableResultCode = delegatingExporter.export(spans) - override fun flush(): CompletableResultCode = - withDelegateOrNull { delegate -> - delegate?.flush() ?: CompletableResultCode.ofSuccess() - } + override fun flush(): CompletableResultCode = delegatingExporter.flush() + + override fun shutdown(): CompletableResultCode = delegatingExporter.shutdown() } diff --git a/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt b/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt deleted file mode 100644 index da8dc4337..000000000 --- a/core/src/main/java/io/opentelemetry/android/export/BufferedDelegatingExporter.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.android.export - -import io.opentelemetry.sdk.common.CompletableResultCode -import java.util.concurrent.atomic.AtomicBoolean - -/** - * An in-memory buffer delegating signal exporter that buffers signal in memory until a delegate is set. - * Once a delegate is set, the buffered signals are exported to the delegate. - * - * The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new signals. - */ -internal abstract class BufferedDelegatingExporter(private val bufferedSignals: Int = 5_000) { - @Volatile - private var delegate: D? = null - private val buffer = arrayListOf() - private val lock = Any() - private var isShutDown = AtomicBoolean(false) - - /** - * Sets the delegate for this exporter and flushes the buffer to the delegate. - * - * If the delegate has already been set, an [IllegalStateException] will be thrown. - * If this exporter has been shut down, the delegate will be shut down immediately. - * - * @param delegate the delegate to set - * - * @throws IllegalStateException if a delegate has already been set - */ - fun setDelegate(delegate: D) { - synchronized(lock) { - check(this.delegate == null) { "Exporter delegate has already been set." } - - flushToDelegate(delegate) - - this.delegate = delegate - - if (isShutDown.get()) { - shutdownDelegate(delegate) - } - } - } - - /** - * Buffers the given data if the delegate has not been set, otherwise exports the data to the delegate. - * - * @param data the data to buffer or export - */ - protected fun bufferOrDelegate(data: Collection): CompletableResultCode = - withDelegateOrNull { - if (it != null) { - exportToDelegate(it, data) - } else { - val amountToTake = bufferedSignals - buffer.size - buffer.addAll(data.take(amountToTake)) - CompletableResultCode.ofSuccess() - } - } - - /** - * Executes the given block with the delegate if it has been set, otherwise executes the block with a null delegate. - * - * @param block the block to execute - */ - protected fun withDelegateOrNull(block: (D?) -> R): R { - delegate?.let { return block(it) } - return synchronized(lock) { block(delegate) } - } - - open fun shutdown(): CompletableResultCode = bufferedShutDown() - - protected abstract fun exportToDelegate( - delegate: D, - data: Collection, - ): CompletableResultCode - - protected abstract fun shutdownDelegate(delegate: D): CompletableResultCode - - private fun flushToDelegate(delegate: D) { - exportToDelegate(delegate, buffer) - buffer.clear() - buffer.trimToSize() - } - - private fun bufferedShutDown(): CompletableResultCode { - isShutDown.set(true) - - return withDelegateOrNull { - if (it != null) { - shutdownDelegate(it) - } else { - CompletableResultCode.ofSuccess() - } - } - } -} diff --git a/core/src/main/java/io/opentelemetry/android/export/DelegatingExporter.kt b/core/src/main/java/io/opentelemetry/android/export/DelegatingExporter.kt new file mode 100644 index 000000000..cf7bece81 --- /dev/null +++ b/core/src/main/java/io/opentelemetry/android/export/DelegatingExporter.kt @@ -0,0 +1,161 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.android.export + +import io.opentelemetry.api.internal.GuardedBy +import io.opentelemetry.sdk.common.CompletableResultCode +import java.nio.BufferOverflowException + +/** + * An exporter that delegates calls to a delegate exporter. Any data exported before the delegate + * is set will be buffered in memory, up to the [maxBufferedData] number of entries. + * + * If the buffer is full, the exporter will drop any new signals. + * + * @param D the type of the delegate. + * @param T the type of the data. + * @param doExport a lambda that handles exporting to the delegate. + * @param doFlush a lambda that handles flushing the delegate. + * @param doShutdown a lambda that handles shutting down the delegate. + * @param maxBufferedData the maximum number of data to buffer in memory before dropping new data. + */ +internal class DelegatingExporter( + private val doExport: D.(data: Collection) -> CompletableResultCode, + private val doFlush: D.() -> CompletableResultCode, + private val doShutdown: D.() -> CompletableResultCode, + private val maxBufferedData: Int, +) { + private val lock = Any() + + @GuardedBy("lock") + private var delegate: D? = null + + @GuardedBy("lock") + private val buffer = arrayListOf() + + @GuardedBy("lock") + private var pendingExport: CompletableResultCode? = null + + @GuardedBy("lock") + private var pendingFlush: CompletableResultCode? = null + + @GuardedBy("lock") + private var pendingShutdown: CompletableResultCode? = null + + /** + * Sets the delegate for this exporter. + * + * Any buffered data will be written to the delegate followed by a flush and shut down if + * [flush] and/or [shutdown] has been called prior to this call. + * + * @param delegate the delegate to set + * @throws IllegalStateException if a delegate has already been set + */ + fun setDelegate(delegate: D) { + synchronized(lock) { + check(this.delegate == null) { "A delegate has already been set." } + this.delegate = delegate + } + // Exporting outside of the synchronized block could lead to an out of order export + // but export order shouldn't matter so this is fine. It's better to avoid calling external + // code from within the synchronized block. + pendingExport?.setTo(delegate.doExport(buffer)) + pendingFlush?.setTo(delegate.doFlush()) + pendingShutdown?.setTo(delegate.doShutdown()) + synchronized(lock) { + pendingExport = null + pendingFlush = null + pendingShutdown = null + } + clearBuffer() + } + + /** + * Exports the given data using the [doExport] lambda. If the delegate is not yet set an export + * will be scheduled and executed when the delegate is set. + * + * @param data the data to export. + * @return the result. If the delegate is set then the result from it will be returned, + * otherwise a result is returned which will complete when the delegate is set and the data + * has been exported. If all of the data was dropped then a failure is returned. + */ + fun export(data: Collection): CompletableResultCode = + withDelegate( + ifSet = { doExport(this, data) }, + ifNotSet = { + val amountToTake = maxBufferedData - buffer.size + buffer.addAll(data.take(amountToTake)) + // If all the data was dropped we return an exception + if (amountToTake == 0 && data.isNotEmpty()) { + CompletableResultCode.ofExceptionalFailure(BufferOverflowException()) + } else { + pendingExport + ?: CompletableResultCode().also { pendingExport = it } + } + }, + ) + + /** + * Flushes the exporter using the [doFlush] lambda. If the delegate is not yet set a flush will + * be scheduled and executed when the delegate is set. + * + * @return the result. If the delegate is set then the result from it will be returned, + * otherwise a result is returned which will complete when the delegate is set and has been + * flushed. + */ + fun flush(): CompletableResultCode = + withDelegate( + ifSet = doFlush, + ifNotSet = { pendingFlush ?: CompletableResultCode().also { pendingFlush = it } }, + ) + + /** + * Shuts down the exporter using the [doShutdown]. If the delegate is not yet set a shut down + * will be scheduled and executed when the delegate is set. + * + * @return the result. If the delegate is set then the result from it will be returned, + * otherwise a result is returned which will complete when the delegate is set and has been + * shut down. + */ + fun shutdown(): CompletableResultCode = + withDelegate( + ifSet = doShutdown, + ifNotSet = { pendingShutdown ?: CompletableResultCode().also { pendingShutdown = it } }, + ) + + private fun clearBuffer() { + buffer.clear() + buffer.trimToSize() + } + + private inline fun withDelegate( + ifSet: D.() -> R, + ifNotSet: () -> R, + ): R { + val delegate = + synchronized(lock) { + delegate ?: return ifNotSet() + } + // We interact with the delegate outside of the synchronized block to avoid any potential + // deadlocks due to reentrant calls + return delegate.ifSet() + } + + private fun CompletableResultCode.setTo(other: CompletableResultCode) { + other.whenComplete { + if (other.isSuccess) { + succeed() + } else { + val throwable = other.failureThrowable + if (throwable == null) { + fail() + } else { + failExceptionally(throwable) + } + } + } + } +} diff --git a/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt index d6f0b5460..f415f61a6 100644 --- a/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt +++ b/core/src/test/java/io/opentelemetry/android/export/BufferDelegatingSpanExporterTest.kt @@ -12,30 +12,52 @@ import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter import io.opentelemetry.sdk.trace.data.SpanData import org.junit.Test +import java.nio.BufferOverflowException class BufferDelegatingSpanExporterTest { + private val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() + private val delegate = spyk() + private val spanData = mockk() + @Test - fun `test setDelegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - val spanExporter = InMemorySpanExporter.create() + fun `test no data`() { + bufferDelegatingSpanExporter.setDelegate(delegate) + + verify(exactly = 0) { delegate.export(any()) } + verify(exactly = 0) { delegate.flush() } + verify(exactly = 0) { delegate.shutdown() } + } - val spanData = mockk() + @Test + fun `test setDelegate`() { bufferDelegatingSpanExporter.export(listOf(spanData)) - bufferDelegatingSpanExporter.setDelegate(spanExporter) + bufferDelegatingSpanExporter.setDelegate(delegate) - assertThat(spanExporter.finishedSpanItems) + assertThat(delegate.finishedSpanItems) .containsExactly(spanData) + verify(exactly = 0) { delegate.flush() } + verify(exactly = 0) { delegate.shutdown() } + } + + @Test + fun `the export result should complete when the delegate is set`() { + val result = bufferDelegatingSpanExporter.export(listOf(spanData)) + assertThat(result.isDone).isFalse() + bufferDelegatingSpanExporter.setDelegate(delegate) + assertThat(result.isSuccess).isTrue() } @Test fun `test buffer limit handling`() { val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter(10) val spanExporter = InMemorySpanExporter.create() + val initialResult = bufferDelegatingSpanExporter.export(List(10) { mockk() }) + assertThat(initialResult.isDone).isFalse() - repeat(11) { - val spanData = mockk() - bufferDelegatingSpanExporter.export(listOf(spanData)) - } + val overflowResult = bufferDelegatingSpanExporter.export(listOf(mockk())) + assertThat(overflowResult.isDone).isTrue() + assertThat(overflowResult.isSuccess).isFalse() + assertThat(overflowResult.failureThrowable).isInstanceOf(BufferOverflowException::class.java) bufferDelegatingSpanExporter.setDelegate(spanExporter) @@ -45,74 +67,51 @@ class BufferDelegatingSpanExporterTest { @Test fun `test flush with delegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - val delegate = spyk() - - val spanData = mockk() - bufferDelegatingSpanExporter.export(listOf(spanData)) - bufferDelegatingSpanExporter.setDelegate(delegate) - verify(exactly = 0) { delegate.flush() } + val result = bufferDelegatingSpanExporter.flush() + verify(exactly = 1) { delegate.flush() } + assertThat(result.isSuccess).isTrue() + } - bufferDelegatingSpanExporter.flush() + @Test + fun `test flush without delegate`() { + val result = bufferDelegatingSpanExporter.flush() + assertThat(result.isDone).isFalse() - verify { delegate.flush() } + bufferDelegatingSpanExporter.setDelegate(delegate) + verify(exactly = 1) { delegate.flush() } + assertThat(result.isSuccess).isTrue() } @Test fun `test export with delegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - val delegate = spyk() - - val spanData = mockk() bufferDelegatingSpanExporter.export(listOf(spanData)) - - verify(exactly = 0) { delegate.export(any()) } - bufferDelegatingSpanExporter.setDelegate(delegate) - verify(exactly = 1) { delegate.export(any()) } + assertThat(delegate.finishedSpanItems).containsExactly(spanData) val spanData2 = mockk() - bufferDelegatingSpanExporter.export(listOf(spanData2)) + val result = bufferDelegatingSpanExporter.export(listOf(spanData2)) - verify(exactly = 2) { delegate.export(any()) } + assertThat(delegate.finishedSpanItems).containsExactly(spanData, spanData2) + assertThat(result.isSuccess).isTrue() } @Test fun `test shutdown with delegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - val delegate = spyk() - bufferDelegatingSpanExporter.setDelegate(delegate) - - bufferDelegatingSpanExporter.shutdown() - - verify { delegate.shutdown() } - } - - @Test - fun `test flush without delegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - - val spanData = mockk() - bufferDelegatingSpanExporter.export(listOf(spanData)) - - val flushResult = bufferDelegatingSpanExporter.flush() - - assertThat(flushResult.isSuccess).isTrue() + val result = bufferDelegatingSpanExporter.shutdown() + verify(exactly = 1) { delegate.shutdown() } + assertThat(result.isSuccess).isTrue() } @Test fun `test shutdown without delegate`() { - val bufferDelegatingSpanExporter = BufferDelegatingSpanExporter() - - val spanData = mockk() - bufferDelegatingSpanExporter.export(listOf(spanData)) + val result = bufferDelegatingSpanExporter.shutdown() + assertThat(result.isDone).isFalse() - val shutdownResult = bufferDelegatingSpanExporter.shutdown() - - assertThat(shutdownResult.isSuccess).isTrue() + bufferDelegatingSpanExporter.setDelegate(delegate) + assertThat(result.isSuccess).isTrue() } }