Skip to content

Commit

Permalink
Add the Link type
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Dec 6, 2024
1 parent 6886223 commit c648a98
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 58 deletions.
37 changes: 33 additions & 4 deletions src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,15 @@ KJ_TEST("Read/Write Onset works") {

tracing::FetchEventInfo fetchInfo(
kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr);
tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)), ExecutionModel::STATELESS,
kj::str("foo"), kj::none, kj::none, kj::none, kj::none);

FakeEntropySource entropy;
auto trigger = InvocationSpanContext::newForInvocation(kj::none, entropy);

tracing::Onset info(tracing::Onset::Info(kj::mv(fetchInfo)),
{
.scriptName = kj::str("foo"),
},
tracing::Onset::TriggerContext(trigger));
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
Expand All @@ -488,14 +495,19 @@ KJ_TEST("Read/Write Onset works") {
KJ_ASSERT_NONNULL(info2.info.tryGet<tracing::FetchEventInfo>());
KJ_ASSERT(fetchInfo2.method == kj::HttpMethod::GET);
KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj);
KJ_ASSERT(info2.executionModel == ExecutionModel::STATELESS);
KJ_ASSERT(info2.workerInfo.executionModel == ExecutionModel::STATELESS);

auto& triggerCtx = KJ_ASSERT_NONNULL(info2.trigger);
KJ_ASSERT(triggerCtx.traceId == trigger.getTraceId());
KJ_ASSERT(triggerCtx.invocationId == trigger.getInvocationId());
KJ_ASSERT(triggerCtx.spanId == trigger.getSpanId());

tracing::Onset info3 = info.clone();
tracing::FetchEventInfo& fetchInfo3 =
KJ_ASSERT_NONNULL(info3.info.tryGet<tracing::FetchEventInfo>());
KJ_ASSERT(fetchInfo3.method == kj::HttpMethod::GET);
KJ_ASSERT(fetchInfo3.url == "https://example.com"_kj);
KJ_ASSERT(info3.executionModel == ExecutionModel::STATELESS);
KJ_ASSERT(info3.workerInfo.executionModel == ExecutionModel::STATELESS);
}

KJ_TEST("Read/Write Outcome works") {
Expand All @@ -517,6 +529,23 @@ KJ_TEST("Read/Write Outcome works") {
KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS);
}

KJ_TEST("Read/Write Link works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Link>();

FakeEntropySource entropy;
auto context = tracing::InvocationSpanContext::newForInvocation(kj::none, entropy);

tracing::Link link(context, kj::str("foo"));
link.copyTo(infoBuilder);

tracing::Link link2(infoBuilder.asReader());
KJ_ASSERT(KJ_ASSERT_NONNULL(link2.label) == "foo"_kj);
KJ_ASSERT(link2.traceId == context.getTraceId());
KJ_ASSERT(link2.invocationId == context.getInvocationId());
KJ_ASSERT(link2.spanId == context.getSpanId());
}

KJ_TEST("Read/Write TailEvent works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::TailEvent>();
Expand Down
142 changes: 112 additions & 30 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,59 @@ tracing::SpanClose tracing::SpanClose::clone() {
return SpanClose(outcome);
}

namespace {
kj::Maybe<kj::String> readLabelFromReader(const rpc::Trace::Link::Reader& reader) {
if (!reader.hasLabel()) return kj::none;
return kj::str(reader.getLabel());
}
tracing::TraceId readTraceIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::TraceId::fromCapnp(context.getTraceId());
}
tracing::TraceId readInvocationIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::TraceId::fromCapnp(context.getInvocationId());
}
tracing::SpanId readSpanIdFromReader(const rpc::Trace::Link::Reader& reader) {
KJ_ASSERT(reader.hasContext());
auto context = reader.getContext();
return tracing::SpanId(context.getSpanId());
}
} // namespace

tracing::Link::Link(const InvocationSpanContext& other, kj::Maybe<kj::String> label)
: Link(kj::mv(label), other.getTraceId(), other.getInvocationId(), other.getSpanId()) {}

tracing::Link::Link(
kj::Maybe<kj::String> label, TraceId traceId, TraceId invocationId, SpanId spanId)
: label(kj::mv(label)),
traceId(kj::mv(traceId)),
invocationId(kj::mv(invocationId)),
spanId(kj::mv(spanId)) {}

tracing::Link::Link(rpc::Trace::Link::Reader reader)
: label(readLabelFromReader(reader)),
traceId(readTraceIdFromReader(reader)),
invocationId(readInvocationIdFromReader(reader)),
spanId(readSpanIdFromReader(reader)) {}

void tracing::Link::copyTo(rpc::Trace::Link::Builder builder) {
KJ_IF_SOME(l, label) {
builder.setLabel(l);
}
auto ctx = builder.initContext();
traceId.toCapnp(ctx.initTraceId());
invocationId.toCapnp(ctx.initInvocationId());
ctx.setSpanId(spanId.getId());
}

tracing::Link tracing::Link::clone() {
return Link(
label.map([](kj::String& str) { return kj::str(str); }), traceId, invocationId, spanId);
}

