Skip to content

Commit

Permalink
[o11y] Provide OTel-compatible user span representation in trace
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Dec 18, 2024
1 parent 6812dae commit 956a297
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 54 deletions.
61 changes: 52 additions & 9 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <workerd/util/uuid.h>

#include <capnp/schema.h>
#include <kj/encoding.h>

namespace workerd::api {

Expand Down Expand Up @@ -74,15 +75,11 @@ jsg::V8Ref<v8::Object> getTraceLogMessage(jsg::Lock& js, const tracing::Log& log
}

kj::Array<jsg::Ref<TraceLog>> getTraceLogs(jsg::Lock& js, const Trace& trace) {
auto builder = kj::heapArrayBuilder<jsg::Ref<TraceLog>>(trace.logs.size() + trace.spans.size());
for (auto i: kj::indices(trace.logs)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.logs[i]));
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(trace.spans)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.spans[i]));
}
return builder.finish();
return KJ_MAP(x, trace.logs) -> jsg::Ref<TraceLog> { return jsg::alloc<TraceLog>(js, trace, x); };
}

kj::Array<jsg::Ref<OTelSpan>> getTraceSpans(const Trace& trace) {
return KJ_MAP(x, trace.spans) -> jsg::Ref<OTelSpan> { return jsg::alloc<OTelSpan>(x); };
}

kj::Array<jsg::Ref<TraceDiagnosticChannelEvent>> getTraceDiagnosticChannelEvents(
Expand Down Expand Up @@ -209,6 +206,7 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace)
dispatchNamespace(trace.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })),
scriptTags(getTraceScriptTags(trace)),
executionModel(enumToStr(trace.executionModel)),
spans(getTraceSpans(trace)),
outcome(enumToStr(trace.outcome)),
cpuTime(trace.cpuTime / kj::MILLISECONDS),
wallTime(trace.wallTime / kj::MILLISECONDS),
Expand Down Expand Up @@ -290,6 +288,10 @@ kj::StringPtr TraceItem::getExecutionModel() {
return executionModel;
}

kj::ArrayPtr<jsg::Ref<OTelSpan>> TraceItem::getSpans() {
return spans;
}

kj::StringPtr TraceItem::getOutcome() {
return outcome;
}
Expand Down Expand Up @@ -553,6 +555,47 @@ bool TraceItem::HibernatableWebSocketEventInfo::Close::getWasClean() {
return eventInfo.wasClean;
}

kj::StringPtr OTelSpan::getOperation() {
return operation;
}

kj::Date OTelSpan::getStartTime() {
return startTime;
}

kj::StringPtr OTelSpan::getSpanID() {
return spanId;
}
kj::StringPtr OTelSpan::getParentSpanID() {
return parentSpanId;
}

kj::Date OTelSpan::getEndTime() {
return endTime;
}

kj::ArrayPtr<OTelSpanTag> OTelSpan::getTags() {
return tags;
}

OTelSpan::OTelSpan(const CompleteSpan& span)
: operation(kj::str(span.operationName)),
startTime(span.startTime),
endTime(span.endTime),
tags(kj::heapArray<OTelSpanTag>(span.tags.size())) {
// IDs are represented as network-order hex strings.
uint64_t netSpanId = __builtin_bswap64(span.spanId);
uint64_t netParentSpanId = __builtin_bswap64(span.parentSpanId);
spanId = kj::encodeHex(kj::ArrayPtr<byte>((kj::byte*)&netSpanId, sizeof(uint64_t)));
parentSpanId = kj::encodeHex(kj::ArrayPtr<byte>((kj::byte*)&netParentSpanId, sizeof(uint64_t)));
uint32_t i = 0;
for (auto& tag: span.tags) {
tags[i].key = kj::str(tag.key);
tags[i].value = spanTagClone(tag.value);
i++;
}
}

TraceLog::TraceLog(jsg::Lock& js, const Trace& trace, const tracing::Log& log)
: timestamp(getTraceLogTimestamp(log)),
level(getTraceLogLevel(log)),
Expand Down
60 changes: 57 additions & 3 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,54 @@ struct ScriptVersion {
}
};

struct OTelSpanTag final: public jsg::Object {
kj::String key;
kj::OneOf<bool, int64_t, double, kj::String> value;
JSG_STRUCT(key, value);
};

// OpenTelemetry-compatible span data exposed as part of the trace. Loosely based on https://github.com/open-telemetry/opentelemetry-js/blob/v1.28.0/experimental/packages/otlp-transformer/src/trace/types.ts#L64
class OTelSpan final: public jsg::Object {
public:
OTelSpan(const CompleteSpan& span);
kj::StringPtr getSpanID();
kj::StringPtr getParentSpanID();
kj::StringPtr getOperation();
kj::ArrayPtr<OTelSpanTag> getTags();
kj::Date getStartTime();
kj::Date getEndTime();

JSG_RESOURCE_TYPE(OTelSpan) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(spanId, getSpanID);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(parentSpanId, getParentSpanID);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(operation, getOperation);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(tags, getTags);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(startTime, getStartTime);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(endTime, getEndTime);
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("operation", operation);
for (const OTelSpanTag& tag: tags) {
tracker.trackField("key", tag.key);
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
tracker.trackField("value", str);
}
KJ_CASE_ONEOF_DEFAULT break;
}
}
}

