Skip to content

Commit

Permalink
Adds HttpSender to aid proper handling of HttpEndpointSupplier (#253)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Feb 13, 2024
1 parent be50820 commit ec4e01c
Show file tree
Hide file tree
Showing 20 changed files with 901 additions and 967 deletions.
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
<artifactId>zipkin</artifactId>
<version>${zipkin.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
* HTTP-based {@link BytesMessageSender senders} use this to resolve a potentially-pseudo endpoint
* passed by configuration to a real endpoint.
*
* <h3>Usage Notes</h3>
* <h3>Implementation Notes</h3>
*
* <p>{@link BytesMessageSender senders} should implement the following logic:
* {@linkplain HttpSender} is a convenience type that implements the following logic:
* <ul>
* <li>During build, the sender should invoke the {@linkplain Factory}.</li>
* <li>If the result is {@link ConstantHttpEndpointSupplier}, build the sender to use a static
Expand All @@ -32,8 +32,6 @@
* <li>Call {@link #close()} once during {@link BytesMessageSender#close()}.</li>
* </ul>
*
* <h3>Implementation Notes</h3>
*
* <p>Implement friendly {@code toString()} functions, that include the real endpoint or the one
* passed to the {@linkplain Factory}.
*
Expand All @@ -46,6 +44,8 @@
* dependency injection, without limiting an HTTP framework that can handle groups, to a
* single-endpoint supplier.
*
* @see ConstantHttpEndpointSupplier
* @see HttpSender
* @since 3.3
*/
public interface HttpEndpointSupplier extends Closeable {
Expand Down
152 changes: 152 additions & 0 deletions core/src/main/java/zipkin2/reporter/HttpSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import zipkin2.reporter.HttpEndpointSupplier.Factory;

import static zipkin2.reporter.Call.propagateIfFatal;

/**
* Reports spans to Zipkin, using its <a href="https://zipkin.io/zipkin-api/#/">POST</a> endpoint.
*
* <p>Calls to {@linkplain #postSpans(Object, Object)} happen on the same async reporting thread,
* but {@linkplain #close()} might be called from any thread.
*
* @since 3.3
*/
public abstract class HttpSender<U, B> extends BytesMessageSender.Base {
final Logger logger;
final HttpEndpointSupplier endpointSupplier;
final U endpoint;

/** close is typically called from a different thread */
final AtomicBoolean closeCalled = new AtomicBoolean();

/**
* Called each invocation of {@linkplain #postSpans(Object, Object)}, unless the
* {@linkplain HttpEndpointSupplier} is a {@linkplain ConstantHttpEndpointSupplier},
* Implementations should perform any validation needed here.
*
* @since 3.3
*/
protected abstract U newEndpoint(String endpoint);

/**
* Creates a new POST body from the encoded spans.
*
* <p>Below is the simplest implementation, when {@linkplain HttpSender#<B>} is a byte array.
* <pre>{@code
* @Override protected byte[] newBody(List<byte[]> encodedSpans) {
* return encoding.encode(encodedSpans);
* }
* }</pre>
*
* <p>If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}.
*
* @since 3.3
*/
protected abstract B newBody(List<byte[]> encodedSpans);

/**
* Implement to POST spans to the given endpoint.
*
* <p>If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}.
*
* @since 3.3
*/
protected abstract void postSpans(U endpoint, B body) throws IOException;

/**
* Override to close any resources.
*
* @since 3.3
*/
protected void doClose() {
}

protected HttpSender(Encoding encoding, Factory endpointSupplierFactory, String endpoint) {
this(Logger.getLogger(HttpSender.class.getName()), encoding, endpointSupplierFactory, endpoint);
}

HttpSender(Logger logger, Encoding encoding, Factory endpointSupplierFactory, String endpoint) {
super(encoding);
this.logger = logger;
if (endpointSupplierFactory == null) {
throw new NullPointerException("endpointSupplierFactory == null");
}
if (endpoint == null) throw new NullPointerException("endpoint == null");

HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint);
if (endpointSupplier == null) {
throw new NullPointerException("endpointSupplierFactory.create() returned null");
}
if (endpointSupplier instanceof ConstantHttpEndpointSupplier) {
this.endpoint = nextEndpoint(endpointSupplier);
closeQuietly(endpointSupplier);
this.endpointSupplier = null;
} else {
this.endpoint = null;
this.endpointSupplier = endpointSupplier;
}
}

final U nextEndpoint(HttpEndpointSupplier endpointSupplier) {
String endpoint = endpointSupplier.get(); // eagerly resolve the endpoint
if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null");
return newEndpoint(endpoint);
}

/** Defaults to the most common max message size: 512KB. */
@Override public int messageMaxBytes() {
return 512 * 1024;
}

/** Sends spans as an HTTP POST request. */
@Override public final void send(List<byte[]> encodedSpans) throws IOException {
if (closeCalled.get()) throw new ClosedSenderException();
U endpoint = this.endpoint;
if (endpoint == null) endpoint = nextEndpoint(endpointSupplier);
B body = newBody(encodedSpans);
if (body == null) throw new NullPointerException("newBody(encodedSpans) returned null");
postSpans(endpoint, newBody(encodedSpans));
}

@Override public final void close() {
if (!closeCalled.compareAndSet(false, true)) return; // already closed
closeQuietly(endpointSupplier);
doClose();
}

final void closeQuietly(HttpEndpointSupplier endpointSupplier) {
if (endpointSupplier == null) return;
try {
endpointSupplier.close();
} catch (Throwable t) {
propagateIfFatal(t);
logger.fine("ignoring error closing endpoint supplier: " + t.getMessage());
}
}

@Override public String toString() {
String name = getClass().getSimpleName();
if (endpoint != null) {
return name + "{" + endpoint + "}";
}
return name + "{" + endpointSupplier + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter.okhttp3;
package zipkin2.reporter.internal;

import java.io.IOException;
import java.lang.reflect.Constructor;

/** Taken from {@code zipkin2.reporter.internal.Platform} to avoid needing to shade over a single method. */
abstract class Platform {
public abstract class Platform {
private static final Platform PLATFORM = findPlatform();

Platform() {
}

RuntimeException uncheckedIOException(IOException e) {
public RuntimeException uncheckedIOException(IOException e) {
return new RuntimeException(e);
}

static Platform get() {
public static Platform get() {
return PLATFORM;
}

Expand Down
104 changes: 104 additions & 0 deletions core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter.internal;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;

/**
* Reduces burden on types that need to extend {@linkplain Sender}.
*/
public abstract class SenderAdapter extends Sender {
protected abstract BytesMessageSender delegate();

@Override public final int messageSizeInBytes(List<byte[]> encodedSpans) {
return delegate().messageSizeInBytes(encodedSpans);
}

@Override public final int messageSizeInBytes(int encodedSizeInBytes) {
return delegate().messageSizeInBytes(encodedSizeInBytes);
}

@Override public final Encoding encoding() {
return delegate().encoding();
}

@Override public final int messageMaxBytes() {
return delegate().messageMaxBytes();
}

@Override @Deprecated public final Call<Void> sendSpans(List<byte[]> encodedSpans) {
return new SendSpans(encodedSpans);
}

@Override public final void send(List<byte[]> encodedSpans) throws IOException {
delegate().send(encodedSpans);
}

@Override @Deprecated public final CheckResult check() {
try {
delegate().send(Collections.<byte[]>emptyList());
return CheckResult.OK;
} catch (Throwable e) {
Call.propagateIfFatal(e);
return CheckResult.failed(e);
}
}

@Override public final void close() {
try {
delegate().close();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override public final String toString() {
return delegate().toString();
}

final class SendSpans extends Call.Base<Void> {
private final List<byte[]> encodedSpans;

SendSpans(List<byte[]> encodedSpans) {
this.encodedSpans = encodedSpans;
}

@Override protected Void doExecute() throws IOException {
send(encodedSpans);
return null;
}

@Override protected void doEnqueue(Callback<Void> callback) {
try {
send(encodedSpans);
callback.onSuccess(null);
} catch (Throwable t) {
Call.propagateIfFatal(t);
callback.onError(t);
}
}

@Override public Call<Void> clone() {
return new SendSpans(encodedSpans);
}
}
}
67 changes: 67 additions & 0 deletions core/src/test/java/zipkin2/reporter/FakeHttpSender.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.List;
import java.util.function.Consumer;
import java.util.logging.Logger;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.reporter.HttpEndpointSupplier.Factory;

class FakeHttpSender extends HttpSender<String, byte[]> {
final String originalEndpoint;
final Consumer<List<Span>> onSpans;

FakeHttpSender(Logger logger, String endpoint, Consumer<List<Span>> onSpans) {
this(logger, endpoint, ConstantHttpEndpointSupplier.FACTORY, onSpans);
}

FakeHttpSender(Logger logger, String endpoint, Factory endpointSupplierFactory,
Consumer<List<Span>> onSpans) {
super(logger, Encoding.JSON, endpointSupplierFactory, endpoint);
this.originalEndpoint = endpoint;
this.onSpans = onSpans;
}

FakeHttpSender withHttpEndpointSupplierFactory(Factory endpointSupplierFactory) {
return new FakeHttpSender(logger, originalEndpoint, endpointSupplierFactory, onSpans);
}

/** close is typically called from a different thread */
volatile boolean closeCalled;

@Override protected String newEndpoint(String endpoint) {
try {
return URI.create(endpoint).toURL().toString(); // validate
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e.getMessage());
}
}

@Override protected byte[] newBody(List<byte[]> encodedSpans) {
return encoding.encode(encodedSpans);
}

@Override protected void postSpans(String endpoint, byte[] body) {
List<Span> decoded = SpanBytesDecoder.JSON_V2.decodeList(body);
onSpans.accept(decoded);
}

@Override protected void doClose() {
closeCalled = true;
}
}
Loading

0 comments on commit ec4e01c

Please sign in to comment.