Skip to content

Commit

Permalink
Add support for virtual threads to universal profiling integration (#326
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JonasKunz authored Jul 19, 2024
1 parent 4300949 commit 20c06c1
Show file tree
Hide file tree
Showing 15 changed files with 538 additions and 38 deletions.
7 changes: 6 additions & 1 deletion jvmti-access/src/main/java/co/elastic/otel/JvmtiAccess.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ static void setProfilingCorrelationCurrentThreadStorage(@Nullable ByteBuffer sto
JvmtiAccessImpl.setThreadProfilingCorrelationBuffer0(storage);
}

static void setProfilingCorrelationVirtualThreadSupportEnabled(boolean enable) {
ensureInitialized();
JvmtiAccessImpl.setProfilingCorrelationVirtualThreadSupportEnabled0(enable);
}

/**
* Starts the socket for receiving universal profiler messages on the given filepath. Note that
* the path has a limitation of about 100 characters, see <a
Expand Down Expand Up @@ -106,7 +111,7 @@ private static synchronized void doInit() {
}
case LOADED:
try {
// TODO: call an initialization method and check the results
JvmtiAccessImpl.init0();
state = State.INITIALIZED;
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to initialize jvmti native library", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@

public class JvmtiAccessImpl {

static native int destroy0();
public static native int init0();

public static native int destroy0();

/**
* @param threadBuffer the buffer whose address will get stored in the native thread-local-storage
* for APM <-> profiling correlation
*/
static native void setThreadProfilingCorrelationBuffer0(ByteBuffer threadBuffer);

static native int setProfilingCorrelationVirtualThreadSupportEnabled0(boolean enable);

/**
* @param byteBuffer the buffer whose address will get stored in the native global variable for
* APM <-> profiling correlation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

public class UniversalProfilingCorrelation {

private static final MethodHandle VIRTUAL_CHECKER = generateVirtualChecker();
private static final MethodHandle THREAD_IS_VIRTUAL = generateVirtualChecker();

private static ThreadLocal<ByteBuffer> threadStorage;
private static volatile boolean virtualThreadSupportEnabled = false;

// We hold a reference to the configured processStorage to make sure it is not GCed
private static ByteBuffer processStorage;
Expand All @@ -60,11 +61,23 @@ public static synchronized void setProcessStorage(@Nullable ByteBuffer buffer) {
processStorage = buffer;
}

public static synchronized void setVirtualThreadSupportEnabled(boolean enable) {
if (THREAD_IS_VIRTUAL == null) {
// JVM does not have virtual threads, so this method is a NoOp
return;
}
if (virtualThreadSupportEnabled == enable) {
return;
}
JvmtiAccess.setProfilingCorrelationVirtualThreadSupportEnabled(enable);
virtualThreadSupportEnabled = enable;
}

@Nullable
public static ByteBuffer getCurrentThreadStorage(
boolean allocateIfRequired, int expectedCapacity) {
if (isVirtual(Thread.currentThread())) {
return null; // virtual threads are not supported yet
if (!virtualThreadSupportEnabled && isVirtual(Thread.currentThread())) {
return null;
}
ByteBuffer buffer = threadStorage.get();
if (buffer == null) {
Expand All @@ -86,9 +99,6 @@ public static ByteBuffer getCurrentThreadStorage(
}

public static void removeCurrentThreadStorage() {
if (isVirtual(Thread.currentThread())) {
return; // virtual threads are not supported yet
}
ByteBuffer buffer = threadStorage.get();
if (buffer != null) {
try {
Expand Down Expand Up @@ -160,11 +170,15 @@ static synchronized void reset() {
processStorage = null;
JvmtiAccess.setProfilingCorrelationProcessStorage(null);
}
setVirtualThreadSupportEnabled(false);
}

private static boolean isVirtual(Thread thread) {
if (THREAD_IS_VIRTUAL == null) {
return false;
}
try {
return (boolean) VIRTUAL_CHECKER.invokeExact(thread);
return (boolean) THREAD_IS_VIRTUAL.invokeExact(thread);
} catch (Throwable e) {
throw new IllegalStateException("isVirtual is not expected to throw exceptions", e);
}
Expand All @@ -184,8 +198,7 @@ private static MethodHandle generateVirtualChecker() {
return MethodHandles.lookup().unreflect(isVirtual);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
// virtual threads are not supported, therefore no thread is virtual
return MethodHandles.dropArguments(
MethodHandles.constant(boolean.class, false), 0, Thread.class);
return null;
}
}
}
84 changes: 76 additions & 8 deletions jvmti-access/src/main/jni/ElasticJvmtiAgent.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "ElasticJvmtiAgent.h"
#include "ProfilerSocket.h"
#include "VirtualThreadSupport.h"

// These two global variables have symbol names which will be recognized by
// the elastic universal profiling host-agent. The host-agent will be able
Expand All @@ -12,19 +13,86 @@ namespace elastic
namespace jvmti_agent
{

static ProfilerSocket profilerSocket;
namespace {
static ProfilerSocket profilerSocket;
static VirtualThreadSupport virtualThreads;
static jvmtiEnv* jvmti;
}


ReturnCode init(JNIEnv* jniEnv) {
if(jvmti != nullptr) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "JVMTI environment is already initialized!");
}
JavaVM* vm;
auto vmError = jniEnv->GetJavaVM(&vm);
if(vmError != JNI_OK) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "jniEnv->GetJavaVM() failed, return code is ", vmError);
}

void destroy() {
elastic_apm_profiling_correlation_process_storage_v1 = nullptr;
profilerSocket.destroy();
auto getEnvErr = vm->GetEnv(reinterpret_cast<void**>(&jvmti), JVMTI_VERSION_21);
if(getEnvErr == JNI_EVERSION) {
getEnvErr = vm->GetEnv(reinterpret_cast<void**>(&jvmti), JVMTI_VERSION_1_2);
}
if(getEnvErr != JNI_OK) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "JavaVM->GetEnv() failed, return code is ", getEnvErr);
}
return virtualThreads.init(jniEnv, jvmti);
}

void setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) {
if(bytebuffer == nullptr) {
elastic_apm_profiling_correlation_tls_v1 = nullptr;
ReturnCode destroy(JNIEnv* jniEnv) {
if(jvmti != nullptr) {
elastic_apm_profiling_correlation_process_storage_v1 = nullptr;
profilerSocket.destroy();

ReturnCode vterror = virtualThreads.destroy(jniEnv);
if (vterror != ReturnCode::SUCCESS) {
return vterror;
}

auto error = jvmti->DisposeEnvironment();
jvmti = nullptr;
if (error != JVMTI_ERROR_NONE) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "jvmti->DisposeEnvironment() failed, return code is: ", error);
}

return ReturnCode::SUCCESS;
} else {
elastic_apm_profiling_correlation_tls_v1 = jniEnv->GetDirectBufferAddress(bytebuffer);
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!");
}
}

void onVirtualThreadMount(JNIEnv*, jthread) {
void* address = nullptr;
jvmti->GetThreadLocalStorage(nullptr, &address);
elastic_apm_profiling_correlation_tls_v1 = address;
}

void onVirtualThreadUnmount(JNIEnv*, jthread){
elastic_apm_profiling_correlation_tls_v1 = nullptr;
}

ReturnCode setVirtualThreadProfilingCorrelationEnabled(JNIEnv* jniEnv, jboolean enable) {
if(jvmti == nullptr) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!");
}
return virtualThreads.setMountCallbacksEnabled(jniEnv, enable == JNI_TRUE);
}

ReturnCode setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) {
if(jvmti == nullptr) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!");
}
void* address = nullptr;
if (bytebuffer != nullptr) {
address = jniEnv->GetDirectBufferAddress(bytebuffer);
}
auto error = jvmti->SetThreadLocalStorage(nullptr, address);
if (error != JVMTI_ERROR_NONE) {
return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "jvmti->SetThreadLocalStorage() returned error code ", error);
}
elastic_apm_profiling_correlation_tls_v1 = address;
return ReturnCode::SUCCESS;
}

void setProcessProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) {
Expand Down
10 changes: 8 additions & 2 deletions jvmti-access/src/main/jni/ElasticJvmtiAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ namespace elastic {
enum class ReturnCode {
SUCCESS = 0,
ERROR = -1,
ERROR_NOT_INITIALIZED = -2,
};

constexpr jint toJint(ReturnCode rc) noexcept {
return static_cast<jint>(rc);
}

void destroy();
ReturnCode init(JNIEnv* jniEnv);
ReturnCode destroy(JNIEnv* jniEnv);

void setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer);
ReturnCode setVirtualThreadProfilingCorrelationEnabled(JNIEnv* jniEnv, jboolean enable);
ReturnCode setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer);
void setProcessProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer);

jobject createThreadProfilingCorrelationBufferAlias(JNIEnv* jniEnv, jlong capacity);
Expand All @@ -31,6 +34,9 @@ namespace elastic {
jint readProfilerSocketMessage(JNIEnv* jniEnv, jobject outputBuffer);
ReturnCode writeProfilerSocketMessage(JNIEnv* jniEnv, jbyteArray message);

void onVirtualThreadMount(JNIEnv* jni, jthread currentThread);
void onVirtualThreadUnmount(JNIEnv* jni, jthread currentThread);


template <typename T>
typename std::enable_if<
Expand Down
16 changes: 11 additions & 5 deletions jvmti-access/src/main/jni/JvmtiAccessImpl.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#include "co_elastic_otel_JvmtiAccessImpl.h"
#include "ElasticJvmtiAgent.h"
#include <array>

using elastic::jvmti_agent::ReturnCode;
using elastic::jvmti_agent::toJint;

JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_init0(JNIEnv* env, jclass) {
return toJint(elastic::jvmti_agent::init(env));
}

JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_destroy0(JNIEnv* env, jclass) {
return toJint(elastic::jvmti_agent::destroy(env));
}

JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_destroy0(JNIEnv*, jclass) {
elastic::jvmti_agent::destroy();
return toJint(ReturnCode::SUCCESS);
JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_setProfilingCorrelationVirtualThreadSupportEnabled0(JNIEnv* env, jclass, jboolean enabled) {
return toJint(elastic::jvmti_agent::setVirtualThreadProfilingCorrelationEnabled(env, enabled));
}

JNIEXPORT void JNICALL Java_co_elastic_otel_JvmtiAccessImpl_setThreadProfilingCorrelationBuffer0(JNIEnv* env, jclass, jobject bytebuffer) {
Expand Down Expand Up @@ -40,5 +47,4 @@ JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_readProfilerReturnCh

JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_sendToProfilerReturnChannelSocket0(JNIEnv* env, jclass, jbyteArray message) {
return toJint(elastic::jvmti_agent::writeProfilerSocketMessage(env, message));

}
}
Loading

0 comments on commit 20c06c1

Please sign in to comment.