Skip to content

Commit

Permalink
feat: Reduce DSM public API and merge its extractor (#5228)
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectSlayer authored Jun 14, 2023
1 parent c28320c commit ce8a316
Show file tree
Hide file tree
Showing 25 changed files with 278 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static datadog.trace.api.cache.RadixTreeCache.UNSET_STATUS;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.decorator.http.HttpResourceDecorator.HTTP_RESOURCE_DECORATOR;

Expand All @@ -24,7 +23,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
import datadog.trace.bootstrap.instrumentation.api.Tags;
Expand Down Expand Up @@ -133,8 +131,6 @@ public AgentSpan startSpan(REQUEST_CARRIER carrier, AgentSpan.Context.Extracted
}
AgentPropagation.ContextVisitor<REQUEST_CARRIER> getter = getter();
if (null != carrier && null != getter) {
PathwayContext pathwayContext = propagate().extractPathwayContext(carrier, getter);
span.mergePathwayContext(pathwayContext);
tracer().setDataStreamCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS);
}
return span;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
Expand Down Expand Up @@ -68,8 +67,6 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
CallbackProvider cbp = tracer.getCallbackProvider(RequestContextSlot.APPSEC);
final AgentSpan span = startSpan(GRPC_SERVER, spanContext).setMeasured(true);

PathwayContext pathwayContext = propagate().extractPathwayContext(headers, GETTER);
span.mergePathwayContext(pathwayContext);
AgentTracer.get().setDataStreamCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS);

RequestContext reqContext = span.getRequestContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -84,9 +83,6 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
// The queueSpan will be finished after inner span has been activated to ensure that
// spans are written out together by TraceStructureWriter when running in strict mode
}
PathwayContext pathwayContext =
propagate().extractBinaryPathwayContext(val.headers(), GETTER);
span.mergePathwayContext(pathwayContext);

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -232,8 +231,6 @@ public static void start(
// spans are written out together by TraceStructureWriter when running in strict mode
}

