From d2c39ba79c50f2ba7f4b9925352865fdc8cb257d Mon Sep 17 00:00:00 2001 From: Felix Hanau Date: Mon, 25 Nov 2024 15:39:58 +0000 Subject: [PATCH] [o11y] Provide OTel-compatible user span representation in trace --- src/workerd/api/trace.c++ | 61 +++++++++-- src/workerd/api/trace.h | 60 ++++++++++- src/workerd/io/BUILD.bazel | 1 + src/workerd/io/compatibility-date.capnp | 5 + src/workerd/io/trace.c++ | 129 ++++++++++++++++++------ src/workerd/io/trace.capnp | 14 +++ src/workerd/io/trace.h | 42 +++++--- src/workerd/io/worker-interface.capnp | 3 + src/workerd/io/worker.c++ | 2 +- 9 files changed, 263 insertions(+), 54 deletions(-) diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 2ba7ebab889..efd0f66718f 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -15,6 +15,7 @@ #include #include +#include namespace workerd::api { @@ -74,15 +75,11 @@ jsg::V8Ref getTraceLogMessage(jsg::Lock& js, const tracing::Log& log } kj::Array> getTraceLogs(jsg::Lock& js, const Trace& trace) { - auto builder = kj::heapArrayBuilder>(trace.logs.size() + trace.spans.size()); - for (auto i: kj::indices(trace.logs)) { - builder.add(jsg::alloc(js, trace, trace.logs[i])); - } - // Add spans represented as logs to the logs object. - for (auto i: kj::indices(trace.spans)) { - builder.add(jsg::alloc(js, trace, trace.spans[i])); - } - return builder.finish(); + return KJ_MAP(x, trace.logs) -> jsg::Ref { return jsg::alloc(js, trace, x); }; +} + +kj::Array> getTraceSpans(const Trace& trace) { + return KJ_MAP(x, trace.spans) -> jsg::Ref { return jsg::alloc(x); }; } kj::Array> getTraceDiagnosticChannelEvents( @@ -209,6 +206,7 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace) dispatchNamespace(trace.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })), scriptTags(getTraceScriptTags(trace)), executionModel(enumToStr(trace.executionModel)), + spans(getTraceSpans(trace)), outcome(enumToStr(trace.outcome)), cpuTime(trace.cpuTime / kj::MILLISECONDS), wallTime(trace.wallTime / kj::MILLISECONDS), @@ -290,6 +288,10 @@ kj::StringPtr TraceItem::getExecutionModel() { return executionModel; } +kj::ArrayPtr> TraceItem::getSpans() { + return spans; +} + kj::StringPtr TraceItem::getOutcome() { return outcome; } @@ -553,6 +555,47 @@ bool TraceItem::HibernatableWebSocketEventInfo::Close::getWasClean() { return eventInfo.wasClean; } +kj::StringPtr OTelSpan::getOperation() { + return operation; +} + +kj::Date OTelSpan::getStartTime() { + return startTime; +} + +kj::StringPtr OTelSpan::getSpanID() { + return spanId; +} +kj::StringPtr OTelSpan::getParentSpanID() { + return parentSpanId; +} + +kj::Date OTelSpan::getEndTime() { + return endTime; +} + +kj::ArrayPtr OTelSpan::getTags() { + return tags; +} + +OTelSpan::OTelSpan(const CompleteSpan& span) + : operation(kj::str(span.operationName)), + startTime(span.startTime), + endTime(span.endTime), + tags(kj::heapArray(span.tags.size())) { + // IDs are represented as network-order hex strings. + uint64_t netSpanId = __builtin_bswap64(span.spanId); + uint64_t netParentSpanId = __builtin_bswap64(span.parentSpanId); + spanId = kj::encodeHex(kj::ArrayPtr((kj::byte*)&netSpanId, sizeof(uint64_t))); + parentSpanId = kj::encodeHex(kj::ArrayPtr((kj::byte*)&netParentSpanId, sizeof(uint64_t))); + uint32_t i = 0; + for (auto& tag: span.tags) { + tags[i].key = kj::str(tag.key); + tags[i].value = spanTagClone(tag.value); + i++; + } +} + TraceLog::TraceLog(jsg::Lock& js, const Trace& trace, const tracing::Log& log) : timestamp(getTraceLogTimestamp(log)), level(getTraceLogLevel(log)), diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index dd9a804fbed..2dc157a5909 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -66,6 +66,54 @@ struct ScriptVersion { } }; +struct OTelSpanTag final: public jsg::Object { + kj::String key; + kj::OneOf value; + JSG_STRUCT(key, value); +}; + +// OpenTelemetry-compatible span data exposed as part of the trace. Loosely based on https://github.com/open-telemetry/opentelemetry-js/blob/v1.28.0/experimental/packages/otlp-transformer/src/trace/types.ts#L64 +class OTelSpan final: public jsg::Object { + public: + OTelSpan(const CompleteSpan& span); + kj::StringPtr getSpanID(); + kj::StringPtr getParentSpanID(); + kj::StringPtr getOperation(); + kj::ArrayPtr getTags(); + kj::Date getStartTime(); + kj::Date getEndTime(); + + JSG_RESOURCE_TYPE(OTelSpan) { + JSG_LAZY_READONLY_INSTANCE_PROPERTY(spanId, getSpanID); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(parentSpanId, getParentSpanID); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(operation, getOperation); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(tags, getTags); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(startTime, getStartTime); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(endTime, getEndTime); + } + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("operation", operation); + for (const OTelSpanTag& tag: tags) { + tracker.trackField("key", tag.key); + KJ_SWITCH_ONEOF(tag.value) { + KJ_CASE_ONEOF(str, kj::String) { + tracker.trackField("value", str); + } + KJ_CASE_ONEOF_DEFAULT break; + } + } + } + + private: + kj::String spanId; + kj::String parentSpanId; + kj::String operation; + kj::Date startTime; + kj::Date endTime; + kj::Array tags; +}; + class TraceItem final: public jsg::Object { public: class FetchEventInfo; @@ -102,16 +150,20 @@ class TraceItem final: public jsg::Object { jsg::Optional getDispatchNamespace(); jsg::Optional> getScriptTags(); kj::StringPtr getExecutionModel(); + kj::ArrayPtr> getSpans(); kj::StringPtr getOutcome(); uint getCpuTime(); uint getWallTime(); bool getTruncated(); - JSG_RESOURCE_TYPE(TraceItem) { + JSG_RESOURCE_TYPE(TraceItem, CompatibilityFlags::Reader flags) { JSG_LAZY_READONLY_INSTANCE_PROPERTY(event, getEvent); JSG_LAZY_READONLY_INSTANCE_PROPERTY(eventTimestamp, getEventTimestamp); JSG_LAZY_READONLY_INSTANCE_PROPERTY(logs, getLogs); + if (flags.getTailWorkerUserSpans()) { + JSG_LAZY_READONLY_INSTANCE_PROPERTY(spans, getSpans); + } JSG_LAZY_READONLY_INSTANCE_PROPERTY(exceptions, getExceptions); JSG_LAZY_READONLY_INSTANCE_PROPERTY(diagnosticsChannelEvents, getDiagnosticChannelEvents); JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptName, getScriptName); @@ -138,6 +190,7 @@ class TraceItem final: public jsg::Object { kj::Maybe dispatchNamespace; jsg::Optional> scriptTags; kj::String executionModel; + kj::Array> spans; kj::String outcome; uint cpuTime; uint wallTime; @@ -647,8 +700,9 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent { api::TraceItem::HibernatableWebSocketEventInfo, \ api::TraceItem::HibernatableWebSocketEventInfo::Message, \ api::TraceItem::HibernatableWebSocketEventInfo::Close, \ - api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \ - api::TraceDiagnosticChannelEvent, api::TraceMetrics, api::UnsafeTraceMetrics + api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::OTelSpan, \ + api::OTelSpanTag, api::TraceException, api::TraceDiagnosticChannelEvent, api::TraceMetrics, \ + api::UnsafeTraceMetrics // The list of trace.h types that are added to worker.c++'s JSG_DECLARE_ISOLATE_TYPE } // namespace workerd::api diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index c1a28cbae78..9f970614564 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -264,6 +264,7 @@ wd_capnp_library( deps = [ ":outcome_capnp", ":script-version_capnp", + ":trace_capnp", "@capnp-cpp//src/capnp/compat:http-over-capnp_capnp", ], ) diff --git a/src/workerd/io/compatibility-date.capnp b/src/workerd/io/compatibility-date.capnp index de899b9100d..f33139d0752 100644 --- a/src/workerd/io/compatibility-date.capnp +++ b/src/workerd/io/compatibility-date.capnp @@ -668,4 +668,9 @@ struct CompatibilityFlags @0x8f8c1b68151b6cef { # A bug in the original implementation of TransformStream failed to apply backpressure # correctly. The fix, however, can break existing implementations that don't account # for the bug so we need to put the fix behind a compat flag. + + # Experimental support for exporting user spans to tail worker. + tailWorkerUserSpans @69 :Bool + $compatEnableFlag("tail_worker_user_spans") + $experimental; } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 3b6d2bc02cd..e4a331a9c91 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -582,13 +582,17 @@ Trace::~Trace() noexcept(false) {} void Trace::copyTo(rpc::Trace::Builder builder) { { - auto list = builder.initLogs(logs.size() + spans.size()); + auto list = builder.initLogs(logs.size()); for (auto i: kj::indices(logs)) { logs[i].copyTo(list[i]); } - // Add spans represented as logs to the logs object. + } + + { + // Add spans to the builder. + auto list = builder.initSpans(spans.size()); for (auto i: kj::indices(spans)) { - spans[i].copyTo(list[i + logs.size()]); + spans[i].copyTo(list[i]); } } @@ -719,6 +723,7 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev // "full", so we may need to filter out the extra data after receiving the traces back. if (pipelineLogLevel != PipelineLogLevel::NONE) { logs.addAll(reader.getLogs()); + spans.addAll(reader.getSpans()); exceptions.addAll(reader.getExceptions()); diagnosticChannelEvents.addAll(reader.getDiagnosticChannelEvents()); } @@ -1682,7 +1687,10 @@ WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel exe kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)), self(kj::refcounted>(kj::Badge{}, *this)) {} -void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) { +kj::LiteralStringConst logSizeExceeded = + "[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"_kjc; + +void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) { if (trace->exceededLogLimit) { return; } @@ -1694,47 +1702,80 @@ void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String mess trace->exceededLogLimit = true; trace->truncated = true; // We use a JSON encoded array/string to match other console.log() recordings: - trace->logs.add(timestamp, LogLevel::WARN, - kj::str( - "[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]")); + trace->logs.add(timestamp, LogLevel::WARN, kj::str(logSizeExceeded)); return; } trace->bytesUsed = newSize; - if (isSpan) { - trace->spans.add(timestamp, logLevel, kj::mv(message)); - trace->numSpans++; - return; - } trace->logs.add(timestamp, logLevel, kj::mv(message)); } -void WorkerTracer::addSpan(const Span& span, kj::String spanContext) { - // This is where we'll actually encode the span for now. +void WorkerTracer::addSpan(CompleteSpan&& span) { + // This is where we'll actually encode the span. // Drop any spans beyond MAX_USER_SPANS. if (trace->numSpans >= MAX_USER_SPANS) { return; } - if (isPredictableModeForTest()) { - // Do not emit span duration information in predictable mode. - addLog(span.endTime, LogLevel::LOG, kj::str("[\"span: ", span.operationName, "\"]"), true); - } else { - // Time since Unix epoch in seconds, with millisecond precision - double epochSecondsStart = (span.startTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0; - double epochSecondsEnd = (span.endTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0; - auto message = kj::str("[\"span: ", span.operationName, " ", kj::mv(spanContext), " ", - epochSecondsStart, " ", epochSecondsEnd, "\"]"); - addLog(span.endTime, LogLevel::LOG, kj::mv(message), true); + trace->numSpans++; + + if (trace->exceededLogLimit) { + return; + } + if (pipelineLogLevel == PipelineLogLevel::NONE) { + return; } - // TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e - // String) to avoid having to handle each type explicitly here. + // 48B for traceID, spanID, parentSpanID, start & end time. + const int fixedSpanOverhead = 48; + size_t newSize = trace->bytesUsed + fixedSpanOverhead + span.operationName.size(); for (const Span::TagMap::Entry& tag: span.tags) { - kj::String message = kj::str("[\"tag: "_kj, tag.key, " => "_kj, spanTagStr(tag.value), "\"]"); - addLog(span.endTime, LogLevel::LOG, kj::mv(message), true); + newSize += tag.key.size(); + KJ_SWITCH_ONEOF(tag.value) { + KJ_CASE_ONEOF(str, kj::String) { + newSize += str.size(); + } + KJ_CASE_ONEOF(val, bool) { + newSize++; + } + // int64_t and double + KJ_CASE_ONEOF_DEFAULT { + newSize += sizeof(int64_t); + } + } + } + + if (newSize > MAX_TRACE_BYTES) { + trace->exceededLogLimit = true; + trace->truncated = true; + trace->logs.add(span.endTime, LogLevel::WARN, kj::str(logSizeExceeded)); + return; + } + trace->bytesUsed = newSize; + trace->spans.add(kj::mv(span)); + trace->numSpans++; +} + +Span::TagValue spanTagClone(const Span::TagValue& tag) { + KJ_SWITCH_ONEOF(tag) { + KJ_CASE_ONEOF(str, kj::String) { + return kj::str(str); + } + KJ_CASE_ONEOF(val, int64_t) { + // TODO(o11y): We can't stringify BigInt, which causes test problems. Export this as hex + // instead? Then again OTel assumes that int values can be represented as JS numbers, so + // representing this as a double/Number might be fine despite the possible precision loss. + return kj::str(val); + } + KJ_CASE_ONEOF(val, double) { + return val; + } + KJ_CASE_ONEOF(val, bool) { + return val; + } } + KJ_UNREACHABLE; } -kj::String spanTagStr(const kj::OneOf& tag) { +kj::String spanTagStr(const Span::TagValue& tag) { KJ_SWITCH_ONEOF(tag) { KJ_CASE_ONEOF(str, kj::String) { return kj::str(str); @@ -1785,6 +1826,36 @@ Span::TagValue deserializeTagValue(RpcValue::Reader value) { } } +void CompleteSpan::copyTo(rpc::UserSpanData::Builder builder) { + builder.setOperationName(operationName.asPtr()); + builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSpanId(spanId); + builder.setParentSpanId(parentSpanId); + + auto tagsParam = builder.initTags(tags.size()); + auto i = 0; + for (auto& tag: tags) { + auto tagParam = tagsParam[i++]; + tagParam.setKey(tag.key.asPtr()); + serializeTagValue(tagParam.initValue(), tag.value); + } +} + +CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader) + : spanId(reader.getSpanId()), + parentSpanId(reader.getParentSpanId()), + operationName(kj::str(reader.getOperationName())), + startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS), + endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) { + auto tagsParam = reader.getTags(); + tags.reserve(tagsParam.size()); + for (auto tagParam: tagsParam) { + tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())), + deserializeTagValue(tagParam.getValue())); + } +} + void WorkerTracer::addException( kj::Date timestamp, kj::String name, kj::String message, kj::Maybe stack) { if (trace->exceededExceptionLimit) { diff --git a/src/workerd/io/trace.capnp b/src/workerd/io/trace.capnp index c215874958a..d27e2cfe9fc 100644 --- a/src/workerd/io/trace.capnp +++ b/src/workerd/io/trace.capnp @@ -28,3 +28,17 @@ struct Tag { value @1 :TagValue; } +struct UserSpanData { + # Representation of a completed user span + operationName @0 :Text; + + startTimeNs @1 :Int64; + endTimeNs @2 :Int64; + # Nanoseconds since Unix epoch + + tags @3 :List(Tag); + + spanId @4 :UInt64; + parentSpanId @5 :UInt64; +} + diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index f684e5c0e97..c7c5385bb8e 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -778,6 +778,7 @@ enum class PipelineLogLevel { }; struct Span; +struct CompleteSpan; // TODO(someday): See if we can merge similar code concepts... Trace fills a role similar to // MetricsCollector::Reporter::StageEvent, and Tracer fills a role similar to @@ -826,8 +827,7 @@ class Trace final: public kj::Refcounted { kj::Maybe entrypoint; kj::Vector logs; - // TODO(o11y): Convert this to actually store spans. - kj::Vector spans; + kj::Vector spans; // A request's trace can have multiple exceptions due to separate request/waitUntil tasks. kj::Vector exceptions; @@ -908,11 +908,9 @@ class PipelineTracer final: public kj::Refcounted, public kj::EnableAddRefToThis class BaseTracer { public: // Adds log line to trace. For Spectre, timestamp should only be as accurate as JS Date.now(). - // The isSpan parameter allows for logging spans, which will be emitted after regular logs. There - // can be at most MAX_USER_SPANS spans in a trace. - virtual void addLog(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) = 0; - // Add a span, which will be represented as a log. - virtual void addSpan(const Span& span, kj::String spanContext) = 0; + virtual void addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) = 0; + // Add a span. There can be at most MAX_USER_SPANS spans in a trace. + virtual void addSpan(CompleteSpan&& span) = 0; virtual void addException( kj::Date timestamp, kj::String name, kj::String message, kj::Maybe stack) = 0; @@ -945,9 +943,8 @@ class WorkerTracer final: public kj::Refcounted, public BaseTracer { } KJ_DISALLOW_COPY_AND_MOVE(WorkerTracer); - void addLog( - kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan = false) override; - void addSpan(const Span& span, kj::String spanContext) override; + void addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) override; + void addSpan(CompleteSpan&& span) override; void addException(kj::Date timestamp, kj::String name, kj::String message, @@ -1048,8 +1045,29 @@ struct Span { // Utility functions for handling span tags. void serializeTagValue(rpc::TagValue::Builder builder, const Span::TagValue& value); Span::TagValue deserializeTagValue(rpc::TagValue::Reader value); -// Stringifier for span tags, getting this to work with KJ_STRINGIFY() appears exceedingly difficult. -kj::String spanTagStr(const kj::OneOf& tag); + +// Stringify and clone for span tags, getting this to work with KJ_STRINGIFY() appears exceedingly +// difficult. +kj::String spanTagStr(const Span::TagValue& tag); +Span::TagValue spanTagClone(const Span::TagValue& tag); + +struct CompleteSpan { + // Represents a completed span within user tracing. + uint64_t spanId; + uint64_t parentSpanId; + + kj::ConstString operationName; + kj::Date startTime; + kj::Date endTime; + Span::TagMap tags; + + CompleteSpan(rpc::UserSpanData::Reader reader); + void copyTo(rpc::UserSpanData::Builder builder); + explicit CompleteSpan(kj::ConstString operationName, kj::Date startTime) + : operationName(kj::mv(operationName)), + startTime(startTime), + endTime(startTime) {} +}; // An opaque token which can be used to create child spans of some parent. This is typically // passed down from a caller to a callee when the caller wants to allow the callee to create diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 8a1588cb66a..5b61b320a49 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -14,6 +14,7 @@ using import "/capnp/compat/http-over-capnp.capnp".HttpService; using import "/capnp/compat/byte-stream.capnp".ByteStream; using import "/workerd/io/outcome.capnp".EventOutcome; using import "/workerd/io/script-version.capnp".ScriptVersion; +using import "/workerd/io/trace.capnp".UserSpanData; struct InvocationSpanContext { struct TraceId { @@ -42,6 +43,8 @@ struct Trace @0x8e8d911203762d34 { message @2 :Text; } + spans @26 :List(UserSpanData); + exceptions @1 :List(Exception); struct Exception { timestampNs @0 :Int64; diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 552467699c5..ef162b2f776 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1862,7 +1862,7 @@ void Worker::handleLog(jsg::Lock& js, auto& ioContext = IoContext::current(); KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { auto timestamp = ioContext.now(); - tracer.addLog(timestamp, level, message(), false); + tracer.addLog(timestamp, level, message()); } }