From 7b23b75b33fc605c2d2abae02c94d3a80f89ab65 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Thu, 29 Aug 2024 16:07:50 +0900 Subject: [PATCH 01/14] Add OTLP/HTTP collector --- README.md | 4 +- collector-http/README.md | 6 +- collector-http/pom.xml | 55 ++- .../otel/http/OpenTelemetryHttpCollector.java | 97 ++++- .../collector/otel/http/ProtoUtils.java | 57 +++ .../collector/otel/http/SpanTranslator.java | 308 +++++++++++++ .../http/ITOpenTelemetryHttpCollector.java | 344 ++++++++++++++- .../collector/otel/http/ProtoUtilsTest.java | 63 +++ .../otel/http/SpanTranslatorTest.java | 412 ++++++++++++++++++ .../collector/otel/http/ZipkinTestUtil.java | 122 ++++++ module/pom.xml | 2 +- 11 files changed, 1443 insertions(+), 27 deletions(-) create mode 100644 collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java create mode 100644 collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java create mode 100644 collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java create mode 100644 collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java create mode 100644 collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java diff --git a/README.md b/README.md index 04241c2..3535ea2 100644 --- a/README.md +++ b/README.md @@ -5,11 +5,11 @@ # zipkin-otel Shared libraries that provide Zipkin integration with the OpenTelemetry. Requires JRE 11 or later. -# Usage +## Usage These components integrate traced applications and servers with OpenTelemetry protocols via interfaces defined by [Zipkin](https://github.com/openzipkin/zipkin). -## Collectors +### Collectors The component in a zipkin server that receives trace data is called a collector. A collector decodes spans reported by applications and persists them to a configured collector component. diff --git a/collector-http/README.md b/collector-http/README.md index 20e8b5d..f6a4dc2 100644 --- a/collector-http/README.md +++ b/collector-http/README.md @@ -1,5 +1,3 @@ -# zipkin-collector-otel-http +# collector-http -This component implements -the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) -with [Armeria](https://armeria.dev/). +This component implements the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) with [Armeria](https://armeria.dev/). diff --git a/collector-http/pom.xml b/collector-http/pom.xml index 375c103..d6f495c 100644 --- a/collector-http/pom.xml +++ b/collector-http/pom.xml @@ -6,8 +6,8 @@ --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -17,8 +17,8 @@ ../pom.xml - zipkin-collector-otel-http - Zipkin Collector: OpenTelemetry HTTP + collector-http + Zipkin Collector: OTLP HTTP ${project.basedir}/.. @@ -37,6 +37,34 @@ ${armeria.version} + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-common + ${opentelemetry.version} + + + io.opentelemetry.proto + opentelemetry-proto + ${opentelemetry-proto.version} + + + io.opentelemetry.semconv + opentelemetry-semconv + ${opentelemetry-semconv.version} + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + provided + + ${zipkin.groupId} zipkin-tests @@ -50,5 +78,24 @@ ${armeria.version} test + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + test + + + org.awaitility + awaitility + ${awaitility.version} + test + diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index ac3384d..d032f53 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -4,12 +4,30 @@ */ package zipkin2.collector.otel.http; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.protobuf.CodedInputStream; +import com.linecorp.armeria.common.AggregationOptions; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.encoding.StreamDecoderFactory; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServerConfigurator; import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.encoding.DecodingService; +import io.netty.buffer.ByteBufAllocator; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import zipkin2.Callback; +import zipkin2.Span; import zipkin2.collector.Collector; import zipkin2.collector.CollectorComponent; import zipkin2.collector.CollectorMetrics; @@ -18,6 +36,7 @@ public final class OpenTelemetryHttpCollector extends CollectorComponent implements ServerConfigurator { + public static Builder newBuilder() { return new Builder(); } @@ -25,25 +44,32 @@ public static Builder newBuilder() { public static final class Builder extends CollectorComponent.Builder { Collector.Builder delegate = Collector.newBuilder(OpenTelemetryHttpCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; - @Override public Builder storage(StorageComponent storageComponent) { + @Override + public Builder storage(StorageComponent storageComponent) { delegate.storage(storageComponent); return this; } - @Override public Builder metrics(CollectorMetrics metrics) { - if (metrics == null) throw new NullPointerException("metrics == null"); + @Override + public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) { + throw new NullPointerException("metrics == null"); + } delegate.metrics(this.metrics = metrics.forTransport("otel/http")); return this; } - @Override public Builder sampler(CollectorSampler sampler) { + @Override + public Builder sampler(CollectorSampler sampler) { delegate.sampler(sampler); return this; } - @Override public OpenTelemetryHttpCollector build() { + @Override + public OpenTelemetryHttpCollector build() { return new OpenTelemetryHttpCollector(this); } @@ -52,6 +78,7 @@ public static final class Builder extends CollectorComponent.Builder { } final Collector collector; + final CollectorMetrics metrics; OpenTelemetryHttpCollector(Builder builder) { @@ -59,31 +86,79 @@ public static final class Builder extends CollectorComponent.Builder { metrics = builder.metrics; } - @Override public OpenTelemetryHttpCollector start() { + @Override + public OpenTelemetryHttpCollector start() { return this; } - @Override public String toString() { + @Override + public String toString() { return "OpenTelemetryHttpCollector{}"; } /** * Reconfigures the service per https://opentelemetry.io/docs/specs/otlp/#otlphttp-request */ - @Override public void reconfigure(ServerBuilder sb) { + @Override + public void reconfigure(ServerBuilder sb) { + sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip())); sb.service("/v1/traces", new HttpService(this)); } static final class HttpService extends AbstractHttpService { + private static final Logger LOG = Logger.getLogger(HttpService.class.getName()); + final OpenTelemetryHttpCollector collector; HttpService(OpenTelemetryHttpCollector collector) { this.collector = collector; } - @Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) - throws Exception { - throw new RuntimeException("Implement me!"); + @Override + protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { + CompletableCallback result = new CompletableCallback(); + req.aggregate(AggregationOptions.usePooledObjects(ByteBufAllocator.DEFAULT, ctx.eventLoop() + )).handle((msg, t) -> { + if (t != null) { + result.onError(t); + return null; + } + try (HttpData content = msg.content()) { + if (content.isEmpty()) { + result.onSuccess(null); + return null; + } + + try { + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(CodedInputStream.newInstance(content.byteBuf().nioBuffer())); + List spans = SpanTranslator.translate(request); + collector.collector.accept(spans, result); + } + catch (IOException e) { + LOG.log(Level.WARNING, "Unable to parse the request:", e); + throw new UncheckedIOException(e); + } + return null; + } + }); + return HttpResponse.of(result); + } + } + + static final class CompletableCallback extends CompletableFuture + implements Callback { + + static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED); + + @Override + public void onSuccess(Void value) { + complete(HttpResponse.of(ACCEPTED_RESPONSE)); + } + + @Override + public void onError(Throwable t) { + completeExceptionally(t); } } + } diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java new file mode 100644 index 0000000..8754961 --- /dev/null +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.List; + +import com.google.protobuf.TextFormat; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import static java.util.stream.Collectors.joining; + +final class ProtoUtils { + static String kvListToJson(List attributes) { + return attributes.stream() + .map(entry -> "\"" + entry.getKey() + "\":" + valueToJson(entry.getValue())) + .collect(joining(",", "{", "}")); + } + + static String valueToString(AnyValue value) { + if (value.hasStringValue()) { + return value.getStringValue(); + } + return valueToJson(value); + } + + static String valueToJson(AnyValue value) { + if (value.hasStringValue()) { + return "\"" + value.getStringValue() + "\""; + } + if (value.hasArrayValue()) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute + return value.getArrayValue().getValuesList().stream() + .map(ProtoUtils::valueToJson) + .collect(joining(",", "[", "]")); + } + if (value.hasKvlistValue()) { + return kvListToJson(value.getKvlistValue().getValuesList()); + } + if (value.hasBoolValue()) { + return String.valueOf(value.getBoolValue()); + } + if (value.hasDoubleValue()) { + return String.valueOf(value.getDoubleValue()); + } + if (value.hasIntValue()) { + return String.valueOf(value.getIntValue()); + } + if (value.hasBytesValue()) { + // TODO + return TextFormat.escapeBytes(value.getBytesValue()); + } + return value.toString(); + } +} diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java new file mode 100644 index 0000000..40fb33a --- /dev/null +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -0,0 +1,308 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.protobuf.ByteString; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.proto.trace.v1.Span.Event; +import io.opentelemetry.proto.trace.v1.Span.SpanKind; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.proto.trace.v1.Status.StatusCode; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import io.opentelemetry.semconv.ServiceAttributes; +import zipkin2.Endpoint; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * SpanTranslator converts OpenTelemetry Spans to Zipkin Spans + * It is based, in part, on code from https://github.com/open-telemetry/opentelemetry-java/blob/ad120a5bff0887dffedb9c73af8e8e0aeb63659a/exporters/zipkin/src/main/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformer.java + * @see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#status + */ +final class SpanTranslator { + + static final AttributeKey PEER_SERVICE = AttributeKey.stringKey("peer.service"); + + static final String OTEL_DROPPED_ATTRIBUTES_COUNT = "otel.dropped_attributes_count"; + + static final String OTEL_DROPPED_EVENTS_COUNT = "otel.dropped_events_count"; + + static final String ERROR_TAG = "error"; + + static List translate(ExportTraceServiceRequest otelSpans) { + List spans = new ArrayList<>(); + List spansList = otelSpans.getResourceSpansList(); + for (ResourceSpans resourceSpans : spansList) { + for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) { + InstrumentationScope scope = scopeSpans.getScope(); + for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { + spans.add(generateSpan(span, scope, resourceSpans.getResource())); + } + } + } + return spans; + } + + /** + * Creates an instance of a Zipkin Span from an OpenTelemetry SpanData instance. + * + * @param spanData an OpenTelemetry spanData instance + * @param scope InstrumentationScope of the span + * @return a new Zipkin Span + */ + private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope scope, Resource resource) { + long startTimestamp = nanoToMills(spanData.getStartTimeUnixNano()); + long endTimestamp = nanoToMills(spanData.getEndTimeUnixNano()); + Map attributesMap = spanData.getAttributesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); + zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder() + .traceId(OtelEncodingUtils.traceIdFromBytes(spanData.getTraceId().toByteArray())) + .id(OtelEncodingUtils.spanIdFromBytes(spanData.getSpanId().toByteArray())) + .kind(toSpanKind(spanData.getKind())) + .name(spanData.getName()) + .timestamp(nanoToMills(spanData.getStartTimeUnixNano())) + .duration(Math.max(1, endTimestamp - startTimestamp)) + .localEndpoint(getLocalEndpoint(attributesMap, resource)) + .remoteEndpoint(getRemoteEndpoint(attributesMap, spanData.getKind())); + ByteString parentSpanId = spanData.getParentSpanId(); + if (!parentSpanId.isEmpty()) { + String parentId = OtelEncodingUtils.spanIdFromBytes(parentSpanId.toByteArray()); + if (!parentId.equals(OtelEncodingUtils.INVALID_SPAN)) { + spanBuilder.parentId(parentId); + } + } + attributesMap.forEach((k, v) -> spanBuilder.putTag(k, ProtoUtils.valueToString(v))); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#dropped-attributes-count + int droppedAttributes = spanData.getAttributesCount() - attributesMap.size(); + if (droppedAttributes > 0) { + spanBuilder.putTag(OTEL_DROPPED_ATTRIBUTES_COUNT, String.valueOf(droppedAttributes)); + } + Status status = spanData.getStatus(); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#status + if (status.getCode() != Status.StatusCode.STATUS_CODE_UNSET) { + String codeValue = status.getCode().toString().replace("STATUS_CODE_", ""); // either OK or ERROR + spanBuilder.putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), codeValue); + // add the error tag, if it isn't already in the source span. + if (status.getCode() == StatusCode.STATUS_CODE_ERROR && !attributesMap.containsKey(ERROR_TAG)) { + spanBuilder.putTag(ERROR_TAG, status.getMessage()); + } + } + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#instrumentationscope + if (!scope.getName().isEmpty()) { + spanBuilder.putTag(OtelAttributes.OTEL_SCOPE_NAME.getKey(), scope.getName()); + } + if (!scope.getVersion().isEmpty()) { + spanBuilder.putTag(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), scope.getVersion()); + } + for (Event eventData : spanData.getEventsList()) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#events + String name = eventData.getName(); + String value = ProtoUtils.kvListToJson(eventData.getAttributesList()); + String annotation = "\"" + name + "\":" + value; + spanBuilder.addAnnotation(nanoToMills(eventData.getTimeUnixNano()), annotation); + } + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#dropped-events-count + int droppedEvents = spanData.getEventsCount() - spanData.getEventsList().size(); + if (droppedEvents > 0) { + spanBuilder.putTag(OTEL_DROPPED_EVENTS_COUNT, String.valueOf(droppedEvents)); + } + return spanBuilder.build(); + } + + private static Endpoint getLocalEndpoint(Map attributesMap, Resource resource) { + AnyValue serviceName = resource.getAttributesList().stream() + .filter(kv -> kv.getKey().equals(ServiceAttributes.SERVICE_NAME.getKey())) + .findFirst() + .map(KeyValue::getValue) + .orElse(null); + if (serviceName != null) { + Endpoint.Builder endpoint = Endpoint.newBuilder().serviceName(serviceName.getStringValue()); + AnyValue networkLocalAddress = attributesMap.get(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey()); + AnyValue networkLocalPort = attributesMap.get(NetworkAttributes.NETWORK_LOCAL_PORT.getKey()); + if (networkLocalAddress != null) { + endpoint.ip(networkLocalAddress.getStringValue()); + } + if (networkLocalPort != null) { + endpoint.port(Long.valueOf(networkLocalPort.getIntValue()).intValue()); + } + // TODO remove the corresponding (duplicated) tags? + return endpoint.build(); + } + return null; + } + + private static Endpoint getRemoteEndpoint(Map attributesMap, SpanKind kind) { + if (kind == SpanKind.SPAN_KIND_CLIENT || kind == SpanKind.SPAN_KIND_PRODUCER) { + AnyValue peerService = attributesMap.get(PEER_SERVICE.getKey()); + AnyValue networkPeerAddress = attributesMap.get(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey()); + String serviceName = null; + // TODO: Implement fallback mechanism? + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#otlp---zipkin + if (peerService != null) { + serviceName = peerService.getStringValue(); + } + else if (networkPeerAddress != null) { + serviceName = networkPeerAddress.getStringValue(); + } + if (serviceName != null) { + Endpoint.Builder endpoint = Endpoint.newBuilder().serviceName(serviceName); + AnyValue networkPeerPort = attributesMap.get(NetworkAttributes.NETWORK_PEER_PORT.getKey()); + if (networkPeerAddress != null) { + endpoint.ip(networkPeerAddress.getStringValue()); + } + if (networkPeerPort != null) { + endpoint.port(Long.valueOf(networkPeerPort.getIntValue()).intValue()); + } + // TODO remove the corresponding (duplicated) tags? + return endpoint.build(); + } + } + return null; + } + + static zipkin2.Span.Kind toSpanKind(Span.SpanKind spanKind) { + switch (spanKind) { + case SPAN_KIND_UNSPECIFIED: + case UNRECOGNIZED: + case SPAN_KIND_INTERNAL: + break; + case SPAN_KIND_SERVER: + return zipkin2.Span.Kind.SERVER; + case SPAN_KIND_CLIENT: + return zipkin2.Span.Kind.CLIENT; + case SPAN_KIND_PRODUCER: + return zipkin2.Span.Kind.PRODUCER; + case SPAN_KIND_CONSUMER: + return zipkin2.Span.Kind.CONSUMER; + default: + return null; + } + return null; + } + + static long nanoToMills(long epochNanos) { + return NANOSECONDS.toMicros(epochNanos); + } + + /** + * Taken from OpenTelemetry codebase. + * https://github.com/open-telemetry/opentelemetry-java/blob/3e8092d086967fa24a0559044651781403033313/api/all/src/main/java/io/opentelemetry/api/internal/OtelEncodingUtils.java + */ + static class OtelEncodingUtils { + + static final String ALPHABET = "0123456789abcdef"; + + static final char[] ENCODING = buildEncodingArray(); + + static final String INVALID_TRACE = "00000000000000000000000000000000"; + + static final int TRACE_BYTES_LENGTH = 16; + + static final int TRACE_HEX_LENGTH = 2 * TRACE_BYTES_LENGTH; + + static final int SPAN_BYTES_LENGTH = 8; + + static final int SPAN_HEX_LENGTH = 2 * SPAN_BYTES_LENGTH; + + static final String INVALID_SPAN = "0000000000000000"; + + private static char[] buildEncodingArray() { + char[] encoding = new char[512]; + for (int i = 0; i < 256; ++i) { + encoding[i] = ALPHABET.charAt(i >>> 4); + encoding[i | 0x100] = ALPHABET.charAt(i & 0xF); + } + return encoding; + } + + /** + * Fills {@code dest} with the hex encoding of {@code bytes}. + */ + public static void bytesToBase16(byte[] bytes, char[] dest, int length) { + for (int i = 0; i < length; i++) { + byteToBase16(bytes[i], dest, i * 2); + } + } + + /** + * Encodes the specified byte, and returns the encoded {@code String}. + * + * @param value the value to be converted. + * @param dest the destination char array. + * @param destOffset the starting offset in the destination char array. + */ + public static void byteToBase16(byte value, char[] dest, int destOffset) { + int b = value & 0xFF; + dest[destOffset] = ENCODING[b]; + dest[destOffset + 1] = ENCODING[b | 0x100]; + } + + /** + * Returns the lowercase hex (base16) representation of the {@code TraceId} converted from the + * given bytes representation, or {@link #INVALID_TRACE} if input is {@code null} or the given + * byte array is too short. + * + *

