Skip to content

Commit

Permalink
add queue type and length to queue events
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin committed Jan 23, 2025
1 parent ae1aa30 commit 0a24a91
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,31 @@ private static final class RateLimiterHolder {
}

public static <T> void startQueuingTimer(
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
ContextStore<T, State> taskContextStore,
Class<?> schedulerClass,
Class<?> queueClass,
int queueLength,
T task) {
State state = taskContextStore.get(task);
startQueuingTimer(state, schedulerClass, task);
startQueuingTimer(state, schedulerClass, queueClass, queueLength, task);
}

public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
public static void startQueuingTimer(
State state, Class<?> schedulerClass, Class<?> queueClass, int queueLength, Object task) {
if (Platform.isNativeImage()) {
// explicitly not supported for Graal native image
return;
}
// TODO consider queue length based sampling here to reduce overhead
// avoid calling this before JFR is initialised because it will lead to reading the wrong
// TSC frequency before JFR has set it up properly
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
QueueTiming timing =
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
timing.setTask(task);
timing.setScheduler(schedulerClass);
timing.setQueue(queueClass);
timing.setQueueLength(queueLength);
state.setTiming(timing);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class QueueTimeEvent extends Event implements QueueTiming {
@Label("Scheduler")
private Class<?> scheduler;

@Label("Queue")
private Class<?> queueType;

@Label("Queue Length on Entry")
private int queueLength;

public QueueTimeEvent() {
this.origin = Thread.currentThread();
AgentSpan activeSpan = AgentTracer.activeSpan();
Expand All @@ -55,6 +61,16 @@ public void setScheduler(Class<?> scheduler) {
this.scheduler = scheduler;
}

@Override
public void setQueue(Class<?> queueType) {
this.queueType = queueType;
}

@Override
public void setQueueLength(int queueLength) {
this.queueLength = queueLength;
}

@Override
public void report() {
commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class QueueTimeTracker implements QueueTiming {
// FIXME this can be eliminated by altering the instrumentation
// since it is known when the item is polled from the queue
private Class<?> scheduler;
private Class<?> queue;
private int queueLength;

public QueueTimeTracker(DatadogProfiler profiler, long startTicks) {
this.profiler = profiler;
Expand All @@ -31,6 +33,16 @@ public void setScheduler(Class<?> scheduler) {
this.scheduler = scheduler;
}

@Override
public void setQueue(Class<?> queue) {
this.queue = queue;
}

@Override
public void setQueueLength(int queueLength) {
this.queueLength = queueLength;
}

@Override
public void report() {
assert weakTask != null && scheduler != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.channels.Channel;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
Expand Down Expand Up @@ -66,7 +67,14 @@ public static final class Construct {
public static void after(@Advice.This Object command) {
ContextStore<Object, State> contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE);
capture(contextStore, command);
QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command);
// FIXME hard to handle both the lifecyle and get access to the queue instance in the same
// frame within the WriteQueue class.
// This means we can't get the queue length. A (bad) alternative would be to instrument
// ConcurrentLinkedQueue broadly,
// or we could write more brittle instrumentation targeting code patterns in different gRPC
// versions.
QueueTimerHelper.startQueuingTimer(
contextStore, Channel.class, ConcurrentLinkedQueue.class, 0, command);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -162,12 +163,20 @@ public static void capture(
// excluded as
// Runnables but it is not until now that they will be put on the executor's queue
if (!exclude(RUNNABLE, task)) {
Queue<?> queue = tpe.getQueue();
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(Runnable.class, State.class), tpe.getClass(), task);
InstrumentationContext.get(Runnable.class, State.class),
tpe.getClass(),
queue.getClass(),
queue.size(),
task);
} else if (!exclude(RUNNABLE_FUTURE, task) && task instanceof RunnableFuture) {
Queue<?> queue = tpe.getQueue();
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(RunnableFuture.class, State.class),
tpe.getClass(),
queue.getClass(),
queue.size(),
(RunnableFuture<?>) task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import net.bytebuddy.asm.Advice;

Expand Down Expand Up @@ -53,13 +51,11 @@ public void methodAdvice(MethodTransformer transformer) {
public static final class ExternalPush {
@SuppressWarnings("rawtypes")
@Advice.OnMethodEnter
public static <T> void externalPush(
@Advice.This ForkJoinPool pool, @Advice.Argument(0) ForkJoinTask<T> task) {
public static <T> void externalPush(@Advice.Argument(0) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
capture(contextStore, task);
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
}
}

Expand All @@ -74,13 +70,11 @@ public static <T> void cleanup(

public static final class PoolSubmit {
@Advice.OnMethodEnter
public static <T> void poolSubmit(
@Advice.This ForkJoinPool pool, @Advice.Argument(1) ForkJoinTask<T> task) {
public static <T> void poolSubmit(@Advice.Argument(1) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
capture(contextStore, task);
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package datadog.trace.instrumentation.java.concurrent.forkjoin;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.FORK_JOIN_POOL_INSTRUMENTATION_NAME;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.fieldType;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.config.ProfilingConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.config.provider.ConfigProvider;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class JavaForkJoinWorkQueueInstrumentation extends InstrumenterModule.Profiling
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public JavaForkJoinWorkQueueInstrumentation() {
super(
EXECUTOR_INSTRUMENTATION_NAME,
FORK_JOIN_POOL_INSTRUMENTATION_NAME,
FORK_JOIN_POOL_INSTRUMENTATION_NAME + "-workqueue");
}

@Override
public String instrumentedType() {
return "java.util.concurrent.ForkJoinPool$WorkQueue";
}

@Override
public boolean isEnabled() {
return super.isEnabled()
&& ConfigProvider.getInstance()
.getBoolean(
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED,
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED_DEFAULT);
}

@Override
public Map<String, String> contextStore() {
return singletonMap("java.util.concurrent.ForkJoinTask", State.class.getName());
}

@Override
public void methodAdvice(MethodTransformer transformer) {
String name = getClass().getName();
transformer.applyAdvice(
isMethod()
.and(named("push"))
.and(takesArgument(0, named("java.util.concurrent.ForkJoinTask")))
.and(
isDeclaredBy(
declaresField(fieldType(int.class).and(named("top")))
.and(declaresField(fieldType(int.class).and(named("base")))))),
name + "$PushTask");
}

public static final class PushTask {
@SuppressWarnings("rawtypes")
@Advice.OnMethodEnter
public static <T> void push(
@Advice.This Object workQueue,
@Advice.FieldValue("top") int top,
@Advice.FieldValue("base") int base,
@Advice.Argument(0) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
QueueTimerHelper.startQueuingTimer(
contextStore, ForkJoinPool.class, workQueue.getClass(), top - base, task);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper.startQueuingTimer;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.RUNNABLE_INSTRUMENTATION_NAME;
import static java.util.Collections.singletonMap;
Expand All @@ -21,7 +20,6 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import net.bytebuddy.asm.Advice;

Expand Down Expand Up @@ -67,7 +65,6 @@ public static void before(@Advice.Argument(0) TimerTask task, @Advice.Argument(2
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
capture(contextStore, task);
startQueuingTimer(contextStore, Timer.class, task);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import datadog.trace.agent.test.TestProfilingContextIntegration
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling

import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.LinkedBlockingQueue

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

Expand All @@ -20,21 +22,34 @@ class QueueTimingForkedTest extends AgentTestRunner {
def "test queue timing with submit"() {
setup:
def executor = Executors.newSingleThreadExecutor()
def fjp = new ForkJoinPool(1)

when:
runUnderTrace("parent", {
executor.submit(new TestRunnable()).get()
})

then:
verify()
verify(LinkedBlockingQueue.name)

when:
runUnderTrace("parent", {
for (int i = 0; i < 99; i++) {
fjp.submit(new TestRunnable())
}
fjp.submit(new TestRunnable()).get()
})

then:
verify("java.util.concurrent.ForkJoinPool\$WorkQueue")

cleanup:
executor.shutdown()
fjp.shutdown()
TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.clear()
}

void verify() {
void verify(expectedQueueType) {
assert TEST_PROFILING_CONTEXT_INTEGRATION.isBalanced()
assert !TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.isEmpty()
int numAsserts = 0
Expand All @@ -45,6 +60,8 @@ class QueueTimingForkedTest extends AgentTestRunner {
assert timing.task == TestRunnable
assert timing.scheduler != null
assert timing.origin == Thread.currentThread()
assert timing.queueLength >= 0
assert timing.queue.name == expectedQueueType
numAsserts++
}
}
Expand Down
Loading

0 comments on commit 0a24a91

Please sign in to comment.