private:
kj::String spanId;
kj::String parentSpanId;
kj::String operation;
kj::Date startTime;
kj::Date endTime;
kj::Array<OTelSpanTag> tags;
};

class TraceItem final: public jsg::Object {
public:
class FetchEventInfo;
Expand Down Expand Up @@ -102,16 +150,20 @@ class TraceItem final: public jsg::Object {
jsg::Optional<kj::StringPtr> getDispatchNamespace();
jsg::Optional<kj::Array<kj::StringPtr>> getScriptTags();
kj::StringPtr getExecutionModel();
kj::ArrayPtr<jsg::Ref<OTelSpan>> getSpans();
kj::StringPtr getOutcome();

uint getCpuTime();
uint getWallTime();
bool getTruncated();

JSG_RESOURCE_TYPE(TraceItem) {
JSG_RESOURCE_TYPE(TraceItem, CompatibilityFlags::Reader flags) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(event, getEvent);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(eventTimestamp, getEventTimestamp);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(logs, getLogs);
if (flags.getTailWorkerUserSpans()) {
JSG_LAZY_READONLY_INSTANCE_PROPERTY(spans, getSpans);
}
JSG_LAZY_READONLY_INSTANCE_PROPERTY(exceptions, getExceptions);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(diagnosticsChannelEvents, getDiagnosticChannelEvents);
JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptName, getScriptName);
Expand All @@ -138,6 +190,7 @@ class TraceItem final: public jsg::Object {
kj::Maybe<kj::String> dispatchNamespace;
jsg::Optional<kj::Array<kj::String>> scriptTags;
kj::String executionModel;
kj::Array<jsg::Ref<OTelSpan>> spans;
kj::String outcome;
uint cpuTime;
uint wallTime;
Expand Down Expand Up @@ -647,8 +700,9 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {
api::TraceItem::HibernatableWebSocketEventInfo, \
api::TraceItem::HibernatableWebSocketEventInfo::Message, \
api::TraceItem::HibernatableWebSocketEventInfo::Close, \
api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \
api::TraceDiagnosticChannelEvent, api::TraceMetrics, api::UnsafeTraceMetrics
api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::OTelSpan, \
api::OTelSpanTag, api::TraceException, api::TraceDiagnosticChannelEvent, api::TraceMetrics, \
api::UnsafeTraceMetrics
// The list of trace.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ wd_capnp_library(
deps = [
":outcome_capnp",
":script-version_capnp",
":trace_capnp",
"@capnp-cpp//src/capnp/compat:http-over-capnp_capnp",
],
)
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/io/compatibility-date.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -668,4 +668,9 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef {
# A bug in the original implementation of TransformStream failed to apply backpressure
# correctly. The fix, however, can break existing implementations that don't account
# for the bug so we need to put the fix behind a compat flag.

# Experimental support for exporting user spans to tail worker.
tailWorkerUserSpans @69 :Bool
$compatEnableFlag("tail_worker_user_spans")
$experimental;
}
129 changes: 100 additions & 29 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -582,13 +582,17 @@ Trace::~Trace() noexcept(false) {}

void Trace::copyTo(rpc::Trace::Builder builder) {
{
auto list = builder.initLogs(logs.size() + spans.size());
auto list = builder.initLogs(logs.size());
for (auto i: kj::indices(logs)) {
logs[i].copyTo(list[i]);
}
// Add spans represented as logs to the logs object.
}

{
// Add spans to the builder.
auto list = builder.initSpans(spans.size());
for (auto i: kj::indices(spans)) {
spans[i].copyTo(list[i + logs.size()]);
spans[i].copyTo(list[i]);
}
}

Expand Down Expand Up @@ -719,6 +723,7 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev
// "full", so we may need to filter out the extra data after receiving the traces back.
if (pipelineLogLevel != PipelineLogLevel::NONE) {
logs.addAll(reader.getLogs());
spans.addAll(reader.getSpans());
exceptions.addAll(reader.getExceptions());
diagnosticChannelEvents.addAll(reader.getDiagnosticChannelEvents());
}
Expand Down Expand Up @@ -1682,7 +1687,10 @@ WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel exe
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}

void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) {
kj::LiteralStringConst logSizeExceeded =
"[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"_kjc;

void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) {
if (trace->exceededLogLimit) {
return;
}
Expand All @@ -1694,47 +1702,80 @@ void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String mess
trace->exceededLogLimit = true;
trace->truncated = true;
// We use a JSON encoded array/string to match other console.log() recordings:
trace->logs.add(timestamp, LogLevel::WARN,
kj::str(
"[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"));
trace->logs.add(timestamp, LogLevel::WARN, kj::str(logSizeExceeded));
return;
}
trace->bytesUsed = newSize;
if (isSpan) {
trace->spans.add(timestamp, logLevel, kj::mv(message));
trace->numSpans++;
return;
}
trace->logs.add(timestamp, logLevel, kj::mv(message));
}

void WorkerTracer::addSpan(const Span& span, kj::String spanContext) {
// This is where we'll actually encode the span for now.
void WorkerTracer::addSpan(CompleteSpan&& span) {
// This is where we'll actually encode the span.
// Drop any spans beyond MAX_USER_SPANS.
if (trace->numSpans >= MAX_USER_SPANS) {
return;
}
if (isPredictableModeForTest()) {
// Do not emit span duration information in predictable mode.
addLog(span.endTime, LogLevel::LOG, kj::str("[\"span: ", span.operationName, "\"]"), true);
} else {
// Time since Unix epoch in seconds, with millisecond precision
double epochSecondsStart = (span.startTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
double epochSecondsEnd = (span.endTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
auto message = kj::str("[\"span: ", span.operationName, " ", kj::mv(spanContext), " ",
epochSecondsStart, " ", epochSecondsEnd, "\"]");
addLog(span.endTime, LogLevel::LOG, kj::mv(message), true);
trace->numSpans++;

if (trace->exceededLogLimit) {
return;
}
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}

// TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e
// String) to avoid having to handle each type explicitly here.
// 48B for traceID, spanID, parentSpanID, start & end time.
const int fixedSpanOverhead = 48;
size_t newSize = trace->bytesUsed + fixedSpanOverhead + span.operationName.size();
for (const Span::TagMap::Entry& tag: span.tags) {
kj::String message = kj::str("[\"tag: "_kj, tag.key, " => "_kj, spanTagStr(tag.value), "\"]");
addLog(span.endTime, LogLevel::LOG, kj::mv(message), true);
newSize += tag.key.size();
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
newSize += str.size();
}
KJ_CASE_ONEOF(val, bool) {
newSize++;
}
// int64_t and double
KJ_CASE_ONEOF_DEFAULT {
newSize += sizeof(int64_t);
}
}
}

if (newSize > MAX_TRACE_BYTES) {
trace->exceededLogLimit = true;
trace->truncated = true;
trace->logs.add(span.endTime, LogLevel::WARN, kj::str(logSizeExceeded));
return;
}
trace->bytesUsed = newSize;
trace->spans.add(kj::mv(span));
trace->numSpans++;
}

Span::TagValue spanTagClone(const Span::TagValue& tag) {
KJ_SWITCH_ONEOF(tag) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
}
KJ_CASE_ONEOF(val, int64_t) {
// TODO(o11y): We can't stringify BigInt, which causes test problems. Export this as hex
// instead? Then again OTel assumes that int values can be represented as JS numbers, so
// representing this as a double/Number might be fine despite the possible precision loss.
return kj::str(val);
}
KJ_CASE_ONEOF(val, double) {
return val;
}
KJ_CASE_ONEOF(val, bool) {
return val;
}
}
KJ_UNREACHABLE;
}

