From 6d32714b94939bc457522d1ab8afb84aab443230 Mon Sep 17 00:00:00 2001 From: Robert Elliot Date: Thu, 16 Jan 2025 14:26:05 +0000 Subject: [PATCH] Make access to ArrayDeque synchronized ArrayDeque specifies that: > Array deques ... are not thread-safe; in the absence of external > synchronization, they do not support concurrent access by multiple > threads. `marshalerPool` is concurrently added to by the OkHttp threadpool without synchronization, along with all threads that end spans (with synchronisation in `SimpleSpanProcessor.exportLock`, which is not used to synchronize with the OkHttp threadpool). Just making the ArrayQueue synchronous internally removes all need to worry about upstream locks. Fixes #7019 --- .../traces/SpanReusableDataMarshaler.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java index af69e811c89..5b23222a7ae 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/traces/SpanReusableDataMarshaler.java @@ -11,7 +11,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayDeque; import java.util.Collection; -import java.util.Deque; +import java.util.Queue; import java.util.function.BiFunction; /** @@ -20,7 +20,8 @@ */ public class SpanReusableDataMarshaler { - private final Deque marshalerPool = new ArrayDeque<>(); + private final SynchronizedQueue marshalerPool = + new SynchronizedQueue<>(new ArrayDeque<>()); private final MemoryMode memoryMode; private final BiFunction doExport; @@ -55,4 +56,21 @@ public CompletableResultCode export(Collection spans) { TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); return doExport.apply(request, spans.size()); } + + private static class SynchronizedQueue { + private final Queue queue; + + private SynchronizedQueue(Queue queue) { + this.queue = queue; + } + + @SuppressWarnings("UnusedReturnValue") + public synchronized boolean add(T t) { + return queue.add(t); + } + + public synchronized T poll() { + return queue.poll(); + } + } }