PathwayContext pathwayContext = propagate().extractBinaryPathwayContext(record, SR_GETTER);
span.mergePathwayContext(pathwayContext);
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
if (streamTaskContext != null) {
Expand Down Expand Up @@ -297,8 +294,6 @@ public static void start(
// spans are written out together by TraceStructureWriter when running in strict mode
}

PathwayContext pathwayContext = propagate().extractBinaryPathwayContext(record, PR_GETTER);
span.mergePathwayContext(pathwayContext);
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
if (streamTaskContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ContextVisitors;
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator;
Expand Down Expand Up @@ -219,9 +218,6 @@ public static AgentScope startReceivingSpan(
final AgentSpan span = startSpan(OPERATION_AMQP_INBOUND, parentContext, spanStartMicros);

if (null != headers) {
PathwayContext pathwayContext =
propagate().extractPathwayContext(headers, ContextVisitors.objectValuesMap());
span.mergePathwayContext(pathwayContext);
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import datadog.trace.api.time.SystemTimeSource
import datadog.trace.bootstrap.ActiveSubsystems
import datadog.trace.bootstrap.instrumentation.api.AgentSpan
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.TracerAPI
import datadog.trace.bootstrap.instrumentation.api.DataStreamsMonitoring
import datadog.trace.bootstrap.instrumentation.api.NoopDataStreamsMonitoring
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring
import datadog.trace.common.metrics.EventListener
import datadog.trace.common.metrics.Sink
import datadog.trace.common.writer.DDAgentWriter
Expand All @@ -31,6 +30,7 @@ import datadog.trace.core.CoreTracer
import datadog.trace.core.DDSpan
import datadog.trace.core.PendingTrace
import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring
import datadog.trace.core.datastreams.NoopDataStreamsMonitoring
import datadog.trace.test.util.DDSpecification
import datadog.trace.util.Strings
import de.thetaphi.forbiddenapis.SuppressForbidden
Expand Down Expand Up @@ -158,7 +158,7 @@ abstract class AgentTestRunner extends DDSpecification implements AgentBuilder.L

@SuppressWarnings('PropertyName')
@Shared
DataStreamsMonitoring TEST_DATA_STREAMS_MONITORING
AgentDataStreamsMonitoring TEST_DATA_STREAMS_MONITORING

@Shared
ClassFileTransformer activeTransformer
Expand Down
71 changes: 36 additions & 35 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@
import datadog.trace.api.scopemanager.ScopeListener;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentScopeManager;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.DataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.NoopDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration;
import datadog.trace.bootstrap.instrumentation.api.ScopeSource;
Expand All @@ -74,7 +73,9 @@
import datadog.trace.common.writer.WriterFactory;
import datadog.trace.common.writer.ddintake.DDIntakeTraceInterceptor;
import datadog.trace.core.datastreams.DataStreamsContextCarrierAdapter;
import datadog.trace.core.datastreams.DataStreamsMonitoring;
import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring;
import datadog.trace.core.datastreams.NoopDataStreamsMonitoring;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.monitor.MonitoringImpl;
import datadog.trace.core.monitor.TracerHealthMetrics;
Expand Down Expand Up @@ -503,12 +504,6 @@ private CoreTracer(
.setBaggageMapping(baggageMapping)
.apply();
this.sampler = sampler;
this.injector = injector;
if (extractor != null) {
this.extractor = extractor;
} else {
this.extractor = HttpCodec.createExtractor(config, this::captureTraceConfig);
}
this.logs128bTraceIdEnabled = InstrumenterConfig.get().isLogs128bTraceIdEnabled();
this.localRootSpanTags = localRootSpanTags;
this.defaultSpanTags = defaultSpanTags;
Expand Down Expand Up @@ -604,6 +599,16 @@ private CoreTracer(
}
this.dataStreamsMonitoring.start();

this.injector = injector;
HttpCodec.Extractor builtExtractor;
if (extractor != null) {
builtExtractor = extractor;
} else {
builtExtractor = HttpCodec.createExtractor(config, this::captureTraceConfig);
}
builtExtractor = this.dataStreamsMonitoring.decorate(builtExtractor);
this.extractor = builtExtractor;

this.tagInterceptor =
null == tagInterceptor ? new TagInterceptor(new RuleFlags(config)) : tagInterceptor;

Expand Down Expand Up @@ -818,10 +823,12 @@ public <C> void inject(AgentSpan span, C carrier, Setter<C> setter, TracePropaga
public <C> void injectBinaryPathwayContext(
AgentSpan span, C carrier, BinarySetter<C> setter, LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring);

if (pathwayContext == null) {
return;
}
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
try {
byte[] encodedContext = span.context().getPathwayContext().encode();
byte[] encodedContext = pathwayContext.encode();

if (encodedContext != null) {
log.debug("Injecting pathway context {}", pathwayContext);
Expand All @@ -844,7 +851,10 @@ private static void injectPathwayTags(AgentSpan span, PathwayContext pathwayCont
public <C> void injectPathwayContext(
AgentSpan span, C carrier, Setter<C> setter, LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring);
if (pathwayContext == null) {
return;
}
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
try {
String encodedContext = pathwayContext.strEncode();
if (encodedContext != null) {
Expand Down Expand Up @@ -878,21 +888,13 @@ public <C> AgentSpan.Context.Extracted extract(final C carrier, final ContextVis
return extractor.extract(carrier, getter);
}

@Override
public <C> PathwayContext extractBinaryPathwayContext(C carrier, BinaryContextVisitor<C> getter) {
return dataStreamsMonitoring.extractBinaryPathwayContext(carrier, getter);
}

@Override
public <C> PathwayContext extractPathwayContext(C carrier, ContextVisitor<C> getter) {
return dataStreamsMonitoring.extractPathwayContext(carrier, getter);
}

@Override
public void setDataStreamCheckpoint(AgentSpan span, LinkedHashMap<String, String> sortedTags) {
PathwayContext pathwayContext = span.context().getPathwayContext();
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring);
injectPathwayTags(span, pathwayContext);
if (pathwayContext != null) {
pathwayContext.setCheckpoint(sortedTags, dataStreamsMonitoring::add);
injectPathwayTags(span, pathwayContext);
}
}

@Override
Expand All @@ -906,7 +908,7 @@ public void notifyExtensionEnd(AgentSpan span, Object result, boolean isError) {
}

@Override
public DataStreamsMonitoring getDataStreamsMonitoring() {
public AgentDataStreamsMonitoring getDataStreamsMonitoring() {
return dataStreamsMonitoring;
}

Expand Down Expand Up @@ -1068,10 +1070,7 @@ public void setConsumeCheckpoint(String type, String source, DataStreamsContextC
log.warn("SetConsumeCheckpoint is called with no active span");
return;
}

PathwayContext pathwayContext =
extractPathwayContext(carrier, DataStreamsContextCarrierAdapter.INSTANCE);
span.mergePathwayContext(pathwayContext);
this.dataStreamsMonitoring.mergePathwayContextIntoSpan(span, carrier);

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
Expand Down Expand Up @@ -1466,10 +1465,6 @@ private DDSpanContext buildSpanContext() {
requestContextDataIast = null;
ciVisibilityContextData = null;
}
pathwayContext =
ddsc.getPathwayContext().isStarted()
? ddsc.getPathwayContext()
: dataStreamsMonitoring.newPathwayContext();
propagationTags = propagationTagsFactory.empty();
} else {
long endToEndStartTime;
Expand Down Expand Up @@ -1529,10 +1524,16 @@ private DDSpanContext buildSpanContext() {
if (endToEndStartTime > 0) {
parentTrace.beginEndToEnd(endToEndStartTime);
}

pathwayContext = dataStreamsMonitoring.newPathwayContext();
}

// Use parent pathwayContext if present and started
pathwayContext =
parentContext != null
&& parentContext.getPathwayContext() != null
&& parentContext.getPathwayContext().isStarted()
? parentContext.getPathwayContext()
: dataStreamsMonitoring.newPathwayContext();

if (serviceName == null) {
serviceName = CoreTracer.this.serviceName;
}
Expand Down
6 changes: 0 additions & 6 deletions dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AttachableWrapper;
import datadog.trace.bootstrap.instrumentation.api.ErrorPriorities;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand Down Expand Up @@ -536,11 +535,6 @@ public RequestContext getRequestContext() {
return context.getRequestContext();
}

@Override
public void mergePathwayContext(PathwayContext pathwayContext) {
context.mergePathwayContext(pathwayContext);
}

@Override
public Integer forceSamplingDecision() {
PendingTrace trace = this.context.getTrace();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.trace.core.datastreams;

import datadog.trace.api.WellKnownTags;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
import datadog.trace.core.propagation.HttpCodec;

public class DataStreamContextExtractor implements HttpCodec.Extractor {
private final HttpCodec.Extractor delegate;
private final TimeSource timeSource;
private final WellKnownTags wellKnownTags;

public DataStreamContextExtractor(
HttpCodec.Extractor extractor, TimeSource timeSource, WellKnownTags wellKnownTags) {
this.delegate = extractor;
this.timeSource = timeSource;
this.wellKnownTags = wellKnownTags;
}

@Override
public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> getter) {
// Delegate the default HTTP extraction
TagContext extracted = this.delegate.extract(carrier, getter);
// Extract the pathway context
if (extracted != null) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.wellKnownTags);
extracted.withPathwayContext(pathwayContext);
}
// Return merged extracted context
return extracted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datadog.trace.core.datastreams;

import datadog.trace.api.experimental.DataStreamsContextCarrier;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan.Context;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
import datadog.trace.core.propagation.HttpCodec;

public interface DataStreamsMonitoring extends AgentDataStreamsMonitoring, AutoCloseable {
void start();

PathwayContext newPathwayContext();

/**
* Adds DSM context extractor behavior.
*
* @param extractor The extractor to decorate with DSM extraction.
* @return An extractor with DSM context extraction.
*/
HttpCodec.Extractor decorate(HttpCodec.Extractor extractor);

/**
* Injects DSM {@link PathwayContext} into a span {@link Context}.
*
* @param span The span to update.
* @param carrier The carrier of the {@link PathwayContext} to extract and inject.
*/
void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrier carrier);

void add(StatsPoint statsPoint);

void clear();

@Override
void close();
}
Loading

0 comments on commit ce8a316

Please sign in to comment.