Skip to content

Commit

Permalink
Track allocations within computers
Browse files Browse the repository at this point in the history
  • Loading branch information
SquidDev committed Nov 9, 2023
1 parent 1d365f5 commit 7f87f2a
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private void addTranslations() {
// Metrics
add(Metrics.COMPUTER_TASKS, "Tasks");
add(Metrics.SERVER_TASKS, "Server tasks");
add(Metrics.JAVA_ALLOCATION, "Java Allocations");
add(Metrics.PERIPHERAL_OPS, "Peripheral calls");
add(Metrics.FS_OPS, "Filesystem operations");
add(Metrics.HTTP_REQUESTS, "HTTP requests");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.metrics.ThreadAllocations;
import dan200.computercraft.core.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.Arrays;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -476,6 +478,9 @@ public void run() {
}

private void runImpl() {
var workerThreadIds = new long[workersReadOnly().length];
Arrays.fill(workerThreadIds, Thread.currentThread().getId());

while (state.get() < CLOSED) {
computerLock.lock();
try {
Expand All @@ -490,12 +495,32 @@ private void runImpl() {
computerLock.unlock();
}

checkRunners();
checkRunners(workerThreadIds);
}
}

private void checkRunners() {
for (@Nullable var runner : workersReadOnly()) {
private void checkRunners(long[] workerThreadIds) {
var workers = workersReadOnly();

long[] allocations;
if (ThreadAllocations.isSupported()) {
// If allocation tracking is supported, update the current thread IDs and then fetch the total allocated
// memory. When dealing with multiple workers, it's more efficient to getAllocatedBytes in bulk rather
// than, hence doing it within the worker loop.
// However, this does mean we need to maintain an array of worker thread IDs. We could have a shared
// array and update it within .addWorker(_), but that's got all sorts of thread-safety issues. It ends
// up being easier (and not too inefficient) to just recompute the array each time.
for (var i = 0; i < workers.length; i++) {
var runner = workers[i];
if (runner != null) workerThreadIds[i] = runner.owner.getId();
}
allocations = ThreadAllocations.getAllocatedBytes(workerThreadIds);
} else {
allocations = null;
}

for (var i = 0; i < workers.length; i++) {
var runner = workers[i];
if (runner == null) continue;

// If the worker has no work, skip
Expand All @@ -505,6 +530,11 @@ private void checkRunners() {
// Refresh the timeout state. Will set the pause/soft timeout flags as appropriate.
executor.timeout.refresh();

// And track the allocated memory.
if (allocations != null) {
executor.updateAllocations(new ThreadAllocation(workerThreadIds[i], allocations[i]));
}

// If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT),
// then we can let the Lua machine do its work.
var remainingTime = executor.timeout.getRemainingTime();
Expand Down Expand Up @@ -732,6 +762,9 @@ private final class ExecutorImpl implements Executor {
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ExecutorState> STATE = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ExecutorState.class, "$state"
);
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ThreadAllocation> THREAD_ALLOCATION = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ThreadAllocation.class, "$threadAllocation"
);

final Worker worker;
private final MetricsObserver metrics;
Expand All @@ -742,6 +775,16 @@ private final class ExecutorImpl implements Executor {
*/
private volatile ExecutorState $state = ExecutorState.IDLE;

/**
* Information about allocations on the currently executing thread.
* <p>
* {@linkplain #beforeWork() Before starting any work}, we set this to the current thread and the current
* {@linkplain ThreadAllocations#getAllocatedBytes(long) amount of allocated memory}. When the computer
* {@linkplain #afterWork()} finishes executing, we set this back to null and compute the difference between the
* two, updating the {@link Metrics#JAVA_ALLOCATION} metric.
*/
private volatile @Nullable ThreadAllocation $threadAllocation = null;

/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
Expand All @@ -768,6 +811,11 @@ private final class ExecutorImpl implements Executor {
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer(scaledPeriod());

if (ThreadAllocations.isSupported()) {
var current = Thread.currentThread().getId();
THREAD_ALLOCATION.set(this, new ThreadAllocation(current, ThreadAllocations.getAllocatedBytes(current)));
}
}

/**
Expand All @@ -779,10 +827,46 @@ boolean afterWork() {
timeout.reset();
metrics.observe(Metrics.COMPUTER_TASKS, timeout.getExecutionTime());

if (ThreadAllocations.isSupported()) {
var current = Thread.currentThread().getId();
var info = THREAD_ALLOCATION.getAndSet(this, null);
assert info.threadId() == current;

var allocated = ThreadAllocations.getAllocatedBytes(current) - info.allocatedBytes();
if (allocated > 0) {
metrics.observe(Metrics.JAVA_ALLOCATION, allocated);
} else {
LOG.warn("Allocated a negative number of bytes!");
}
}

var state = STATE.getAndUpdate(this, ExecutorState::requeue);
return state == ExecutorState.REPEAT;
}

/**
* Update the per-thread allocation information.
*
* @param allocation The latest allocation information.
*/
void updateAllocations(ThreadAllocation allocation) {
ThreadAllocation current;
long allocated;
do {
// Probe the current information - if it's null or the thread has changed, then the worker has already
// finished and this information is out-of-date, so just abort.
current = THREAD_ALLOCATION.get(this);
if (current == null || current.threadId() != allocation.threadId()) return;

// Then compute the difference since the previous measurement. If the new value is less than the current
// one, then it must be out-of-date. Again, just abort.
allocated = allocation.allocatedBytes() - current.allocatedBytes();
if (allocated <= 0) return;
} while (!THREAD_ALLOCATION.compareAndSet(this, current, allocation));

metrics.observe(Metrics.JAVA_ALLOCATION, allocated);
}

@Override
public void submit() {
var state = STATE.getAndUpdate(this, ExecutorState::enqueue);
Expand Down Expand Up @@ -811,4 +895,13 @@ protected boolean shouldPause() {
return hasPendingWork();
}
}

/**
* Allocation information about a specific thread.
*
* @param threadId The ID of this thread.
* @param allocatedBytes The amount of memory this thread has allocated.
*/
private record ThreadAllocation(long threadId, long allocatedBytes) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ private Metrics() {
public static final Metric.Event COMPUTER_TASKS = new Metric.Event("computer_tasks", "ns", Metric::formatTime);
public static final Metric.Event SERVER_TASKS = new Metric.Event("server_tasks", "ns", Metric::formatTime);

public static final Metric.Event JAVA_ALLOCATION = new Metric.Event("java_allocation", "bytes", Metric::formatBytes);

public static final Metric.Event PERIPHERAL_OPS = new Metric.Event("peripheral", "ns", Metric::formatTime);
public static final Metric.Event FS_OPS = new Metric.Event("fs", "ns", Metric::formatTime);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0

package dan200.computercraft.core.metrics;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

/**
* Provides a way to get the memory allocated by a specific thread.
* <p>
* This uses Hotspot-specific functionality, so may not be available on all JVMs. Consumers should call
* {@link #isSupported()} before calling more specific methods.
*
* @see com.sun.management.ThreadMXBean
*/
public final class ThreadAllocations {
private static final Logger LOG = LoggerFactory.getLogger(ThreadAllocations.class);

private static final @Nullable MethodHandle threadAllocatedBytes;
private static final @Nullable MethodHandle threadsAllocatedBytes;

static {
MethodHandle threadAllocatedBytesHandle, threadsAllocatedBytesHandle;
try {
var threadMxBean = Class.forName("com.sun.management.ThreadMXBean").asSubclass(ThreadMXBean.class);
var bean = ManagementFactory.getPlatformMXBean(threadMxBean);

// Enable allocation tracking.
threadMxBean.getMethod("setThreadAllocatedMemoryEnabled", boolean.class).invoke(bean, true);

// Just probe this method once to check it doesn't error.
threadMxBean.getMethod("getCurrentThreadAllocatedBytes").invoke(bean);

threadAllocatedBytesHandle = MethodHandles.publicLookup()
.findVirtual(threadMxBean, "getThreadAllocatedBytes", MethodType.methodType(long.class, long.class))
.bindTo(bean);
threadsAllocatedBytesHandle = MethodHandles.publicLookup()
.findVirtual(threadMxBean, "getThreadAllocatedBytes", MethodType.methodType(long[].class, long[].class))
.bindTo(bean);
} catch (LinkageError | ReflectiveOperationException | RuntimeException e) {
LOG.warn("Cannot track allocated memory of computer threads", e);
threadAllocatedBytesHandle = threadsAllocatedBytesHandle = null;
}

threadAllocatedBytes = threadAllocatedBytesHandle;
threadsAllocatedBytes = threadsAllocatedBytesHandle;
}

private ThreadAllocations() {
}

/**
* Check whether the current JVM provides information about per-thread allocations.
*
* @return Whether per-thread allocation information is available.
*/
public static boolean isSupported() {
return threadAllocatedBytes != null;
}

/**
* Get an approximation the amount of memory a thread has allocated over its lifetime.
*
* @param threadId The ID of the thread.
* @return The allocated memory, in bytes.
* @see com.sun.management.ThreadMXBean#getThreadAllocatedBytes(long)
*/
public static long getAllocatedBytes(long threadId) {
if (threadAllocatedBytes == null) {
throw new UnsupportedOperationException("Allocated bytes are not supported");
}

try {
return (long) threadAllocatedBytes.invokeExact(threadId);
} catch (Throwable t) {
throw throwUnchecked0(t); // Should never occur, but if it does it's guaranteed to be a runtime exception.
}
}

/**
* Get an approximation the amount of memory a thread has allocated over its lifetime.
* <p>
* This is equivalent to calling {@link #getAllocatedBytes(long)} for each thread in {@code threadIds}.
*
* @param threadIds An array of thread IDs.
* @return An array with the same length as {@code threadIds}, containing the allocated memory for each thread.
* @see com.sun.management.ThreadMXBean#getThreadAllocatedBytes(long[])
*/
public static long[] getAllocatedBytes(long[] threadIds) {
if (threadsAllocatedBytes == null) {
throw new UnsupportedOperationException("Allocated bytes are not supported");
}

try {
return (long[]) threadsAllocatedBytes.invokeExact(threadIds);
} catch (Throwable t) {
throw throwUnchecked0(t); // Should never occur, but if it does it's guaranteed to be a runtime exception.
}
}

@SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
private static <T extends Throwable> T throwUnchecked0(Throwable t) throws T {
throw (T) t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package dan200.computercraft.core.computer.computerthread;

import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metric;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import javax.annotation.concurrent.GuardedBy;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -81,14 +83,15 @@ private interface Task {
void run(ComputerScheduler.Executor executor) throws InterruptedException;
}

public final class Worker implements ComputerScheduler.Worker {
public final class Worker implements ComputerScheduler.Worker, MetricsObserver {
private final Task run;
private final ComputerScheduler.Executor executor;
private long[] totals = new long[16];
volatile int executed = 0;

private Worker(ComputerScheduler scheduler, Task run) {
this.run = run;
this.executor = scheduler.createExecutor(this, MetricsObserver.discard());
this.executor = scheduler.createExecutor(this, this);
}

public ComputerScheduler.Executor executor() {
Expand Down Expand Up @@ -138,5 +141,25 @@ public void unload() {
@Override
public void abortWithError() {
}

private synchronized void observeImpl(Metric metric, long value) {
if (metric.id() >= totals.length) totals = Arrays.copyOf(totals, Math.max(metric.id(), totals.length * 2));
totals[metric.id()] += value;
}

@Override
public void observe(Metric.Counter counter) {
observeImpl(counter, 1);
}

@Override
public void observe(Metric.Event event, long value) {
observeImpl(event, value);
}

public long getMetric(Metric metric) {
var totals = this.totals;
return metric.id() < totals.length ? totals[metric.id()] : 0;
}
}
}
Loading

0 comments on commit 7f87f2a

Please sign in to comment.