Skip to content

Commit

Permalink
Add the Link type
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Dec 6, 2024
1 parent 6886223 commit 7add95d
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 22 deletions.
29 changes: 28 additions & 1 deletion src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,13 @@ KJ_TEST("Read/Write Onset works") {

tracing::FetchEventInfo fetchInfo(
kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr);

FakeEntropySource entropy;
auto trigger = InvocationSpanContext::newForInvocation(kj::none, entropy);

tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)), ExecutionModel::STATELESS,
kj::str("foo"), kj::none, kj::none, kj::none, kj::none);
kj::str("foo"), kj::none, kj::none, kj::none, kj::none,
tracing::Onset::TriggerContext(trigger));
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
Expand All @@ -490,6 +495,11 @@ KJ_TEST("Read/Write Onset works") {
KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj);
KJ_ASSERT(info2.executionModel == ExecutionModel::STATELESS);

auto& triggerCtx = KJ_ASSERT_NONNULL(info2.trigger);
KJ_ASSERT(triggerCtx.traceId == trigger.getTraceId());
KJ_ASSERT(triggerCtx.invocationId == trigger.getInvocationId());
KJ_ASSERT(triggerCtx.spanId == trigger.getSpanId());

tracing::Onset info3 = info.clone();
tracing::FetchEventInfo& fetchInfo3 =
KJ_ASSERT_NONNULL(info3.info.tryGet<tracing::FetchEventInfo>());
Expand Down Expand Up @@ -517,6 +527,23 @@ KJ_TEST("Read/Write Outcome works") {
KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS);
}

KJ_TEST("Read/Write Link works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Link>();

FakeEntropySource entropy;
auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy);

tracing::Link link(context, kj::str("foo"));
link.copyTo(infoBuilder);

tracing::Link link2(infoBuilder.asReader());
KJ_ASSERT(KJ_ASSERT_NONNULL(link2.label) == "foo"_kj);
KJ_ASSERT(link2.traceId == context.getTraceId());
KJ_ASSERT(link2.invocationId == context.getInvocationId());
KJ_ASSERT(link2.spanId == context.getSpanId());
}

KJ_TEST("Read/Write TailEvent works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::TailEvent>();
Expand Down
88 changes: 84 additions & 4 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,59 @@ tracing::SpanClose tracing::SpanClose::clone() {
return SpanClose(outcome);
}

namespace {
kj::Maybe<kj::String> readLabelFromReader(const rpc::Trace::Link::Reader& reader) {
if (!reader.hasLabel()) return kj::none;
return kj::str(reader.getLabel());
}
tracing::TraceId readTraceIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::TraceId::fromCapnp(context.getTraceId());
}
tracing::TraceId readInvocationIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::TraceId::fromCapnp(context.getInvocationId());
}
tracing::SpanId readSpanIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::SpanId(context.getSpanId());
}
} // namespace

tracing::Link::Link(const InvocationSpanContext& other, kj::Maybe<kj::String> label)
: Link(kj::mv(label), other.getTraceId(), other.getInvocationId(), other.getSpanId()) {}

tracing::Link::Link(
kj::Maybe<kj::String> label, TraceId traceId, TraceId invocationId, SpanId spanId)
: label(kj::mv(label)),
traceId(kj::mv(traceId)),
invocationId(kj::mv(invocationId)),
spanId(kj::mv(spanId)) {}

tracing::Link::Link(rpc::Trace::Link::Reader reader)
: label(readLabelFromReader(reader)),
traceId(readTraceIdFromReader(reader)),
invocationId(readInvocationIdFromReader(reader)),
spanId(readSpanIdFromReader(reader)) {}

void tracing::Link::copyTo(rpc::Trace::Link::Builder builder) {
KJ_IF_SOME(l, label) {
builder.setLabel(l);
}
auto ctx = builder.initContext();
traceId.toCapnp(ctx.initTraceId());
invocationId.toCapnp(ctx.initInvocationId());
ctx.setSpanId(spanId.getId());
}

tracing::Link tracing::Link::clone() {
return Link(
label.map([](kj::String& str) { return kj::str(str); }), traceId, invocationId, spanId);
}

namespace {
tracing::Onset::Info getInfoFromReader(const rpc::Trace::Onset::Reader& reader) {
auto info = reader.getInfo();
Expand Down Expand Up @@ -1175,6 +1228,13 @@ kj::Maybe<kj::String> getEntrypointFromReader(const rpc::Trace::Onset::Reader& r
}
return kj::none;
}
kj::Maybe<tracing::Onset::TriggerContext> getTriggerContextFromReader(
const rpc::Trace::Onset::Reader& reader) {
if (!reader.hasTrigger()) return kj::none;
auto trigger = reader.getTrigger();
return tracing::Onset::TriggerContext(tracing::TraceId::fromCapnp(trigger.getTraceId()),
tracing::TraceId::fromCapnp(trigger.getInvocationId()), tracing::SpanId(trigger.getSpanId()));
}
} // namespace

