From 708abee87f08e098444b2f7694b997eefaac8374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Wed, 11 Sep 2024 15:14:35 +0300 Subject: [PATCH] * Memory visibility issue when taking thread dumps. --- CHANGELOG.md | 6 + gradle.properties | 2 +- .../common/baseutils/threads/ThreadUtils.java | 118 ++++++++++-------- .../common/baseutils/clock/TestClockTest.java | 21 ++++ .../baseutils/threads/ThreadUtilsTest.java | 6 +- 5 files changed, 100 insertions(+), 53 deletions(-) create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/clock/TestClockTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6af7b9d..7b86b79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.13.1] - 2024-10-15 + +### Fixed + +* Memory visibility issue when taking thread dumps. + ## [1.13.0] - 2024-10-11 ### Added diff --git a/gradle.properties b/gradle.properties index ef52e60..0188526 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=1.13.0 \ No newline at end of file +version=1.13.1 \ No newline at end of file diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java index 75a6799..1fcf7de 100644 --- a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java @@ -3,11 +3,14 @@ import com.google.common.util.concurrent.MoreExecutors; import com.transferwise.common.baseutils.ExceptionUtils; import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Predicate; import lombok.extern.slf4j.Slf4j; @@ -19,7 +22,7 @@ public static String toString(ThreadInfo threadInfo) { sb.append("\"") .append(threadInfo.getName()) .append("\"") - .append(" group=\"").append(threadInfo.getGroupName()).append("\"") + .append(" group=\"").append(threadInfo.getGroupName() == null ? "system" : threadInfo.getGroupName()).append("\"") .append(" id=").append(threadInfo.getId()) .append(" prio=").append(threadInfo.getPriority()) .append(" daemon=").append(threadInfo.isDaemon()) @@ -34,7 +37,7 @@ public static String toString(ThreadInfo threadInfo) { return sb.toString(); } - public static String toString(ThreadInfo[] threadInfos) { + public static String toString(List threadInfos) { var sb = new StringBuilder(); boolean first = true; for (var threadInfo : threadInfos) { @@ -48,65 +51,78 @@ public static String toString(ThreadInfo[] threadInfos) { return sb.toString(); } - public static ThreadInfo[] getInconsistentThreadDump() { + public static List getInconsistentThreadDump() { return getInconsistentThreadDump(MoreExecutors.newDirectExecutorService(), 1); } - public static ThreadInfo[] getInconsistentThreadDump(ExecutorService executorService, int concurrency) { + public static List getInconsistentThreadDump(ExecutorService executorService, int concurrency) { long startTimeMs = System.currentTimeMillis(); try { - return ExceptionUtils.doUnchecked(() -> { - var threadMxBean = ManagementFactory.getThreadMXBean(); - - var threads = org.apache.commons.lang3.ThreadUtils.findThreads((Predicate) Objects::nonNull); - - var threadInfos = new ThreadInfo[threads.size()]; - - var semaphore = new Semaphore(concurrency); - var latch = new CountDownLatch(threads.size()); - - int i = 0; - for (var thread : threads) { - var threadId = thread.getId(); - semaphore.acquire(); - int finalI = i; - - executorService.submit(() -> { - try { - StackTraceElement[] stackTrace = thread.getStackTrace(); - - var threadState = new ThreadInfo() - .setStackTrace(stackTrace) - .setId(threadId) - .setState(thread.getState()) - .setName(thread.getName()) - .setPriority(thread.getPriority()) - .setCpuTime(threadMxBean.getThreadCpuTime(threadId)) - .setUserTime(threadMxBean.getThreadUserTime(threadId)) - .setGroupName(thread.getThreadGroup().getName()) - .setDaemon(thread.isDaemon()); - - threadInfos[finalI] = threadState; - } finally { - semaphore.release(); - latch.countDown(); - } - }); - i++; - } - - if (!latch.await(1, TimeUnit.MINUTES)) { - throw new IllegalStateException("We were unable to complete inconsistent thread dump."); - } - - return threadInfos; - }); + return ExceptionUtils.doUnchecked(() -> getInconsistentThreadDump0(executorService, concurrency)); } finally { if (log.isDebugEnabled()) { - log.debug("Inconsistent thread dump took {} ms.", (System.currentTimeMillis() - startTimeMs)); + log.debug("Inconsistent thread dump took {} ms to take.", (System.currentTimeMillis() - startTimeMs)); } } } + private static ArrayList getInconsistentThreadDump0(ExecutorService executorService, int concurrency) throws InterruptedException { + var threadMxBean = ManagementFactory.getThreadMXBean(); + + var threads = org.apache.commons.lang3.ThreadUtils.findThreads((Predicate) Objects::nonNull); + + final var threadsCount = threads.size(); + var threadInfos = new AtomicReferenceArray(threadsCount); + + var semaphore = new Semaphore(concurrency); + var latch = new CountDownLatch(threadsCount); + + int i = 0; + for (var thread : threads) { + var threadId = thread.getId(); + semaphore.acquire(); + int finalI = i; + + executorService.submit(() -> { + try { + StackTraceElement[] stackTrace = thread.getStackTrace(); + final var threadGroup = thread.getThreadGroup(); + var threadGroupName = threadGroup == null ? null : threadGroup.getName(); + + var threadInfo = new ThreadInfo() + .setStackTrace(stackTrace) + .setId(threadId) + .setState(thread.getState()) + .setName(thread.getName()) + .setPriority(thread.getPriority()) + .setCpuTime(threadMxBean.getThreadCpuTime(threadId)) + .setUserTime(threadMxBean.getThreadUserTime(threadId)) + .setGroupName(threadGroupName) + .setDaemon(thread.isDaemon()); + + threadInfos.set(finalI, threadInfo); + } catch (Throwable t) { + log.error("Processing thread with id {} failed.", threadId, t); + } finally { + semaphore.release(); + latch.countDown(); + } + }); + i++; + } + + if (!latch.await(1, TimeUnit.MINUTES)) { + throw new IllegalStateException("We were unable to complete inconsistent thread dump."); + } + + var result = new ArrayList(threadsCount); + for (int j = 0; j < threadsCount; j++) { + var threadInfo = threadInfos.get(j); + if (threadInfo != null) { + result.add(threadInfo); + } + } + return result; + } } diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/clock/TestClockTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/clock/TestClockTest.java new file mode 100644 index 0000000..f4a3cfd --- /dev/null +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/clock/TestClockTest.java @@ -0,0 +1,21 @@ +package com.transferwise.common.baseutils.clock; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.time.Duration; +import org.junit.jupiter.api.Test; + +public class TestClockTest { + + // Plus operation is used in Groovy based tests. + @Test + void plusOperationWorks() { + var testClock = new TestClock(); + var startInstant = testClock.instant(); + + testClock.plus("P1D"); + + assertThat(testClock.instant(), equalTo(startInstant.plus(Duration.ofDays(1)))); + } +} diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/threads/ThreadUtilsTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/threads/ThreadUtilsTest.java index 20a5590..5a6a7ad 100644 --- a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/threads/ThreadUtilsTest.java +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/threads/ThreadUtilsTest.java @@ -6,6 +6,7 @@ import static org.hamcrest.Matchers.greaterThan; import com.transferwise.common.baseutils.ExceptionUtils; +import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -17,6 +18,9 @@ public class ThreadUtilsTest { @Test @SneakyThrows void takingThreadDumpWorks() { + ProcessHandle processHandle = ProcessHandle.current(); + long pid = processHandle.pid(); + System.out.println("The PID of this Java process is: " + pid); var executorService = Executors.newCachedThreadPool(); @@ -62,6 +66,6 @@ void takingThreadDumpWorks() { @Test @SneakyThrows void takingThreadDumpWithCurrentThreadWorks() { - assertThat(ThreadUtils.getInconsistentThreadDump().length, greaterThan(0)); + assertThat(ThreadUtils.getInconsistentThreadDump().size(), greaterThan(0)); } }