diff --git a/README.md b/README.md index 04241c2..3535ea2 100644 --- a/README.md +++ b/README.md @@ -5,11 +5,11 @@ # zipkin-otel Shared libraries that provide Zipkin integration with the OpenTelemetry. Requires JRE 11 or later. -# Usage +## Usage These components integrate traced applications and servers with OpenTelemetry protocols via interfaces defined by [Zipkin](https://github.com/openzipkin/zipkin). -## Collectors +### Collectors The component in a zipkin server that receives trace data is called a collector. A collector decodes spans reported by applications and persists them to a configured collector component. diff --git a/collector-http/README.md b/collector-http/README.md index 20e8b5d..f6a4dc2 100644 --- a/collector-http/README.md +++ b/collector-http/README.md @@ -1,5 +1,3 @@ -# zipkin-collector-otel-http +# collector-http -This component implements -the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) -with [Armeria](https://armeria.dev/). +This component implements the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) with [Armeria](https://armeria.dev/). diff --git a/collector-http/pom.xml b/collector-http/pom.xml index 375c103..a1fdcb8 100644 --- a/collector-http/pom.xml +++ b/collector-http/pom.xml @@ -6,8 +6,8 @@ --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 @@ -17,8 +17,8 @@ ../pom.xml - zipkin-collector-otel-http - Zipkin Collector: OpenTelemetry HTTP + collector-http + Zipkin Collector: OTLP HTTP ${project.basedir}/.. @@ -37,6 +37,29 @@ ${armeria.version} + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry.proto + opentelemetry-proto + ${opentelemetry-proto.version} + + + io.opentelemetry.semconv + opentelemetry-semconv + ${opentelemetry-semconv.version} + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + provided + + ${zipkin.groupId} zipkin-tests @@ -50,5 +73,24 @@ ${armeria.version} test + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + test + + + org.awaitility + awaitility + ${awaitility.version} + test + diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java index ac3384d..485c136 100644 --- a/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/OpenTelemetryHttpCollector.java @@ -4,12 +4,29 @@ */ package zipkin2.collector.otel.http; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.protobuf.UnsafeByteOperations; +import com.linecorp.armeria.common.AggregationOptions; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.encoding.StreamDecoderFactory; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServerConfigurator; import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.encoding.DecodingService; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import zipkin2.Callback; +import zipkin2.Span; import zipkin2.collector.Collector; import zipkin2.collector.CollectorComponent; import zipkin2.collector.CollectorMetrics; @@ -18,6 +35,7 @@ public final class OpenTelemetryHttpCollector extends CollectorComponent implements ServerConfigurator { + public static Builder newBuilder() { return new Builder(); } @@ -25,25 +43,32 @@ public static Builder newBuilder() { public static final class Builder extends CollectorComponent.Builder { Collector.Builder delegate = Collector.newBuilder(OpenTelemetryHttpCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; - @Override public Builder storage(StorageComponent storageComponent) { + @Override + public Builder storage(StorageComponent storageComponent) { delegate.storage(storageComponent); return this; } - @Override public Builder metrics(CollectorMetrics metrics) { - if (metrics == null) throw new NullPointerException("metrics == null"); + @Override + public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) { + throw new NullPointerException("metrics == null"); + } delegate.metrics(this.metrics = metrics.forTransport("otel/http")); return this; } - @Override public Builder sampler(CollectorSampler sampler) { + @Override + public Builder sampler(CollectorSampler sampler) { delegate.sampler(sampler); return this; } - @Override public OpenTelemetryHttpCollector build() { + @Override + public OpenTelemetryHttpCollector build() { return new OpenTelemetryHttpCollector(this); } @@ -52,6 +77,7 @@ public static final class Builder extends CollectorComponent.Builder { } final Collector collector; + final CollectorMetrics metrics; OpenTelemetryHttpCollector(Builder builder) { @@ -59,31 +85,93 @@ public static final class Builder extends CollectorComponent.Builder { metrics = builder.metrics; } - @Override public OpenTelemetryHttpCollector start() { + @Override + public OpenTelemetryHttpCollector start() { return this; } - @Override public String toString() { + @Override + public String toString() { return "OpenTelemetryHttpCollector{}"; } /** * Reconfigures the service per https://opentelemetry.io/docs/specs/otlp/#otlphttp-request */ - @Override public void reconfigure(ServerBuilder sb) { + @Override + public void reconfigure(ServerBuilder sb) { + sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip())); sb.service("/v1/traces", new HttpService(this)); } static final class HttpService extends AbstractHttpService { + private static final Logger LOG = Logger.getLogger(HttpService.class.getName()); + final OpenTelemetryHttpCollector collector; HttpService(OpenTelemetryHttpCollector collector) { this.collector = collector; } - @Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) - throws Exception { - throw new RuntimeException("Implement me!"); + @Override + protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) { + CompletableCallback result = new CompletableCallback(); + req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop() + )).handle((msg, t) -> { + if (t != null) { + collector.metrics.incrementMessagesDropped(); + result.onError(t); + return null; + } + try (HttpData content = msg.content()) { + if (content.isEmpty()) { + result.onSuccess(null); + return null; + } + collector.metrics.incrementBytes(content.length()); + try { + ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(UnsafeByteOperations.unsafeWrap(content.byteBuf().nioBuffer()).newCodedInput()); + collector.metrics.incrementMessages(); + try { + List spans = SpanTranslator.translate(request); + collector.collector.accept(spans, result); + } + catch (RuntimeException e) { + // If the span is invalid, an exception such as IllegalArgumentException will be thrown. + int spanSize = request.getResourceSpansList().stream() + .flatMap(rs -> rs.getScopeSpansList().stream()) + .mapToInt(ScopeSpans::getSpansCount).sum(); + collector.metrics.incrementSpansDropped(spanSize); + LOG.log(Level.WARNING, "Unable to translate the spans:", e); + result.onError(e); + } + } + catch (IOException e) { + collector.metrics.incrementMessagesDropped(); + LOG.log(Level.WARNING, "Unable to parse the request:", e); + result.onError(e); + } + return null; + } + }); + return HttpResponse.of(result); + } + } + + static final class CompletableCallback extends CompletableFuture + implements Callback { + + static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED); + + @Override + public void onSuccess(Void value) { + complete(HttpResponse.of(ACCEPTED_RESPONSE)); + } + + @Override + public void onError(Throwable t) { + completeExceptionally(t); } } + } diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java new file mode 100644 index 0000000..ae40bba --- /dev/null +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/ProtoUtils.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.List; + +import com.google.protobuf.TextFormat; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; + +import static java.util.stream.Collectors.joining; + +final class ProtoUtils { + static String kvListToJson(List attributes) { + return attributes.stream() + .map(entry -> "\"" + entry.getKey() + "\":" + valueToJson(entry.getValue())) + .collect(joining(",", "{", "}")); + } + + static String valueToString(AnyValue value) { + if (value.hasStringValue()) { + return value.getStringValue(); + } + if (value.hasArrayValue()) { + // While https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute says + // that an array should be encoded as a json, + // the Otel Zipkin Exporter doesn't implement like that https://github.com/open-telemetry/opentelemetry-java/blob/main/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java#L382-L385 + // Also Brave doesn't use the json encoding. + // So follow the comma separator here. + return value.getArrayValue().getValuesList().stream() + .map(ProtoUtils::valueToString) + .collect(joining(",")); + } + return valueToJson(value); + } + + static String valueToJson(AnyValue value) { + if (value.hasStringValue()) { + return "\"" + value.getStringValue() + "\""; + } + if (value.hasArrayValue()) { + return value.getArrayValue().getValuesList().stream() + .map(ProtoUtils::valueToJson) + .collect(joining(",", "[", "]")); + } + if (value.hasKvlistValue()) { + return kvListToJson(value.getKvlistValue().getValuesList()); + } + if (value.hasBoolValue()) { + return String.valueOf(value.getBoolValue()); + } + if (value.hasDoubleValue()) { + return String.valueOf(value.getDoubleValue()); + } + if (value.hasIntValue()) { + return String.valueOf(value.getIntValue()); + } + if (value.hasBytesValue()) { + // TODO + return TextFormat.escapeBytes(value.getBytesValue()); + } + return value.toString(); + } +} diff --git a/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java new file mode 100644 index 0000000..b883240 --- /dev/null +++ b/collector-http/src/main/java/zipkin2/collector/otel/http/SpanTranslator.java @@ -0,0 +1,214 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.protobuf.ByteString; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Span; +import io.opentelemetry.proto.trace.v1.Span.Event; +import io.opentelemetry.proto.trace.v1.Span.SpanKind; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.proto.trace.v1.Status.StatusCode; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import io.opentelemetry.semconv.ServiceAttributes; +import zipkin2.Endpoint; +import zipkin2.collector.CollectorMetrics; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * SpanTranslator converts OpenTelemetry Spans to Zipkin Spans + * It is based, in part, on code from https://github.com/open-telemetry/opentelemetry-java/blob/ad120a5bff0887dffedb9c73af8e8e0aeb63659a/exporters/zipkin/src/main/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformer.java + * @see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#status + */ +final class SpanTranslator { + + static final AttributeKey PEER_SERVICE = AttributeKey.stringKey("peer.service"); + + static final String OTEL_DROPPED_ATTRIBUTES_COUNT = "otel.dropped_attributes_count"; + + static final String OTEL_DROPPED_EVENTS_COUNT = "otel.dropped_events_count"; + + static final String ERROR_TAG = "error"; + + static List translate(ExportTraceServiceRequest otelSpans) { + List spans = new ArrayList<>(); + List spansList = otelSpans.getResourceSpansList(); + for (ResourceSpans resourceSpans : spansList) { + for (ScopeSpans scopeSpans : resourceSpans.getScopeSpansList()) { + InstrumentationScope scope = scopeSpans.getScope(); + for (io.opentelemetry.proto.trace.v1.Span span : scopeSpans.getSpansList()) { + spans.add(generateSpan(span, scope, resourceSpans.getResource())); + } + } + } + return spans; + } + + /** + * Creates an instance of a Zipkin Span from an OpenTelemetry SpanData instance. + * + * @param spanData an OpenTelemetry spanData instance + * @param scope InstrumentationScope of the span + * @return a new Zipkin Span + */ + private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope scope, Resource resource) { + long startTimestamp = nanoToMills(spanData.getStartTimeUnixNano()); + long endTimestamp = nanoToMills(spanData.getEndTimeUnixNano()); + Map attributesMap = spanData.getAttributesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); + zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder(); + byte[] traceIdBytes = spanData.getTraceId().toByteArray(); + long high = bytesToLong(traceIdBytes, 0); + long low = bytesToLong(traceIdBytes, 8); + spanBuilder + .traceId(high, low) + .id(bytesToLong(spanData.getSpanId().toByteArray(), 0)) + .kind(toSpanKind(spanData.getKind())) + .name(spanData.getName()) + .timestamp(nanoToMills(spanData.getStartTimeUnixNano())) + .duration(Math.max(1, endTimestamp - startTimestamp)) + .localEndpoint(getLocalEndpoint(attributesMap, resource)) + .remoteEndpoint(getRemoteEndpoint(attributesMap, spanData.getKind())); + ByteString parentSpanId = spanData.getParentSpanId(); + if (!parentSpanId.isEmpty()) { + long parentId = bytesToLong(parentSpanId.toByteArray(), 0); + if (parentId != 0) { + spanBuilder.parentId(parentId); + } + } + attributesMap.forEach((k, v) -> spanBuilder.putTag(k, ProtoUtils.valueToString(v))); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#dropped-attributes-count + int droppedAttributes = spanData.getAttributesCount() - attributesMap.size(); + if (droppedAttributes > 0) { + spanBuilder.putTag(OTEL_DROPPED_ATTRIBUTES_COUNT, String.valueOf(droppedAttributes)); + } + Status status = spanData.getStatus(); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#status + if (status.getCode() != Status.StatusCode.STATUS_CODE_UNSET) { + String codeValue = status.getCode().toString().replace("STATUS_CODE_", ""); // either OK or ERROR + spanBuilder.putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), codeValue); + // add the error tag, if it isn't already in the source span. + if (status.getCode() == StatusCode.STATUS_CODE_ERROR && !attributesMap.containsKey(ERROR_TAG)) { + spanBuilder.putTag(ERROR_TAG, status.getMessage()); + } + } + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#instrumentationscope + if (!scope.getName().isEmpty()) { + spanBuilder.putTag(OtelAttributes.OTEL_SCOPE_NAME.getKey(), scope.getName()); + } + if (!scope.getVersion().isEmpty()) { + spanBuilder.putTag(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), scope.getVersion()); + } + for (Event eventData : spanData.getEventsList()) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#events + String name = eventData.getName(); + String value = ProtoUtils.kvListToJson(eventData.getAttributesList()); + String annotation = "\"" + name + "\":" + value; + spanBuilder.addAnnotation(nanoToMills(eventData.getTimeUnixNano()), annotation); + } + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/mapping-to-non-otlp.md#dropped-events-count + int droppedEvents = spanData.getEventsCount() - spanData.getEventsList().size(); + if (droppedEvents > 0) { + spanBuilder.putTag(OTEL_DROPPED_EVENTS_COUNT, String.valueOf(droppedEvents)); + } + return spanBuilder.build(); + } + + private static Endpoint getLocalEndpoint(Map attributesMap, Resource resource) { + AnyValue serviceName = resource.getAttributesList().stream() + .filter(kv -> kv.getKey().equals(ServiceAttributes.SERVICE_NAME.getKey())) + .findFirst() + .map(KeyValue::getValue) + .orElse(null); + if (serviceName != null) { + Endpoint.Builder endpoint = Endpoint.newBuilder().serviceName(serviceName.getStringValue()); + AnyValue networkLocalAddress = attributesMap.get(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey()); + AnyValue networkLocalPort = attributesMap.get(NetworkAttributes.NETWORK_LOCAL_PORT.getKey()); + if (networkLocalAddress != null) { + endpoint.ip(networkLocalAddress.getStringValue()); + } + if (networkLocalPort != null) { + endpoint.port(Long.valueOf(networkLocalPort.getIntValue()).intValue()); + } + // TODO remove the corresponding (duplicated) tags? + return endpoint.build(); + } + return null; + } + + private static Endpoint getRemoteEndpoint(Map attributesMap, SpanKind kind) { + if (kind == SpanKind.SPAN_KIND_CLIENT || kind == SpanKind.SPAN_KIND_PRODUCER) { + AnyValue peerService = attributesMap.get(PEER_SERVICE.getKey()); + AnyValue networkPeerAddress = attributesMap.get(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey()); + String serviceName = null; + // TODO: Implement fallback mechanism? + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#otlp---zipkin + if (peerService != null) { + serviceName = peerService.getStringValue(); + } + else if (networkPeerAddress != null) { + serviceName = networkPeerAddress.getStringValue(); + } + if (serviceName != null) { + Endpoint.Builder endpoint = Endpoint.newBuilder().serviceName(serviceName); + AnyValue networkPeerPort = attributesMap.get(NetworkAttributes.NETWORK_PEER_PORT.getKey()); + if (networkPeerAddress != null) { + endpoint.ip(networkPeerAddress.getStringValue()); + } + if (networkPeerPort != null) { + endpoint.port(Long.valueOf(networkPeerPort.getIntValue()).intValue()); + } + // TODO remove the corresponding (duplicated) tags? + return endpoint.build(); + } + } + return null; + } + + static zipkin2.Span.Kind toSpanKind(Span.SpanKind spanKind) { + switch (spanKind) { + case SPAN_KIND_UNSPECIFIED: + case UNRECOGNIZED: + case SPAN_KIND_INTERNAL: + break; + case SPAN_KIND_SERVER: + return zipkin2.Span.Kind.SERVER; + case SPAN_KIND_CLIENT: + return zipkin2.Span.Kind.CLIENT; + case SPAN_KIND_PRODUCER: + return zipkin2.Span.Kind.PRODUCER; + case SPAN_KIND_CONSUMER: + return zipkin2.Span.Kind.CONSUMER; + default: + return null; + } + return null; + } + + static long nanoToMills(long epochNanos) { + return NANOSECONDS.toMicros(epochNanos); + } + + static long bytesToLong(byte[] bytes, int offset) { + if (bytes == null || bytes.length < offset + 8) { + return 0; + } + return ByteBuffer.wrap(bytes, offset, 8).getLong(); + } +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java index c871b45..46c510c 100644 --- a/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ITOpenTelemetryHttpCollector.java @@ -5,21 +5,84 @@ package zipkin2.collector.otel.http; import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.protobuf.ByteString; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.TracesData; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import io.opentelemetry.semconv.ServiceAttributes; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import zipkin2.collector.CollectorComponent; import zipkin2.collector.CollectorSampler; import zipkin2.collector.InMemoryCollectorMetrics; import zipkin2.storage.InMemoryStorage; +import static io.opentelemetry.sdk.trace.samplers.Sampler.alwaysOn; +import static org.assertj.core.api.Assertions.assertThat; + @TestInstance(TestInstance.Lifecycle.PER_CLASS) class ITOpenTelemetryHttpCollector { InMemoryStorage store; + InMemoryCollectorMetrics metrics; - CollectorComponent collector; - @BeforeEach public void setup() { + OpenTelemetryHttpCollector collector; + + int port = getFreePort(); + + SpanExporter spanExporter = OtlpHttpSpanExporter.builder() + .setCompression("gzip") + .setEndpoint("http://localhost:" + port + "/v1/traces") + .build(); + + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setSampler(alwaysOn()) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .addResource(Resource.create(Attributes.of(ServiceAttributes.SERVICE_NAME, "zipkin-collector-otel-http-test"))) + .build(); + + OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .build(); + + Tracer tracer = openTelemetrySdk.getTracerProvider() + .get("io.zipkin.contrib.otel:zipkin-collector-otel-http", "0.0.1"); + + Server server; + + @BeforeEach + public void setup() { store = InMemoryStorage.newBuilder().build(); metrics = new InMemoryCollectorMetrics(); @@ -29,13 +92,411 @@ class ITOpenTelemetryHttpCollector { .storage(store) .build() .start(); + ServerBuilder serverBuilder = Server.builder().http(port); + collector.reconfigure(serverBuilder); metrics = metrics.forTransport("otel/http"); + server = serverBuilder.build(); + server.start().join(); } - @AfterEach void teardown() throws IOException { + @AfterEach + void teardown() throws IOException { store.close(); collector.close(); + server.stop().join(); + } + + @Test + void testServerKind() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("get") + .setSpanKind(SpanKind.SERVER) + .setAttribute("string", "foo" + i) + .setAttribute("int", 100) + .setAttribute("double", 10.5) + .setAttribute("boolean", true) + .setAttribute(AttributeKey.stringArrayKey("array"), Arrays.asList("a", "b", "c")) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + Thread.sleep(100); // do something + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("get"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(9); + assertThat(span.tags()).containsEntry("string", "foo" + i); + assertThat(span.tags()).containsEntry("int", "100"); + assertThat(span.tags()).containsEntry("double", "10.5"); + assertThat(span.tags()).containsEntry("boolean", "true"); + assertThat(span.tags()).containsEntry("array", "a,b,c"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isEmpty(); + } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes + } + + @Test + void testServerKindWithEvents() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + Instant eventTime1 = Instant.now(); + Instant eventTime2 = eventTime1.plusMillis(10); + Instant eventTime3 = eventTime1.plusMillis(100); + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("do-something") + .setSpanKind(SpanKind.SERVER) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + span.addEvent("event-1", Attributes.builder().put("foo", "bar").put("i", i).build(), eventTime1.plusMillis(size)); + span.addEvent("event-2", eventTime2.plusMillis(size)); + Thread.sleep(100); // do something + span.addEvent("event-3", eventTime3.plusMillis(size)); + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("do-something"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(4); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isNotNull(); + assertThat(span.annotations()).hasSize(3); + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/zipkin.md#events + assertThat(span.annotations().get(0).value()).isEqualTo("\"event-1\":{\"foo\":\"bar\",\"i\":" + i + "}"); + assertThat(span.annotations().get(0).timestamp()).isEqualTo(toMillis(eventTime1.plusMillis(size))); + assertThat(span.annotations().get(1).value()).isEqualTo("\"event-2\":{}"); + assertThat(span.annotations().get(1).timestamp()).isEqualTo(toMillis(eventTime2.plusMillis(size))); + assertThat(span.annotations().get(2).value()).isEqualTo("\"event-3\":{}"); + assertThat(span.annotations().get(2).timestamp()).isEqualTo(toMillis(eventTime3.plusMillis(size))); + } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes + } + + @Test + void testServerKindWithError() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("do-something") + .setSpanKind(SpanKind.SERVER) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .startSpan(); + Thread.sleep(100); // do something + span.setStatus(StatusCode.ERROR, "Exception!!"); + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("do-something"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.SERVER); + assertThat(span.tags()).hasSize(6); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.tags()).containsEntry(SpanTranslator.ERROR_TAG, "Exception!!"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isNull(); + assertThat(span.remoteEndpoint()).isNull(); + assertThat(span.annotations()).isEmpty(); + } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes + } + + @Test + void testClientKind() throws Exception { + List traceIds = new ArrayList<>(); + List spanIds = new ArrayList<>(); + final int size = 5; + for (int i = 0; i < size; i++) { + Span span = tracer + .spanBuilder("send") + .setSpanKind(SpanKind.CLIENT) + .setAttribute("string", "foo" + i) + .setAttribute("int", 100) + .setAttribute("double", 10.5) + .setAttribute("boolean", true) + .setAttribute(AttributeKey.stringArrayKey("array"), Arrays.asList("a", "b", "c")) + .setAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS, "127.0.0.1") + .setAttribute(NetworkAttributes.NETWORK_LOCAL_PORT, 12345L) + .setAttribute(SpanTranslator.PEER_SERVICE, "demo") + .setAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS, "1.2.3.4") + .setAttribute(NetworkAttributes.NETWORK_PEER_PORT, 8080L) + .startSpan(); + Thread.sleep(100); // do something + span.end(); + spanIds.add(span.getSpanContext().getSpanId()); + traceIds.add(span.getSpanContext().getTraceId()); + } + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(size)); + List> received = store.getTraces(traceIds).execute(); + assertThat(received.size()).isEqualTo(size); + for (int i = 0; i < size; i++) { + assertThat(received.get(i)).hasSize(1); + zipkin2.Span span = received.get(i).get(0); + assertThat(span.id()).isEqualTo(spanIds.get(i)); + assertThat(span.traceId()).isEqualTo(traceIds.get(i)); + assertThat(span.parentId()).isNull(); + assertThat(span.name()).isEqualTo("send"); + assertThat(span.kind()).isEqualTo(zipkin2.Span.Kind.CLIENT); + assertThat(span.tags()).hasSize(12); + assertThat(span.tags()).containsEntry("string", "foo" + i); + assertThat(span.tags()).containsEntry("int", "100"); + assertThat(span.tags()).containsEntry("double", "10.5"); + assertThat(span.tags()).containsEntry("boolean", "true"); + assertThat(span.tags()).containsEntry("array", "a,b,c"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "127.0.0.1"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_LOCAL_PORT.getKey(), "12345"); + assertThat(span.tags()).containsEntry(SpanTranslator.PEER_SERVICE.getKey(), "demo"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "1.2.3.4"); + assertThat(span.tags()).containsEntry(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "8080"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.zipkin.contrib.otel:zipkin-collector-otel-http"); + assertThat(span.tags()).containsEntry(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "0.0.1"); + assertThat(span.duration()).isGreaterThan(100_000 /* 100ms */).isLessThan(110_000 /* 110ms */); + assertThat(span.localServiceName()).isEqualTo("zipkin-collector-otel-http-test"); + assertThat(span.localEndpoint().ipv4()).isEqualTo("127.0.0.1"); + assertThat(span.localEndpoint().port()).isEqualTo(12345); + assertThat(span.remoteServiceName()).isEqualTo("demo"); + assertThat(span.remoteEndpoint().ipv4()).isEqualTo("1.2.3.4"); + assertThat(span.remoteEndpoint().port()).isEqualTo(8080); + assertThat(span.annotations()).isEmpty(); + } + assertThat(metrics.spans()).isEqualTo(size); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + // TODO calculate received bytes } - // TODO: integration test + @Test + void minimalSpan() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000001")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000001"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(1)); + assertThat(metrics.spans()).isEqualTo(1); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); + } + + @Test + void invalidSpanId() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000000")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000001"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); + } + + @Test + void invalidTraceId() throws Exception { + TracesData tracesData = TracesData.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .addScopeSpans(ScopeSpans.newBuilder() + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex("0000000000000001")) + .setTraceId(ByteString.fromHex("00000000000000000000000000000000"))))) + .build(); + + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(tracesData.toByteArray()); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isEqualTo(1); + assertThat(metrics.messages()).isEqualTo(1); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isEqualTo(tracesData.getSerializedSize()); + } + + @Test + void emptyRequest() throws Exception { + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(new byte[0]); // empty + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_ACCEPTED); + Awaitility.waitAtMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isZero(); + assertThat(metrics.messagesDropped()).isZero(); + assertThat(metrics.bytes()).isZero(); + } + + @Test + void brokenRequest() throws Exception { + URL url = URI.create("http://localhost:" + port + "/v1/traces").toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Content-Type", "application/x-protobuf"); + try (OutputStream os = connection.getOutputStream()) { + os.write(0x00); + os.flush(); + } + connection.disconnect(); + int responseCode = connection.getResponseCode(); + assertThat(responseCode).isEqualTo(HttpURLConnection.HTTP_INTERNAL_ERROR); + Awaitility.waitAtMost(Duration.ofMillis(200)) + .untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(0)); + assertThat(metrics.spans()).isZero(); + assertThat(metrics.spansDropped()).isZero(); + assertThat(metrics.messages()).isZero(); + assertThat(metrics.messagesDropped()).isEqualTo(1); + assertThat(metrics.bytes()).isEqualTo(1); + } + + static long toMillis(Instant instant) { + long time = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()); + time += instant.getNano(); + return SpanTranslator.nanoToMills(time); + } + + static int getFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + catch (IOException e) { + throw new UncheckedIOException("Failed to find a free port", e); + } + } } diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java new file mode 100644 index 0000000..01d558f --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ProtoUtilsTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.Arrays; + +import com.google.protobuf.ByteString; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.common.v1.KeyValueList; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin2.collector.otel.http.ProtoUtils.kvListToJson; +import static zipkin2.collector.otel.http.ProtoUtils.valueToJson; + +class ProtoUtilsTest { + + @Test + void testValueToJson() { + assertThat(valueToJson(AnyValue.newBuilder().setStringValue("string").build())).isEqualTo("\"string\""); + assertThat(valueToJson(AnyValue.newBuilder().setIntValue(100).build())).isEqualTo("100"); + assertThat(valueToJson(AnyValue.newBuilder().setBoolValue(true).build())).isEqualTo("true"); + assertThat(valueToJson(AnyValue.newBuilder().setDoubleValue(1.2).build())).isEqualTo("1.2"); + assertThat(valueToJson(AnyValue.newBuilder().setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("abc")) + .addValues(AnyValue.newBuilder().setIntValue(20)) + .addValues(AnyValue.newBuilder().setBoolValue(false))) + .build())) + .isEqualTo("[\"abc\",20,false]"); + assertThat(valueToJson(AnyValue.newBuilder().setKvlistValue(KeyValueList.newBuilder() + .addValues(KeyValue.newBuilder().setKey("x").setValue(AnyValue.newBuilder().setStringValue("abc"))) + .addValues(KeyValue.newBuilder().setKey("y").setValue(AnyValue.newBuilder().setStringValue("efg"))) + .addValues(KeyValue.newBuilder().setKey("z").setValue(AnyValue.newBuilder().setIntValue(0))) + .build()).build())) + .isEqualTo("{\"x\":\"abc\",\"y\":\"efg\",\"z\":0}"); + assertThat(valueToJson(AnyValue.newBuilder().setBytesValue(ByteString.fromHex("cafebabe")).build())) + .isEqualTo("\\312\\376\\272\\276"); + assertThat(valueToJson(AnyValue.newBuilder().build())).isEqualTo(""); + } + + @Test + void testKvListToJson() { + assertThat(kvListToJson(Arrays.asList(KeyValue.newBuilder().setKey("string").setValue(AnyValue.newBuilder().setStringValue("s")).build(), + KeyValue.newBuilder().setKey("int").setValue(AnyValue.newBuilder().setIntValue(100)).build(), + KeyValue.newBuilder().setKey("boolean").setValue(AnyValue.newBuilder().setBoolValue(true)).build(), + KeyValue.newBuilder().setKey("double").setValue(AnyValue.newBuilder().setDoubleValue(1.2)).build(), + KeyValue.newBuilder().setKey("array").setValue(AnyValue.newBuilder().setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("abc")) + .addValues(AnyValue.newBuilder().setIntValue(20)) + .addValues(AnyValue.newBuilder().setBoolValue(false)))).build(), + KeyValue.newBuilder().setKey("kvlist").setValue(AnyValue.newBuilder().setKvlistValue(KeyValueList.newBuilder() + .addValues(KeyValue.newBuilder().setKey("x").setValue(AnyValue.newBuilder().setStringValue("abc"))) + .addValues(KeyValue.newBuilder().setKey("y").setValue(AnyValue.newBuilder().setStringValue("efg"))) + .addValues(KeyValue.newBuilder().setKey("z").setValue(AnyValue.newBuilder().setIntValue(0))) + .build())).build()))) + .isEqualTo("{\"string\":\"s\",\"int\":100,\"boolean\":true,\"double\":1.2,\"array\":[\"abc\",20,false],\"kvlist\":{\"x\":\"abc\",\"y\":\"efg\",\"z\":0}}"); + } + +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java new file mode 100644 index 0000000..02ec6f9 --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/SpanTranslatorTest.java @@ -0,0 +1,432 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import com.google.protobuf.ByteString; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.ArrayValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.Span.SpanKind; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.OtelAttributes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import zipkin2.Endpoint; +import zipkin2.Span; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static zipkin2.collector.otel.http.ZipkinTestUtil.attribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.longAttribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilder; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithResourceCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithScopeCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.requestBuilderWithSpanCustomizer; +import static zipkin2.collector.otel.http.ZipkinTestUtil.stringAttribute; +import static zipkin2.collector.otel.http.ZipkinTestUtil.zipkinSpanBuilder; + +/* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/OtelToZipkinSpanTransformerTest.java */ +class SpanTranslatorTest { + + @Test + void translate_remoteParent() { + ExportTraceServiceRequest data = requestBuilder().build(); + Span expected = zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + @Test + void translate_invalidParent() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setParentSpanId(ByteString.fromHex("0000000000000000"))) + .build(); + Span expected = zipkinSpanBuilder(Span.Kind.SERVER) + .parentId(0) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + + @Test + void translate_invalidTraceId() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setTraceId(ByteString.fromHex("00000000000000000000000000000000"))) + .build(); + assertThatThrownBy(() -> SpanTranslator.translate(data)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void translate_invalidSpanId() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setSpanId(ByteString.fromHex("0000000000000000"))) + .build(); + assertThatThrownBy(() -> SpanTranslator.translate(data)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void translate_subMicroDurations() { + ExportTraceServiceRequest data = + ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setStartTimeUnixNano(1505855794_194009601L) + .setEndTimeUnixNano(1505855794_194009999L)) + .build(); + Span expected = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .duration(1) + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expected); + } + + @Test + void translate_ServerKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ClientKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_InternalKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_INTERNAL)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(null) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ConsumeKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CONSUMER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.CONSUMER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ProducerKind() { + ExportTraceServiceRequest data = ZipkinTestUtil.requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_PRODUCER)).build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly( + zipkinSpanBuilder(Span.Kind.PRODUCER) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build()); + } + + @Test + void translate_ResourceServiceNameMapping() { + ExportTraceServiceRequest data = requestBuilderWithResourceCustomizer(resource -> resource + .clearAttributes() + .addAttributes(stringAttribute("service.name", "super-zipkin-service"))) + .build(); + Endpoint expectedLocalEndpoint = Endpoint.newBuilder() + .serviceName("super-zipkin-service") + .ip("1.2.3.4") + .build(); + Span expectedZipkinSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .localEndpoint(expectedLocalEndpoint) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)).containsExactly(expectedZipkinSpan); + } + + @Test + void translate_noServiceName() { + ExportTraceServiceRequest data = requestBuilderWithResourceCustomizer(Resource.Builder::clearAttributes) + .build(); + Span expectedZipkinSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .localEndpoint(null) + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedZipkinSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMapping(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("remote-test-service") + .ip("8.8.8.8") + .port(42) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_SERVER", "SPAN_KIND_CONSUMER", "SPAN_KIND_INTERNAL", "SPAN_KIND_UNSPECIFIED"}) + void translate_RemoteEndpointMappingWhenKindIsNotClientOrProducer(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(null) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenServiceNameAndPeerAddressAreMissing(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(null) + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenServiceNameIsMissingButPeerAddressExists(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8")) + .addAttributes(longAttribute(NetworkAttributes.NETWORK_PEER_PORT.getKey(), 42L))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("8.8.8.8") + .ip("8.8.8.8") + .port(42) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(NetworkAttributes.NETWORK_PEER_PORT.getKey(), "42") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + + @ParameterizedTest + @EnumSource(value = SpanKind.class, names = {"SPAN_KIND_CLIENT", "SPAN_KIND_PRODUCER"}) + void translate_RemoteEndpointMappingWhenPortIsMissing(SpanKind spanKind) { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(spanKind) + .addAttributes(stringAttribute(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service")) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8"))) + .build(); + + Endpoint expectedRemoteEndpoint = Endpoint.newBuilder() + .serviceName("remote-test-service") + .ip("8.8.8.8") + .build(); + + Span expectedSpan = + zipkinSpanBuilder(SpanTranslator.toSpanKind(spanKind)) + .remoteEndpoint(expectedRemoteEndpoint) + .putTag(SpanTranslator.PEER_SERVICE.getKey(), "remote-test-service") + .putTag(NetworkAttributes.NETWORK_PEER_ADDRESS.getKey(), "8.8.8.8") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithAttributes() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT) + .addAttributes(stringAttribute("string", "string value")) + .addAttributes(attribute("boolean", av -> av.setBoolValue(false))) + .addAttributes(longAttribute("long", 9999L)) + .addAttributes(attribute("double", av -> av.setDoubleValue(222.333))) + .addAttributes(attribute("booleanArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setBoolValue(true)) + .addValues(AnyValue.newBuilder().setBoolValue(false))))) + .addAttributes(attribute("stringArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setStringValue("Hello"))))) + .addAttributes(attribute("doubleArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setDoubleValue(32.33)) + .addValues(AnyValue.newBuilder().setDoubleValue(-98.3))))) + .addAttributes(attribute("longArray", av -> av.setArrayValue(ArrayValue.newBuilder() + .addValues(AnyValue.newBuilder().setIntValue(32L)) + .addValues(AnyValue.newBuilder().setIntValue(999L)))))) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag("string", "string value") + .putTag("boolean", "false") + .putTag("long", "9999") + .putTag("double", "222.333") + .putTag("booleanArray", "true,false") + .putTag("stringArray", "Hello") + .putTag("doubleArray", "32.33,-98.3") + .putTag("longArray", "32,999") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithInstrumentationLibraryInfo() { + ExportTraceServiceRequest data = requestBuilderWithScopeCustomizer(scope -> scope + .setName("io.opentelemetry.auto") + .setVersion("1.0.0")) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag(OtelAttributes.OTEL_SCOPE_NAME.getKey(), "io.opentelemetry.auto") + .putTag(OtelAttributes.OTEL_SCOPE_VERSION.getKey(), "1.0.0") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "OK") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_AlreadyHasHttpStatusInfo() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_CLIENT) + .addAttributes(longAttribute("http.response.status.code", 404)) + .addAttributes(stringAttribute("error", "A user provided error")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.CLIENT) + .putTag("http.response.status.code", "404") + .putTag("error", "A user provided error") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcTimeoutErrorStatus_WithTimeoutErrorDescription() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).setMessage("timeout").build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .putTag("error", "timeout") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcErrorStatus_WithEmptyErrorDescription() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_ERROR).setMessage("").build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .putTag("error", "") + .putTag(OtelAttributes.OTEL_STATUS_CODE.getKey(), "ERROR") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } + + @Test + void translate_WithRpcUnsetStatus() { + ExportTraceServiceRequest data = requestBuilderWithSpanCustomizer(span -> span + .setKind(SpanKind.SPAN_KIND_SERVER) + .addAttributes(stringAttribute("rpc.service", "my service name")) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_UNSET).build())) + .build(); + + Span expectedSpan = + zipkinSpanBuilder(Span.Kind.SERVER) + .putTag("rpc.service", "my service name") + .build(); + + assertThat(SpanTranslator.translate(data)) + .containsExactly(expectedSpan); + } +} diff --git a/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java b/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java new file mode 100644 index 0000000..32bb3f5 --- /dev/null +++ b/collector-http/src/test/java/zipkin2/collector/otel/http/ZipkinTestUtil.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.collector.otel.http; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.protobuf.ByteString; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Status; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.semconv.NetworkAttributes; +import zipkin2.Endpoint; +import zipkin2.Span; + +import static io.opentelemetry.proto.trace.v1.Span.Event; +import static io.opentelemetry.proto.trace.v1.Span.SpanKind; + +/* Based on code from https://github.com/open-telemetry/opentelemetry-java/blob/d37c1c74e7ec20a990e1a0a07a5daa1a2ecf9f0b/exporters/zipkin/src/test/java/io/opentelemetry/exporter/zipkin/ZipkinTestUtil.java */ +class ZipkinTestUtil { + + static final String TRACE_ID = "d239036e7d5cec116b562147388b35bf"; + + static final String SPAN_ID = "9cc1e3049173be09"; + + static final String PARENT_SPAN_ID = "8b03ab423da481c5"; + + private static final Attributes attributes = Attributes.empty(); + + private static final List annotations = + Collections.unmodifiableList( + Arrays.asList( + EventData.create(1505855799_433901068L, "RECEIVED", Attributes.empty()), + EventData.create(1505855799_459486280L, "SENT", Attributes.empty()))); + + private ZipkinTestUtil() { + } + + static Span.Builder zipkinSpanBuilder(Span.Kind kind) { + return Span.newBuilder() + .traceId(TRACE_ID) + .parentId(PARENT_SPAN_ID) + .id(SPAN_ID) + .kind(kind) + .name("Recv.helloworld.Greeter.SayHello") + .timestamp(1505855794000000L + 194009601L / 1000) + .duration((1505855799000000L + 465726528L / 1000) - (1505855794000000L + 194009601L / 1000)) + .localEndpoint(Endpoint.newBuilder().ip("1.2.3.4").serviceName("tweetiebird").build()) + .putTag(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "1.2.3.4") + .addAnnotation(1505855799000000L + 433901068L / 1000, "\"RECEIVED\":{}") + .addAnnotation(1505855799000000L + 459486280L / 1000, "\"SENT\":{}"); + } + + static ExportTraceServiceRequest.Builder requestBuilder( + Function resourceCustomizer, + Function scopeCustomizer, + Function spanCustomizer) { + return ExportTraceServiceRequest.newBuilder() + .addResourceSpans(ResourceSpans.newBuilder() + .setResource(resourceCustomizer.apply(Resource.newBuilder() + .addAttributes(stringAttribute("service.name", "tweetiebird")))) + .addScopeSpans(ScopeSpans.newBuilder() + .setScope(scopeCustomizer.apply(InstrumentationScope.newBuilder()).build()) + .addSpans(spanCustomizer.apply(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setSpanId(ByteString.fromHex(SPAN_ID)) + .setTraceId(ByteString.fromHex(TRACE_ID)) + .setParentSpanId(ByteString.fromHex(PARENT_SPAN_ID)) + .setStatus(Status.newBuilder().setCode(Status.StatusCode.STATUS_CODE_OK)) + .setKind(SpanKind.SPAN_KIND_SERVER) + .setName("Recv.helloworld.Greeter.SayHello") + .setStartTimeUnixNano(1505855794_194009601L) + .setEndTimeUnixNano(1505855799_465726528L) + .addAttributes(stringAttribute(NetworkAttributes.NETWORK_LOCAL_ADDRESS.getKey(), "1.2.3.4")) + .addEvents(Event.newBuilder() + .setName("RECEIVED").setTimeUnixNano(1505855799_433901068L)) + .addEvents(Event.newBuilder() + .setName("SENT").setTimeUnixNano(1505855799_459486280L)) + )))); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithResourceCustomizer(Function resourceCustomizer) { + return requestBuilder(resourceCustomizer, Function.identity(), Function.identity()); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithScopeCustomizer(Function scopeCustomizer) { + return requestBuilder(Function.identity(), scopeCustomizer, Function.identity()); + } + + static ExportTraceServiceRequest.Builder requestBuilderWithSpanCustomizer(Function spanCustomizer) { + return requestBuilder(Function.identity(), Function.identity(), spanCustomizer); + } + + static ExportTraceServiceRequest.Builder requestBuilder() { + return requestBuilder(Function.identity(), Function.identity(), Function.identity()); + } + + static KeyValue stringAttribute(String key, String value) { + return attribute(key, av -> av.setStringValue(value)); + } + + static KeyValue longAttribute(String key, long value) { + return attribute(key, av -> av.setIntValue(value)); + } + + static KeyValue attribute(String key, Function builder) { + return KeyValue.newBuilder() + .setKey(key) + .setValue(builder.apply(AnyValue.newBuilder())) + .build(); + } +} diff --git a/module/pom.xml b/module/pom.xml index 00f49d4..4506a3a 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -25,7 +25,7 @@ ${project.groupId} - zipkin-collector-otel-http + collector-http ${project.version}