From ec4e01c3f27a9746ec80d8d401249e90bfcd3c15 Mon Sep 17 00:00:00 2001 From: Crypt Keeper <64215+codefromthecrypt@users.noreply.github.com> Date: Wed, 14 Feb 2024 07:46:14 +0800 Subject: [PATCH] Adds HttpSender to aid proper handling of HttpEndpointSupplier (#253) Signed-off-by: Adrian Cole --- core/pom.xml | 7 + .../reporter/HttpEndpointSupplier.java | 8 +- .../java/zipkin2/reporter/HttpSender.java | 152 ++++++++++ .../zipkin2/reporter/internal}/Platform.java | 8 +- .../reporter/internal/SenderAdapter.java | 104 +++++++ .../java/zipkin2/reporter/FakeHttpSender.java | 67 ++++ .../java/zipkin2/reporter/HttpSenderTest.java | 187 ++++++++++++ .../reporter/internal}/PlatformTest.java | 3 +- .../zipkin2/reporter/okhttp3/HttpCall.java | 99 ------ .../okhttp3/InternalOkHttpSender.java | 189 ++++++++++++ .../reporter/okhttp3/OkHttpSender.java | 286 +----------------- .../okhttp3/RequestBodyMessageEncoder.java | 13 + .../reporter/okhttp3/ITOkHttpSender.java | 78 ----- ...est.java => InternalOkHttpSenderTest.java} | 6 +- .../reporter/okhttp3/OkHttpSenderTest.java | 128 +------- .../beans/OkHttpSenderFactoryBeanTest.java | 22 +- .../URLConnectionSenderFactoryBeanTest.java | 13 +- .../InternalURLConnectionSender.java | 117 +++++++ .../urlconnection/URLConnectionSender.java | 257 ++-------------- .../URLConnectionSenderTest.java | 124 -------- 20 files changed, 901 insertions(+), 967 deletions(-) create mode 100644 core/src/main/java/zipkin2/reporter/HttpSender.java rename {okhttp3/src/main/java/zipkin2/reporter/okhttp3 => core/src/main/java/zipkin2/reporter/internal}/Platform.java (92%) create mode 100644 core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java create mode 100644 core/src/test/java/zipkin2/reporter/FakeHttpSender.java create mode 100644 core/src/test/java/zipkin2/reporter/HttpSenderTest.java rename {okhttp3/src/test/java/zipkin2/reporter/okhttp3 => core/src/test/java/zipkin2/reporter/internal}/PlatformTest.java (93%) delete mode 100644 okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java create mode 100644 okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java rename okhttp3/src/test/java/zipkin2/reporter/okhttp3/{HttpCallTest.java => InternalOkHttpSenderTest.java} (92%) create mode 100644 urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java diff --git a/core/pom.xml b/core/pom.xml index ebbcc0db..02ae5153 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -41,6 +41,13 @@ zipkin ${zipkin.version} + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + diff --git a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java index 54d9fb3c..f18d46dd 100644 --- a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java +++ b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java @@ -20,9 +20,9 @@ * HTTP-based {@link BytesMessageSender senders} use this to resolve a potentially-pseudo endpoint * passed by configuration to a real endpoint. * - *

Usage Notes

+ *

Implementation Notes

* - *

{@link BytesMessageSender senders} should implement the following logic: + * {@linkplain HttpSender} is a convenience type that implements the following logic: *

* - *

Implementation Notes

- * *

Implement friendly {@code toString()} functions, that include the real endpoint or the one * passed to the {@linkplain Factory}. * @@ -46,6 +44,8 @@ * dependency injection, without limiting an HTTP framework that can handle groups, to a * single-endpoint supplier. * + * @see ConstantHttpEndpointSupplier + * @see HttpSender * @since 3.3 */ public interface HttpEndpointSupplier extends Closeable { diff --git a/core/src/main/java/zipkin2/reporter/HttpSender.java b/core/src/main/java/zipkin2/reporter/HttpSender.java new file mode 100644 index 00000000..329acf2d --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/HttpSender.java @@ -0,0 +1,152 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import zipkin2.reporter.HttpEndpointSupplier.Factory; + +import static zipkin2.reporter.Call.propagateIfFatal; + +/** + * Reports spans to Zipkin, using its POST endpoint. + * + *

Calls to {@linkplain #postSpans(Object, Object)} happen on the same async reporting thread, + * but {@linkplain #close()} might be called from any thread. + * + * @since 3.3 + */ +public abstract class HttpSender extends BytesMessageSender.Base { + final Logger logger; + final HttpEndpointSupplier endpointSupplier; + final U endpoint; + + /** close is typically called from a different thread */ + final AtomicBoolean closeCalled = new AtomicBoolean(); + + /** + * Called each invocation of {@linkplain #postSpans(Object, Object)}, unless the + * {@linkplain HttpEndpointSupplier} is a {@linkplain ConstantHttpEndpointSupplier}, + * Implementations should perform any validation needed here. + * + * @since 3.3 + */ + protected abstract U newEndpoint(String endpoint); + + /** + * Creates a new POST body from the encoded spans. + * + *

Below is the simplest implementation, when {@linkplain HttpSender#} is a byte array. + *

{@code
+   * @Override protected byte[] newBody(List encodedSpans) {
+   *   return encoding.encode(encodedSpans);
+   * }
+   * }
+ * + *

If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}. + * + * @since 3.3 + */ + protected abstract B newBody(List encodedSpans); + + /** + * Implement to POST spans to the given endpoint. + * + *

If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}. + * + * @since 3.3 + */ + protected abstract void postSpans(U endpoint, B body) throws IOException; + + /** + * Override to close any resources. + * + * @since 3.3 + */ + protected void doClose() { + } + + protected HttpSender(Encoding encoding, Factory endpointSupplierFactory, String endpoint) { + this(Logger.getLogger(HttpSender.class.getName()), encoding, endpointSupplierFactory, endpoint); + } + + HttpSender(Logger logger, Encoding encoding, Factory endpointSupplierFactory, String endpoint) { + super(encoding); + this.logger = logger; + if (endpointSupplierFactory == null) { + throw new NullPointerException("endpointSupplierFactory == null"); + } + if (endpoint == null) throw new NullPointerException("endpoint == null"); + + HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint); + if (endpointSupplier == null) { + throw new NullPointerException("endpointSupplierFactory.create() returned null"); + } + if (endpointSupplier instanceof ConstantHttpEndpointSupplier) { + this.endpoint = nextEndpoint(endpointSupplier); + closeQuietly(endpointSupplier); + this.endpointSupplier = null; + } else { + this.endpoint = null; + this.endpointSupplier = endpointSupplier; + } + } + + final U nextEndpoint(HttpEndpointSupplier endpointSupplier) { + String endpoint = endpointSupplier.get(); // eagerly resolve the endpoint + if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null"); + return newEndpoint(endpoint); + } + + /** Defaults to the most common max message size: 512KB. */ + @Override public int messageMaxBytes() { + return 512 * 1024; + } + + /** Sends spans as an HTTP POST request. */ + @Override public final void send(List encodedSpans) throws IOException { + if (closeCalled.get()) throw new ClosedSenderException(); + U endpoint = this.endpoint; + if (endpoint == null) endpoint = nextEndpoint(endpointSupplier); + B body = newBody(encodedSpans); + if (body == null) throw new NullPointerException("newBody(encodedSpans) returned null"); + postSpans(endpoint, newBody(encodedSpans)); + } + + @Override public final void close() { + if (!closeCalled.compareAndSet(false, true)) return; // already closed + closeQuietly(endpointSupplier); + doClose(); + } + + final void closeQuietly(HttpEndpointSupplier endpointSupplier) { + if (endpointSupplier == null) return; + try { + endpointSupplier.close(); + } catch (Throwable t) { + propagateIfFatal(t); + logger.fine("ignoring error closing endpoint supplier: " + t.getMessage()); + } + } + + @Override public String toString() { + String name = getClass().getSimpleName(); + if (endpoint != null) { + return name + "{" + endpoint + "}"; + } + return name + "{" + endpointSupplier + "}"; + } +} diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java b/core/src/main/java/zipkin2/reporter/internal/Platform.java similarity index 92% rename from okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java rename to core/src/main/java/zipkin2/reporter/internal/Platform.java index 1b2b5151..ead051ab 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java +++ b/core/src/main/java/zipkin2/reporter/internal/Platform.java @@ -11,23 +11,23 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.okhttp3; +package zipkin2.reporter.internal; import java.io.IOException; import java.lang.reflect.Constructor; /** Taken from {@code zipkin2.reporter.internal.Platform} to avoid needing to shade over a single method. */ -abstract class Platform { +public abstract class Platform { private static final Platform PLATFORM = findPlatform(); Platform() { } - RuntimeException uncheckedIOException(IOException e) { + public RuntimeException uncheckedIOException(IOException e) { return new RuntimeException(e); } - static Platform get() { + public static Platform get() { return PLATFORM; } diff --git a/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java b/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java new file mode 100644 index 00000000..bdab99e0 --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java @@ -0,0 +1,104 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.internal; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.Call; +import zipkin2.reporter.Callback; +import zipkin2.reporter.CheckResult; +import zipkin2.reporter.Encoding; +import zipkin2.reporter.Sender; + +/** + * Reduces burden on types that need to extend {@linkplain Sender}. + */ +public abstract class SenderAdapter extends Sender { + protected abstract BytesMessageSender delegate(); + + @Override public final int messageSizeInBytes(List encodedSpans) { + return delegate().messageSizeInBytes(encodedSpans); + } + + @Override public final int messageSizeInBytes(int encodedSizeInBytes) { + return delegate().messageSizeInBytes(encodedSizeInBytes); + } + + @Override public final Encoding encoding() { + return delegate().encoding(); + } + + @Override public final int messageMaxBytes() { + return delegate().messageMaxBytes(); + } + + @Override @Deprecated public final Call sendSpans(List encodedSpans) { + return new SendSpans(encodedSpans); + } + + @Override public final void send(List encodedSpans) throws IOException { + delegate().send(encodedSpans); + } + + @Override @Deprecated public final CheckResult check() { + try { + delegate().send(Collections.emptyList()); + return CheckResult.OK; + } catch (Throwable e) { + Call.propagateIfFatal(e); + return CheckResult.failed(e); + } + } + + @Override public final void close() { + try { + delegate().close(); + } catch (IOException e) { + throw Platform.get().uncheckedIOException(e); + } + } + + @Override public final String toString() { + return delegate().toString(); + } + + final class SendSpans extends Call.Base { + private final List encodedSpans; + + SendSpans(List encodedSpans) { + this.encodedSpans = encodedSpans; + } + + @Override protected Void doExecute() throws IOException { + send(encodedSpans); + return null; + } + + @Override protected void doEnqueue(Callback callback) { + try { + send(encodedSpans); + callback.onSuccess(null); + } catch (Throwable t) { + Call.propagateIfFatal(t); + callback.onError(t); + } + } + + @Override public Call clone() { + return new SendSpans(encodedSpans); + } + } +} diff --git a/core/src/test/java/zipkin2/reporter/FakeHttpSender.java b/core/src/test/java/zipkin2/reporter/FakeHttpSender.java new file mode 100644 index 00000000..0f50275f --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/FakeHttpSender.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.net.MalformedURLException; +import java.net.URI; +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Logger; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.reporter.HttpEndpointSupplier.Factory; + +class FakeHttpSender extends HttpSender { + final String originalEndpoint; + final Consumer> onSpans; + + FakeHttpSender(Logger logger, String endpoint, Consumer> onSpans) { + this(logger, endpoint, ConstantHttpEndpointSupplier.FACTORY, onSpans); + } + + FakeHttpSender(Logger logger, String endpoint, Factory endpointSupplierFactory, + Consumer> onSpans) { + super(logger, Encoding.JSON, endpointSupplierFactory, endpoint); + this.originalEndpoint = endpoint; + this.onSpans = onSpans; + } + + FakeHttpSender withHttpEndpointSupplierFactory(Factory endpointSupplierFactory) { + return new FakeHttpSender(logger, originalEndpoint, endpointSupplierFactory, onSpans); + } + + /** close is typically called from a different thread */ + volatile boolean closeCalled; + + @Override protected String newEndpoint(String endpoint) { + try { + return URI.create(endpoint).toURL().toString(); // validate + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override protected byte[] newBody(List encodedSpans) { + return encoding.encode(encodedSpans); + } + + @Override protected void postSpans(String endpoint, byte[] body) { + List decoded = SpanBytesDecoder.JSON_V2.decodeList(body); + onSpans.accept(decoded); + } + + @Override protected void doClose() { + closeCalled = true; + } +} diff --git a/core/src/test/java/zipkin2/reporter/HttpSenderTest.java b/core/src/test/java/zipkin2/reporter/HttpSenderTest.java new file mode 100644 index 00000000..8d0df09a --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/HttpSenderTest.java @@ -0,0 +1,187 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.logging.Logger; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; + +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static zipkin2.TestObjects.CLIENT_SPAN; + +@ExtendWith(MockitoExtension.class) +class HttpSenderTest { + + @Mock Logger logger; + @Mock Consumer> onSpans; + FakeHttpSender sender; + + @BeforeEach + void newSender() { + sender = new FakeHttpSender(logger, "http://localhost:19092", onSpans); + } + + @Test void illegalToSendWhenClosed() { + sender.close(); + + assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(ClosedSenderException.class); + } + + @Test void endpointSupplierFactory_defaultsToConstant() { + // The default connection supplier returns a constant URL + assertThat(sender.endpoint) + .isEqualTo("http://localhost:19092"); + } + + @Test void endpointSupplierFactory_constant() { + sender.close(); + sender = sender.withHttpEndpointSupplierFactory( + e -> ConstantHttpEndpointSupplier.create("http://localhost:29092") + ); + + // The connection supplier has a constant URL + assertThat(sender.endpoint) + .isEqualTo("http://localhost:29092"); + } + + @Test void endpointSupplierFactory_constantBad() { + HttpEndpointSupplier.Factory badFactory = + e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans"); + + assertThatThrownBy(() -> sender.withHttpEndpointSupplierFactory(badFactory)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("unknown protocol: htp"); + } + + @Test void endpointSupplierFactory_dynamic() { + AtomicInteger closeCalled = new AtomicInteger(); + HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { + @Override public String get() { + throw new UnsupportedOperationException(); + } + + @Override public void close() { + closeCalled.incrementAndGet(); + } + }; + + sender.close(); + sender = sender.withHttpEndpointSupplierFactory(e -> dynamicEndpointSupplier); + + // The connection supplier is deferred until send + assertThat(sender.endpointSupplier) + .isEqualTo(dynamicEndpointSupplier); + + assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(UnsupportedOperationException.class); + + // Ensure that closing the sender closes the endpoint supplier + sender.close(); + sender.close(); // check only closed once + assertThat(closeCalled).hasValue(1); + } + + @Test void endpointSupplierFactory_ignoresCloseFailure() { + AtomicInteger closeCalled = new AtomicInteger(); + HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { + @Override public String get() { + throw new UnsupportedOperationException(); + } + + @Override public void close() throws IOException { + closeCalled.incrementAndGet(); + throw new IOException("unexpected"); + } + }; + + sender.close(); + sender = sender.withHttpEndpointSupplierFactory(e -> dynamicEndpointSupplier); + + // Ensure that an exception closing the endpoint supplier doesn't propagate. + sender.close(); + assertThat(closeCalled).hasValue(1); + } + + @Test void endpointSupplierFactory_dynamicNull() { + sender.close(); + sender = sender.withHttpEndpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { + @Override public String get() { + return null; + } + }); + + assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(NullPointerException.class) + .hasMessage("endpointSupplier.get() returned null"); + } + + @Test void endpointSupplierFactory_dynamicBad() { + sender.close(); + sender = sender.withHttpEndpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { + @Override public String get() { + return "htp://localhost:9411/api/v1/spans"; + } + }); + + assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("unknown protocol: htp"); + } + + /** + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. + */ + @Test void toStringContainsOnlySummaryInformation() { + assertThat(sender).hasToString("FakeHttpSender{http://localhost:19092}"); + } + + static void sendSpans(BytesMessageSender sender, Span... spans) throws IOException { + SpanBytesEncoder bytesEncoder; + switch (sender.encoding()) { + case JSON: + bytesEncoder = SpanBytesEncoder.JSON_V2; + break; + case THRIFT: + bytesEncoder = SpanBytesEncoder.THRIFT; + break; + case PROTO3: + bytesEncoder = SpanBytesEncoder.PROTO3; + break; + default: + throw new UnsupportedOperationException("encoding: " + sender.encoding()); + } + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + } + + static abstract class BaseHttpEndpointSupplier implements HttpEndpointSupplier { + @Override public void close() { + } + } +} diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java b/core/src/test/java/zipkin2/reporter/internal/PlatformTest.java similarity index 93% rename from okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java rename to core/src/test/java/zipkin2/reporter/internal/PlatformTest.java index 978b7d86..d8a59fb4 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/PlatformTest.java @@ -11,11 +11,12 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.okhttp3; +package zipkin2.reporter.internal; import java.io.IOException; import java.io.UncheckedIOException; import org.junit.jupiter.api.Test; +import zipkin2.reporter.internal.Platform; import static org.assertj.core.api.Assertions.assertThat; diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java deleted file mode 100644 index 818574ef..00000000 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.okhttp3; - -import java.io.IOException; -import okhttp3.Response; -import okhttp3.ResponseBody; -import okio.BufferedSource; -import okio.GzipSource; -import okio.Okio; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; - -final class HttpCall extends Call { - - final okhttp3.Call call; - - HttpCall(okhttp3.Call call) { - this.call = call; - } - - @Override public Void execute() throws IOException { - parseResponse(call.execute()); - return null; - } - - @Override public void enqueue(Callback delegate) { - call.enqueue(new V2CallbackAdapter<>(delegate)); - } - - @Override public void cancel() { - call.cancel(); - } - - @Override public boolean isCanceled() { - return call.isCanceled(); - } - - @Override public HttpCall clone() { - return new HttpCall(call.clone()); - } - - static class V2CallbackAdapter implements okhttp3.Callback { - final Callback delegate; - - V2CallbackAdapter(Callback delegate) { - this.delegate = delegate; - } - - @Override public void onFailure(okhttp3.Call call, IOException e) { - delegate.onError(e); - } - - /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ - @Override public void onResponse(okhttp3.Call call, Response response) { - try { - parseResponse(response); - delegate.onSuccess(null); - } catch (Throwable e) { - propagateIfFatal(e); - delegate.onError(e); - } - } - } - - static void parseResponse(Response response) throws IOException { - ResponseBody responseBody = response.body(); - if (responseBody == null) { - if (response.isSuccessful()) { - return; - } else { - throw new RuntimeException("response failed: " + response); - } - } - try { - BufferedSource content = responseBody.source(); - if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) { - content = Okio.buffer(new GzipSource(responseBody.source())); - } - if (!response.isSuccessful()) { - throw new RuntimeException( - "response for " + response.request().tag() + " failed: " + content.readUtf8()); - } - } finally { - responseBody.close(); - } - } -} diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java new file mode 100644 index 00000000..0ac74daa --- /dev/null +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java @@ -0,0 +1,189 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.okhttp3; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import okhttp3.Call; +import okhttp3.Dispatcher; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.Buffer; +import okio.BufferedSink; +import okio.BufferedSource; +import okio.GzipSink; +import okio.GzipSource; +import okio.Okio; +import zipkin2.reporter.Component; +import zipkin2.reporter.Encoding; +import zipkin2.reporter.HttpSender; + +/** + * We have to nest this class until v4 when {@linkplain OkHttpSender} no longer needs to extend + * {@linkplain Component}. + */ +final class InternalOkHttpSender extends HttpSender { + final OkHttpClient client; + final RequestBodyMessageEncoder encoder; + final Encoding encoding; + final int messageMaxBytes, maxRequests; + final boolean compressionEnabled; + + InternalOkHttpSender(OkHttpSender.Builder builder) { + super(builder.encoding, builder.endpointSupplierFactory, builder.endpoint); + encoding = builder.encoding; + encoder = RequestBodyMessageEncoder.forEncoding(encoding); + maxRequests = builder.maxRequests; + messageMaxBytes = builder.messageMaxBytes; + compressionEnabled = builder.compressionEnabled; + Dispatcher dispatcher = newDispatcher(maxRequests); + + // doing the extra "build" here prevents us from leaking our dispatcher to the builder + client = builder.clientBuilder().build().newBuilder().dispatcher(dispatcher).build(); + } + + @Override public int messageMaxBytes() { + return messageMaxBytes; + } + + @Override protected HttpUrl newEndpoint(String endpoint) { + HttpUrl parsed = HttpUrl.parse(endpoint); + if (parsed == null) throw new IllegalArgumentException("invalid POST url: " + endpoint); + return parsed; + } + + @Override protected RequestBody newBody(List encodedSpans) { + return encoder.encode(encodedSpans); + } + + @Override protected void postSpans(HttpUrl endpoint, RequestBody body) throws IOException { + Request request = newRequest(endpoint, body); + Call call = client.newCall(request); + parseResponse(call.execute()); + } + + Request newRequest(HttpUrl endpoint, RequestBody body) + throws IOException { + Request.Builder request = new Request.Builder().url(endpoint); + // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented. + // This prevents that in proxies, such as Envoy, that understand B3 single format, + request.addHeader("b3", "0"); + if (compressionEnabled) { + request.addHeader("Content-Encoding", "gzip"); + Buffer gzipped = new Buffer(); + BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipped)); + body.writeTo(gzipSink); + gzipSink.close(); + body = new BufferRequestBody(body.contentType(), gzipped); + } + request.post(body); + return request.build(); + } + + static final class BufferRequestBody extends RequestBody { + final MediaType contentType; + final Buffer body; + + BufferRequestBody(MediaType contentType, Buffer body) { + this.contentType = contentType; + this.body = body; + } + + @Override public long contentLength() { + return body.size(); + } + + @Override public MediaType contentType() { + return contentType; + } + + @Override public void writeTo(BufferedSink sink) throws IOException { + sink.write(body, body.size()); + } + } + + static void parseResponse(Response response) throws IOException { + ResponseBody responseBody = response.body(); + if (responseBody == null) { + if (response.isSuccessful()) { + return; + } else { + throw new RuntimeException("response failed: " + response); + } + } + try { + BufferedSource content = responseBody.source(); + if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) { + content = Okio.buffer(new GzipSource(responseBody.source())); + } + if (!response.isSuccessful()) { + throw new RuntimeException( + "response for " + response.request().tag() + " failed: " + content.readUtf8()); + } + } finally { + responseBody.close(); + } + } + + /** Waits up to a second for in-flight requests to finish before cancelling them */ + @Override protected void doClose() { + Dispatcher dispatcher = client.dispatcher(); + dispatcher.executorService().shutdown(); + try { + if (!dispatcher.executorService().awaitTermination(1, TimeUnit.SECONDS)) { + dispatcher.cancelAll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + static Dispatcher newDispatcher(int maxRequests) { + // bound the executor so that we get consistent performance + ThreadPoolExecutor dispatchExecutor = + new ThreadPoolExecutor(0, maxRequests, 60, TimeUnit.SECONDS, + // Using a synchronous queue means messages will send immediately until we hit max + // in-flight requests. Once max requests are hit, send will block the caller, which is + // the AsyncReporter flush thread. This is ok, as the AsyncReporter has a buffer of + // unsent spans for this purpose. + new SynchronousQueue(), + OkHttpSenderThreadFactory.INSTANCE); + + Dispatcher dispatcher = new Dispatcher(dispatchExecutor); + dispatcher.setMaxRequests(maxRequests); + dispatcher.setMaxRequestsPerHost(maxRequests); + return dispatcher; + } + + enum OkHttpSenderThreadFactory implements ThreadFactory { + INSTANCE; + + @Override public Thread newThread(Runnable r) { + return new Thread(r, "OkHttpSender Dispatcher"); + } + } + + @Override public String toString() { + return super.toString().replace("Internal", ""); + } +} diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java index a888df1f..09c8757e 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java @@ -13,39 +13,16 @@ */ package zipkin2.reporter.okhttp3; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; -import okhttp3.Call; -import okhttp3.Dispatcher; import okhttp3.HttpUrl; -import okhttp3.MediaType; import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.Buffer; -import okio.BufferedSink; -import okio.GzipSink; -import okio.Okio; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageSender; -import zipkin2.reporter.CheckResult; -import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.ConstantHttpEndpointSupplier; import zipkin2.reporter.Encoding; import zipkin2.reporter.HttpEndpointSupplier; -import zipkin2.reporter.Sender; +import zipkin2.reporter.internal.SenderAdapter; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static zipkin2.reporter.Call.propagateIfFatal; -import static zipkin2.reporter.okhttp3.HttpCall.parseResponse; /** * Reports spans to Zipkin, using its POST endpoint. @@ -86,9 +63,7 @@ * *

This sender is thread-safe. */ -public final class OkHttpSender extends Sender { - static final Logger logger = Logger.getLogger(OkHttpSender.class.getName()); - +public final class OkHttpSender extends SenderAdapter { /** Creates a sender that posts {@link Encoding#JSON} messages. */ public static OkHttpSender create(String endpoint) { return newBuilder().endpoint(endpoint).build(); @@ -112,13 +87,13 @@ public static final class Builder { } Builder(OkHttpSender sender) { - clientBuilder = sender.client.newBuilder(); + clientBuilder = sender.delegate.client.newBuilder(); endpointSupplierFactory = sender.endpointSupplierFactory; endpoint = sender.endpoint; - maxRequests = sender.client.dispatcher().getMaxRequests(); - compressionEnabled = sender.compressionEnabled; - encoding = sender.encoding; - messageMaxBytes = sender.messageMaxBytes; + maxRequests = sender.delegate.client.dispatcher().getMaxRequests(); + compressionEnabled = sender.delegate.compressionEnabled; + encoding = sender.delegate.encoding; + messageMaxBytes = sender.delegate.messageMaxBytes; } /** @@ -202,139 +177,19 @@ public OkHttpClient.Builder clientBuilder() { } public OkHttpSender build() { - String endpoint = this.endpoint; if (endpoint == null) throw new NullPointerException("endpoint == null"); - - HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint); - if (endpointSupplier == null) { - throw new NullPointerException("endpointSupplierFactory.create() returned null"); - } - if (endpointSupplier instanceof ConstantHttpEndpointSupplier) { - endpoint = endpointSupplier.get(); // eagerly resolve the endpoint - return new OkHttpSender(this, new ConstantHttpUrlSupplier(endpoint)); - } - return new OkHttpSender(this, new DynamicHttpUrlSupplier(endpointSupplier)); - } - } - - static HttpUrl toHttpUrl(String endpoint) { - HttpUrl parsed = HttpUrl.parse(endpoint); - if (parsed == null) throw new IllegalArgumentException("invalid POST url: " + endpoint); - return parsed; - } - - static abstract class HttpUrlSupplier { - abstract HttpUrl get(); - - void close() { - } - } - - static final class ConstantHttpUrlSupplier extends HttpUrlSupplier { - final HttpUrl url; - - ConstantHttpUrlSupplier(String endpoint) { - this.url = toHttpUrl(endpoint); - } - - @Override HttpUrl get() { - return url; - } - - @Override public String toString() { - return url.toString(); - } - } - - static final class DynamicHttpUrlSupplier extends HttpUrlSupplier { - - final HttpEndpointSupplier endpointSupplier; - - DynamicHttpUrlSupplier(HttpEndpointSupplier endpointSupplier) { - this.endpointSupplier = endpointSupplier; - } - - @Override HttpUrl get() { - String endpoint = endpointSupplier.get(); - if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null"); - return toHttpUrl(endpoint); - } - - @Override void close() { - try { - endpointSupplier.close(); - } catch (Throwable t) { - propagateIfFatal(t); - logger.fine("ignoring error closing endpoint supplier: " + t.getMessage()); - } - } - - @Override public String toString() { - return endpointSupplier.toString(); + return new OkHttpSender(this); } } + final InternalOkHttpSender delegate; final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder() final String endpoint; // for toBuilder() - final HttpUrlSupplier urlSupplier; - final OkHttpClient client; - final RequestBodyMessageEncoder encoder; - final Encoding encoding; - final int messageMaxBytes, maxRequests; - final boolean compressionEnabled; - - OkHttpSender(Builder builder, HttpUrlSupplier urlSupplier) { - endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder() - endpoint = builder.endpoint; // for toBuilder() - - this.urlSupplier = urlSupplier; - encoding = builder.encoding; - switch (encoding) { - case JSON: - encoder = RequestBodyMessageEncoder.JSON; - break; - case THRIFT: - encoder = RequestBodyMessageEncoder.THRIFT; - break; - case PROTO3: - encoder = RequestBodyMessageEncoder.PROTO3; - break; - default: - throw new UnsupportedOperationException("Unsupported encoding: " + encoding.name()); - } - maxRequests = builder.maxRequests; - messageMaxBytes = builder.messageMaxBytes; - compressionEnabled = builder.compressionEnabled; - Dispatcher dispatcher = newDispatcher(maxRequests); - - // doing the extra "build" here prevents us from leaking our dispatcher to the builder - client = builder.clientBuilder().build().newBuilder().dispatcher(dispatcher).build(); - } - - static Dispatcher newDispatcher(int maxRequests) { - // bound the executor so that we get consistent performance - ThreadPoolExecutor dispatchExecutor = - new ThreadPoolExecutor(0, maxRequests, 60, TimeUnit.SECONDS, - // Using a synchronous queue means messages will send immediately until we hit max - // in-flight requests. Once max requests are hit, send will block the caller, which is - // the AsyncReporter flush thread. This is ok, as the AsyncReporter has a buffer of - // unsent spans for this purpose. - new SynchronousQueue(), - OkHttpSenderThreadFactory.INSTANCE); - - Dispatcher dispatcher = new Dispatcher(dispatchExecutor); - dispatcher.setMaxRequests(maxRequests); - dispatcher.setMaxRequestsPerHost(maxRequests); - return dispatcher; - } - - enum OkHttpSenderThreadFactory implements ThreadFactory { - INSTANCE; - - @Override public Thread newThread(Runnable r) { - return new Thread(r, "OkHttpSender Dispatcher"); - } + OkHttpSender(Builder builder) { + this.delegate = new InternalOkHttpSender(builder); + this.endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder() + this.endpoint = builder.endpoint; // for toBuilder() } /** @@ -345,118 +200,7 @@ public Builder toBuilder() { return new Builder(this); } - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding.listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding.listSizeInBytes(encodedSizeInBytes); - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageMaxBytes() { - return messageMaxBytes; - } - - /** close is typically called from a different thread */ - final AtomicBoolean closeCalled = new AtomicBoolean(); - - /** {@inheritDoc} */ - @Override @Deprecated public zipkin2.reporter.Call sendSpans(List encodedSpans) { - if (closeCalled.get()) throw new ClosedSenderException(); - Request request; - try { - request = newRequest(encoder.encode(encodedSpans)); - } catch (IOException e) { - throw Platform.get().uncheckedIOException(e); - } - return new HttpCall(client.newCall(request)); - } - - /** Sends spans as a POST to {@link Builder#endpoint(String)}. */ - @Override public void send(List encodedSpans) throws IOException { - if (closeCalled.get()) throw new ClosedSenderException(); - Request request = newRequest(encoder.encode(encodedSpans)); - Call call = client.newCall(request); - parseResponse(call.execute()); - } - - /** {@inheritDoc} */ - @Override @Deprecated public CheckResult check() { - try { - Request request = new Request.Builder().url(urlSupplier.get()) - .post(encoder.encode(Collections.emptyList())).build(); - try (Response response = client.newCall(request).execute()) { - if (!response.isSuccessful()) { - return CheckResult.failed(new RuntimeException("check response failed: " + response)); - } - } - return CheckResult.OK; - } catch (Exception e) { - return CheckResult.failed(e); - } - } - - /** Waits up to a second for in-flight requests to finish before cancelling them */ - @Override public void close() { - if (!closeCalled.compareAndSet(false, true)) return; // already closed - - urlSupplier.close(); - - Dispatcher dispatcher = client.dispatcher(); - dispatcher.executorService().shutdown(); - try { - if (!dispatcher.executorService().awaitTermination(1, TimeUnit.SECONDS)) { - dispatcher.cancelAll(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - Request newRequest(RequestBody body) throws IOException { - Request.Builder request = new Request.Builder().url(urlSupplier.get()); - // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented. - // This prevents that in proxies, such as Envoy, that understand B3 single format, - request.addHeader("b3", "0"); - if (compressionEnabled) { - request.addHeader("Content-Encoding", "gzip"); - Buffer gzipped = new Buffer(); - BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipped)); - body.writeTo(gzipSink); - gzipSink.close(); - body = new BufferRequestBody(body.contentType(), gzipped); - } - request.post(body); - return request.build(); - } - - @Override public String toString() { - return "OkHttpSender{" + urlSupplier + "}"; - } - - static final class BufferRequestBody extends RequestBody { - final MediaType contentType; - final Buffer body; - - BufferRequestBody(MediaType contentType, Buffer body) { - this.contentType = contentType; - this.body = body; - } - - @Override public long contentLength() { - return body.size(); - } - - @Override public MediaType contentType() { - return contentType; - } - - @Override public void writeTo(BufferedSink sink) throws IOException { - sink.write(body, body.size()); - } + @Override protected BytesMessageSender delegate() { + return delegate; } } diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java index 133f37d4..928a6a8d 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java @@ -38,6 +38,19 @@ enum RequestBodyMessageEncoder { } }; + static RequestBodyMessageEncoder forEncoding(Encoding encoding) { + switch (encoding) { + case JSON: + return RequestBodyMessageEncoder.JSON; + case THRIFT: + return RequestBodyMessageEncoder.THRIFT; + case PROTO3: + return RequestBodyMessageEncoder.PROTO3; + default: + throw new UnsupportedOperationException("Unsupported encoding: " + encoding.name()); + } + } + static abstract class StreamingRequestBody extends RequestBody { final MediaType contentType; final List values; diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java index 48832a7c..ed8c0f51 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java +++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java @@ -193,84 +193,6 @@ public class ITOkHttpSender { // public for use in src/it .isEqualTo("application/json"); } - @Deprecated - @Test void closeWhileRequestInFlight_cancelsRequest() throws Exception { - server.shutdown(); // shutdown the normal zipkin rule - sender.close(); - - MockWebServer server = new MockWebServer(); - server.setDispatcher(new Dispatcher() { - @Override public MockResponse dispatch(RecordedRequest request) throws InterruptedException { - Thread.sleep(2000); // lingers after the one-second grace - return new MockResponse(); - } - }); - try { - sender = OkHttpSender.create(server.url("/api/v1/spans").toString()); - - AwaitableCallback callback = new AwaitableCallback(); - - Call call = sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN))); - new Thread(() -> - call.enqueue(callback) - ).start(); - Thread.sleep(100); // make sure the thread starts - - sender.close(); // close while request is in flight - - try { - callback.await(); - failBecauseExceptionWasNotThrown(RuntimeException.class); - } catch (RuntimeException e) { - // throws because the request still in flight after a second was canceled - assertThat(e.getCause()).isInstanceOf(IOException.class); - } - } finally { - server.shutdown(); - } - } - - /** - * Each message by default is up to 5MiB, make sure these go out of process as soon as they can. - */ - @Deprecated - @Test void messagesSendImmediately() throws Exception { - server.shutdown(); // shutdown the normal zipkin rule - sender.close(); - - CountDownLatch latch = new CountDownLatch(1); - MockWebServer server = new MockWebServer(); - server.setDispatcher(new Dispatcher() { - AtomicInteger count = new AtomicInteger(); - - @Override public MockResponse dispatch(RecordedRequest request) throws InterruptedException { - if (count.incrementAndGet() == 1) { - latch.await(); - } else { - latch.countDown(); - } - return new MockResponse(); - } - }); - try (OkHttpSender sender = OkHttpSender.create(server.url("/api/v1/spans").toString())) { - - AwaitableCallback callback1 = new AwaitableCallback(); - AwaitableCallback callback2 = new AwaitableCallback(); - - Thread t = new Thread(() -> { - sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN))).enqueue(callback1); - sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN))).enqueue(callback2); - }); - t.start(); - t.join(); - - callback1.await(); - callback2.await(); - } finally { - server.shutdown(); - } - } - @Deprecated @Test void closeWhileRequestInFlight_graceful() throws Exception { server.shutdown(); // shutdown the normal zipkin rule diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java similarity index 92% rename from okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java rename to okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java index ab62dd58..4f3a0365 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java +++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,7 +25,7 @@ import static org.assertj.core.api.Assertions.assertThat; -class HttpCallTest { +class InternalOkHttpSenderTest { @Test void parseResponse_closesBody() throws Exception { // It is difficult to prove close was called, this approach looks at an underlying stream @@ -45,7 +45,7 @@ class HttpCallTest { .body(ResponseBody.create(null, 1, Okio.buffer(Okio.source(in)))) .build(); - HttpCall.parseResponse(response); + InternalOkHttpSender.parseResponse(response); assertThat(closed.get()).isTrue(); } diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java index 46aa4cb5..e44b3953 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java +++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java @@ -69,125 +69,6 @@ class OkHttpSenderTest { .isInstanceOf(IOException.class); } - @Test void illegalToSendWhenClosed() { - sender.close(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(ClosedSenderException.class); - } - - @Test void endpointSupplierFactory_defaultsToConstant() { - // The default connection supplier returns a constant URL - assertThat(sender) - .extracting("urlSupplier.url") - .isEqualTo(HttpUrl.parse("http://localhost:19092")); - } - - @Test void endpointSupplierFactory_constant() { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("http://localhost:29092")) - .build(); - - // The connection supplier has a constant URL - assertThat(sender) - .extracting("urlSupplier.url") - .isEqualTo(HttpUrl.parse("http://localhost:29092")); - } - - @Test void endpointSupplierFactory_constantBad() { - OkHttpSender.Builder builder = sender.toBuilder() - .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans")); - - assertThatThrownBy(builder::build) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("invalid POST url"); - } - - @Test void endpointSupplierFactory_dynamic() { - AtomicInteger closeCalled = new AtomicInteger(); - HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { - @Override public String get() { - throw new UnsupportedOperationException(); - } - - @Override public void close() { - closeCalled.incrementAndGet(); - } - }; - - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> dynamicEndpointSupplier) - .build(); - - // The connection supplier is deferred until send - assertThat(sender) - .extracting("urlSupplier.endpointSupplier") - .isEqualTo(dynamicEndpointSupplier); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(UnsupportedOperationException.class); - - // Ensure that closing the sender closes the endpoint supplier - sender.close(); - sender.close(); // check only closed once - assertThat(closeCalled).hasValue(1); - } - - @Test void endpointSupplierFactory_ignoresCloseFailure() { - AtomicInteger closeCalled = new AtomicInteger(); - HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { - @Override public String get() { - throw new UnsupportedOperationException(); - } - - @Override public void close() throws IOException { - closeCalled.incrementAndGet(); - throw new IOException("unexpected"); - } - }; - - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> dynamicEndpointSupplier) - .build(); - - // Ensure that an exception closing the endpoint supplier doesn't propagate. - sender.close(); - assertThat(closeCalled).hasValue(1); - } - - @Test void endpointSupplierFactory_dynamicNull() { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { - @Override public String get() { - return null; - } - }) - .build(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(NullPointerException.class) - .hasMessage("endpointSupplier.get() returned null"); - } - - @Test void endpointSupplierFactory_dynamicBad() { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { - @Override public String get() { - return "htp://localhost:9411/api/v1/spans"; - } - }) - .build(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("invalid POST url"); - } - /** * The output of toString() on {@link BytesMessageSender} implementations appears in thread names * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other @@ -198,15 +79,8 @@ class OkHttpSenderTest { assertThat(sender).hasToString("OkHttpSender{http://localhost:19092/}"); } - @Test void outOfBandCancel() { - HttpCall call = (HttpCall) sender.sendSpans(Collections.emptyList()); - call.cancel(); - - assertThat(call.isCanceled()).isTrue(); - } - @Test void bugGuardCache() { - assertThat(sender.client.cache()) + assertThat(sender.delegate.client.cache()) .withFailMessage("senders should not open a disk cache") .isNull(); } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java index fa7ad12b..95cfa734 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java @@ -13,7 +13,7 @@ */ package zipkin2.reporter.beans; -import java.util.Arrays; +import java.util.List; import okhttp3.HttpUrl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -41,7 +41,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("urlSupplier.endpointSupplier") + .extracting("delegate.endpointSupplier") .isEqualTo(FakeEndpointSupplier.INSTANCE); } @@ -53,7 +53,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("urlSupplier.url") + .extracting("delegate.endpoint") .isEqualTo(HttpUrl.parse("http://localhost:9411/api/v2/spans")); } @@ -66,7 +66,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("client.connectTimeoutMillis") + .extracting("delegate.client.connectTimeoutMillis") .isEqualTo(1000); } @@ -79,7 +79,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("client.writeTimeoutMillis") + .extracting("delegate.client.writeTimeoutMillis") .isEqualTo(1000); } @@ -92,7 +92,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("client.readTimeoutMillis") + .extracting("delegate.client.readTimeoutMillis") .isEqualTo(1000); } @@ -105,7 +105,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("client.dispatcher.maxRequests") + .extracting("delegate.client.dispatcher.maxRequests") .isEqualTo(4); } @@ -118,7 +118,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("compressionEnabled") + .extracting("delegate.compressionEnabled") .isEqualTo(false); } @@ -131,7 +131,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("messageMaxBytes") + .extracting("delegate.messageMaxBytes") .isEqualTo(1024); } @@ -144,7 +144,7 @@ class OkHttpSenderFactoryBeanTest { ); assertThat(context.getBean("sender", OkHttpSender.class)) - .extracting("encoding") + .extracting("delegate.encoding") .isEqualTo(Encoding.PROTO3); } @@ -159,7 +159,7 @@ class OkHttpSenderFactoryBeanTest { OkHttpSender sender = context.getBean("sender", OkHttpSender.class); context.close(); - sender.send(Arrays.asList(new byte[]{'{', '}'})); + sender.send(List.of(new byte[]{'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java index 52d0b198..826d6d43 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java @@ -16,6 +16,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.util.Arrays; +import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import zipkin2.reporter.Encoding; @@ -42,7 +43,7 @@ class URLConnectionSenderFactoryBeanTest { ); assertThat(context.getBean("sender", URLConnectionSender.class)) - .extracting("connectionSupplier.endpointSupplier") + .extracting("delegate.endpointSupplier") .isEqualTo(FakeEndpointSupplier.INSTANCE); } @@ -54,7 +55,7 @@ class URLConnectionSenderFactoryBeanTest { ); assertThat(context.getBean("sender", URLConnectionSender.class)) - .extracting("connectionSupplier.url") + .extracting("delegate.endpoint") .isEqualTo(URI.create("http://localhost:9411/api/v2/spans").toURL()); } @@ -98,7 +99,7 @@ class URLConnectionSenderFactoryBeanTest { ); assertThat(context.getBean("sender", URLConnectionSender.class)) - .extracting("compressionEnabled") + .extracting("delegate.compressionEnabled") .isEqualTo(false); } @@ -111,7 +112,7 @@ class URLConnectionSenderFactoryBeanTest { ); assertThat(context.getBean("sender", URLConnectionSender.class)) - .extracting("messageMaxBytes") + .extracting("delegate.messageMaxBytes") .isEqualTo(1024); } @@ -124,7 +125,7 @@ class URLConnectionSenderFactoryBeanTest { ); assertThat(context.getBean("sender", URLConnectionSender.class)) - .extracting("encoding") + .extracting("delegate.encoding") .isEqualTo(Encoding.PROTO3); } @@ -139,7 +140,7 @@ class URLConnectionSenderFactoryBeanTest { URLConnectionSender sender = context.getBean("sender", URLConnectionSender.class); context.close(); - sender.send(Arrays.asList(new byte[]{'{', '}'})); + sender.send(List.of(new byte[]{'{', '}'})); }); } } diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java new file mode 100644 index 00000000..8709486e --- /dev/null +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java @@ -0,0 +1,117 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.urlconnection; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import zipkin2.reporter.Component; +import zipkin2.reporter.HttpSender; + +/** + * We have to nest this class until v4 when {@linkplain URLConnectionSender} no longer needs to + * extend {@linkplain Component}. + */ +final class InternalURLConnectionSender extends HttpSender { + + final int messageMaxBytes; + final int connectTimeout; + final int readTimeout; + final boolean compressionEnabled; + + InternalURLConnectionSender(URLConnectionSender.Builder builder) { + super(builder.encoding, builder.endpointSupplierFactory, builder.endpoint); + this.messageMaxBytes = builder.messageMaxBytes; + this.connectTimeout = builder.connectTimeout; + this.readTimeout = builder.readTimeout; + this.compressionEnabled = builder.compressionEnabled; + } + + @Override public int messageMaxBytes() { + return messageMaxBytes; + } + + @Override protected URL newEndpoint(String endpoint) { + try { + return new URL(endpoint); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override protected byte[] newBody(List encodedSpans) { + return encoding.encode(encodedSpans); + } + + @Override protected void postSpans(URL endpoint, byte[] body) throws IOException { + // intentionally not closing the connection, to use keep-alives + HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection(); + connection.setConnectTimeout(connectTimeout); + connection.setReadTimeout(readTimeout); + connection.setRequestMethod("POST"); + // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented. + // This prevents that in proxies, such as Envoy, that understand B3 single format, + connection.addRequestProperty("b3", "0"); + connection.addRequestProperty("Content-Type", encoding.mediaType()); + if (compressionEnabled) { + connection.addRequestProperty("Content-Encoding", "gzip"); + ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); + GZIPOutputStream compressor = new GZIPOutputStream(gzipped); + try { + compressor.write(body); + } finally { + compressor.close(); + } + body = gzipped.toByteArray(); + } + connection.setDoOutput(true); + connection.setFixedLengthStreamingMode(body.length); + connection.getOutputStream().write(body); + + skipAllContent(connection); + } + + /** This utility is verbose as we have a minimum java version of 6 */ + static void skipAllContent(HttpURLConnection connection) throws IOException { + InputStream in = connection.getInputStream(); + IOException thrown = skipAndSuppress(in); + if (thrown == null) return; + InputStream err = connection.getErrorStream(); + if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped + throw thrown; + } + + static IOException skipAndSuppress(InputStream in) { + try { + while (in.read() != -1) ; // skip + return null; + } catch (IOException e) { + return e; + } finally { + try { + in.close(); + } catch (IOException suppressed) { + } + } + } + + @Override public String toString() { + return super.toString().replace("Internal", ""); + } +} diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java index 07fb5378..8de1467a 100644 --- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java @@ -13,35 +13,20 @@ */ package zipkin2.reporter.urlconnection; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; -import java.util.zip.GZIPOutputStream; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; -import zipkin2.reporter.ClosedSenderException; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ConstantHttpEndpointSupplier; import zipkin2.reporter.Encoding; import zipkin2.reporter.HttpEndpointSupplier; -import zipkin2.reporter.Sender; - -import static zipkin2.reporter.Call.propagateIfFatal; +import zipkin2.reporter.HttpEndpointSupplier.Factory; +import zipkin2.reporter.internal.SenderAdapter; /** * Reports spans to Zipkin, using its POST endpoint. * *

This sender is thread-safe. */ -public final class URLConnectionSender extends Sender { - static final Logger logger = Logger.getLogger(URLConnectionSender.class.getName()); +public final class URLConnectionSender extends SenderAdapter { /** Creates a sender that posts {@link Encoding#JSON} messages. */ public static URLConnectionSender create(String endpoint) { @@ -53,7 +38,7 @@ public static Builder newBuilder() { } public static final class Builder { - HttpEndpointSupplier.Factory endpointSupplierFactory = ConstantHttpEndpointSupplier.FACTORY; + Factory endpointSupplierFactory = ConstantHttpEndpointSupplier.FACTORY; String endpoint; Encoding encoding = Encoding.JSON; int messageMaxBytes = 500000; @@ -63,17 +48,17 @@ public static final class Builder { Builder(URLConnectionSender sender) { this.endpointSupplierFactory = sender.endpointSupplierFactory; this.endpoint = sender.endpoint; - this.encoding = sender.encoding; - this.messageMaxBytes = sender.messageMaxBytes; - this.connectTimeout = sender.connectTimeout; - this.readTimeout = sender.readTimeout; - this.compressionEnabled = sender.compressionEnabled; + this.encoding = sender.delegate.encoding(); + this.messageMaxBytes = sender.delegate.messageMaxBytes; + this.connectTimeout = sender.delegate.connectTimeout; + this.readTimeout = sender.delegate.readTimeout; + this.compressionEnabled = sender.delegate.compressionEnabled; } /** * No default. See JavaDoc on {@link HttpEndpointSupplier} for implementation notes. */ - public Builder endpointSupplierFactory(HttpEndpointSupplier.Factory endpointSupplierFactory) { + public Builder endpointSupplierFactory(Factory endpointSupplierFactory) { if (endpointSupplierFactory == null) { throw new NullPointerException("endpointSupplierFactory == null"); } @@ -135,235 +120,29 @@ public Builder encoding(Encoding encoding) { } public URLConnectionSender build() { - String endpoint = this.endpoint; if (endpoint == null) throw new NullPointerException("endpoint == null"); - - HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint); - if (endpointSupplier == null) { - throw new NullPointerException("endpointSupplierFactory.create() returned null"); - } - if (endpointSupplier instanceof ConstantHttpEndpointSupplier) { - endpoint = endpointSupplier.get(); // eagerly resolve the endpoint - return new URLConnectionSender(this, new ConstantHttpURLConnectionSupplier(endpoint)); - } - return new URLConnectionSender(this, new DynamicHttpURLConnectionSupplier(endpointSupplier)); + return new URLConnectionSender(this); } Builder() { } } - static URL toURL(String endpoint) { - try { - return new URL(endpoint); - } catch (MalformedURLException e) { - throw new IllegalArgumentException(e.getMessage()); - } - } - - static abstract class HttpURLConnectionSupplier { - abstract HttpURLConnection openConnection() throws IOException; - - void close() { - } - } - - static final class ConstantHttpURLConnectionSupplier extends HttpURLConnectionSupplier { - final URL url; - - ConstantHttpURLConnectionSupplier(String endpoint) { - this.url = toURL(endpoint); - } - - @Override HttpURLConnection openConnection() throws IOException { - return (HttpURLConnection) url.openConnection(); - } - - @Override public String toString() { - return url.toString(); - } - } - - static final class DynamicHttpURLConnectionSupplier extends HttpURLConnectionSupplier { - final HttpEndpointSupplier endpointSupplier; - - DynamicHttpURLConnectionSupplier(HttpEndpointSupplier endpointSupplier) { - this.endpointSupplier = endpointSupplier; - } - - @Override HttpURLConnection openConnection() throws IOException { - String endpoint = endpointSupplier.get(); - if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null"); - URL url = toURL(endpoint); - return (HttpURLConnection) url.openConnection(); - } - - @Override void close() { - try { - endpointSupplier.close(); - } catch (Throwable t) { - propagateIfFatal(t); - logger.fine("ignoring error closing endpoint supplier: " + t.getMessage()); - } - } - - @Override public String toString() { - return endpointSupplier.toString(); - } - } - - final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder() + final InternalURLConnectionSender delegate; + final Factory endpointSupplierFactory; // for toBuilder() final String endpoint; // for toBuilder() - final HttpURLConnectionSupplier connectionSupplier; - final Encoding encoding; - final int messageMaxBytes; - final int connectTimeout, readTimeout; - final boolean compressionEnabled; - - URLConnectionSender(Builder builder, HttpURLConnectionSupplier connectionSupplier) { + URLConnectionSender(Builder builder) { + this.delegate = new InternalURLConnectionSender(builder); this.endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder() this.endpoint = builder.endpoint; // for toBuilder() - - this.connectionSupplier = connectionSupplier; - this.encoding = builder.encoding; - this.messageMaxBytes = builder.messageMaxBytes; - this.connectTimeout = builder.connectTimeout; - this.readTimeout = builder.readTimeout; - this.compressionEnabled = builder.compressionEnabled; } public Builder toBuilder() { return new Builder(this); } - /** close is typically called from a different thread */ - final AtomicBoolean closeCalled = new AtomicBoolean(); - - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding().listSizeInBytes(encodedSizeInBytes); - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageMaxBytes() { - return messageMaxBytes; - } - - /** {@inheritDoc} */ - @Override @Deprecated public Call sendSpans(List encodedSpans) { - if (closeCalled.get()) throw new ClosedSenderException(); - return new HttpPostCall(encoding.encode(encodedSpans)); - } - - /** Sends spans as a POST to {@link Builder#endpoint}. */ - @Override public void send(List encodedSpans) throws IOException { - if (closeCalled.get()) throw new ClosedSenderException(); - send(encoding.encode(encodedSpans)); - } - - /** {@inheritDoc} */ - @Override @Deprecated public CheckResult check() { - try { - send(encoding.encode(Collections.emptyList())); - return CheckResult.OK; - } catch (Throwable e) { - Call.propagateIfFatal(e); - return CheckResult.failed(e); - } - } - - void send(byte[] body) throws IOException { - // intentionally not closing the connection, to use keep-alives - HttpURLConnection connection = connectionSupplier.openConnection(); - connection.setConnectTimeout(connectTimeout); - connection.setReadTimeout(readTimeout); - connection.setRequestMethod("POST"); - // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented. - // This prevents that in proxies, such as Envoy, that understand B3 single format, - connection.addRequestProperty("b3", "0"); - connection.addRequestProperty("Content-Type", encoding.mediaType()); - if (compressionEnabled) { - connection.addRequestProperty("Content-Encoding", "gzip"); - ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); - GZIPOutputStream compressor = new GZIPOutputStream(gzipped); - try { - compressor.write(body); - } finally { - compressor.close(); - } - body = gzipped.toByteArray(); - } - connection.setDoOutput(true); - connection.setFixedLengthStreamingMode(body.length); - connection.getOutputStream().write(body); - - skipAllContent(connection); - } - - /** This utility is verbose as we have a minimum java version of 6 */ - static void skipAllContent(HttpURLConnection connection) throws IOException { - InputStream in = connection.getInputStream(); - IOException thrown = skipAndSuppress(in); - if (thrown == null) return; - InputStream err = connection.getErrorStream(); - if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped - throw thrown; - } - - static IOException skipAndSuppress(InputStream in) { - try { - while (in.read() != -1) ; // skip - return null; - } catch (IOException e) { - return e; - } finally { - try { - in.close(); - } catch (IOException suppressed) { - } - } - } - - @Override public void close() { - if (!closeCalled.compareAndSet(false, true)) return; // already closed - connectionSupplier.close(); - } - - @Override public String toString() { - return "URLConnectionSender{" + connectionSupplier + "}"; - } - - class HttpPostCall extends Call.Base { - private final byte[] message; - - HttpPostCall(byte[] message) { - this.message = message; - } - - @Override protected Void doExecute() throws IOException { - send(message); - return null; - } - - @Override protected void doEnqueue(Callback callback) { - try { - send(message); - callback.onSuccess(null); - } catch (Throwable t) { - Call.propagateIfFatal(t); - callback.onError(t); - } - } - - @Override public Call clone() { - return new HttpPostCall(message); - } + @Override protected BytesMessageSender delegate() { + return delegate; } } diff --git a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java index e6729de8..cf5ae522 100644 --- a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java +++ b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java @@ -63,130 +63,6 @@ class URLConnectionSenderTest { .hasToString("URLConnectionSender{http://localhost:29092}"); } - @Test void sendFailsWhenEndpointIsDown() { - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(ConnectException.class); - } - - @Test void illegalToSendWhenClosed() { - sender.close(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(ClosedSenderException.class); - } - - @Test void endpointSupplierFactory_defaultsToConstant() throws MalformedURLException { - // The default connection supplier returns a constant URL - assertThat(sender) - .extracting("connectionSupplier.url") - .isEqualTo(URI.create("http://localhost:19092").toURL()); - } - - @Test void endpointSupplierFactory_constant() throws MalformedURLException { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("http://localhost:29092")) - .build(); - - // The connection supplier has a constant URL - assertThat(sender) - .extracting("connectionSupplier.url") - .isEqualTo(URI.create("http://localhost:29092").toURL()); - } - - @Test void endpointSupplierFactory_constantBad() { - URLConnectionSender.Builder builder = sender.toBuilder() - .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans")); - - assertThatThrownBy(builder::build) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("unknown protocol: htp"); - } - - @Test void endpointSupplierFactory_dynamic() { - AtomicInteger closeCalled = new AtomicInteger(); - HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { - @Override public String get() { - throw new UnsupportedOperationException(); - } - - @Override public void close() { - closeCalled.incrementAndGet(); - } - }; - - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> dynamicEndpointSupplier) - .build(); - - // The connection supplier is deferred until send - assertThat(sender) - .extracting("connectionSupplier.endpointSupplier") - .isEqualTo(dynamicEndpointSupplier); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(UnsupportedOperationException.class); - - // Ensure that closing the sender closes the endpoint supplier - sender.close(); - sender.close(); // check only closed once - assertThat(closeCalled).hasValue(1); - } - - @Test void endpointSupplierFactory_ignoresCloseFailure() { - AtomicInteger closeCalled = new AtomicInteger(); - HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() { - @Override public String get() { - throw new UnsupportedOperationException(); - } - - @Override public void close() throws IOException { - closeCalled.incrementAndGet(); - throw new IOException("unexpected"); - } - }; - - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> dynamicEndpointSupplier) - .build(); - - // Ensure that an exception closing the endpoint supplier doesn't propagate. - sender.close(); - assertThat(closeCalled).hasValue(1); - } - - @Test void endpointSupplierFactory_dynamicNull() { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { - @Override public String get() { - return null; - } - }) - .build(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(NullPointerException.class) - .hasMessage("endpointSupplier.get() returned null"); - } - - @Test void endpointSupplierFactory_dynamicBad() { - sender.close(); - sender = sender.toBuilder() - .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() { - @Override public String get() { - return "htp://localhost:9411/api/v1/spans"; - } - }) - .build(); - - assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("unknown protocol: htp"); - } - /** * The output of toString() on {@link BytesMessageSender} implementations appears in thread names * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other