diff --git a/jvmti-access/src/main/java/co/elastic/otel/JvmtiAccess.java b/jvmti-access/src/main/java/co/elastic/otel/JvmtiAccess.java index 1e42c81d..ee6c353f 100644 --- a/jvmti-access/src/main/java/co/elastic/otel/JvmtiAccess.java +++ b/jvmti-access/src/main/java/co/elastic/otel/JvmtiAccess.java @@ -57,6 +57,11 @@ static void setProfilingCorrelationCurrentThreadStorage(@Nullable ByteBuffer sto JvmtiAccessImpl.setThreadProfilingCorrelationBuffer0(storage); } + static void setProfilingCorrelationVirtualThreadSupportEnabled(boolean enable) { + ensureInitialized(); + JvmtiAccessImpl.setProfilingCorrelationVirtualThreadSupportEnabled0(enable); + } + /** * Starts the socket for receiving universal profiler messages on the given filepath. Note that * the path has a limitation of about 100 characters, see profiling correlation diff --git a/jvmti-access/src/main/java/co/elastic/otel/UniversalProfilingCorrelation.java b/jvmti-access/src/main/java/co/elastic/otel/UniversalProfilingCorrelation.java index ad963d93..54f7730a 100644 --- a/jvmti-access/src/main/java/co/elastic/otel/UniversalProfilingCorrelation.java +++ b/jvmti-access/src/main/java/co/elastic/otel/UniversalProfilingCorrelation.java @@ -31,9 +31,10 @@ public class UniversalProfilingCorrelation { - private static final MethodHandle VIRTUAL_CHECKER = generateVirtualChecker(); + private static final MethodHandle THREAD_IS_VIRTUAL = generateVirtualChecker(); private static ThreadLocal threadStorage; + private static volatile boolean virtualThreadSupportEnabled = false; // We hold a reference to the configured processStorage to make sure it is not GCed private static ByteBuffer processStorage; @@ -60,11 +61,23 @@ public static synchronized void setProcessStorage(@Nullable ByteBuffer buffer) { processStorage = buffer; } + public static synchronized void setVirtualThreadSupportEnabled(boolean enable) { + if (THREAD_IS_VIRTUAL == null) { + // JVM does not have virtual threads, so this method is a NoOp + return; + } + if (virtualThreadSupportEnabled == enable) { + return; + } + JvmtiAccess.setProfilingCorrelationVirtualThreadSupportEnabled(enable); + virtualThreadSupportEnabled = enable; + } + @Nullable public static ByteBuffer getCurrentThreadStorage( boolean allocateIfRequired, int expectedCapacity) { - if (isVirtual(Thread.currentThread())) { - return null; // virtual threads are not supported yet + if (!virtualThreadSupportEnabled && isVirtual(Thread.currentThread())) { + return null; } ByteBuffer buffer = threadStorage.get(); if (buffer == null) { @@ -86,9 +99,6 @@ public static ByteBuffer getCurrentThreadStorage( } public static void removeCurrentThreadStorage() { - if (isVirtual(Thread.currentThread())) { - return; // virtual threads are not supported yet - } ByteBuffer buffer = threadStorage.get(); if (buffer != null) { try { @@ -160,11 +170,15 @@ static synchronized void reset() { processStorage = null; JvmtiAccess.setProfilingCorrelationProcessStorage(null); } + setVirtualThreadSupportEnabled(false); } private static boolean isVirtual(Thread thread) { + if (THREAD_IS_VIRTUAL == null) { + return false; + } try { - return (boolean) VIRTUAL_CHECKER.invokeExact(thread); + return (boolean) THREAD_IS_VIRTUAL.invokeExact(thread); } catch (Throwable e) { throw new IllegalStateException("isVirtual is not expected to throw exceptions", e); } @@ -184,8 +198,7 @@ private static MethodHandle generateVirtualChecker() { return MethodHandles.lookup().unreflect(isVirtual); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { // virtual threads are not supported, therefore no thread is virtual - return MethodHandles.dropArguments( - MethodHandles.constant(boolean.class, false), 0, Thread.class); + return null; } } } diff --git a/jvmti-access/src/main/jni/ElasticJvmtiAgent.cpp b/jvmti-access/src/main/jni/ElasticJvmtiAgent.cpp index 90bbc1a0..bda6514c 100644 --- a/jvmti-access/src/main/jni/ElasticJvmtiAgent.cpp +++ b/jvmti-access/src/main/jni/ElasticJvmtiAgent.cpp @@ -1,5 +1,6 @@ #include "ElasticJvmtiAgent.h" #include "ProfilerSocket.h" +#include "VirtualThreadSupport.h" // These two global variables have symbol names which will be recognized by // the elastic universal profiling host-agent. The host-agent will be able @@ -12,19 +13,86 @@ namespace elastic namespace jvmti_agent { - static ProfilerSocket profilerSocket; + namespace { + static ProfilerSocket profilerSocket; + static VirtualThreadSupport virtualThreads; + static jvmtiEnv* jvmti; + } + + + ReturnCode init(JNIEnv* jniEnv) { + if(jvmti != nullptr) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "JVMTI environment is already initialized!"); + } + JavaVM* vm; + auto vmError = jniEnv->GetJavaVM(&vm); + if(vmError != JNI_OK) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "jniEnv->GetJavaVM() failed, return code is ", vmError); + } - void destroy() { - elastic_apm_profiling_correlation_process_storage_v1 = nullptr; - profilerSocket.destroy(); + auto getEnvErr = vm->GetEnv(reinterpret_cast(&jvmti), JVMTI_VERSION_21); + if(getEnvErr == JNI_EVERSION) { + getEnvErr = vm->GetEnv(reinterpret_cast(&jvmti), JVMTI_VERSION_1_2); + } + if(getEnvErr != JNI_OK) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "JavaVM->GetEnv() failed, return code is ", getEnvErr); + } + return virtualThreads.init(jniEnv, jvmti); } - void setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) { - if(bytebuffer == nullptr) { - elastic_apm_profiling_correlation_tls_v1 = nullptr; + ReturnCode destroy(JNIEnv* jniEnv) { + if(jvmti != nullptr) { + elastic_apm_profiling_correlation_process_storage_v1 = nullptr; + profilerSocket.destroy(); + + ReturnCode vterror = virtualThreads.destroy(jniEnv); + if (vterror != ReturnCode::SUCCESS) { + return vterror; + } + + auto error = jvmti->DisposeEnvironment(); + jvmti = nullptr; + if (error != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "jvmti->DisposeEnvironment() failed, return code is: ", error); + } + + return ReturnCode::SUCCESS; } else { - elastic_apm_profiling_correlation_tls_v1 = jniEnv->GetDirectBufferAddress(bytebuffer); + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!"); + } + } + + void onVirtualThreadMount(JNIEnv*, jthread) { + void* address = nullptr; + jvmti->GetThreadLocalStorage(nullptr, &address); + elastic_apm_profiling_correlation_tls_v1 = address; + } + + void onVirtualThreadUnmount(JNIEnv*, jthread){ + elastic_apm_profiling_correlation_tls_v1 = nullptr; + } + + ReturnCode setVirtualThreadProfilingCorrelationEnabled(JNIEnv* jniEnv, jboolean enable) { + if(jvmti == nullptr) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!"); + } + return virtualThreads.setMountCallbacksEnabled(jniEnv, enable == JNI_TRUE); + } + + ReturnCode setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) { + if(jvmti == nullptr) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "JVMTI environment has not been initialized yet!"); + } + void* address = nullptr; + if (bytebuffer != nullptr) { + address = jniEnv->GetDirectBufferAddress(bytebuffer); + } + auto error = jvmti->SetThreadLocalStorage(nullptr, address); + if (error != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR_NOT_INITIALIZED, "jvmti->SetThreadLocalStorage() returned error code ", error); } + elastic_apm_profiling_correlation_tls_v1 = address; + return ReturnCode::SUCCESS; } void setProcessProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer) { diff --git a/jvmti-access/src/main/jni/ElasticJvmtiAgent.h b/jvmti-access/src/main/jni/ElasticJvmtiAgent.h index 1cb0244a..51386efc 100644 --- a/jvmti-access/src/main/jni/ElasticJvmtiAgent.h +++ b/jvmti-access/src/main/jni/ElasticJvmtiAgent.h @@ -10,15 +10,18 @@ namespace elastic { enum class ReturnCode { SUCCESS = 0, ERROR = -1, + ERROR_NOT_INITIALIZED = -2, }; constexpr jint toJint(ReturnCode rc) noexcept { return static_cast(rc); } - void destroy(); + ReturnCode init(JNIEnv* jniEnv); + ReturnCode destroy(JNIEnv* jniEnv); - void setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer); + ReturnCode setVirtualThreadProfilingCorrelationEnabled(JNIEnv* jniEnv, jboolean enable); + ReturnCode setThreadProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer); void setProcessProfilingCorrelationBuffer(JNIEnv* jniEnv, jobject bytebuffer); jobject createThreadProfilingCorrelationBufferAlias(JNIEnv* jniEnv, jlong capacity); @@ -31,6 +34,9 @@ namespace elastic { jint readProfilerSocketMessage(JNIEnv* jniEnv, jobject outputBuffer); ReturnCode writeProfilerSocketMessage(JNIEnv* jniEnv, jbyteArray message); + void onVirtualThreadMount(JNIEnv* jni, jthread currentThread); + void onVirtualThreadUnmount(JNIEnv* jni, jthread currentThread); + template typename std::enable_if< diff --git a/jvmti-access/src/main/jni/JvmtiAccessImpl.cpp b/jvmti-access/src/main/jni/JvmtiAccessImpl.cpp index 0c31b8a4..a4b8cb76 100644 --- a/jvmti-access/src/main/jni/JvmtiAccessImpl.cpp +++ b/jvmti-access/src/main/jni/JvmtiAccessImpl.cpp @@ -1,13 +1,20 @@ #include "co_elastic_otel_JvmtiAccessImpl.h" #include "ElasticJvmtiAgent.h" +#include using elastic::jvmti_agent::ReturnCode; using elastic::jvmti_agent::toJint; +JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_init0(JNIEnv* env, jclass) { + return toJint(elastic::jvmti_agent::init(env)); +} + +JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_destroy0(JNIEnv* env, jclass) { + return toJint(elastic::jvmti_agent::destroy(env)); +} -JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_destroy0(JNIEnv*, jclass) { - elastic::jvmti_agent::destroy(); - return toJint(ReturnCode::SUCCESS); +JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_setProfilingCorrelationVirtualThreadSupportEnabled0(JNIEnv* env, jclass, jboolean enabled) { + return toJint(elastic::jvmti_agent::setVirtualThreadProfilingCorrelationEnabled(env, enabled)); } JNIEXPORT void JNICALL Java_co_elastic_otel_JvmtiAccessImpl_setThreadProfilingCorrelationBuffer0(JNIEnv* env, jclass, jobject bytebuffer) { @@ -40,5 +47,4 @@ JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_readProfilerReturnCh JNIEXPORT jint JNICALL Java_co_elastic_otel_JvmtiAccessImpl_sendToProfilerReturnChannelSocket0(JNIEnv* env, jclass, jbyteArray message) { return toJint(elastic::jvmti_agent::writeProfilerSocketMessage(env, message)); - -} \ No newline at end of file +} diff --git a/jvmti-access/src/main/jni/VirtualThreadSupport.cpp b/jvmti-access/src/main/jni/VirtualThreadSupport.cpp new file mode 100644 index 00000000..6b9e9323 --- /dev/null +++ b/jvmti-access/src/main/jni/VirtualThreadSupport.cpp @@ -0,0 +1,214 @@ +#include "VirtualThreadSupport.h" +#include + +namespace elastic { + namespace jvmti_agent { + + namespace { + + bool isExpectedMountEvent(jvmtiExtensionEventInfo& eventInfo) { + if(strcmp(eventInfo.id, "com.sun.hotspot.events.VirtualThreadMount") != 0) { + return false; + } + if(eventInfo.param_count != 2) { + return false; + } + if(eventInfo.params[0].base_type != JVMTI_TYPE_JNIENV) { + return false; + } + if(eventInfo.params[0].kind != JVMTI_KIND_IN_PTR) { + return false; + } + if(eventInfo.params[0].null_ok) { + return false; + } + if(eventInfo.params[1].base_type != JVMTI_TYPE_JTHREAD) { + return false; + } + if(eventInfo.params[1].kind != JVMTI_KIND_IN) { + return false; + } + if(eventInfo.params[1].null_ok) { + return false; + } + return true; + } + + bool isExpectedUnmountEvent(jvmtiExtensionEventInfo& eventInfo) { + if(strcmp(eventInfo.id, "com.sun.hotspot.events.VirtualThreadUnmount") != 0) { + return false; + } + if(eventInfo.param_count != 2) { + return false; + } + if(eventInfo.params[0].base_type != JVMTI_TYPE_JNIENV) { + return false; + } + if(eventInfo.params[0].kind != JVMTI_KIND_IN_PTR) { + return false; + } + if(eventInfo.params[0].null_ok) { + return false; + } + if(eventInfo.params[1].base_type != JVMTI_TYPE_JTHREAD) { + return false; + } + if(eventInfo.params[1].kind != JVMTI_KIND_IN) { + return false; + } + if(eventInfo.params[1].null_ok) { + return false; + } + return true; + } + + void JNICALL vtMountHandler(jvmtiEnv* jvmtiEnv, ...) { + va_list args; + va_start(args, jvmtiEnv); + JNIEnv* jniEnv = va_arg(args, JNIEnv*); + jthread argThread = va_arg(args, jthread); + va_end(args); + + jvmti_agent::onVirtualThreadMount(jniEnv, argThread); + } + + void JNICALL vtUnmountHandler(jvmtiEnv* jvmtiEnv, ...) { + va_list args; + va_start(args, jvmtiEnv); + JNIEnv* jniEnv = va_arg(args, JNIEnv*); + jthread argThread = va_arg(args, jthread); + va_end(args); + + jvmti_agent::onVirtualThreadUnmount(jniEnv, argThread); + } + } + + ReturnCode VirtualThreadSupport::init(JNIEnv* env, jvmtiEnv* jvmti) { + this->jvmti = jvmti; + this->unsupportedReason = "Not yet initialized"; + this->eventsEnabled = false; + + jint version; + auto error = jvmti->GetVersionNumber(&version); + if (error != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(env, ReturnCode::ERROR, "jvmti->GetVersionNumber() returned error code ", error); + } + + if (version < JVMTI_VERSION_21) { + this->unsupportedReason = "This JVM does not support JVMTI version 21+"; + return ReturnCode::SUCCESS; + } + + jvmtiCapabilities supportedCapabilities; + auto supErr =jvmti->GetPotentialCapabilities(&supportedCapabilities); + if(supErr != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(env, ReturnCode::ERROR, "Failed to get JVMTI supported capabilities", supErr); + } + + bool virtualThreadsCapabilitySupported = supportedCapabilities.can_support_virtual_threads != 0; + if (!virtualThreadsCapabilitySupported) { + this->unsupportedReason = "The JVMTI environment can not support the can_support_virtual_threads capability"; + return ReturnCode::SUCCESS; + } + + jvmtiCapabilities caps = {}; + caps.can_support_virtual_threads = 1; + auto capErr = jvmti->AddCapabilities(&caps); + if(capErr != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(env, ReturnCode::ERROR, "Failed to add virtual threads capability, return code is", capErr); + } + + jint extensionCount; + jvmtiExtensionEventInfo* extensionInfos; + error = jvmti->GetExtensionEvents(&extensionCount, &extensionInfos); + if (error != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(env, ReturnCode::ERROR, "jvmti->GetExtensionEvents() returned error code ", error); + } + + mountEventIdx = -1; + unmountEventIdx = -1; + for(int i=0; iDeallocate((unsigned char*) extensionInfos); + + if (mountEventIdx == -1) { + this->unsupportedReason = "This JVM does not support the JVMTI com.sun.hotspot.events.VirtualThreadMount event"; + return ReturnCode::SUCCESS; + } + if (unmountEventIdx == -1) { + this->unsupportedReason = "This JVM does not support the JVMTI com.sun.hotspot.events.VirtualThreadUnmount event"; + return ReturnCode::SUCCESS; + } + + this->unsupportedReason = ""; + return ReturnCode::SUCCESS; + } + + ReturnCode VirtualThreadSupport::destroy(JNIEnv* jni){ + if (this->eventsEnabled) { + return this->setMountCallbacksEnabled(jni, false); + } + return ReturnCode::SUCCESS; + } + + ReturnCode VirtualThreadSupport::setMountCallbacksEnabled(JNIEnv* jniEnv, bool enabled) { + if (eventsEnabled == enabled) { + return ReturnCode::SUCCESS; + } + if (unsupportedReason != "") { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, unsupportedReason); + } + + if (enabled) { + auto error = jvmti->SetExtensionEventCallback(mountEventIdx, (jvmtiExtensionEvent) &vtMountHandler); + if (error != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set mount event handler, error code is ", error); + } + error = jvmti->SetExtensionEventCallback(unmountEventIdx, (jvmtiExtensionEvent) &vtUnmountHandler); + if (error != JVMTI_ERROR_NONE) { + jvmti->SetExtensionEventCallback(mountEventIdx, nullptr); + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set unmount event handler, error code is ", error); + } + error = jvmti->SetEventNotificationMode(JVMTI_ENABLE, static_cast(mountEventIdx), nullptr); + if (error != JVMTI_ERROR_NONE) { + jvmti->SetExtensionEventCallback(unmountEventIdx, nullptr); + jvmti->SetExtensionEventCallback(mountEventIdx, nullptr); + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set mount event enabled, error code is ", error); + } + error = jvmti->SetEventNotificationMode(JVMTI_ENABLE, static_cast(unmountEventIdx), nullptr); + if (error != JVMTI_ERROR_NONE) { + jvmti->SetEventNotificationMode(JVMTI_DISABLE, static_cast(mountEventIdx), nullptr); + jvmti->SetExtensionEventCallback(unmountEventIdx, nullptr); + jvmti->SetExtensionEventCallback(mountEventIdx, nullptr); + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set unmount event enabled, error code is ", error); + } + eventsEnabled = true; + } else { + auto err1 = jvmti->SetEventNotificationMode(JVMTI_DISABLE, static_cast(mountEventIdx), nullptr); + auto err2 = jvmti->SetEventNotificationMode(JVMTI_DISABLE, static_cast(unmountEventIdx), nullptr); + auto err3 = jvmti->SetExtensionEventCallback(mountEventIdx, nullptr); + auto err4 = jvmti->SetExtensionEventCallback(unmountEventIdx, nullptr); + eventsEnabled = false; + if (err1 != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set mount event mode to disabled, error code is ", err1); + } + if (err2 != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to set unmount event mode to disabled, error code is ", err2); + } + if (err3 != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to unset mount event handler, error code is ", err3); + } + if (err4 != JVMTI_ERROR_NONE) { + return raiseExceptionAndReturn(jniEnv, ReturnCode::ERROR, "Failed to to unset unmount event handler, error code is ", err4); + } + } + return ReturnCode::SUCCESS; + } + } +} \ No newline at end of file diff --git a/jvmti-access/src/main/jni/VirtualThreadSupport.h b/jvmti-access/src/main/jni/VirtualThreadSupport.h new file mode 100644 index 00000000..cf27fa1e --- /dev/null +++ b/jvmti-access/src/main/jni/VirtualThreadSupport.h @@ -0,0 +1,30 @@ +#ifndef VIRTUALTHREADSUPPORT_H_ +#define VIRTUALTHREADSUPPORT_H_ + +#include +#include +#include "ElasticJvmtiAgent.h" + +namespace elastic { + namespace jvmti_agent { + + + class VirtualThreadSupport { + private: + std::string unsupportedReason = "Not yet initialized"; + jvmtiEnv* jvmti; + + jint mountEventIdx; + jint unmountEventIdx; + bool eventsEnabled; + public: + + [[nodiscard]] ReturnCode init(JNIEnv* env, jvmtiEnv* jvmti); + [[nodiscard]] ReturnCode destroy(JNIEnv* env); + + [[nodiscard]] ReturnCode setMountCallbacksEnabled(JNIEnv* jniEnv, bool enabled); + }; + } +} + +#endif \ No newline at end of file diff --git a/jvmti-access/src/test/java/co/elastic/otel/UniversalProfilingCorrelationTest.java b/jvmti-access/src/test/java/co/elastic/otel/UniversalProfilingCorrelationTest.java index 68a37053..5ad5d521 100644 --- a/jvmti-access/src/test/java/co/elastic/otel/UniversalProfilingCorrelationTest.java +++ b/jvmti-access/src/test/java/co/elastic/otel/UniversalProfilingCorrelationTest.java @@ -34,7 +34,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -175,9 +177,24 @@ public void testPlatformThreadStorage() throws ExecutionException, InterruptedEx } } + @Test + public void enablingVirtualThreadSupportDoesNotThrow() { + UniversalProfilingCorrelation.setVirtualThreadSupportEnabled(true); + } + @Test @EnabledForJreRange(min = JRE.JAVA_21) - public void testVirtualThreadsExcluded() throws Exception { + public void testVirtualThreadsExcludedByDefault() throws Exception { + + if (System.getProperty("java.vm.name").toUpperCase().contains("J9")) { + // We exclude this test on OpenJ9, because it is flaky there + // It seems like sometimes OpenJ9 does not disable the mount/unmount listeners + // even though it didn't report an error + // While this can cause this test to fail, in practice this doesn't cause any problems + // because we never disable the support after enabling it in the real world + return; + } + ExecutorService exec = (ExecutorService) Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null); @@ -192,6 +209,66 @@ public void testVirtualThreadsExcluded() throws Exception { }) .get(); } + + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + public void testVirtualThreadsStoragePropagatedWithMounts() throws Exception { + ExecutorService exec = + (ExecutorService) + Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null); + + UniversalProfilingCorrelation.setVirtualThreadSupportEnabled(true); + + List virtualThreads = Collections.synchronizedList(new ArrayList<>()); + List threadLatches = new ArrayList<>(); + List> threadResults = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + int threadId = i; + CountDownLatch latch = new CountDownLatch(1); + threadLatches.add(latch); + threadResults.add( + exec.submit( + () -> { + virtualThreads.add(Thread.currentThread()); + ByteBuffer buffer = + UniversalProfilingCorrelation.getCurrentThreadStorage(true, 4); + buffer.order(ByteOrder.nativeOrder()); + assertThat(buffer).isNotNull(); + buffer.putInt(threadId); + + try { + // The waiting on the latch will cause this virtual thread to be unmounted + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // Read back the current buffer after being re-mounted and check if it is correct + buffer = JvmtiAccessImpl.createThreadProfilingCorrelationBufferAlias(4); + buffer.order(ByteOrder.nativeOrder()); + + assertThat(buffer.getInt()).isEqualTo(threadId); + })); + } + + // Wait until all threads have reached their latch + await() + .atMost(Duration.ofSeconds(10)) + .until( + () -> + virtualThreads.size() == threadLatches.size() + && virtualThreads.stream() + .allMatch(t -> t.getState() == Thread.State.WAITING)); + + // resume all threads + for (CountDownLatch latch : threadLatches) { + latch.countDown(); + } + for (Future future : threadResults) { + future.get(); // this will throw an ExecutionException if any assertions failed + } + } } @Nested diff --git a/universal-profiling-integration/README.md b/universal-profiling-integration/README.md index 7774fa50..671629a1 100644 --- a/universal-profiling-integration/README.md +++ b/universal-profiling-integration/README.md @@ -21,11 +21,12 @@ This extension supports [autoconfiguration](https://github.com/open-telemetry/op So if you are using an autoconfigured OpenTelemetry SDK, you'll only need to add this extension to your class path and configure it via system properties or environment variables: -| Property Name / Environment Variable Name | Default | Description | -|-------------------------------------------------------------------------------------------------------------------------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| elastic.otel.universal.profiling.integration.enabled
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_ENABLED | `auto` on supported systems, `false` otherwise | Enables or disables the feature. Possible values are `true`, `false` or `auto`. On `auto` the profiling integration will be installed but remain inactive until the presence of a profiler is detected (Requires a profiling host agent 8.15 or later). This reduces the overhead in the case no profiler is there. When using `auto`, there might be a slight delay until the correlation is activated. So if your application creates spans during startup which you want correlated, you should use `true` instead. | -| elastic.otel.universal.profiling.integration.socket.dir
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_SOCKET_DIR | the value of the `java.io.tmpdir` JVM-property | The extension needs to bind a socket to a file for communicating with the universal profiling host agent. By default, this socket will be placed in the java.io.tmpdir. This configuration option can be used to change the location. Note that the total path name (including the socket) must not exceed 100 characters due to OS restrictions. | -| elastic.otel.universal.profiling.integration.buffer.size
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_BUFFER_SIZE | 8096 | The extension needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received. This configuration options configures the buffer size in number of spans. The higher the number of local root spans per second, the higher this buffer size should be set. The extension will log a warning if it is not capable of buffering a span due to insufficient buffer size. This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data. | +| Property Name / Environment Variable Name | Default | Description | +|-------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| elastic.otel.universal.profiling.integration.enabled
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_ENABLED | `auto` on supported systems, `false` otherwise | Enables or disables the feature. Possible values are `true`, `false` or `auto`. On `auto` the profiling integration will be installed but remain inactive until the presence of a profiler is detected (Requires a profiling host agent 8.15 or later). This reduces the overhead in the case no profiler is there. When using `auto`, there might be a slight delay until the correlation is activated. So if your application creates spans during startup which you want correlated, you should use `true` instead. | +| elastic.otel.universal.profiling.integration.socket.dir
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_SOCKET_DIR | the value of the `java.io.tmpdir` JVM-property | The extension needs to bind a socket to a file for communicating with the universal profiling host agent. By default, this socket will be placed in the java.io.tmpdir. This configuration option can be used to change the location. Note that the total path name (including the socket) must not exceed 100 characters due to OS restrictions. | +| elastic.otel.universal.profiling.integration.buffer.size
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_BUFFER_SIZE | 8096 | The extension needs to buffer ended local-root spans for a short duration to ensure that all of its profiling data has been received. This configuration options configures the buffer size in number of spans. The higher the number of local root spans per second, the higher this buffer size should be set. The extension will log a warning if it is not capable of buffering a span due to insufficient buffer size. This will cause the span to be exported immediately instead with possibly incomplete profiling correlation data. | +| elastic.otel.universal.profiling.integration.virtual.threads.enabled
ELASTIC_OTEL_UNIVERSAL_PROFILING_INTEGRATION_VIRTUAL_THREADS_ENABLED | `true` | Virtual threads need some extra work for correlation: On mount/unmount the span/trace context of the platform thread needs to be kept in sync. This is done by hooking on to JVMTI-events. This option allows to disable support for virtual threads in case this mechanism causes any problems. | ### Manual SDK setup diff --git a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessor.java b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessor.java index 4c77fa5a..7cc01d8f 100644 --- a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessor.java +++ b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessor.java @@ -78,6 +78,8 @@ public class UniversalProfilingProcessor extends AbstractChainingSpanProcessor { private static boolean anyInstanceActive = false; + final boolean tryEnableVirtualThreadSupport; + // Visible for testing final SpanProfilingSamplesCorrelator correlator; private final ScheduledExecutorService messagePollAndSpanFlushExecutor; @@ -97,10 +99,12 @@ public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Res Resource serviceResource, int bufferSize, boolean activeOnlyAfterProfilerRegistration, + boolean virtualThreadSupportEnabled, String socketDir, LongSupplier nanoClock) { super(next); synchronized (UniversalProfilingProcessor.class) { + this.tryEnableVirtualThreadSupport = virtualThreadSupportEnabled; if (anyInstanceActive) { throw new IllegalStateException( "Another instance has already been started and not stopped yet." @@ -110,10 +114,9 @@ public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Res long initialSpanDelay; if (activeOnlyAfterProfilerRegistration) { initialSpanDelay = 0; // do not buffer spans until we know that a profiler is running - tlsPropagationActive = false; } else { initialSpanDelay = INITIAL_SPAN_DELAY_NANOS; // delay conservatively to not miss any data - tlsPropagationActive = true; + enableTlsPropagation(); } correlator = @@ -145,6 +148,25 @@ public static UniversalProfilingProcessorBuilder builder(SpanProcessor next, Res } } + private synchronized void enableTlsPropagation() { + if (tlsPropagationActive) { + return; + } + try { + log.log( + Level.FINE, + "Setting virtual thread support to {0}", + new Object[] {tryEnableVirtualThreadSupport}); + UniversalProfilingCorrelation.setVirtualThreadSupportEnabled(tryEnableVirtualThreadSupport); + } catch (Exception e) { + log.log( + Level.SEVERE, + "Could not enable virtual thread support, correlation will only work for platform threads", + e); + } + tlsPropagationActive = true; + } + private String openProfilerSocket(String socketDir) { Path dir = Paths.get(socketDir); if (!Files.exists(dir) && !dir.toFile().mkdirs()) { @@ -277,7 +299,7 @@ private void handleMessage(ProfilerRegistrationMessage message) { "Received profiler registration message! host.id is {0} and the span delay is {1} ms", new Object[] {message.getHostId(), message.getSamplesDelayMillis()}); - tlsPropagationActive = true; + enableTlsPropagation(); long spanDelayNanos = Duration.ofMillis(message.getSamplesDelayMillis() + POLL_FREQUENCY_MS).toNanos(); correlator.setSpanBufferDurationNanos(spanDelayNanos); diff --git a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorAutoConfig.java b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorAutoConfig.java index 03a2e548..0adfcdee 100644 --- a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorAutoConfig.java +++ b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorAutoConfig.java @@ -40,6 +40,8 @@ public class UniversalProfilingProcessorAutoConfig static final String BUFFER_SIZE_OPTION = "elastic.otel.universal.profiling.integration.buffer.size"; static final String SOCKET_DIR_OPTION = "elastic.otel.universal.profiling.integration.socket.dir"; + static final String VIRTUAL_THREAD_SUPPORT_OPTION = + "elastic.otel.universal.profiling.integration.virtual.threads.enabled"; private enum EnabledOptions { TRUE, @@ -85,6 +87,7 @@ public void registerSpanProcessors( builder.delayActivationAfterProfilerRegistration(enabled == EnabledOptions.AUTO); props.applyInt(BUFFER_SIZE_OPTION, builder::bufferSize); props.applyString(SOCKET_DIR_OPTION, builder::socketDir); + props.applyBool(VIRTUAL_THREAD_SUPPORT_OPTION, builder::virtualThreadSupportEnabled); return builder.build(); } catch (Exception e) { logger.log( diff --git a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorBuilder.java b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorBuilder.java index aaec7924..919c1107 100644 --- a/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorBuilder.java +++ b/universal-profiling-integration/src/main/java/co/elastic/otel/UniversalProfilingProcessorBuilder.java @@ -29,10 +29,9 @@ public class UniversalProfilingProcessorBuilder { private boolean delayActivationAfterProfilerRegistration = true; private LongSupplier nanoClock = System::nanoTime; - private int bufferSize = 8096; - private String socketDir = System.getProperty("java.io.tmpdir"); + private boolean virtualThreadSupportEnabled = true; UniversalProfilingProcessorBuilder(SpanProcessor next, Resource resource) { this.resource = resource; @@ -45,6 +44,7 @@ public UniversalProfilingProcessor build() { resource, bufferSize, delayActivationAfterProfilerRegistration, + virtualThreadSupportEnabled, socketDir, nanoClock); } @@ -93,4 +93,15 @@ public UniversalProfilingProcessorBuilder socketDir(String path) { this.socketDir = path; return this; } + + /** + * Virtual threads need some extra work for correlation: On mount/unmount the span/trace context + * of the platform thread needs to be kept in sync. This is done by hooking on to JVMTI-events. + * This option allows to disable support for virtual threads in case this mechanism causes any + * problems. + */ + public UniversalProfilingProcessorBuilder virtualThreadSupportEnabled(boolean enable) { + this.virtualThreadSupportEnabled = enable; + return this; + } } diff --git a/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorAutoConfigTest.java b/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorAutoConfigTest.java index 0f159810..651642e8 100644 --- a/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorAutoConfigTest.java +++ b/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorAutoConfigTest.java @@ -21,6 +21,7 @@ import static co.elastic.otel.UniversalProfilingProcessorAutoConfig.BUFFER_SIZE_OPTION; import static co.elastic.otel.UniversalProfilingProcessorAutoConfig.ENABLED_OPTION; import static co.elastic.otel.UniversalProfilingProcessorAutoConfig.SOCKET_DIR_OPTION; +import static co.elastic.otel.UniversalProfilingProcessorAutoConfig.VIRTUAL_THREAD_SUPPORT_OPTION; import static org.assertj.core.api.Assertions.assertThat; import co.elastic.otel.testing.AutoConfigTestProperties; @@ -70,6 +71,7 @@ public void checkEnabledButInactiveByDefault() { .get(); assertThat(processor.tlsPropagationActive).isFalse(); + assertThat(processor.tryEnableVirtualThreadSupport).isEqualTo(true); } } @@ -83,7 +85,8 @@ public void testAllSettings(@TempDir Path tempDir) { .put("otel.service.name", "myservice") .put(ENABLED_OPTION, "true") .put(BUFFER_SIZE_OPTION, "256") - .put(SOCKET_DIR_OPTION, tempDirAbs)) { + .put(SOCKET_DIR_OPTION, tempDirAbs) + .put(VIRTUAL_THREAD_SUPPORT_OPTION, "false")) { OpenTelemetry otel = GlobalOpenTelemetry.get(); List processors = OtelReflectionUtils.getSpanProcessors(otel); UniversalProfilingProcessor processor = @@ -95,6 +98,7 @@ public void testAllSettings(@TempDir Path tempDir) { assertThat(processor.tlsPropagationActive).isTrue(); assertThat(processor.socketPath).startsWith(tempDirAbs); + assertThat(processor.tryEnableVirtualThreadSupport).isEqualTo(false); assertThat(processor.correlator.delayedSpans.getBufferSize()).isEqualTo(256); } } diff --git a/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorTest.java b/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorTest.java index 520fae46..90b5d9d4 100644 --- a/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorTest.java +++ b/universal-profiling-integration/src/test/java/co/elastic/otel/UniversalProfilingProcessorTest.java @@ -62,12 +62,16 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -211,6 +215,38 @@ public void testNestedActivations() { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @EnabledForJreRange(min = JRE.JAVA_21) + public void testVirtualThreadSupport(boolean enableSupport) throws Exception { + + ExecutorService exec = + (ExecutorService) + Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null); + + try (OpenTelemetrySdk sdk = + initSdk(builder -> builder.virtualThreadSupportEnabled(enableSupport))) { + + exec.submit( + () -> { + Tracer tracer = sdk.getTracer("test-tracer"); + + Span span = tracer.spanBuilder("first").startSpan(); + + checkTlsIs(Span.getInvalid(), null); + try (Scope s1 = span.makeCurrent()) { + Thread.yield(); + if (enableSupport) { + checkTlsIs(span, span); + } else { + checkTlsIs(Span.getInvalid(), null); + } + } + }) + .get(); + } + } + @Test public void testProcessStoragePopulated() { Resource withNamespace =