namespace {
tracing::Onset::Info getInfoFromReader(const rpc::Trace::Onset::Reader& reader) {
auto info = reader.getInfo();
Expand Down Expand Up @@ -1175,52 +1228,63 @@ kj::Maybe<kj::String> getEntrypointFromReader(const rpc::Trace::Onset::Reader& r
}
return kj::none;
}
kj::Maybe<tracing::Onset::TriggerContext> getTriggerContextFromReader(
const rpc::Trace::Onset::Reader& reader) {
if (!reader.hasTrigger()) return kj::none;
auto trigger = reader.getTrigger();
return tracing::Onset::TriggerContext(tracing::TraceId::fromCapnp(trigger.getTraceId()),
tracing::TraceId::fromCapnp(trigger.getInvocationId()), tracing::SpanId(trigger.getSpanId()));
}
tracing::Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reader) {
return tracing::Onset::WorkerInfo{
.executionModel = reader.getExecutionModel(),
.scriptName = getScriptNameFromReader(reader),
.scriptVersion = getScriptVersionFromReader(reader),
.dispatchNamespace = getDispatchNamespaceFromReader(reader),
.scriptTags = getScriptTagsFromReader(reader),
.entrypoint = getEntrypointFromReader(reader),
};
}
} // namespace

tracing::Onset::Onset(tracing::Onset::Info&& info,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptName,
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion,
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::Array<kj::String>> scriptTags,
kj::Maybe<kj::String> entrypoint)
tracing::Onset::WorkerInfo&& workerInfo,
kj::Maybe<TriggerContext> maybeTrigger)
: info(kj::mv(info)),
executionModel(executionModel),
scriptName(kj::mv(scriptName)),
scriptVersion(kj::mv(scriptVersion)),
dispatchNamespace(kj::mv(dispatchNamespace)),
scriptTags(kj::mv(scriptTags)),
entrypoint(kj::mv(entrypoint)) {}
workerInfo(kj::mv(workerInfo)),
trigger(kj::mv(maybeTrigger)) {}

tracing::Onset::Onset(rpc::Trace::Onset::Reader reader)
: info(getInfoFromReader(reader)),
executionModel(reader.getExecutionModel()),
scriptName(getScriptNameFromReader(reader)),
scriptVersion(getScriptVersionFromReader(reader)),
dispatchNamespace(getDispatchNamespaceFromReader(reader)),
scriptTags(getScriptTagsFromReader(reader)),
entrypoint(getEntrypointFromReader(reader)) {}
workerInfo(getWorkerInfoFromReader(reader)),
trigger(getTriggerContextFromReader(reader)) {}