tracing::Onset::Onset(tracing::Onset::Info&& info,
Expand All @@ -1183,14 +1243,16 @@ tracing::Onset::Onset(tracing::Onset::Info&& info,
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion,
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::Array<kj::String>> scriptTags,
kj::Maybe<kj::String> entrypoint)
kj::Maybe<kj::String> entrypoint,
kj::Maybe<TriggerContext> maybeTrigger)
: info(kj::mv(info)),
executionModel(executionModel),
scriptName(kj::mv(scriptName)),
scriptVersion(kj::mv(scriptVersion)),
dispatchNamespace(kj::mv(dispatchNamespace)),
scriptTags(kj::mv(scriptTags)),
entrypoint(kj::mv(entrypoint)) {}
entrypoint(kj::mv(entrypoint)),
trigger(kj::mv(maybeTrigger)) {}

tracing::Onset::Onset(rpc::Trace::Onset::Reader reader)
: info(getInfoFromReader(reader)),
Expand All @@ -1199,7 +1261,8 @@ tracing::Onset::Onset(rpc::Trace::Onset::Reader reader)
scriptVersion(getScriptVersionFromReader(reader)),
dispatchNamespace(getDispatchNamespaceFromReader(reader)),
scriptTags(getScriptTagsFromReader(reader)),
entrypoint(getEntrypointFromReader(reader)) {}
entrypoint(getEntrypointFromReader(reader)),
trigger(getTriggerContextFromReader(reader)) {}

void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) {
builder.setExecutionModel(executionModel);
Expand All @@ -1221,6 +1284,12 @@ void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) {
KJ_IF_SOME(e, entrypoint) {
builder.setEntryPoint(e);
}
KJ_IF_SOME(t, trigger) {
auto ctx = builder.initTrigger();
t.traceId.toCapnp(ctx.initTraceId());
t.invocationId.toCapnp(ctx.getInvocationId());
ctx.setSpanId(t.spanId.getId());
}
auto infoBuilder = builder.initInfo();
KJ_SWITCH_ONEOF(info) {
KJ_CASE_ONEOF(fetch, FetchEventInfo) {
Expand Down Expand Up @@ -1297,7 +1366,9 @@ tracing::Onset tracing::Onset::clone() {
scriptVersion.map([](auto& version) { return capnp::clone(*version); }),
dispatchNamespace.map([](auto& ns) { return kj::str(ns); }),
scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }),
entrypoint.map([](auto& e) { return kj::str(e); }));
entrypoint.map([](auto& e) { return kj::str(e); }), trigger.map([](TriggerContext& ctx) {
return TriggerContext(ctx.traceId, ctx.invocationId, ctx.spanId);
}));
}

tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime)
Expand Down Expand Up @@ -1383,6 +1454,9 @@ tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Re
case rpc::Trace::TailEvent::Event::LOG: {
return tracing::Mark(tracing::Log(event.getLog()));
}
case rpc::Trace::TailEvent::Event::LINK: {
return tracing::Mark(tracing::Link(event.getLink()));
}
}
KJ_UNREACHABLE;
}
Expand Down Expand Up @@ -1434,6 +1508,9 @@ void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) {
KJ_CASE_ONEOF(ret, Return) {
ret.copyTo(eventBuilder.initReturn());
}
KJ_CASE_ONEOF(link, Link) {
link.copyTo(eventBuilder.initLink());
}
KJ_CASE_ONEOF(attrs, kj::Array<Attribute>) {
// Mark is a collection of attributes.
auto attrBuilder = eventBuilder.initAttribute(attrs.size());
Expand Down Expand Up @@ -1478,6 +1555,9 @@ tracing::TailEvent tracing::TailEvent::clone() {
KJ_CASE_ONEOF(ret, Return) {
return Mark(ret.clone());
}
KJ_CASE_ONEOF(link, Link) {
return Mark(link.clone());
}
KJ_CASE_ONEOF(attrs, tracing::CustomInfo) {
return Mark(KJ_MAP(attr, attrs) { return attr.clone(); });
}
Expand Down
51 changes: 44 additions & 7 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,26 @@ struct Return final {
Return clone();
};

using Mark = kj::OneOf<DiagnosticChannelEvent, Exception, Log, Return, kj::Array<Attribute>>;
// A Link mark is used to establish a link from one span to another.
// The optional label can be used to identify the link.
struct Link final {
explicit Link(const InvocationSpanContext& other, kj::Maybe<kj::String> label = kj::none);
explicit Link(kj::Maybe<kj::String> label, TraceId traceId, TraceId invocationId, SpanId spanId);
Link(rpc::Trace::Link::Reader reader);
Link(Link&&) = default;
Link& operator=(Link&&) = default;
KJ_DISALLOW_COPY(Link);

kj::Maybe<kj::String> label;
TraceId traceId;
TraceId invocationId;
SpanId spanId;

void copyTo(rpc::Trace::Link::Builder builder);
Link clone();
};

using Mark = kj::OneOf<DiagnosticChannelEvent, Exception, Log, Return, Link, kj::Array<Attribute>>;

// Marks the opening of a child span within the streaming tail session.
struct SpanOpen final {
Expand Down Expand Up @@ -656,13 +675,29 @@ struct SpanClose final {
struct Onset final {
using Info = EventInfo;

struct TriggerContext {
TraceId traceId;
TraceId invocationId;
SpanId spanId;

TriggerContext(TraceId traceId, TraceId invocationId, SpanId spanId)
: traceId(kj::mv(traceId)),
invocationId(kj::mv(invocationId)),
spanId(kj::mv(spanId)) {}

TriggerContext(const InvocationSpanContext& ctx)
: TriggerContext(ctx.getTraceId(), ctx.getInvocationId(), ctx.getSpanId()) {}
};

explicit Onset(Info&& info,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptName,
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion,
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::Array<kj::String>> scriptTags,
kj::Maybe<kj::String> entrypoint);
ExecutionModel executionModel = ExecutionModel::STATELESS,
kj::Maybe<kj::String> scriptName = kj::none,
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion = kj::none,
kj::Maybe<kj::String> dispatchNamespace = kj::none,
kj::Maybe<kj::Array<kj::String>> scriptTags = kj::none,
kj::Maybe<kj::String> entrypoint = kj::none,
kj::Maybe<TriggerContext> maybeTrigger = kj::none);

Onset(rpc::Trace::Onset::Reader reader);
Onset(Onset&&) = default;
Onset& operator=(Onset&&) = default;
Expand All @@ -676,6 +711,8 @@ struct Onset final {
kj::Maybe<kj::Array<kj::String>> scriptTags;
kj::Maybe<kj::String> entrypoint;

kj::Maybe<TriggerContext> trigger;

void copyTo(rpc::Trace::Onset::Builder builder);
Onset clone();
};
Expand Down
33 changes: 23 additions & 10 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,23 @@ struct Trace @0x8e8d911203762d34 {
scriptTags @4 :List(Text);
entryPoint @5 :Text;

trigger @6 :InvocationSpanContext;
# If this invocation was triggered by a different invocation that
# is being traced, the trigger will identify the triggering span.
# Propagation of the trigger context is not required, and in some
# cases is not desirable.

info :union {
fetch @6 :FetchEventInfo;
jsrpc @7 :JsRpcEventInfo;
scheduled @8 :ScheduledEventInfo;
alarm @9 :AlarmEventInfo;
queue @10 :QueueEventInfo;
email @11 :EmailEventInfo;
trace @12 :TraceEventInfo;
hibernatableWebSocket @13 :HibernatableWebSocketEventInfo;
resume @14 :Resume;
custom @15 :CustomEventInfo;
fetch @7 :FetchEventInfo;
jsrpc @8 :JsRpcEventInfo;
scheduled @9 :ScheduledEventInfo;
alarm @10 :AlarmEventInfo;
queue @11 :QueueEventInfo;
email @12 :EmailEventInfo;
trace @13 :TraceEventInfo;
hibernatableWebSocket @14 :HibernatableWebSocketEventInfo;
resume @15 :Resume;
custom @16 :CustomEventInfo;
}
}

Expand All @@ -257,6 +263,12 @@ struct Trace @0x8e8d911203762d34 {
# A hibernate event indicates that the tail session is being hibernated.
}

struct Link {
# A link to another invocation span context.
label @0 :Text;
context @1 :InvocationSpanContext;
}

struct TailEvent {
# A streaming tail worker receives a series of Tail Events. Tail events always
# occur within an InvocationSpanContext. The first TailEvent delivered to a
Expand All @@ -278,6 +290,7 @@ struct Trace @0x8e8d911203762d34 {
diagnosticChannelEvent @10 :DiagnosticChannelEvent;
exception @11 :Exception;
log @12 :Log;
link @13 :Link;
}
}
}
Expand Down

0 comments on commit 7add95d

Please sign in to comment.