kj::String spanTagStr(const kj::OneOf<bool, int64_t, double, kj::String>& tag) {
kj::String spanTagStr(const Span::TagValue& tag) {
KJ_SWITCH_ONEOF(tag) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
Expand Down Expand Up @@ -1785,6 +1826,36 @@ Span::TagValue deserializeTagValue(RpcValue::Reader value) {
}
}

void CompleteSpan::copyTo(rpc::UserSpanData::Builder builder) {
builder.setOperationName(operationName.asPtr());
builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);

auto tagsParam = builder.initTags(tags.size());
auto i = 0;
for (auto& tag: tags) {
auto tagParam = tagsParam[i++];
tagParam.setKey(tag.key.asPtr());
serializeTagValue(tagParam.initValue(), tag.value);
}
}

CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader)
: spanId(reader.getSpanId()),
parentSpanId(reader.getParentSpanId()),
operationName(kj::str(reader.getOperationName())),
startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS),
endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) {
auto tagsParam = reader.getTags();
tags.reserve(tagsParam.size());
for (auto tagParam: tagsParam) {
tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())),
deserializeTagValue(tagParam.getValue()));
}
}

void WorkerTracer::addException(
kj::Date timestamp, kj::String name, kj::String message, kj::Maybe<kj::String> stack) {
if (trace->exceededExceptionLimit) {
Expand Down
Loading

0 comments on commit 956a297

Please sign in to comment.