void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) {
builder.setExecutionModel(executionModel);
KJ_IF_SOME(name, scriptName) {
builder.setExecutionModel(workerInfo.executionModel);
KJ_IF_SOME(name, workerInfo.scriptName) {
builder.setScriptName(name);
}
KJ_IF_SOME(version, scriptVersion) {
KJ_IF_SOME(version, workerInfo.scriptVersion) {
builder.setScriptVersion(*version);
}
KJ_IF_SOME(name, dispatchNamespace) {
KJ_IF_SOME(name, workerInfo.dispatchNamespace) {
builder.setDispatchNamespace(name);
}
KJ_IF_SOME(tags, scriptTags) {
KJ_IF_SOME(tags, workerInfo.scriptTags) {
auto list = builder.initScriptTags(tags.size());
for (size_t i = 0; i < tags.size(); i++) {
list.set(i, tags[i]);
}
}
KJ_IF_SOME(e, entrypoint) {
KJ_IF_SOME(e, workerInfo.entrypoint) {
builder.setEntryPoint(e);
}
KJ_IF_SOME(t, trigger) {
auto ctx = builder.initTrigger();
t.traceId.toCapnp(ctx.initTraceId());
t.invocationId.toCapnp(ctx.getInvocationId());
ctx.setSpanId(t.spanId.getId());
}
auto infoBuilder = builder.initInfo();
KJ_SWITCH_ONEOF(info) {
KJ_CASE_ONEOF(fetch, FetchEventInfo) {
Expand Down Expand Up @@ -1256,6 +1320,18 @@ void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) {
}
}

tracing::Onset::WorkerInfo tracing::Onset::WorkerInfo::clone() const {
return WorkerInfo{
.executionModel = executionModel,
.scriptName = scriptName.map([](auto& str) { return kj::str(str); }),
.scriptVersion = scriptVersion.map([](auto& version) { return capnp::clone(*version); }),
.dispatchNamespace = dispatchNamespace.map([](auto& str) { return kj::str(str); }),
.scriptTags =
scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }),
.entrypoint = entrypoint.map([](auto& str) { return kj::str(str); }),
};
}

tracing::Onset tracing::Onset::clone() {
constexpr auto cloneInfo = [](Info& info) -> tracing::Onset::Info {
KJ_SWITCH_ONEOF(info) {
Expand Down Expand Up @@ -1292,12 +1368,9 @@ tracing::Onset tracing::Onset::clone() {
}
KJ_UNREACHABLE;
};
return Onset(cloneInfo(info), executionModel,
scriptName.map([](auto& name) { return kj::str(name); }),
scriptVersion.map([](auto& version) { return capnp::clone(*version); }),
dispatchNamespace.map([](auto& ns) { return kj::str(ns); }),
scriptTags.map([](auto& tags) { return KJ_MAP(tag, tags) { return kj::str(tag); }; }),
entrypoint.map([](auto& e) { return kj::str(e); }));
return Onset(cloneInfo(info), workerInfo.clone(), trigger.map([](TriggerContext& ctx) {
return TriggerContext(ctx.traceId, ctx.invocationId, ctx.spanId);
}));
}

tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime)
Expand Down Expand Up @@ -1383,6 +1456,9 @@ tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Re
case rpc::Trace::TailEvent::Event::LOG: {
return tracing::Mark(tracing::Log(event.getLog()));
}
case rpc::Trace::TailEvent::Event::LINK: {
return tracing::Mark(tracing::Link(event.getLink()));
}
}
KJ_UNREACHABLE;
}
Expand Down Expand Up @@ -1434,6 +1510,9 @@ void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) {
KJ_CASE_ONEOF(ret, Return) {
ret.copyTo(eventBuilder.initReturn());
}
KJ_CASE_ONEOF(link, Link) {
link.copyTo(eventBuilder.initLink());
}
KJ_CASE_ONEOF(attrs, kj::Array<Attribute>) {
// Mark is a collection of attributes.
auto attrBuilder = eventBuilder.initAttribute(attrs.size());
Expand Down Expand Up @@ -1478,6 +1557,9 @@ tracing::TailEvent tracing::TailEvent::clone() {
KJ_CASE_ONEOF(ret, Return) {
return Mark(ret.clone());
}
KJ_CASE_ONEOF(link, Link) {
return Mark(link.clone());
}
KJ_CASE_ONEOF(attrs, tracing::CustomInfo) {
return Mark(KJ_MAP(attr, attrs) { return attr.clone(); });
}
Expand Down
65 changes: 51 additions & 14 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,26 @@ struct Return final {
Return clone();
};

using Mark = kj::OneOf<DiagnosticChannelEvent, Exception, Log, Return, kj::Array<Attribute>>;
// A Link mark is used to establish a link from one span to another.
// The optional label can be used to identify the link.
struct Link final {
explicit Link(const InvocationSpanContext& other, kj::Maybe<kj::String> label = kj::none);
explicit Link(kj::Maybe<kj::String> label, TraceId traceId, TraceId invocationId, SpanId spanId);
Link(rpc::Trace::Link::Reader reader);
Link(Link&&) = default;
Link& operator=(Link&&) = default;
KJ_DISALLOW_COPY(Link);

kj::Maybe<kj::String> label;
TraceId traceId;
TraceId invocationId;
SpanId spanId;

void copyTo(rpc::Trace::Link::Builder builder);
Link clone();
};

using Mark = kj::OneOf<DiagnosticChannelEvent, Exception, Log, Return, Link, kj::Array<Attribute>>;

// Marks the opening of a child span within the streaming tail session.
struct SpanOpen final {
Expand Down Expand Up @@ -656,25 +675,43 @@ struct SpanClose final {
struct Onset final {
using Info = EventInfo;

explicit Onset(Info&& info,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptName,
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion,
kj::Maybe<kj::String> dispatchNamespace,
kj::Maybe<kj::Array<kj::String>> scriptTags,
kj::Maybe<kj::String> entrypoint);
// Information about the worker that is being tailed.
struct WorkerInfo final {
ExecutionModel executionModel = ExecutionModel::STATELESS;
kj::Maybe<kj::String> scriptName;
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion;
kj::Maybe<kj::String> dispatchNamespace;
kj::Maybe<kj::Array<kj::String>> scriptTags;
kj::Maybe<kj::String> entrypoint;

WorkerInfo clone() const;
};

struct TriggerContext final {
TraceId traceId;
TraceId invocationId;
SpanId spanId;

TriggerContext(TraceId traceId, TraceId invocationId, SpanId spanId)
: traceId(kj::mv(traceId)),
invocationId(kj::mv(invocationId)),
spanId(kj::mv(spanId)) {}

TriggerContext(const InvocationSpanContext& ctx)
: TriggerContext(ctx.getTraceId(), ctx.getInvocationId(), ctx.getSpanId()) {}
};

explicit Onset(
Info&& info, WorkerInfo&& workerInfo, kj::Maybe<TriggerContext> maybeTrigger = kj::none);

Onset(rpc::Trace::Onset::Reader reader);
Onset(Onset&&) = default;
Onset& operator=(Onset&&) = default;
KJ_DISALLOW_COPY(Onset);

Info info;
ExecutionModel executionModel;
kj::Maybe<kj::String> scriptName;
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion;
kj::Maybe<kj::String> dispatchNamespace;
kj::Maybe<kj::Array<kj::String>> scriptTags;
kj::Maybe<kj::String> entrypoint;
WorkerInfo workerInfo;
kj::Maybe<TriggerContext> trigger;

void copyTo(rpc::Trace::Onset::Builder builder);
Onset clone();
Expand Down
Loading

0 comments on commit c648a98

Please sign in to comment.