It converts the first 26 bytes of the given byte array. + * + * @param traceIdBytes the bytes (16-byte array) representation of the {@code TraceId}. + * @return the lowercase hex (base16) representation of the {@code TraceId}. + */ + static String traceIdFromBytes(byte[] traceIdBytes) { + if (traceIdBytes == null || traceIdBytes.length < TRACE_BYTES_LENGTH) { + return INVALID_TRACE; + } + char[] result = TemporaryBuffers.chars(TRACE_HEX_LENGTH); + OtelEncodingUtils.bytesToBase16(traceIdBytes, result, TRACE_BYTES_LENGTH); + return new String(result, 0, TRACE_HEX_LENGTH); + } + + static String spanIdFromBytes(byte[] spanIdBytes) { + if (spanIdBytes == null || spanIdBytes.length < SPAN_BYTES_LENGTH) { + return INVALID_SPAN; + } + char[] result = TemporaryBuffers.chars(SPAN_HEX_LENGTH); + OtelEncodingUtils.bytesToBase16(spanIdBytes, result, SPAN_BYTES_LENGTH); + return new String(result, 0, SPAN_HEX_LENGTH); + } + + static final class TemporaryBuffers { + + private static final ThreadLocal CHAR_ARRAY = new ThreadLocal<>(); + + /** + * A {@link ThreadLocal} {@code char[]} of size {@code len}. Take care when using a large + * value of {@code len} as this buffer will remain for the lifetime of the thread. The + * returned buffer will not be zeroed and may be larger than the requested size, you must make + * sure to fill the entire content to the desired value and set the length explicitly when + * converting to a {@link String}. + */ + public static char[] chars(int len) { + char[] buffer = CHAR_ARRAY.get(); + if (buffer == null || buffer.length < len) { + buffer = new char[len]; + CHAR_ARRAY.set(buffer); + } + return buffer; + } + + private TemporaryBuffers() { + } + } + } + +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index c871b45..65beeef 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -5,21 +5,80 @@ package zipkin2.collector.otel.http; import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import io.opentelemetry.semconv.ServiceAttributes; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import zipkin2.collector.CollectorComponent; import zipkin2.collector.CollectorSampler; import zipkin2.collector.InMemoryCollectorMetrics; import zipkin2.storage.InMemoryStorage; +import static io.opentelemetry.sdk.trace.samplers.Sampler.alwaysOn; +import static org.assertj.core.api.Assertions.assertThat; + @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ITOpenTelemetryHttpCollector { InMemoryStorage store; + InMemoryCollectorMetrics metrics; - CollectorComponent collector; - @BeforeEach public void setup() { + OpenTelemetryHttpCollector collector; + + int port = getFreePort(); + + SpanExporter spanExporter = OtlpHttpSpanExporter.builder() + .setCompression("gzip") + .setEndpoint("http://localhost:" + port + "/v1/traces") + .build(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(alwaysOn()) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .addResource(Resource.create(Attributes.of(ServiceAttributes.SERVICE_NAME, "zipkin-collector-otel-http-test"))) + .build(); + + OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .build(); + + Tracer tracer = openTelemetrySdk.getTracerProvider() + .get("io.zipkin.contrib.otel:zipkin-collector-otel-http", "0.0.1"); + + Server server; + + @BeforeEach + public void setup() { store = InMemoryStorage.newBuilder().build(); metrics = new InMemoryCollectorMetrics(); @@ -29,13 +88,288 @@ class ITOpenTelemetryHttpCollector { .storage(store) .build() .start(); + ServerBuilder serverBuilder = Server.builder().http(port); + collector.reconfigure(serverBuilder); metrics = metrics.forTransport("otel/http"); + server = serverBuilder.build(); + server.start().join(); } - @AfterEach void teardown() throws IOException { + @AfterEach + void teardown() throws IOException { store.close(); collector.close(); + server.stop().join(); } - // TODO: integration test + @Test + void testServerKind() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("get") + .setSpanKind(SpanKind.SERVER) + .setAttribute("string", "foo" + i) + .setAttribute("int", 100) + .setAttribute("double", 10.5) + .setAttribute("boolean", true) + .setAttribute(AttributeKey.stringArrayKey("array"), Arrays.asList("a", "b", "c")) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + Thread.sleep(100); // do something + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("get"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(9); + assertThat(span.tags()).containsEntry("string", "foo" + i); + assertThat(span.tags()).containsEntry("int", "100"); + assertThat(span.tags()).containsEntry("double", "10.5"); + assertThat(span.tags()).containsEntry("boolean", "true"); + assertThat(span.tags()).containsEntry("array", "[\"a\",\"b\",\"c\"]"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isEmpty(); + } + } + + @Test + void testServerKindWithEvents() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + Instant eventTime1 = Instant.now(); + Instant eventTime2 = eventTime1.plusMillis(10); + Instant eventTime3 = eventTime1.plusMillis(100); + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("do-something") + .setSpanKind(SpanKind.SERVER) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + span.addEvent("event-1", Attributes.builder().put("foo", "bar").put("i", i).build(), eventTime1.plusMillis(size)); + span.addEvent("event-2", eventTime2.plusMillis(size)); + Thread.sleep(100); // do something + span.addEvent("event-3", eventTime3.plusMillis(size)); + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("do-something"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(4); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isNotNull(); + assertThat(span.annotations()).hasSize(3); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#events + assertThat(span.annotations().get(0).value()).isEqualTo("\"event-1\":{\"foo\":\"bar\",\"i\":" + i + "}"); + assertThat(span.annotations().get(0).timestamp()).isEqualTo(toMillis(eventTime1.plusMillis(size))); + assertThat(span.annotations().get(1).value()).isEqualTo("\"event-2\":{}"); + assertThat(span.annotations().get(1).timestamp()).isEqualTo(toMillis(eventTime2.plusMillis(size))); + assertThat(span.annotations().get(2).value()).isEqualTo("\"event-3\":{}"); + assertThat(span.annotations().get(2).timestamp()).isEqualTo(toMillis(eventTime3.plusMillis(size))); + } + } + + @Test + void testServerKindWithError() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("do-something") + .setSpanKind(SpanKind.SERVER) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + Thread.sleep(100); // do something + span.setStatus(StatusCode.ERROR, "Exception!!"); + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("do-something"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(6); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.tags()).containsEntry(SpanTranslator.ERROR_TAG, "Exception!!"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isEmpty(); + } + } + + @Test + void testClientKind() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("send") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("string", "foo" + i) + .setAttribute("int", 100) + .setAttribute("double", 10.5) + .setAttribute("boolean", true) + .setAttribute(AttributeKey.stringArrayKey("array"), Arrays.asList("a", "b", "c")) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .setAttribute(SpanTranslator.PEER_SERVICE, "demo") + .setAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS, "1.2.3.4") + .setAttribute(NetworkAttributes.NETWORK_PEER_PORT, 8080L) + .startSpan(); + Thread.sleep(100); // do something + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("send"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.CLIENT); + assertThat(span.tags()).hasSize(12); + assertThat(span.tags()).containsEntry("string", "foo" + i); + assertThat(span.tags()).containsEntry("int", "100"); + assertThat(span.tags()).containsEntry("double", "10.5"); + assertThat(span.tags()).containsEntry("boolean", "true"); + assertThat(span.tags()).containsEntry("array", "[\"a\",\"b\",\"c\"]"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(SpanTranslator.PEER_SERVICE.getKey(), "demo"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "1.2.3.4"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "8080"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isEqualTo("demo"); + assertThat(span.remoteEndpoint().ipv4()).isEqualTo("1.2.3.4"); + assertThat(span.remoteEndpoint().port()).isEqualTo(8080); + assertThat(span.annotations()).isEmpty(); + } + } + + @Test + void emptyRequest() throws Exception { + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(new byte[0]); // empty + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + } + + @Test + void brokenRequest() throws Exception { + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(0x00); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_UNAVAILABLE); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + } + + static long toMillis(Instant instant) { + long time = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()); + time += instant.getNano(); + return SpanTranslator.nanoToMills(time); + } + + static int getFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to find a free port", e); + } + } } diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java new file mode 100644 index 0000000..01d558f --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.Arrays; + +import com.google.protobuf.ByteString; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin2.collector.otel.http.ProtoUtils.kvListToJson; +import static zipkin2.collector.otel.http.ProtoUtils.valueToJson; + +class ProtoUtilsTest { + + @Test + void testValueToJson() { + assertThat(valueToJson(AnyValue.newBuilder().setStringValue("string").build())).isEqualTo("\"string\""); + assertThat(valueToJson(AnyValue.newBuilder().setIntValue(100).build())).isEqualTo("100"); + assertThat(valueToJson(AnyValue.newBuilder().setBoolValue(true).build())).isEqualTo("true"); + assertThat(valueToJson(AnyValue.newBuilder().setDoubleValue(1.2).build())).isEqualTo("1.2"); + assertThat(valueToJson(AnyValue.newBuilder().setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("abc")) + .addValues(AnyValue.newBuilder().setIntValue(20)) + .addValues(AnyValue.newBuilder().setBoolValue(false))) + .build())) + .isEqualTo("[\"abc\",20,false]"); + assertThat(valueToJson(AnyValue.newBuilder().setKvlistValue(KeyValueList.newBuilder() + .addValues(KeyValue.newBuilder().setKey("x").setValue(AnyValue.newBuilder().setStringValue("abc"))) + .addValues(KeyValue.newBuilder().setKey("y").setValue(AnyValue.newBuilder().setStringValue("efg"))) + .addValues(KeyValue.newBuilder().setKey("z").setValue(AnyValue.newBuilder().setIntValue(0))) + .build()).build())) + .isEqualTo("{\"x\":\"abc\",\"y\":\"efg\",\"z\":0}"); + assertThat(valueToJson(AnyValue.newBuilder().setBytesValue(ByteString.fromHex("cafebabe")).build())) + .isEqualTo("\\312\\376\\272\\276"); + assertThat(valueToJson(AnyValue.newBuilder().build())).isEqualTo(""); + } + + @Test + void testKvListToJson() { + assertThat(kvListToJson(Arrays.asList(KeyValue.newBuilder().setKey("string").setValue(AnyValue.newBuilder().setStringValue("s")).build(), + KeyValue.newBuilder().setKey("int").setValue(AnyValue.newBuilder().setIntValue(100)).build(), + KeyValue.newBuilder().setKey("boolean").setValue(AnyValue.newBuilder().setBoolValue(true)).build(), + KeyValue.newBuilder().setKey("double").setValue(AnyValue.newBuilder().setDoubleValue(1.2)).build(), + KeyValue.newBuilder().setKey("array").setValue(AnyValue.newBuilder().setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("abc")) + .addValues(AnyValue.newBuilder().setIntValue(20)) + .addValues(AnyValue.newBuilder().setBoolValue(false)))).build(), + KeyValue.newBuilder().setKey("kvlist").setValue(AnyValue.newBuilder().setKvlistValue(KeyValueList.newBuilder() + .addValues(KeyValue.newBuilder().setKey("x").setValue(AnyValue.newBuilder().setStringValue("abc"))) + .addValues(KeyValue.newBuilder().setKey("y").setValue(AnyValue.newBuilder().setStringValue("efg"))) + .addValues(KeyValue.newBuilder().setKey("z").setValue(AnyValue.newBuilder().setIntValue(0))) + .build())).build()))) + .isEqualTo("{\"string\":\"s\",\"int\":100,\"boolean\":true,\"double\":1.2,\"array\":[\"abc\",20,false],\"kvlist\":{\"x\":\"abc\",\"y\":\"efg\",\"z\":0}}"); + } + +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java new file mode 100644 index 0000000..db68bbc --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java @@ -0,0 +1,412 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import com.google.protobuf.ByteString; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.Span.SpanKind; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import zipkin2.Endpoint; +import zipkin2.Span; + +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin2.collector.otel.http.ZipkinTestUtil.attribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.longAttribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilder; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithResourceCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithScopeCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithSpanCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.stringAttribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.zipkinSpanBuilder; + +/* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java */ +class SpanTranslatorTest { + + @Test + void translate_remoteParent() { + ExportTraceServiceRequest data = requestBuilder().build(); + Span expected = zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + @Test + void translate_invalidParent() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setParentSpanId(ByteString.fromHex("0000000000000000"))) + .build(); + Span expected = zipkinSpanBuilder(Span.Kind.SERVER) + .parentId(0) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + @Test + void translate_subMicroDurations() { + ExportTraceServiceRequest data = + ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setStartTimeUnixNano(1505855794_194009601L) + .setEndTimeUnixNano(1505855794_194009999L)) + .build(); + Span expected = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .duration(1) + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + @Test + void translate_ServerKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ClientKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_InternalKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_INTERNAL)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(null) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ConsumeKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CONSUMER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.CONSUMER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ProducerKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_PRODUCER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.PRODUCER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ResourceServiceNameMapping() { + ExportTraceServiceRequest data = requestBuilderWithResourceCustomizer(resource -> resource + .clearAttributes() + .addAttributes(stringAttribute("service.name", "super-zipkin-service"))) + .build(); + Endpoint expectedLocalEndpoint = Endpoint.newBuilder() + .serviceName("super-zipkin-service") + .ip("1.2.3.4") + .build(); + Span expectedZipkinSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .localEndpoint(expectedLocalEndpoint) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expectedZipkinSpan); + } + + @Test + void translate_noServiceName() { + ExportTraceServiceRequest data = requestBuilderWithResourceCustomizer(Resource.Builder::clearAttributes) + .build(); + Span expectedZipkinSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .localEndpoint(null) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedZipkinSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMapping(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("remote-test-service") + .ip("8.8.8.8") + .port(42) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_SERVER", "SPAN_KIND_CONSUMER", "SPAN_KIND_INTERNAL", "SPAN_KIND_UNSPECIFIED"}) + void translate_RemoteEndpointMappingWhenKindIsNotClientOrProducer(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(null) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenServiceNameAndPeerAddressAreMissing(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(null) + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenServiceNameIsMissingButPeerAddressExists(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("8.8.8.8") + .ip("8.8.8.8") + .port(42) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenPortIsMissing(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8"))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("remote-test-service") + .ip("8.8.8.8") + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithAttributes() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT) + .addAttributes(stringAttribute("string", "string value")) + .addAttributes(attribute("boolean", av -> av.setBoolValue(false))) + .addAttributes(longAttribute("long", 9999L)) + .addAttributes(attribute("double", av -> av.setDoubleValue(222.333))) + .addAttributes(attribute("booleanArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setBoolValue(true)) + .addValues(AnyValue.newBuilder().setBoolValue(false))))) + .addAttributes(attribute("stringArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("Hello"))))) + .addAttributes(attribute("doubleArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setDoubleValue(32.33)) + .addValues(AnyValue.newBuilder().setDoubleValue(-98.3))))) + .addAttributes(attribute("longArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setIntValue(32L)) + .addValues(AnyValue.newBuilder().setIntValue(999L)))))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag("string", "string value") + .putTag("boolean", "false") + .putTag("long", "9999") + .putTag("double", "222.333") + .putTag("booleanArray", "[true,false]") + .putTag("stringArray", "[\"Hello\"]") + .putTag("doubleArray", "[32.33,-98.3]") + .putTag("longArray", "[32,999]") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithInstrumentationLibraryInfo() { + ExportTraceServiceRequest data = requestBuilderWithScopeCustomizer(scope -> scope + .setName("io.opentelemetry.auto") + .setVersion("1.0.0")) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.opentelemetry.auto") + .putTag(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "1.0.0") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_AlreadyHasHttpStatusInfo() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT) + .addAttributes(longAttribute("http.response.status.code", 404)) + .addAttributes(stringAttribute("error", "A user provided error")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag("http.response.status.code", "404") + .putTag("error", "A user provided error") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcTimeoutErrorStatus_WithTimeoutErrorDescription() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).setMessage("timeout").build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .putTag("error", "timeout") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcErrorStatus_WithEmptyErrorDescription() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).setMessage("").build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .putTag("error", "") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcUnsetStatus() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_UNSET).build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java new file mode 100644 index 0000000..32bb3f5 --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.protobuf.ByteString; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.semconv.NetworkAttributes; +import zipkin2.Endpoint; +import zipkin2.Span; + +import static io.opentelemetry.proto.trace.v1.Span.Event; +import static io.opentelemetry.proto.trace.v1.Span.SpanKind; + +/* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/ZipkinTestUtil.java */ +class ZipkinTestUtil { + + static final String TRACE_ID = "d239036e7d5cec116b562147388b35bf"; + + static final String SPAN_ID = "9cc1e3049173be09"; + + static final String PARENT_SPAN_ID = "8b03ab423da481c5"; + + private static final Attributes attributes = Attributes.empty(); + + private static final List annotations = + Collections.unmodifiableList( + Arrays.asList( + EventData.create(1505855799_433901068L, "RECEIVED", Attributes.empty()), + EventData.create(1505855799_459486280L, "SENT", Attributes.empty()))); + + private ZipkinTestUtil() { + } + + static Span.Builder zipkinSpanBuilder(Span.Kind kind) { + return Span.newBuilder() + .traceId(TRACE_ID) + .parentId(PARENT_SPAN_ID) + .id(SPAN_ID) + .kind(kind) + .name("Recv.helloworld.Greeter.SayHello") + .timestamp(1505855794000000L + 194009601L / 1000) + .duration((1505855799000000L + 465726528L / 1000) - (1505855794000000L + 194009601L / 1000)) + .localEndpoint(Endpoint.newBuilder().ip("1.2.3.4").serviceName("tweetiebird").build()) + .putTag(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "1.2.3.4") + .addAnnotation(1505855799000000L + 433901068L / 1000, "\"RECEIVED\":{}") + .addAnnotation(1505855799000000L + 459486280L / 1000, "\"SENT\":{}"); + } + + static ExportTraceServiceRequest.Builder requestBuilder( + Function resourceCustomizer, + Function scopeCustomizer, + Function spanCustomizer) { + return ExportTraceServiceRequest.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .setResource(resourceCustomizer.apply(Resource.newBuilder() + .addAttributes(stringAttribute("service.name", "tweetiebird")))) + .addScopeSpans(ScopeSpans.newBuilder() + .setScope(scopeCustomizer.apply(InstrumentationScope.newBuilder()).build()) + .addSpans(spanCustomizer.apply(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex(SPAN_ID)) + .setTraceId(ByteString.fromHex(TRACE_ID)) + .setParentSpanId(ByteString.fromHex(PARENT_SPAN_ID)) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_OK)) + .setKind(SpanKind.SPAN_KIND_SERVER) + .setName("Recv.helloworld.Greeter.SayHello") + .setStartTimeUnixNano(1505855794_194009601L) + .setEndTimeUnixNano(1505855799_465726528L) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "1.2.3.4")) + .addEvents(Event.newBuilder() + .setName("RECEIVED").setTimeUnixNano(1505855799_433901068L)) + .addEvents(Event.newBuilder() + .setName("SENT").setTimeUnixNano(1505855799_459486280L)) + )))); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithResourceCustomizer(Function resourceCustomizer) { + return requestBuilder(resourceCustomizer, Function.identity(), Function.identity()); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithScopeCustomizer(Function scopeCustomizer) { + return requestBuilder(Function.identity(), scopeCustomizer, Function.identity()); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithSpanCustomizer(Function spanCustomizer) { + return requestBuilder(Function.identity(), Function.identity(), spanCustomizer); + } + + static ExportTraceServiceRequest.Builder requestBuilder() { + return requestBuilder(Function.identity(), Function.identity(), Function.identity()); + } + + static KeyValue stringAttribute(String key, String value) { + return attribute(key, av -> av.setStringValue(value)); + } + + static KeyValue longAttribute(String key, long value) { + return attribute(key, av -> av.setIntValue(value)); + } + + static KeyValue attribute(String key, Function builder) { + return KeyValue.newBuilder() + .setKey(key) + .setValue(builder.apply(AnyValue.newBuilder())) + .build(); + } +} diff --git a/module/pom.xml b/module/pom.xml index 00f49d4..4506a3a 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -25,7 +25,7 @@ ${project.groupId} - zipkin-collector-otel-http + collector-http ${project.version} From b5015c0b9ada6d563e18e195839917e1f04a24fe Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Thu, 29 Aug 2024 16:19:35 +0900 Subject: [PATCH 02/14] Remove unused opentelemetry-sdk-common --- collector-http/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/collector-http/pom.xml b/collector-http/pom.xml index d6f495c..a1fdcb8 100644 --- a/collector-http/pom.xml +++ b/collector-http/pom.xml @@ -42,11 +42,6 @@ opentelemetry-api ${opentelemetry.version} - - io.opentelemetry - opentelemetry-sdk-common - ${opentelemetry.version} - io.opentelemetry.proto opentelemetry-proto From 6f2bec29ebf0ae563a7aa8a06f59028c64c34a0c Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 30 Aug 2024 09:59:00 +0900 Subject: [PATCH 03/14] Update collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java Co-authored-by: minux --- .../zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index d032f53..32f3e41 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -136,7 +136,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { } catch (IOException e) { LOG.log(Level.WARNING, "Unable to parse the request:", e); - throw new UncheckedIOException(e); + result.onError(new UncheckedIOException(e)); } return null; } From 88b397720e9a27096636ba2a54fb19e199fe3a26 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 30 Aug 2024 09:59:07 +0900 Subject: [PATCH 04/14] Update collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java Co-authored-by: minux --- .../zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index 32f3e41..cec8295 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -130,7 +130,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { } try { - ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(CodedInputStream.newInstance(content.byteBuf().nioBuffer())); + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput()); List spans = SpanTranslator.translate(request); collector.collector.accept(spans, result); } From 88a5f959d56493f670672d984082b8aa6377eb75 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 30 Aug 2024 09:59:18 +0900 Subject: [PATCH 05/14] Update collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java Co-authored-by: minux --- .../zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index cec8295..da2aab8 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -117,7 +117,7 @@ static final class HttpService extends AbstractHttpService { @Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { CompletableCallback result = new CompletableCallback(); - req.aggregate(AggregationOptions.usePooledObjects(ByteBufAllocator.DEFAULT, ctx.eventLoop() + req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop() )).handle((msg, t) -> { if (t != null) { result.onError(t); From 9ac6f61a79d9df041c5d28f2f7a3688165ccf47a Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 30 Aug 2024 10:00:36 +0900 Subject: [PATCH 06/14] Fix import --- .../collector/otel/http/OpenTelemetryHttpCollector.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index da2aab8..3f0e2b9 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -11,7 +11,7 @@ import java.util.logging.Level; import java.util.logging.Logger; -import com.google.protobuf.CodedInputStream; +import com.google.protobuf.UnsafeByteOperations; import com.linecorp.armeria.common.AggregationOptions; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; @@ -24,7 +24,6 @@ import com.linecorp.armeria.server.ServerConfigurator; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.encoding.DecodingService; -import io.netty.buffer.ByteBufAllocator; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import zipkin2.Callback; import zipkin2.Span; From e94d942ad44367b8e6c286468cf0e6f3bb64e252 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 30 Aug 2024 10:06:06 +0900 Subject: [PATCH 07/14] Fix test --- .../collector/otel/http/ITOpenTelemetryHttpCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index 65beeef..6115939 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -353,7 +353,7 @@ void brokenRequest() throws Exception { } connection.disconnect(); int responseCode = connection.getResponseCode(); - assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_UNAVAILABLE); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); Awaitility.waitAtMost(Duration.ofMillis(200)) .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); } From ce708bc0aeb35453f8fbebde0541b30db4ef8eaa Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Mon, 2 Sep 2024 11:44:47 +0900 Subject: [PATCH 08/14] Use long to set trace/span id and remove OtelEncodingUtils --- .../collector/otel/http/SpanTranslator.java | 130 +++--------------- 1 file changed, 21 insertions(+), 109 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java index 40fb33a..188c1f2 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -4,6 +4,7 @@ */ package zipkin2.collector.otel.http; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -45,6 +46,8 @@ final class SpanTranslator { static final String ERROR_TAG = "error"; + static final String INVALID_TRACE = "00000000000000000000000000000000"; + static List translate(ExportTraceServiceRequest otelSpans) { List spans = new ArrayList<>(); List spansList = otelSpans.getResourceSpansList(); @@ -70,9 +73,18 @@ private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope sco long startTimestamp = nanoToMills(spanData.getStartTimeUnixNano()); long endTimestamp = nanoToMills(spanData.getEndTimeUnixNano()); Map attributesMap = spanData.getAttributesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); - zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder() - .traceId(OtelEncodingUtils.traceIdFromBytes(spanData.getTraceId().toByteArray())) - .id(OtelEncodingUtils.spanIdFromBytes(spanData.getSpanId().toByteArray())) + zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder(); + byte[] traceIdBytes = spanData.getTraceId().toByteArray(); + long high = bytesToLong(traceIdBytes, 0); + if (high == 0) { + spanBuilder.traceId(INVALID_TRACE); + } + else { + long low = bytesToLong(traceIdBytes, 8); + spanBuilder.traceId(high, low); + } + spanBuilder + .id(bytesToLong(spanData.getSpanId().toByteArray(), 0)) .kind(toSpanKind(spanData.getKind())) .name(spanData.getName()) .timestamp(nanoToMills(spanData.getStartTimeUnixNano())) @@ -81,8 +93,8 @@ private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope sco .remoteEndpoint(getRemoteEndpoint(attributesMap, spanData.getKind())); ByteString parentSpanId = spanData.getParentSpanId(); if (!parentSpanId.isEmpty()) { - String parentId = OtelEncodingUtils.spanIdFromBytes(parentSpanId.toByteArray()); - if (!parentId.equals(OtelEncodingUtils.INVALID_SPAN)) { + long parentId = bytesToLong(parentSpanId.toByteArray(), 0); + if (parentId != 0) { spanBuilder.parentId(parentId); } } @@ -199,110 +211,10 @@ static long nanoToMills(long epochNanos) { return NANOSECONDS.toMicros(epochNanos); } - /** - * Taken from OpenTelemetry codebase. - * https://github.com/open-telemetry/opentelemetry-java/blob/3e8092d086967fa24a0559044651781403033313/api/all/src/main/java/io/opentelemetry/api/internal/OtelEncodingUtils.java - */ - static class OtelEncodingUtils { - - static final String ALPHABET = "0123456789abcdef"; - - static final char[] ENCODING = buildEncodingArray(); - - static final String INVALID_TRACE = "00000000000000000000000000000000"; - - static final int TRACE_BYTES_LENGTH = 16; - - static final int TRACE_HEX_LENGTH = 2 * TRACE_BYTES_LENGTH; - - static final int SPAN_BYTES_LENGTH = 8; - - static final int SPAN_HEX_LENGTH = 2 * SPAN_BYTES_LENGTH; - - static final String INVALID_SPAN = "0000000000000000"; - - private static char[] buildEncodingArray() { - char[] encoding = new char[512]; - for (int i = 0; i < 256; ++i) { - encoding[i] = ALPHABET.charAt(i >>> 4); - encoding[i | 0x100] = ALPHABET.charAt(i & 0xF); - } - return encoding; - } - - /** - * Fills {@code dest} with the hex encoding of {@code bytes}. - */ - public static void bytesToBase16(byte[] bytes, char[] dest, int length) { - for (int i = 0; i < length; i++) { - byteToBase16(bytes[i], dest, i * 2); - } - } - - /** - * Encodes the specified byte, and returns the encoded {@code String}. - * - * @param value the value to be converted. - * @param dest the destination char array. - * @param destOffset the starting offset in the destination char array. - */ - public static void byteToBase16(byte value, char[] dest, int destOffset) { - int b = value & 0xFF; - dest[destOffset] = ENCODING[b]; - dest[destOffset + 1] = ENCODING[b | 0x100]; - } - - /** - * Returns the lowercase hex (base16) representation of the {@code TraceId} converted from the - * given bytes representation, or {@link #INVALID_TRACE} if input is {@code null} or the given - * byte array is too short. - * - *

It converts the first 26 bytes of the given byte array. - * - * @param traceIdBytes the bytes (16-byte array) representation of the {@code TraceId}. - * @return the lowercase hex (base16) representation of the {@code TraceId}. - */ - static String traceIdFromBytes(byte[] traceIdBytes) { - if (traceIdBytes == null || traceIdBytes.length < TRACE_BYTES_LENGTH) { - return INVALID_TRACE; - } - char[] result = TemporaryBuffers.chars(TRACE_HEX_LENGTH); - OtelEncodingUtils.bytesToBase16(traceIdBytes, result, TRACE_BYTES_LENGTH); - return new String(result, 0, TRACE_HEX_LENGTH); - } - - static String spanIdFromBytes(byte[] spanIdBytes) { - if (spanIdBytes == null || spanIdBytes.length < SPAN_BYTES_LENGTH) { - return INVALID_SPAN; - } - char[] result = TemporaryBuffers.chars(SPAN_HEX_LENGTH); - OtelEncodingUtils.bytesToBase16(spanIdBytes, result, SPAN_BYTES_LENGTH); - return new String(result, 0, SPAN_HEX_LENGTH); - } - - static final class TemporaryBuffers { - - private static final ThreadLocal CHAR_ARRAY = new ThreadLocal<>(); - - /** - * A {@link ThreadLocal} {@code char[]} of size {@code len}. Take care when using a large - * value of {@code len} as this buffer will remain for the lifetime of the thread. The - * returned buffer will not be zeroed and may be larger than the requested size, you must make - * sure to fill the entire content to the desired value and set the length explicitly when - * converting to a {@link String}. - */ - public static char[] chars(int len) { - char[] buffer = CHAR_ARRAY.get(); - if (buffer == null || buffer.length < len) { - buffer = new char[len]; - CHAR_ARRAY.set(buffer); - } - return buffer; - } - - private TemporaryBuffers() { - } + static long bytesToLong(byte[] bytes, int offset) { + if (bytes == null || bytes.length < offset + 8) { + return 0; } + return ByteBuffer.wrap(bytes, offset, 8).getLong(); } - } From b16231134eaa672e14436a49f95f939043e08261 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Mon, 2 Sep 2024 11:46:03 +0900 Subject: [PATCH 09/14] Unwrap UncheckedIOException --- .../collector/otel/http/OpenTelemetryHttpCollector.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index 3f0e2b9..80c76c0 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -5,7 +5,6 @@ package zipkin2.collector.otel.http; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.logging.Level; @@ -135,7 +134,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { } catch (IOException e) { LOG.log(Level.WARNING, "Unable to parse the request:", e); - result.onError(new UncheckedIOException(e)); + result.onError(e); } return null; } From 420507baaf023126a889d6ac142dff18256b7491 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Mon, 2 Sep 2024 12:19:54 +0900 Subject: [PATCH 10/14] Use comma separator instead of json to encode attributes --- .../java/zipkin2/collector/otel/http/ProtoUtils.java | 11 ++++++++++- .../otel/http/ITOpenTelemetryHttpCollector.java | 4 ++-- .../collector/otel/http/SpanTranslatorTest.java | 8 ++++---- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java index 8754961..ae40bba 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java @@ -23,6 +23,16 @@ static String valueToString(AnyValue value) { if (value.hasStringValue()) { return value.getStringValue(); } + if (value.hasArrayValue()) { + // While https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute says + // that an array should be encoded as a json, + // the Otel Zipkin Exporter doesn't implement like that https://github.com/open-telemetry/opentelemetry-java/blob/main/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java#L382-L385 + // Also Brave doesn't use the json encoding. + // So follow the comma separator here. + return value.getArrayValue().getValuesList().stream() + .map(ProtoUtils::valueToString) + .collect(joining(",")); + } return valueToJson(value); } @@ -31,7 +41,6 @@ static String valueToJson(AnyValue value) { return "\"" + value.getStringValue() + "\""; } if (value.hasArrayValue()) { - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute return value.getArrayValue().getValuesList().stream() .map(ProtoUtils::valueToJson) .collect(joining(",", "[", "]")); diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index 6115939..bf03139 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -141,7 +141,7 @@ void testServerKind() throws Exception { assertThat(span.tags()).containsEntry("int", "100"); assertThat(span.tags()).containsEntry("double", "10.5"); assertThat(span.tags()).containsEntry("boolean", "true"); - assertThat(span.tags()).containsEntry("array", "[\"a\",\"b\",\"c\"]"); + assertThat(span.tags()).containsEntry("array", "a,b,c"); assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); @@ -303,7 +303,7 @@ void testClientKind() throws Exception { assertThat(span.tags()).containsEntry("int", "100"); assertThat(span.tags()).containsEntry("double", "10.5"); assertThat(span.tags()).containsEntry("boolean", "true"); - assertThat(span.tags()).containsEntry("array", "[\"a\",\"b\",\"c\"]"); + assertThat(span.tags()).containsEntry("array", "a,b,c"); assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); assertThat(span.tags()).containsEntry(SpanTranslator.PEER_SERVICE.getKey(), "demo"); diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java index db68bbc..174b10c 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java @@ -306,10 +306,10 @@ void translate_WithAttributes() { .putTag("boolean", "false") .putTag("long", "9999") .putTag("double", "222.333") - .putTag("booleanArray", "[true,false]") - .putTag("stringArray", "[\"Hello\"]") - .putTag("doubleArray", "[32.33,-98.3]") - .putTag("longArray", "[32,999]") + .putTag("booleanArray", "true,false") + .putTag("stringArray", "Hello") + .putTag("doubleArray", "32.33,-98.3") + .putTag("longArray", "32,999") .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); From 3dbd79a3525f828213d27a68661758e46001e8a8 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Mon, 2 Sep 2024 12:40:42 +0900 Subject: [PATCH 11/14] Makes no sense to set an id with zeros, as this will throw an IllegalArgumentsException. --- .../java/zipkin2/collector/otel/http/SpanTranslator.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java index 188c1f2..836ab13 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -76,14 +76,9 @@ private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope sco zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder(); byte[] traceIdBytes = spanData.getTraceId().toByteArray(); long high = bytesToLong(traceIdBytes, 0); - if (high == 0) { - spanBuilder.traceId(INVALID_TRACE); - } - else { - long low = bytesToLong(traceIdBytes, 8); - spanBuilder.traceId(high, low); - } + long low = bytesToLong(traceIdBytes, 8); spanBuilder + .traceId(high, low) .id(bytesToLong(spanData.getSpanId().toByteArray(), 0)) .kind(toSpanKind(spanData.getKind())) .name(spanData.getName()) From 9f6780653ac918260790c630ad6566f68aba69f7 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Mon, 2 Sep 2024 12:42:09 +0900 Subject: [PATCH 12/14] Remove unused INVALID_TRACE --- .../main/java/zipkin2/collector/otel/http/SpanTranslator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java index 836ab13..2386f8b 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -46,8 +46,6 @@ final class SpanTranslator { static final String ERROR_TAG = "error"; - static final String INVALID_TRACE = "00000000000000000000000000000000"; - static List translate(ExportTraceServiceRequest otelSpans) { List spans = new ArrayList<>(); List spansList = otelSpans.getResourceSpansList(); From c869d693ecb37e91bf605bc5546f215379a9a254 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Thu, 12 Sep 2024 11:46:21 +0900 Subject: [PATCH 13/14] Increment spanDropped if the received span is invalid --- .../otel/http/OpenTelemetryHttpCollector.java | 10 +- .../collector/otel/http/SpanTranslator.java | 17 ++- .../http/ITOpenTelemetryHttpCollector.java | 127 ++++++++++++++++++ .../otel/http/SpanTranslatorTest.java | 92 ++++++++++--- 4 files changed, 221 insertions(+), 25 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index 80c76c0..4b6ee85 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -108,8 +108,11 @@ static final class HttpService extends AbstractHttpService { final OpenTelemetryHttpCollector collector; + final SpanTranslator spanTranslator; + HttpService(OpenTelemetryHttpCollector collector) { this.collector = collector; + this.spanTranslator = new SpanTranslator(collector.metrics); } @Override @@ -118,6 +121,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop() )).handle((msg, t) -> { if (t != null) { + collector.metrics.incrementMessagesDropped(); result.onError(t); return null; } @@ -126,13 +130,15 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { result.onSuccess(null); return null; } - + collector.metrics.incrementBytes(content.length()); try { ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput()); - List spans = SpanTranslator.translate(request); + collector.metrics.incrementMessages(); + List spans = spanTranslator.translate(request); collector.collector.accept(spans, result); } catch (IOException e) { + collector.metrics.incrementMessagesDropped(); LOG.log(Level.WARNING, "Unable to parse the request:", e); result.onError(e); } diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java index 2386f8b..39503f7 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -28,6 +28,7 @@ import io.opentelemetry.semconv.OtelAttributes; import io.opentelemetry.semconv.ServiceAttributes; import zipkin2.Endpoint; +import zipkin2.collector.CollectorMetrics; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -38,6 +39,8 @@ */ final class SpanTranslator { + final CollectorMetrics metrics; + static final AttributeKey PEER_SERVICE = AttributeKey.stringKey("peer.service"); static final String OTEL_DROPPED_ATTRIBUTES_COUNT = "otel.dropped_attributes_count"; @@ -46,14 +49,24 @@ final class SpanTranslator { static final String ERROR_TAG = "error"; - static List translate(ExportTraceServiceRequest otelSpans) { + SpanTranslator(CollectorMetrics metrics) { + this.metrics = metrics; + } + + List translate(ExportTraceServiceRequest otelSpans) { List spans = new ArrayList<>(); List spansList = otelSpans.getResourceSpansList(); for (ResourceSpans resourceSpans : spansList) { for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) { InstrumentationScope scope = scopeSpans.getScope(); for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { - spans.add(generateSpan(span, scope, resourceSpans.getResource())); + try { + spans.add(generateSpan(span, scope, resourceSpans.getResource())); + } + catch (RuntimeException e) { + // If the span is invalid, an exception such as IllegalArgumentException will be thrown. + metrics.incrementSpansDropped(1); + } } } } diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index bf03139..3d3fdb3 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.protobuf.ByteString; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; import io.opentelemetry.api.common.AttributeKey; @@ -27,6 +28,9 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.TracesData; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; @@ -154,6 +158,11 @@ void testServerKind() throws Exception { assertThat(span.remoteEndpoint()).isNull(); assertThat(span.annotations()).isEmpty(); } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes } @Test @@ -212,6 +221,11 @@ void testServerKindWithEvents() throws Exception { assertThat(span.annotations().get(2).value()).isEqualTo("\"event-3\":{}"); assertThat(span.annotations().get(2).timestamp()).isEqualTo(toMillis(eventTime3.plusMillis(size))); } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes } @Test @@ -259,6 +273,11 @@ void testServerKindWithError() throws Exception { assertThat(span.remoteEndpoint()).isNull(); assertThat(span.annotations()).isEmpty(); } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes } @Test @@ -320,6 +339,104 @@ void testClientKind() throws Exception { assertThat(span.remoteEndpoint().port()).isEqualTo(8080); assertThat(span.annotations()).isEmpty(); } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes + } + + @Test + void minimalSpan() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000001")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000001"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(1)); + assertThat(metrics.spans()).isEqualTo(1); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); + } + + @Test + void invalidSpanId() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000000")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000001"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); + } + + @Test + void invalidTraceId() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000001")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000000"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); } @Test @@ -338,6 +455,11 @@ void emptyRequest() throws Exception { assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); Awaitility.waitAtMost(Duration.ofSeconds(5)) .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isZero(); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isZero(); } @Test @@ -356,6 +478,11 @@ void brokenRequest() throws Exception { assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); Awaitility.waitAtMost(Duration.ofMillis(200)) .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isZero(); + assertThat(metrics.messagesDropped()).isEqualTo(1); + assertThat(metrics.bytes()).isEqualTo(1); } static long toMillis(Instant instant) { diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java index 174b10c..98fd244 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java @@ -13,11 +13,13 @@ import io.opentelemetry.proto.trace.v1.Status; import io.opentelemetry.semconv.NetworkAttributes; import io.opentelemetry.semconv.OtelAttributes; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import zipkin2.Endpoint; import zipkin2.Span; +import zipkin2.collector.InMemoryCollectorMetrics; import static org.assertj.core.api.Assertions.assertThat; import static zipkin2.collector.otel.http.ZipkinTestUtil.attribute; @@ -31,6 +33,14 @@ /* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java */ class SpanTranslatorTest { + InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + + SpanTranslator spanTranslator = new SpanTranslator(metrics); + + @BeforeEach + void setup() { + metrics.clear(); + } @Test void translate_remoteParent() { @@ -38,7 +48,8 @@ void translate_remoteParent() { Span expected = zipkinSpanBuilder(Span.Kind.SERVER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)).containsExactly(expected); + assertThat(spanTranslator.translate(data)).containsExactly(expected); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -50,7 +61,27 @@ void translate_invalidParent() { .parentId(0) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)).containsExactly(expected); + assertThat(spanTranslator.translate(data)).containsExactly(expected); + assertThat(metrics.spansDropped()).isZero(); + } + + + @Test + void translate_invalidTraceId() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setTraceId(ByteString.fromHex("00000000000000000000000000000000"))) + .build(); + assertThat(spanTranslator.translate(data)).isEmpty(); + assertThat(metrics.spansDropped()).isEqualTo(1); + } + + @Test + void translate_invalidSpanId() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setSpanId(ByteString.fromHex("0000000000000000"))) + .build(); + assertThat(spanTranslator.translate(data)).isEmpty(); + assertThat(metrics.spansDropped()).isEqualTo(1); } @Test @@ -65,62 +96,68 @@ void translate_subMicroDurations() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .duration(1) .build(); - assertThat(SpanTranslator.translate(data)).containsExactly(expected); + assertThat(spanTranslator.translate(data)).containsExactly(expected); + assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ServerKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_SERVER)).build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.SERVER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); + assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ClientKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_CLIENT)).build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.CLIENT) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); + assertThat(metrics.spansDropped()).isZero(); } @Test void translate_InternalKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_INTERNAL)).build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(null) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); + assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ConsumeKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_CONSUMER)).build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.CONSUMER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); + assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ProducerKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_PRODUCER)).build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.PRODUCER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -138,7 +175,8 @@ void translate_ResourceServiceNameMapping() { .localEndpoint(expectedLocalEndpoint) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)).containsExactly(expectedZipkinSpan); + assertThat(spanTranslator.translate(data)).containsExactly(expectedZipkinSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -150,8 +188,9 @@ void translate_noServiceName() { .localEndpoint(null) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedZipkinSpan); + assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -179,8 +218,9 @@ void translate_RemoteEndpointMapping(SpanKind spanKind) { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -202,8 +242,9 @@ void translate_RemoteEndpointMappingWhenKindIsNotClientOrProducer(SpanKind spanK .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -221,8 +262,9 @@ void translate_RemoteEndpointMappingWhenServiceNameAndPeerAddressAreMissing(Span .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -248,8 +290,9 @@ void translate_RemoteEndpointMappingWhenServiceNameIsMissingButPeerAddressExists .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @@ -275,8 +318,9 @@ void translate_RemoteEndpointMappingWhenPortIsMissing(SpanKind spanKind) { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -313,8 +357,9 @@ void translate_WithAttributes() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -331,8 +376,9 @@ void translate_WithInstrumentationLibraryInfo() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -351,8 +397,9 @@ void translate_AlreadyHasHttpStatusInfo() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -370,8 +417,9 @@ void translate_WithRpcTimeoutErrorStatus_WithTimeoutErrorDescription() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -389,8 +437,9 @@ void translate_WithRpcErrorStatus_WithEmptyErrorDescription() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } @Test @@ -406,7 +455,8 @@ void translate_WithRpcUnsetStatus() { .putTag("rpc.service", "my service name") .build(); - assertThat(SpanTranslator.translate(data)) + assertThat(spanTranslator.translate(data)) .containsExactly(expectedSpan); + assertThat(metrics.spansDropped()).isZero(); } } From 5195c3031d86c05b8ba39fabe92f00e1821e74a6 Mon Sep 17 00:00:00 2001 From: Toshiaki Maki Date: Fri, 13 Sep 2024 20:59:12 +0900 Subject: [PATCH 14/14] Handling invalid span exceptions outside the Translator --- .../otel/http/OpenTelemetryHttpCollector.java | 19 +++-- .../collector/otel/http/SpanTranslator.java | 14 +--- .../http/ITOpenTelemetryHttpCollector.java | 4 +- .../otel/http/SpanTranslatorTest.java | 82 ++++++------------- 4 files changed, 43 insertions(+), 76 deletions(-) diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index 4b6ee85..485c136 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -24,6 +24,7 @@ import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.encoding.DecodingService; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.trace.v1.ScopeSpans; import zipkin2.Callback; import zipkin2.Span; import zipkin2.collector.Collector; @@ -108,11 +109,8 @@ static final class HttpService extends AbstractHttpService { final OpenTelemetryHttpCollector collector; - final SpanTranslator spanTranslator; - HttpService(OpenTelemetryHttpCollector collector) { this.collector = collector; - this.spanTranslator = new SpanTranslator(collector.metrics); } @Override @@ -134,8 +132,19 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { try { ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput()); collector.metrics.incrementMessages(); - List spans = spanTranslator.translate(request); - collector.collector.accept(spans, result); + try { + List spans = SpanTranslator.translate(request); + collector.collector.accept(spans, result); + } + catch (RuntimeException e) { + // If the span is invalid, an exception such as IllegalArgumentException will be thrown. + int spanSize = request.getResourceSpansList().stream() + .flatMap(rs -> rs.getScopeSpansList().stream()) + .mapToInt(ScopeSpans::getSpansCount).sum(); + collector.metrics.incrementSpansDropped(spanSize); + LOG.log(Level.WARNING, "Unable to translate the spans:", e); + result.onError(e); + } } catch (IOException e) { collector.metrics.incrementMessagesDropped(); diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java index 39503f7..b883240 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -39,8 +39,6 @@ */ final class SpanTranslator { - final CollectorMetrics metrics; - static final AttributeKey PEER_SERVICE = AttributeKey.stringKey("peer.service"); static final String OTEL_DROPPED_ATTRIBUTES_COUNT = "otel.dropped_attributes_count"; @@ -49,24 +47,14 @@ final class SpanTranslator { static final String ERROR_TAG = "error"; - SpanTranslator(CollectorMetrics metrics) { - this.metrics = metrics; - } - - List translate(ExportTraceServiceRequest otelSpans) { + static List translate(ExportTraceServiceRequest otelSpans) { List spans = new ArrayList<>(); List spansList = otelSpans.getResourceSpansList(); for (ResourceSpans resourceSpans : spansList) { for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) { InstrumentationScope scope = scopeSpans.getScope(); for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { - try { spans.add(generateSpan(span, scope, resourceSpans.getResource())); - } - catch (RuntimeException e) { - // If the span is invalid, an exception such as IllegalArgumentException will be thrown. - metrics.incrementSpansDropped(1); - } } } } diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index 3d3fdb3..46c510c 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -398,7 +398,7 @@ void invalidSpanId() throws Exception { } connection.disconnect(); int responseCode = connection.getResponseCode(); - assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); Awaitility.waitAtMost(Duration.ofMillis(200)) .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); assertThat(metrics.spans()).isZero(); @@ -429,7 +429,7 @@ void invalidTraceId() throws Exception { } connection.disconnect(); int responseCode = connection.getResponseCode(); - assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); Awaitility.waitAtMost(Duration.ofMillis(200)) .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); assertThat(metrics.spans()).isZero(); diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java index 98fd244..02ec6f9 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java @@ -13,15 +13,14 @@ import io.opentelemetry.proto.trace.v1.Status; import io.opentelemetry.semconv.NetworkAttributes; import io.opentelemetry.semconv.OtelAttributes; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import zipkin2.Endpoint; import zipkin2.Span; -import zipkin2.collector.InMemoryCollectorMetrics; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static zipkin2.collector.otel.http.ZipkinTestUtil.attribute; import static zipkin2.collector.otel.http.ZipkinTestUtil.longAttribute; import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilder; @@ -33,14 +32,6 @@ /* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java */ class SpanTranslatorTest { - InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); - - SpanTranslator spanTranslator = new SpanTranslator(metrics); - - @BeforeEach - void setup() { - metrics.clear(); - } @Test void translate_remoteParent() { @@ -48,8 +39,7 @@ void translate_remoteParent() { Span expected = zipkinSpanBuilder(Span.Kind.SERVER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)).containsExactly(expected); - assertThat(metrics.spansDropped()).isZero(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); } @Test @@ -61,8 +51,7 @@ void translate_invalidParent() { .parentId(0) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)).containsExactly(expected); - assertThat(metrics.spansDropped()).isZero(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); } @@ -71,8 +60,8 @@ void translate_invalidTraceId() { ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span .setTraceId(ByteString.fromHex("00000000000000000000000000000000"))) .build(); - assertThat(spanTranslator.translate(data)).isEmpty(); - assertThat(metrics.spansDropped()).isEqualTo(1); + assertThatThrownBy(() -> SpanTranslator.translate(data)) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -80,8 +69,8 @@ void translate_invalidSpanId() { ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span .setSpanId(ByteString.fromHex("0000000000000000"))) .build(); - assertThat(spanTranslator.translate(data)).isEmpty(); - assertThat(metrics.spansDropped()).isEqualTo(1); + assertThatThrownBy(() -> SpanTranslator.translate(data)) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -96,68 +85,62 @@ void translate_subMicroDurations() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .duration(1) .build(); - assertThat(spanTranslator.translate(data)).containsExactly(expected); - assertThat(metrics.spansDropped()).isZero(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); } @Test void translate_ServerKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_SERVER)).build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.SERVER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); - assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ClientKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_CLIENT)).build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.CLIENT) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); - assertThat(metrics.spansDropped()).isZero(); } @Test void translate_InternalKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_INTERNAL)).build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(null) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); - assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ConsumeKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_CONSUMER)).build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.CONSUMER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); - assertThat(metrics.spansDropped()).isZero(); } @Test void translate_ProducerKind() { ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span .setKind(SpanKind.SPAN_KIND_PRODUCER)).build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly( zipkinSpanBuilder(Span.Kind.PRODUCER) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build()); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -175,8 +158,7 @@ void translate_ResourceServiceNameMapping() { .localEndpoint(expectedLocalEndpoint) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)).containsExactly(expectedZipkinSpan); - assertThat(metrics.spansDropped()).isZero(); + assertThat(SpanTranslator.translate(data)).containsExactly(expectedZipkinSpan); } @Test @@ -188,9 +170,8 @@ void translate_noServiceName() { .localEndpoint(null) .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedZipkinSpan); - assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -218,9 +199,8 @@ void translate_RemoteEndpointMapping(SpanKind spanKind) { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -242,9 +222,8 @@ void translate_RemoteEndpointMappingWhenKindIsNotClientOrProducer(SpanKind spanK .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -262,9 +241,8 @@ void translate_RemoteEndpointMappingWhenServiceNameAndPeerAddressAreMissing(Span .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @ParameterizedTest @@ -290,9 +268,8 @@ void translate_RemoteEndpointMappingWhenServiceNameIsMissingButPeerAddressExists .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @@ -318,9 +295,8 @@ void translate_RemoteEndpointMappingWhenPortIsMissing(SpanKind spanKind) { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -357,9 +333,8 @@ void translate_WithAttributes() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -376,9 +351,8 @@ void translate_WithInstrumentationLibraryInfo() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -397,9 +371,8 @@ void translate_AlreadyHasHttpStatusInfo() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -417,9 +390,8 @@ void translate_WithRpcTimeoutErrorStatus_WithTimeoutErrorDescription() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -437,9 +409,8 @@ void translate_WithRpcErrorStatus_WithEmptyErrorDescription() { .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } @Test @@ -455,8 +426,7 @@ void translate_WithRpcUnsetStatus() { .putTag("rpc.service", "my service name") .build(); - assertThat(spanTranslator.translate(data)) + assertThat(SpanTranslator.translate(data)) .containsExactly(expectedSpan); - assertThat(metrics.spansDropped()).isZero(); } }