Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP/HTTP collector #13

Merged
merged 14 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package zipkin2.collector.otel.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
Expand Down Expand Up @@ -135,7 +134,7 @@ protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req) {
}
catch (IOException e) {
LOG.log(Level.WARNING, "Unable to parse the request:", e);
result.onError(new UncheckedIOException(e));
result.onError(e);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package zipkin2.collector.otel.http;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -45,6 +46,8 @@ final class SpanTranslator {

static final String ERROR_TAG = "error";

static final String INVALID_TRACE = "00000000000000000000000000000000";

static List<zipkin2.Span> translate(ExportTraceServiceRequest otelSpans) {
List<zipkin2.Span> spans = new ArrayList<>();
List<ResourceSpans> spansList = otelSpans.getResourceSpansList();
Expand All @@ -70,9 +73,18 @@ private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope sco
long startTimestamp = nanoToMills(spanData.getStartTimeUnixNano());
long endTimestamp = nanoToMills(spanData.getEndTimeUnixNano());
Map<String, AnyValue> attributesMap = spanData.getAttributesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder()
.traceId(OtelEncodingUtils.traceIdFromBytes(spanData.getTraceId().toByteArray()))
.id(OtelEncodingUtils.spanIdFromBytes(spanData.getSpanId().toByteArray()))
zipkin2.Span.Builder spanBuilder = zipkin2.Span.newBuilder();
byte[] traceIdBytes = spanData.getTraceId().toByteArray();
long high = bytesToLong(traceIdBytes, 0);
if (high == 0) {
spanBuilder.traceId(INVALID_TRACE);
Copy link
Contributor

@codefromthecrypt codefromthecrypt Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid spans should end up being dropped instead I think. You can look at collectormetrics and usage for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codefromthecrypt

What does "collectormetrics" refer to?

If I create a span with an invalid ID in Zipkin it throws an IllegalArgumentException. Does that mean that catch the exception and increment the "drop" counter metric?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increment spanDropped if the received span is invalid in c869d69

}
else {
long low = bytesToLong(traceIdBytes, 8);
spanBuilder.traceId(high, low);
}
spanBuilder
.id(bytesToLong(spanData.getSpanId().toByteArray(), 0))
.kind(toSpanKind(spanData.getKind()))
.name(spanData.getName())
.timestamp(nanoToMills(spanData.getStartTimeUnixNano()))
Expand All @@ -81,8 +93,8 @@ private static zipkin2.Span generateSpan(Span spanData, InstrumentationScope sco
.remoteEndpoint(getRemoteEndpoint(attributesMap, spanData.getKind()));
ByteString parentSpanId = spanData.getParentSpanId();
if (!parentSpanId.isEmpty()) {
String parentId = OtelEncodingUtils.spanIdFromBytes(parentSpanId.toByteArray());
if (!parentId.equals(OtelEncodingUtils.INVALID_SPAN)) {
long parentId = bytesToLong(parentSpanId.toByteArray(), 0);
if (parentId != 0) {
spanBuilder.parentId(parentId);
}
}
Expand Down Expand Up @@ -199,110 +211,10 @@ static long nanoToMills(long epochNanos) {
return NANOSECONDS.toMicros(epochNanos);
}

/**
* Taken from OpenTelemetry codebase.
* https://github.com/open-telemetry/opentelemetry-java/blob/3e8092d086967fa24a0559044651781403033313/api/all/src/main/java/io/opentelemetry/api/internal/OtelEncodingUtils.java
*/
static class OtelEncodingUtils {

static final String ALPHABET = "0123456789abcdef";

static final char[] ENCODING = buildEncodingArray();

static final String INVALID_TRACE = "00000000000000000000000000000000";

static final int TRACE_BYTES_LENGTH = 16;

static final int TRACE_HEX_LENGTH = 2 * TRACE_BYTES_LENGTH;

static final int SPAN_BYTES_LENGTH = 8;

static final int SPAN_HEX_LENGTH = 2 * SPAN_BYTES_LENGTH;

static final String INVALID_SPAN = "0000000000000000";

private static char[] buildEncodingArray() {
char[] encoding = new char[512];
for (int i = 0; i < 256; ++i) {
encoding[i] = ALPHABET.charAt(i >>> 4);
encoding[i | 0x100] = ALPHABET.charAt(i & 0xF);
}
return encoding;
}

/**
* Fills {@code dest} with the hex encoding of {@code bytes}.
*/
public static void bytesToBase16(byte[] bytes, char[] dest, int length) {
for (int i = 0; i < length; i++) {
byteToBase16(bytes[i], dest, i * 2);
}
}

/**
* Encodes the specified byte, and returns the encoded {@code String}.
*
* @param value the value to be converted.
* @param dest the destination char array.
* @param destOffset the starting offset in the destination char array.
*/
public static void byteToBase16(byte value, char[] dest, int destOffset) {
int b = value & 0xFF;
dest[destOffset] = ENCODING[b];
dest[destOffset + 1] = ENCODING[b | 0x100];
}

/**
* Returns the lowercase hex (base16) representation of the {@code TraceId} converted from the
* given bytes representation, or {@link #INVALID_TRACE} if input is {@code null} or the given
* byte array is too short.
*
* <p>It converts the first 26 bytes of the given byte array.
*
* @param traceIdBytes the bytes (16-byte array) representation of the {@code TraceId}.
* @return the lowercase hex (base16) representation of the {@code TraceId}.
*/
static String traceIdFromBytes(byte[] traceIdBytes) {
if (traceIdBytes == null || traceIdBytes.length < TRACE_BYTES_LENGTH) {
return INVALID_TRACE;
}
char[] result = TemporaryBuffers.chars(TRACE_HEX_LENGTH);
OtelEncodingUtils.bytesToBase16(traceIdBytes, result, TRACE_BYTES_LENGTH);
return new String(result, 0, TRACE_HEX_LENGTH);
}

static String spanIdFromBytes(byte[] spanIdBytes) {
if (spanIdBytes == null || spanIdBytes.length < SPAN_BYTES_LENGTH) {
return INVALID_SPAN;
}
char[] result = TemporaryBuffers.chars(SPAN_HEX_LENGTH);
OtelEncodingUtils.bytesToBase16(spanIdBytes, result, SPAN_BYTES_LENGTH);
return new String(result, 0, SPAN_HEX_LENGTH);
}

static final class TemporaryBuffers {

private static final ThreadLocal<char[]> CHAR_ARRAY = new ThreadLocal<>();

/**
* A {@link ThreadLocal} {@code char[]} of size {@code len}. Take care when using a large
* value of {@code len} as this buffer will remain for the lifetime of the thread. The
* returned buffer will not be zeroed and may be larger than the requested size, you must make
* sure to fill the entire content to the desired value and set the length explicitly when
* converting to a {@link String}.
*/
public static char[] chars(int len) {
char[] buffer = CHAR_ARRAY.get();
if (buffer == null || buffer.length < len) {
buffer = new char[len];
CHAR_ARRAY.set(buffer);
}
return buffer;
}

private TemporaryBuffers() {
}
static long bytesToLong(byte[] bytes, int offset) {
if (bytes == null || bytes.length < offset + 8) {
return 0;
}
return ByteBuffer.wrap(bytes, offset, 8).getLong();
}

}
Loading