- * This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}. + * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * reporter}. * *
Here's a simple configuration, configured for json:
*
diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java
index 3b4a3e31..3c758391 100644
--- a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java
+++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -22,13 +22,13 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import zipkin2.Call;
-import zipkin2.CheckResult;
import zipkin2.Span;
-import zipkin2.codec.Encoding;
import zipkin2.codec.SpanBytesDecoder;
-import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.reporter.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
+import zipkin2.reporter.Call;
+import zipkin2.reporter.CheckResult;
+import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;
import static java.util.stream.Collectors.toList;
diff --git a/amqp-client/pom.xml b/amqp-client/pom.xml
index 866c4678..f76e154c 100644
--- a/amqp-client/pom.xml
+++ b/amqp-client/pom.xml
@@ -20,7 +20,7 @@
- * This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}. + * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * reporter}. * *
Here's a simple configuration, configured for json:
*
@@ -189,7 +189,7 @@ public final RabbitMQSender build() {
connectionFactory = builder.connectionFactory.clone();
}
- public final Builder toBuilder() {
+ public Builder toBuilder() {
return new Builder(this);
}
@@ -231,7 +231,7 @@ public final Builder toBuilder() {
}
}
- @Override public final String toString() {
+ @Override public String toString() {
return "RabbitMQSender{addresses=" + addresses + ", queue=" + queue + "}";
}
diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java
index ac68e74b..65462940 100644
--- a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java
+++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -25,11 +25,11 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import zipkin2.Call;
import zipkin2.Span;
-import zipkin2.codec.Encoding;
import zipkin2.codec.SpanBytesDecoder;
-import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.reporter.SpanBytesEncoder;
+import zipkin2.reporter.Call;
+import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;
import static java.util.stream.Collectors.toList;
diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQExtension.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQExtension.java
index 31adce89..4f561248 100644
--- a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQExtension.java
+++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQExtension.java
@@ -25,7 +25,7 @@
import org.testcontainers.containers.wait.strategy.Wait;
import static org.testcontainers.utility.DockerImageName.parse;
-import static zipkin2.Call.propagateIfFatal;
+import static zipkin2.reporter.Call.propagateIfFatal;
class RabbitMQExtension implements BeforeAllCallback, AfterAllCallback {
static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQExtension.class);
diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java
index 5ac41009..ec46540c 100644
--- a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java
+++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -14,8 +14,8 @@
package zipkin2.reporter.amqp;
import org.junit.jupiter.api.Test;
-import zipkin2.CheckResult;
import zipkin2.reporter.AsyncReporter;
+import zipkin2.reporter.CheckResult;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
@@ -27,20 +27,20 @@
class RabbitMQSenderTest {
// We can be pretty certain RabbitMQ isn't running on localhost port 80
RabbitMQSender sender = RabbitMQSender.newBuilder()
- .connectionTimeout(100).addresses("localhost:80").build();
+ .connectionTimeout(100).addresses("localhost:80").build();
@Test void checkFalseWhenRabbitMQIsDown() {
CheckResult check = sender.check();
assertThat(check.ok()).isFalse();
assertThat(check.error())
- .isInstanceOf(RuntimeException.class);
+ .isInstanceOf(RuntimeException.class);
}
@Test void illegalToSendWhenClosed() throws Exception {
sender.close();
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(ClosedSenderException.class);
+ .isInstanceOf(ClosedSenderException.class);
}
/**
@@ -51,7 +51,7 @@ class RabbitMQSenderTest {
*/
@Test void toStringContainsOnlySummaryInformation() {
assertThat(sender).hasToString(
- "RabbitMQSender{addresses=[localhost:80], queue=zipkin}"
+ "RabbitMQSender{addresses=[localhost:80], queue=zipkin}"
);
}
}
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 13903909..6769cfdf 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -20,7 +20,7 @@
Note: this timeout starts when the first unsent span is reported.
*/
public Builder messageTimeout(long timeout, TimeUnit unit) {
- if (timeout < 0) throw new IllegalArgumentException("messageTimeout < 0: " + timeout);
- if (unit == null) throw new NullPointerException("unit == null");
- this.messageTimeoutNanos = unit.toNanos(timeout);
+ this.delegate.messageTimeout(timeout, unit);
return this;
}
/** How long to block for in-flight spans to send out-of-process on close. Default 1 second */
public Builder closeTimeout(long timeout, TimeUnit unit) {
- if (timeout < 0) throw new IllegalArgumentException("closeTimeout < 0: " + timeout);
- if (unit == null) throw new NullPointerException("unit == null");
- this.closeTimeoutNanos = unit.toNanos(timeout);
+ this.delegate.closeTimeout(timeout, unit);
return this;
}
/** Maximum backlog of spans reported vs sent. Default 10000 */
public Builder queuedMaxSpans(int queuedMaxSpans) {
- this.queuedMaxSpans = queuedMaxSpans;
+ this.delegate.queuedMaxSpans(queuedMaxSpans);
return this;
}
/** Maximum backlog of span bytes reported vs sent. Default 1% of heap */
public Builder queuedMaxBytes(int queuedMaxBytes) {
- this.queuedMaxBytes = queuedMaxBytes;
+ this.delegate.queuedMaxBytes(queuedMaxBytes);
return this;
}
/** Builds an async reporter that encodes zipkin spans as they are reported. */
- public AsyncReporter build() {
- switch (sender.encoding()) {
+ public AsyncReporter Implementations should prepare a call such that there's little or no likelihood of late
+ * runtime exceptions. For example, if the call is to get a trace, the call to {@code listSpans}
+ * should propagate input errors vs delay them until a call to {@linkplain #execute()} or
+ * {@linkplain #enqueue(Callback)}.
+ *
+ * Ex.
+ * An instance of call cannot be invoked more than once, but you can {@linkplain #clone()} an
+ * instance if you need to replay the call. There is no relationship between a call and a number of
+ * remote requests. For example, an implementation that stores spans may make hundreds of remote
+ * requests, possibly retrying on your behalf.
+ *
+ * This type owes its design to {@code retrofit2.Call}, which is nearly the same, except limited
+ * to HTTP transports.
+ *
+ * @param This method intends to be used for chaining. That means "this" instance should be discarded
+ * in favor of the result of this method.
+ */
+ public final
+ * Cancelation propagates to the mapped call.
+ *
+ * This method intends to be used for chaining. That means "this" instance should be discarded
+ * in favor of the result of this method.
+ */
+ public final Here's an example of coercing 404 to empty:
+ * Eventhough this is a blocking call, implementations may honor calls to {@linkplain
+ * #cancel()} from a different thread.
+ *
+ * @return a success value. Null is unexpected, except when {@code V} is {@linkplain Void}.
+ */
+ public abstract V execute() throws IOException;
+
+ /**
+ * Invokes a request asynchronously, signaling the {@code callback} when complete. Invoking this
+ * more than once will result in an error. To repeat a call, make a copy with {@linkplain
+ * #clone()}.
+ */
+ public abstract void enqueue(Callback Calls can fail before being canceled, so true does always mean cancelation caused a call to
+ * fail. That said, successful cancellation does result in a failure.
+ */
+ // isCanceled exists while isExecuted does not because you do not need the latter to implement
+ // asynchronous bindings, such as rxjava2
+ public abstract boolean isCanceled();
+
+ /** Returns a copy of this object, so you can make an identical follow-up request. */
+ @Override public abstract Call This is a bridge to async libraries such as CompletableFuture complete, completeExceptionally.
+ *
+ * Implementations will call either {@link #onSuccess} or {@link #onError}, but not both.
+ *
+ * @since 3.0
+ */
+public interface Callback When this is called, {@link #onError} won't be.
+ */
+ void onSuccess(@Nullable V value);
+
+ /**
+ * Invoked when computation produces a possibly null value successfully.
+ *
+ * When this is called, {@link #onSuccess} won't be.
+ */
+ void onError(Throwable t);
+}
diff --git a/core/src/main/java/zipkin2/reporter/CheckResult.java b/core/src/main/java/zipkin2/reporter/CheckResult.java
new file mode 100644
index 00000000..7ef4f17e
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/CheckResult.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import zipkin2.reporter.internal.Nullable;
+
+/**
+ * Answers the question: Are operations on this component likely to succeed?
+ *
+ * Implementations should initialize the component if necessary. It should test a remote
+ * connection, or consult a trusted source to derive the result. They should use least resources
+ * possible to establish a meaningful result, and be safe to call many times, even concurrently.
+ *
+ * @see CheckResult#OK
+ * @since 3.0
+ */
+// @Immutable
+public final class CheckResult {
+ public static final CheckResult OK = new CheckResult(true, null);
+
+ public static CheckResult failed(Throwable error) {
+ return new CheckResult(false, error);
+ }
+
+ public boolean ok() {
+ return ok;
+ }
+
+ /** Present when not ok */
+ @Nullable
+ public Throwable error() {
+ return error;
+ }
+
+ final boolean ok;
+ final Throwable error;
+
+ CheckResult(boolean ok, @Nullable Throwable error) {
+ this.ok = ok;
+ this.error = error;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckResult{ok=" + ok + ", " + "error=" + error + "}";
+ }
+}
diff --git a/core/src/main/java/zipkin2/reporter/Component.java b/core/src/main/java/zipkin2/reporter/Component.java
new file mode 100644
index 00000000..1eac9481
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/Component.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Components are object graphs used to compose a zipkin service or client. For example, a storage
+ * component might return a query api.
+ *
+ * Components are lazy in regard to I/O. They can be injected directly to other components, to
+ * avoid crashing the application graph if a network service is unavailable.
+ *
+ * @since 3.0
+ */
+public abstract class Component implements Closeable {
+
+ /**
+ * Answers the question: Are operations on this component likely to succeed?
+ *
+ * Implementations should initialize the component if necessary. It should test a remote
+ * connection, or consult a trusted source to derive the result. They should use least resources
+ * possible to establish a meaningful result, and be safe to call many times, even concurrently.
+ *
+ * @see CheckResult#OK
+ */
+ public CheckResult check() {
+ return CheckResult.OK;
+ }
+
+ /**
+ * Closes any network resources created implicitly by the component.
+ *
+ * For example, if this created a connection, it would close it. If it was provided one, this
+ * would close any sessions, but leave the connection open.
+ */
+ @Override public void close() throws IOException {
+ }
+}
diff --git a/core/src/main/java/zipkin2/reporter/Encoding.java b/core/src/main/java/zipkin2/reporter/Encoding.java
new file mode 100644
index 00000000..eba1fe2a
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/Encoding.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.util.List;
+
+/**
+ * This includes the formats Zipkin server accepts.
+ *
+ * @since 3.0
+ */
+public enum Encoding {
+ JSON {
+ /** Encoding overhead of a single element is brackets */
+ @Override public int listSizeInBytes(int encodedSizeInBytes) {
+ return 2 + encodedSizeInBytes;
+ }
+
+ /** Encoding overhead is brackets and a comma for each span over 1 */
+ @Override public int listSizeInBytes(List The message's binary data includes a list header followed by N spans serialized in
+ * TBinaryProtocol
+ *
+ * @deprecated this format is deprecated in favor of json or proto3
+ */
+ @Deprecated
+ THRIFT {
+ /** Encoding overhead is thrift type plus 32-bit length prefix */
+ @Override public int listSizeInBytes(int encodedSizeInBytes) {
+ return 5 + encodedSizeInBytes;
+ }
+
+ /** Encoding overhead is thrift type plus 32-bit length prefix */
+ @Override public int listSizeInBytes(List See https://developers.google.com/protocol-buffers/docs/encoding#optional
+ */
+ PROTO3 {
+ /** Returns the input as it is assumed to be length-prefixed field from a protobuf message */
+ @Override public int listSizeInBytes(int encodedSizeInBytes) {
+ return encodedSizeInBytes;
+ }
+
+ /** Returns a concatenation of sizes */
+ @Override public int listSizeInBytes(List Unless mentioned otherwise, senders are not thread-safe. They were designed to be used by
- * {@link AsyncReporter}, which has a single reporting thread.
+ * Unless mentioned otherwise, senders are not thread-safe. They were designed to be used by a
+ * single reporting thread.
*
* Those looking to initialize eagerly should call {@link #check()}. This can be used to reduce
* latency on the first send operation, or to fail fast.
@@ -40,9 +35,11 @@
* scribe will likely write each span as a separate log line.
*
* This accepts a list of {@link BytesEncoder#encode(Object) encoded spans}, as opposed a list of
- * spans like {@link zipkin2.Span}. This allows senders to be re-usable as model shapes change. This
+ * spans like {@code zipkin2.Span}. This allows senders to be re-usable as model shapes change. This
* also allows them to use their most natural message type. For example, kafka would more naturally
* send messages as byte arrays.
+ *
+ * @since 3.0
*/
public abstract class Sender extends Component {
@@ -52,7 +49,7 @@ public abstract class Sender extends Component {
/**
* Maximum bytes sendable per message including overhead. This can be calculated using {@link
* #messageSizeInBytes(List)}
- *
+ *
* Defaults to 500KB as a conservative default. You may get better or reduced performance
* by changing this value based on, e.g., machine size or network bandwidth in your
* infrastructure. Finding a perfect value will require trying out different values in production,
@@ -77,7 +74,6 @@ public abstract class Sender extends Component {
* Always override this, which is only abstract as added after version 2.0
*
* @param encodedSizeInBytes the {@link BytesEncoder#sizeInBytes(Object) encoded size} of a span
- * @since 2.2
*/
public int messageSizeInBytes(int encodedSizeInBytes) {
return messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
@@ -90,12 +86,4 @@ public int messageSizeInBytes(int encodedSizeInBytes) {
* @throws IllegalStateException if {@link #close() close} was called.
*/
public abstract Call Spans are bundled into messages based on size in bytes or a timeout, whichever happens first.
+ *
+ * The thread that sends flushes spans to the {@linkplain Sender} does so in a synchronous loop.
+ * This means that even asynchronous transports will wait for an ack before sending a next message.
+ * We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or
+ * thousands of in-flight messages. The downside of this is that reporting is limited in speed to
+ * what a single thread can clear. When a thread cannot clear the backlog, new spans are dropped.
+ *
+ * @param Note: If you set {@link Builder#messageTimeout(long, TimeUnit) message timeout} to zero, you
+ * must call this externally as otherwise spans will never be sent.
+ *
+ * @throws IllegalStateException if closed
+ */
+ @Override public abstract void flush();
+
+ /** Shuts down the sender thread, and increments drop metrics if there were any unsent spans. */
+ @Override public abstract void close();
+
+ public abstract Builder toBuilder();
+
+ public static final class Builder {
+ final Sender sender;
+ ThreadFactory threadFactory = Executors.defaultThreadFactory();
+ ReporterMetrics metrics = ReporterMetrics.NOOP_METRICS;
+ int messageMaxBytes;
+ long messageTimeoutNanos = TimeUnit.SECONDS.toNanos(1);
+ long closeTimeoutNanos = TimeUnit.SECONDS.toNanos(1);
+ int queuedMaxSpans = 10000;
+ int queuedMaxBytes = onePercentOfMemory();
+
+ Builder(BoundedAsyncReporter> asyncReporter) {
+ this.sender = asyncReporter.sender;
+ this.threadFactory = asyncReporter.threadFactory;
+ this.metrics = asyncReporter.metrics;
+ this.messageMaxBytes = asyncReporter.messageMaxBytes;
+ this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos;
+ this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos;
+ this.queuedMaxSpans = asyncReporter.pending.maxSize;
+ this.queuedMaxBytes = asyncReporter.pending.maxBytes;
+ }
+
+ static int onePercentOfMemory() {
+ long result = (long) (Runtime.getRuntime().totalMemory() * 0.01);
+ // don't overflow in the rare case 1% of memory is larger than 2 GiB!
+ return (int) Math.max(Math.min(Integer.MAX_VALUE, result), Integer.MIN_VALUE);
+ }
+
+ Builder(Sender sender) {
+ if (sender == null) throw new NullPointerException("sender == null");
+ this.sender = sender;
+ this.messageMaxBytes = sender.messageMaxBytes();
+ }
+
+ /**
+ * Launches the flush thread when {@link #messageTimeoutNanos} is greater than zero.
+ */
+ public Builder threadFactory(ThreadFactory threadFactory) {
+ if (threadFactory == null) throw new NullPointerException("threadFactory == null");
+ this.threadFactory = threadFactory;
+ return this;
+ }
+
+ /**
+ * Aggregates and reports reporter metrics to a monitoring system. Defaults to no-op.
+ */
+ public Builder metrics(ReporterMetrics metrics) {
+ if (metrics == null) throw new NullPointerException("metrics == null");
+ this.metrics = metrics;
+ return this;
+ }
+
+ /**
+ * Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link
+ * Sender#messageMaxBytes()}.
+ */
+ public Builder messageMaxBytes(int messageMaxBytes) {
+ if (messageMaxBytes < 0) {
+ throw new IllegalArgumentException("messageMaxBytes < 0: " + messageMaxBytes);
+ }
+ this.messageMaxBytes = Math.min(messageMaxBytes, sender.messageMaxBytes());
+ return this;
+ }
+
+ /**
+ * Default 1 second. 0 implies spans are {@link #flush() flushed} externally.
+ *
+ * Instead of sending one message at a time, spans are bundled into messages, up to {@link
+ * Sender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an incomplete
+ * message.
+ *
+ * Note: this timeout starts when the first unsent span is reported.
+ */
+ public Builder messageTimeout(long timeout, TimeUnit unit) {
+ if (timeout < 0) throw new IllegalArgumentException("messageTimeout < 0: " + timeout);
+ if (unit == null) throw new NullPointerException("unit == null");
+ this.messageTimeoutNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /** How long to block for in-flight spans to send out-of-process on close. Default 1 second */
+ public Builder closeTimeout(long timeout, TimeUnit unit) {
+ if (timeout < 0) throw new IllegalArgumentException("closeTimeout < 0: " + timeout);
+ if (unit == null) throw new NullPointerException("unit == null");
+ this.closeTimeoutNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /** Maximum backlog of spans reported vs sent. Default 10000 */
+ public Builder queuedMaxSpans(int queuedMaxSpans) {
+ this.queuedMaxSpans = queuedMaxSpans;
+ return this;
+ }
+
+ /** Maximum backlog of span bytes reported vs sent. Default 1% of heap */
+ public Builder queuedMaxBytes(int queuedMaxBytes) {
+ this.queuedMaxBytes = queuedMaxBytes;
+ return this;
+ }
+
+ /** Builds an async reporter that encodes arbitrary spans as they are reported. */
+ public Inspired by {@code okhttp3.internal.Internal}.
- */
-public abstract class InternalReporter {
- public static InternalReporter instance;
-
- /**
- * Internal utility that allows a reporter to be reconfigured.
- *
- * Note:Call {@link AsyncReporter#close()} if you no longer need this instance, as
- * otherwise it can leak its reporting thread.
- *
- * @since 2.15
- */
- public abstract AsyncReporter.Builder toBuilder(AsyncReporter> asyncReporter);
-}
diff --git a/core/src/main/java/zipkin2/reporter/internal/Nullable.java b/core/src/main/java/zipkin2/reporter/internal/Nullable.java
new file mode 100644
index 00000000..f2d8ca63
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/internal/Nullable.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter.internal;
+
+/**
+ * Libraries such as Guice and AutoValue will process any annotation named {@code Nullable}. This
+ * avoids a dependency on one of the many jsr305 jars, causes problems in OSGi and Java 9 projects
+ * (where a project is also using jax-ws).
+ *
+ * @since 3.0
+ */
+@java.lang.annotation.Documented
+@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+public @interface Nullable {
+}
diff --git a/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java b/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java
index b9ee3e3a..347ac7b0 100644
--- a/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java
+++ b/core/src/test/java/zipkin2/reporter/BytesMessageEncoderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -23,49 +23,49 @@ class BytesMessageEncoderTest {
@Test void emptyList_json() {
List AsyncReporter build(BytesEncoder encoder) {
if (encoder == null) throw new NullPointerException("encoder == null");
-
- if (encoder.encoding() != sender.encoding()) {
- throw new IllegalArgumentException(String.format(
- "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
- }
-
- return new BoundedAsyncReporter(this, encoder);
+ return new AsyncReporter(delegate.build(new BytesEncoderAdapter(encoder)));
}
}
- static final class BoundedAsyncReporter extends AsyncReporter {
- static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
- final AtomicBoolean started, closed;
- final BytesEncoder encoder;
- final ByteBoundedQueue pending;
- final Sender sender;
- final int messageMaxBytes;
- final long messageTimeoutNanos, closeTimeoutNanos;
- final CountDownLatch close;
- final ReporterMetrics metrics;
- final ThreadFactory threadFactory;
-
- /** Tracks if we should log the first instance of an exception in flush(). */
- private boolean shouldWarnException = true;
-
- BoundedAsyncReporter(Builder builder, BytesEncoder encoder) {
- this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes);
- this.sender = builder.sender;
- this.messageMaxBytes = builder.messageMaxBytes;
- this.messageTimeoutNanos = builder.messageTimeoutNanos;
- this.closeTimeoutNanos = builder.closeTimeoutNanos;
- this.closed = new AtomicBoolean(false);
- // pretend we already started when config implies no thread that flushes the queue in a loop.
- this.started = new AtomicBoolean(builder.messageTimeoutNanos == 0);
- this.close = new CountDownLatch(builder.messageTimeoutNanos > 0 ? 1 : 0);
- this.metrics = builder.metrics;
- this.threadFactory = builder.threadFactory;
- this.encoder = encoder;
- }
-
- void startFlusherThread() {
- BufferNextMessage consumer =
- BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
- Thread flushThread = threadFactory.newThread(new Flusher(this, consumer));
- flushThread.setName("AsyncReporter{" + sender + "}");
- flushThread.setDaemon(true);
- flushThread.start();
- }
-
- @Override public void report(S next) {
- if (next == null) throw new NullPointerException("span == null");
- // Lazy start so that reporters never used don't spawn threads
- if (started.compareAndSet(false, true)) startFlusherThread();
- metrics.incrementSpans(1);
- int nextSizeInBytes = encoder.sizeInBytes(next);
- int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
- metrics.incrementSpanBytes(nextSizeInBytes);
- if (closed.get() ||
- // don't enqueue something larger than we can drain
- messageSizeOfNextSpan > messageMaxBytes ||
- !pending.offer(next, nextSizeInBytes)) {
- metrics.incrementSpansDropped(1);
- }
- }
-
- @Override public final void flush() {
- if (closed.get()) throw new ClosedSenderException();
- flush(BufferNextMessage.create(encoder.encoding(), messageMaxBytes, 0));
- }
-
- void flush(BufferNextMessage bundler) {
- pending.drainTo(bundler, bundler.remainingNanos());
-
- // record after flushing reduces the amount of gauge events vs on doing this on report
- metrics.updateQueuedSpans(pending.count);
- metrics.updateQueuedBytes(pending.sizeInBytes);
-
- // loop around if we are running, and the bundle isn't full
- // if we are closed, try to send what's pending
- if (!bundler.isReady() && !closed.get()) return;
-
- // Signal that we are about to send a message of a known size in bytes
- metrics.incrementMessages();
- metrics.incrementMessageBytes(bundler.sizeInBytes());
-
- // Create the next message. Since we are outside the lock shared with writers, we can encode
- final ArrayList() {
- @Override public boolean offer(S next, int nextSizeInBytes) {
- nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
- if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
- // if we overran the message size, remove the encoded message.
- nextMessage.remove(nextMessage.size() - 1);
- return false;
- }
- return true;
- }
- });
-
- try {
- sender.sendSpans(nextMessage).execute();
- } catch (Throwable t) {
- // In failure case, we increment messages and spans dropped.
- int count = nextMessage.size();
- Call.propagateIfFatal(t);
- metrics.incrementMessagesDropped(t);
- metrics.incrementSpansDropped(count);
-
- Level logLevel = FINE;
-
- if (shouldWarnException) {
- logger.log(WARNING, "Spans were dropped due to exceptions. "
- + "All subsequent errors will be logged at FINE level.");
- logLevel = WARNING;
- shouldWarnException = false;
- }
-
- if (logger.isLoggable(logLevel)) {
- logger.log(logLevel,
- format("Dropped %s spans due to %s(%s)", count, t.getClass().getSimpleName(),
- t.getMessage() == null ? "" : t.getMessage()), t);
- }
-
- // Raise in case the sender was closed out-of-band.
- if (t instanceof ClosedSenderException) throw (ClosedSenderException) t;
-
- // Old senders in other artifacts may be using this less precise way of indicating they've been closed
- // out-of-band.
- if (t instanceof IllegalStateException && t.getMessage().equals("closed"))
- throw (IllegalStateException) t;
- }
- }
-
- @Override public CheckResult check() {
- return sender.check();
- }
-
- @Override public void close() {
- if (!closed.compareAndSet(false, true)) return; // already closed
- started.set(true); // prevent anything from starting the thread after close!
- try {
- // wait for in-flight spans to send
- if (!close.await(closeTimeoutNanos, TimeUnit.NANOSECONDS)) {
- logger.warning("Timed out waiting for in-flight spans to send");
- }
- } catch (InterruptedException e) {
- logger.warning("Interrupted waiting for in-flight spans to send");
- Thread.currentThread().interrupt();
- }
- int count = pending.clear();
- if (count > 0) {
- metrics.incrementSpansDropped(count);
- logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
- }
- }
-
- Builder toBuilder() {
- return new Builder(this);
+ static final class BytesEncoderAdapterimplements BytesEncoder {
+ final BytesEncoder delegate;
+ BytesEncoderAdapter(BytesEncoder delegate) {
+ this.delegate = delegate;
}
- @Override public String toString() {
- return "AsyncReporter{" + sender + "}";
+ @Override public Encoding encoding() {
+ return delegate.encoding();
}
- }
-
- static final class Flusher implements Runnable {
- static final Logger logger = Logger.getLogger(Flusher.class.getName());
- final BoundedAsyncReporter result;
- final BufferNextMessage consumer;
-
- Flusher(BoundedAsyncReporter result, BufferNextMessage consumer) {
- this.result = result;
- this.consumer = consumer;
+ @Override public int sizeInBytes(S input) {
+ return delegate.sizeInBytes(input);
}
- @Override public void run() {
- try {
- while (!result.closed.get()) {
- result.flush(consumer);
- }
- } catch (RuntimeException e) {
- logger.log(Level.WARNING, "Unexpected error flushing spans", e);
- throw e;
- } catch (Error e) {
- logger.log(Level.WARNING, "Unexpected error flushing spans", e);
- throw e;
- } finally {
- int count = consumer.count();
- if (count > 0) {
- result.metrics.incrementSpansDropped(count);
- logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
- }
- result.close.countDown();
- }
+ @Override public byte[] encode(S input) {
+ return delegate.encode(input);
}
@Override public String toString() {
- return "AsyncReporter{" + result.sender + "}";
+ return delegate.toString();
}
}
-
}
diff --git a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java
index f5ac5c56..34bfae83 100644
--- a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java
+++ b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -14,7 +14,6 @@
package zipkin2.reporter;
import java.util.concurrent.CountDownLatch;
-import zipkin2.Callback;
/**
* Blocks until {@link Callback#onSuccess(Object)} or {@link Callback#onError(Throwable)}.
diff --git a/core/src/main/java/zipkin2/reporter/BytesEncoder.java b/core/src/main/java/zipkin2/reporter/BytesEncoder.java
new file mode 100644
index 00000000..7591f764
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/BytesEncoder.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+/**
+ * Utility for encoding one or more elements of a type into a byte array.
+ *
+ * @param type of the object to encode
+ * @since 3.0
+ */
+public interface BytesEncoder {
+ Encoding encoding();
+
+ int sizeInBytes(S input);
+
+ /** Serializes an object into its binary form. */
+ byte[] encode(S input);
+}
diff --git a/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java b/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java
index f39ba491..f7a2d325 100644
--- a/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java
+++ b/core/src/main/java/zipkin2/reporter/BytesMessageEncoder.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -14,8 +14,6 @@
package zipkin2.reporter;
import java.util.List;
-import zipkin2.codec.BytesEncoder;
-import zipkin2.codec.Encoding;
/**
* Senders like Kafka use byte[] message encoding. This provides helpers to concatenate spans into a
diff --git a/core/src/main/java/zipkin2/reporter/Call.java b/core/src/main/java/zipkin2/reporter/Call.java
new file mode 100644
index 00000000..a15a777c
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/Call.java
@@ -0,0 +1,429 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This captures a (usually remote) request and can be used once, either {@link #execute()
+ * synchronously} or {@link #enqueue(Callback) asynchronously}. At any time, from any thread, you
+ * can call {@linkplain #cancel()}, which might stop an in-flight request or prevent one from
+ * occurring.
+ *
+ * {@code
+ * // Any translation of an input request to remote parameters should happen here, and any related
+ * // errors should propagate here.
+ * Call
+ *
+ * >> listTraces = spanStore.listTraces(request);
+ * // When this executes, it should simply run the remote request.
+ * List trace = getTraceCall.execute();
+ * }
Call> emptyList() {
+ return Call.create(Collections.
emptyList());
+ }
+
+ public interface Mapper{@code
+ * getTracesV1Call = getTracesV2Call.map(traces -> v2TracesConverter);
+ * }
+ *
+ * {@code
+ * getTracesCall = getIdsCall.flatMap(ids -> getTraces(ids));
+ *
+ * // this would now invoke the chain
+ * traces = getTracesCall.enqueue(tracesCallback);
+ * }
+ * {@code
+ * call.handleError((error, callback) -> {
+ * if (error instanceof HttpException && ((HttpException) error).code == 404) {
+ * callback.onSuccess(Collections.emptyList());
+ * } else {
+ * callback.onError(error);
+ * }
+ * });
+ * }
+ */
+ public final CallType of span to report, usually {@link zipkin2.Span}, but extracted for reporting other java
+ * Type of span to report, usually {@link Span}, but extracted for reporting other java
* types like HTrace spans to zipkin, and to allow future Zipkin model types to be reported (ex.
* zipkin2.Span).
*/
diff --git a/core/src/main/java/zipkin2/reporter/Sender.java b/core/src/main/java/zipkin2/reporter/Sender.java
index ddf615e6..071bc65c 100644
--- a/core/src/main/java/zipkin2/reporter/Sender.java
+++ b/core/src/main/java/zipkin2/reporter/Sender.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -15,19 +15,14 @@
import java.util.Collections;
import java.util.List;
-import zipkin2.Call;
-import zipkin2.Component;
-import zipkin2.codec.BytesEncoder;
-import zipkin2.codec.Encoding;
-import zipkin2.reporter.internal.InternalReporter;
/**
* Sends a list of encoded spans to a transport such as http or Kafka. Usually, this involves
* encoding them into a message and enqueueing them for transport over http or Kafka. The typical
* end recipient is a zipkin collector.
*
- * type of the span, usually {@code zipkin2.Span}
+ * @since 3.0
+ */
+public abstract class AsyncReporter extends Component implements Reporter, Flushable {
+ public static Builder newBuilder(Sender sender) {
+ return new Builder(sender);
+ }
+
+ /**
+ * Calling this will flush any pending spans to the transport on the current thread.
+ *
+ * AsyncReporter build(BytesEncoder encoder) {
+ if (encoder == null) throw new NullPointerException("encoder == null");
+
+ if (encoder.encoding() != sender.encoding()) {
+ throw new IllegalArgumentException(String.format(
+ "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
+ }
+
+ return new BoundedAsyncReporter(this, encoder);
+ }
+ }
+
+ static final class BoundedAsyncReporter extends AsyncReporter {
+ static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
+ final AtomicBoolean started, closed;
+ final BytesEncoder encoder;
+ final ByteBoundedQueue pending;
+ final Sender sender;
+ final int messageMaxBytes;
+ final long messageTimeoutNanos, closeTimeoutNanos;
+ final CountDownLatch close;
+ final ReporterMetrics metrics;
+ final ThreadFactory threadFactory;
+
+ /** Tracks if we should log the first instance of an exception in flush(). */
+ private boolean shouldWarnException = true;
+
+ BoundedAsyncReporter(Builder builder, BytesEncoder encoder) {
+ this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes);
+ this.sender = builder.sender;
+ this.messageMaxBytes = builder.messageMaxBytes;
+ this.messageTimeoutNanos = builder.messageTimeoutNanos;
+ this.closeTimeoutNanos = builder.closeTimeoutNanos;
+ this.closed = new AtomicBoolean(false);
+ // pretend we already started when config implies no thread that flushes the queue in a loop.
+ this.started = new AtomicBoolean(builder.messageTimeoutNanos == 0);
+ this.close = new CountDownLatch(builder.messageTimeoutNanos > 0 ? 1 : 0);
+ this.metrics = builder.metrics;
+ this.threadFactory = builder.threadFactory;
+ this.encoder = encoder;
+ }
+
+ void startFlusherThread() {
+ BufferNextMessage consumer =
+ BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
+ Thread flushThread = threadFactory.newThread(new Flusher(this, consumer));
+ flushThread.setName("AsyncReporter{" + sender + "}");
+ flushThread.setDaemon(true);
+ flushThread.start();
+ }
+
+ @Override public void report(S next) {
+ if (next == null) throw new NullPointerException("span == null");
+ // Lazy start so that reporters never used don't spawn threads
+ if (started.compareAndSet(false, true)) startFlusherThread();
+ metrics.incrementSpans(1);
+ int nextSizeInBytes = encoder.sizeInBytes(next);
+ int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
+ metrics.incrementSpanBytes(nextSizeInBytes);
+ if (closed.get() ||
+ // don't enqueue something larger than we can drain
+ messageSizeOfNextSpan > messageMaxBytes ||
+ !pending.offer(next, nextSizeInBytes)) {
+ metrics.incrementSpansDropped(1);
+ }
+ }
+
+ @Override public void flush() {
+ if (closed.get()) throw new ClosedSenderException();
+ flush(BufferNextMessage.create(encoder.encoding(), messageMaxBytes, 0));
+ }
+
+ void flush(BufferNextMessage bundler) {
+ pending.drainTo(bundler, bundler.remainingNanos());
+
+ // record after flushing reduces the amount of gauge events vs on doing this on report
+ metrics.updateQueuedSpans(pending.count);
+ metrics.updateQueuedBytes(pending.sizeInBytes);
+
+ // loop around if we are running, and the bundle isn't full
+ // if we are closed, try to send what's pending
+ if (!bundler.isReady() && !closed.get()) return;
+
+ // Signal that we are about to send a message of a known size in bytes
+ metrics.incrementMessages();
+ metrics.incrementMessageBytes(bundler.sizeInBytes());
+
+ // Create the next message. Since we are outside the lock shared with writers, we can encode
+ final ArrayList() {
+ @Override public boolean offer(S next, int nextSizeInBytes) {
+ nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
+ if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
+ // if we overran the message size, remove the encoded message.
+ nextMessage.remove(nextMessage.size() - 1);
+ return false;
+ }
+ return true;
+ }
+ });
+
+ try {
+ sender.sendSpans(nextMessage).execute();
+ } catch (Throwable t) {
+ // In failure case, we increment messages and spans dropped.
+ int count = nextMessage.size();
+ Call.propagateIfFatal(t);
+ metrics.incrementMessagesDropped(t);
+ metrics.incrementSpansDropped(count);
+
+ Level logLevel = FINE;
+
+ if (shouldWarnException) {
+ logger.log(WARNING, "Spans were dropped due to exceptions. "
+ + "All subsequent errors will be logged at FINE level.");
+ logLevel = WARNING;
+ shouldWarnException = false;
+ }
+
+ if (logger.isLoggable(logLevel)) {
+ logger.log(logLevel,
+ format("Dropped %s spans due to %s(%s)", count, t.getClass().getSimpleName(),
+ t.getMessage() == null ? "" : t.getMessage()), t);
+ }
+
+ // Raise in case the sender was closed out-of-band.
+ if (t instanceof ClosedSenderException) throw (ClosedSenderException) t;
+
+ // Old senders in other artifacts may be using this less precise way of indicating they've been closed
+ // out-of-band.
+ if (t instanceof IllegalStateException && t.getMessage().equals("closed")) {
+ throw (IllegalStateException) t;
+ }
+ }
+ }
+
+ @Override public CheckResult check() {
+ return sender.check();
+ }
+
+ @Override public void close() {
+ if (!closed.compareAndSet(false, true)) return; // already closed
+ started.set(true); // prevent anything from starting the thread after close!
+ try {
+ // wait for in-flight spans to send
+ if (!close.await(closeTimeoutNanos, TimeUnit.NANOSECONDS)) {
+ logger.warning("Timed out waiting for in-flight spans to send");
+ }
+ } catch (InterruptedException e) {
+ logger.warning("Interrupted waiting for in-flight spans to send");
+ Thread.currentThread().interrupt();
+ }
+ int count = pending.clear();
+ if (count > 0) {
+ metrics.incrementSpansDropped(count);
+ logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
+ }
+ }
+
+ @Override public Builder toBuilder() {
+ return new Builder(this);
+ }
+
+ @Override public String toString() {
+ return "AsyncReporter{" + sender + "}";
+ }
+ }
+
+ static final class Flusher implements Runnable {
+ static final Logger logger = Logger.getLogger(Flusher.class.getName());
+
+ final BoundedAsyncReporter result;
+ final BufferNextMessage consumer;
+
+ Flusher(BoundedAsyncReporter result, BufferNextMessage consumer) {
+ this.result = result;
+ this.consumer = consumer;
+ }
+
+ @Override public void run() {
+ try {
+ while (!result.closed.get()) {
+ result.flush(consumer);
+ }
+ } catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Unexpected error flushing spans", e);
+ throw e;
+ } catch (Error e) {
+ logger.log(Level.WARNING, "Unexpected error flushing spans", e);
+ throw e;
+ } finally {
+ int count = consumer.count();
+ if (count > 0) {
+ result.metrics.incrementSpansDropped(count);
+ logger.warning("Dropped " + count + " spans due to AsyncReporter.close()");
+ }
+ result.close.countDown();
+ }
+ }
+
+ @Override public String toString() {
+ return "AsyncReporter{" + result.sender + "}";
+ }
+ }
+}
diff --git a/core/src/main/java/zipkin2/reporter/BufferNextMessage.java b/core/src/main/java/zipkin2/reporter/internal/BufferNextMessage.java
similarity index 98%
rename from core/src/main/java/zipkin2/reporter/BufferNextMessage.java
rename to core/src/main/java/zipkin2/reporter/internal/BufferNextMessage.java
index 4811f2b6..663b25fe 100644
--- a/core/src/main/java/zipkin2/reporter/BufferNextMessage.java
+++ b/core/src/main/java/zipkin2/reporter/internal/BufferNextMessage.java
@@ -11,11 +11,11 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package zipkin2.reporter;
+package zipkin2.reporter.internal;
import java.util.ArrayList;
import java.util.Iterator;
-import zipkin2.codec.Encoding;
+import zipkin2.reporter.Encoding;
/** Use of this type happens off the application's main thread. This type is not thread-safe */
abstract class BufferNextMessage implements SpanWithSizeConsumer {
diff --git a/core/src/main/java/zipkin2/reporter/ByteBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java
similarity index 97%
rename from core/src/main/java/zipkin2/reporter/ByteBoundedQueue.java
rename to core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java
index d92f69d7..660fc8aa 100644
--- a/core/src/main/java/zipkin2/reporter/ByteBoundedQueue.java
+++ b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package zipkin2.reporter;
+package zipkin2.reporter.internal;
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
@@ -129,4 +129,4 @@ int doDrain(SpanWithSizeConsumer consumer) {
interface SpanWithSizeConsumer {
/** Returns true if the element could be added or false if it could not due to its size. */
boolean offer(S next, int nextSizeInBytes);
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/zipkin2/reporter/internal/InternalReporter.java b/core/src/main/java/zipkin2/reporter/internal/InternalReporter.java
deleted file mode 100644
index 33bd8f7a..00000000
--- a/core/src/main/java/zipkin2/reporter/internal/InternalReporter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2016-2020 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package zipkin2.reporter.internal;
-
-import zipkin2.reporter.AsyncReporter;
-import zipkin2.reporter.Sender;
-
-/**
- * Escalate internal APIs in {@code zipkin2.reporter} so they can be used from outside packages. The
- * only implementation is in {@link Sender}.
- *
- * > onSpans;
- FakeSender(
- Encoding encoding,
- int messageMaxBytes,
- BytesMessageEncoder messageEncoder,
- BytesEncoder encoder,
- BytesDecoder decoder,
- Consumer
> onSpans
- ) {
+ FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
+ BytesEncoder encoder, BytesDecoder decoder, Consumer
> onSpans) {
this.encoding = encoding;
this.messageMaxBytes = messageMaxBytes;
this.messageEncoder = messageEncoder;
@@ -62,36 +54,18 @@ public static FakeSender create() {
}
FakeSender encoding(Encoding encoding) {
- return new FakeSender(
- encoding,
- messageMaxBytes,
- messageEncoder, // invalid but not needed, yet
- encoder, // invalid but not needed, yet
- decoder, // invalid but not needed, yet
- onSpans
- );
+ return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet
+ encoder, // invalid but not needed, yet
+ decoder, // invalid but not needed, yet
+ onSpans);
}
FakeSender onSpans(Consumer
> onSpans) {
- return new FakeSender(
- encoding,
- messageMaxBytes,
- messageEncoder,
- encoder,
- decoder,
- onSpans
- );
+ return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
}
FakeSender messageMaxBytes(int messageMaxBytes) {
- return new FakeSender(
- encoding,
- messageMaxBytes,
- messageEncoder,
- encoder,
- decoder,
- onSpans
- );
+ return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
}
@Override public Encoding encoding() {
@@ -115,17 +89,11 @@ FakeSender messageMaxBytes(int messageMaxBytes) {
@Override public Call
+ * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * reporter}. * *
Here's a simple configuration, configured for json:
*
@@ -121,7 +121,7 @@ public Builder topic(String topic) {
*
* @see ProducerConfig#BOOTSTRAP_SERVERS_CONFIG
*/
- public final Builder bootstrapServers(String bootstrapServers) {
+ public Builder bootstrapServers(String bootstrapServers) {
if (bootstrapServers == null) throw new NullPointerException("bootstrapServers == null");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return this;
@@ -177,7 +177,7 @@ public Builder overrides(Map NOTE: this blocks until the metadata server is available.
*/
- @Override public zipkin2.Call
+ * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * reporter}. * *
Here's a simple configuration, configured for json:
*
@@ -158,19 +157,19 @@ public Builder encoding(Encoding encoding) {
}
/** Sets the default connect timeout (in milliseconds) for new connections. Default 10000 */
- public final Builder connectTimeout(int connectTimeoutMillis) {
+ public Builder connectTimeout(int connectTimeoutMillis) {
clientBuilder.connectTimeout(connectTimeoutMillis, MILLISECONDS);
return this;
}
/** Sets the default read timeout (in milliseconds) for new connections. Default 10000 */
- public final Builder readTimeout(int readTimeoutMillis) {
+ public Builder readTimeout(int readTimeoutMillis) {
clientBuilder.readTimeout(readTimeoutMillis, MILLISECONDS);
return this;
}
/** Sets the default write timeout (in milliseconds) for new connections. Default 10000 */
- public final Builder writeTimeout(int writeTimeoutMillis) {
+ public Builder writeTimeout(int writeTimeoutMillis) {
clientBuilder.writeTimeout(writeTimeoutMillis, MILLISECONDS);
return this;
}
@@ -246,7 +245,7 @@ enum OkHttpSenderThreadFactory implements ThreadFactory {
* Creates a builder out of this object. Note: if the {@link Builder#clientBuilder()} was
* customized, you'll need to re-apply those customizations.
*/
- public final Builder toBuilder() {
+ public Builder toBuilder() {
return new Builder(this);
}
@@ -270,7 +269,7 @@ public final Builder toBuilder() {
volatile boolean closeCalled;
/** The returned call sends spans as a POST to {@link Builder#endpoint(String)}. */
- @Override public zipkin2.Call