Skip to content

Commit

Permalink
* Memory visibility issue when taking thread dumps. (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo authored Oct 15, 2024
1 parent 22280f4 commit 8a17da8
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 53 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.13.1] - 2024-10-15

### Fixed

* Memory visibility issue when taking thread dumps.

## [1.13.0] - 2024-10-11

### Added
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.13.0
version=1.13.1
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.transferwise.common.baseutils.ExceptionUtils;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Predicate;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -19,7 +22,7 @@ public static String toString(ThreadInfo threadInfo) {
sb.append("\"")
.append(threadInfo.getName())
.append("\"")
.append(" group=\"").append(threadInfo.getGroupName()).append("\"")
.append(" group=\"").append(threadInfo.getGroupName() == null ? "system" : threadInfo.getGroupName()).append("\"")
.append(" id=").append(threadInfo.getId())
.append(" prio=").append(threadInfo.getPriority())
.append(" daemon=").append(threadInfo.isDaemon())
Expand All @@ -34,7 +37,7 @@ public static String toString(ThreadInfo threadInfo) {
return sb.toString();
}

public static String toString(ThreadInfo[] threadInfos) {
public static String toString(List<ThreadInfo> threadInfos) {
var sb = new StringBuilder();
boolean first = true;
for (var threadInfo : threadInfos) {
Expand All @@ -48,65 +51,78 @@ public static String toString(ThreadInfo[] threadInfos) {
return sb.toString();
}

public static ThreadInfo[] getInconsistentThreadDump() {
public static List<ThreadInfo> getInconsistentThreadDump() {
return getInconsistentThreadDump(MoreExecutors.newDirectExecutorService(), 1);
}

public static ThreadInfo[] getInconsistentThreadDump(ExecutorService executorService, int concurrency) {
public static List<ThreadInfo> getInconsistentThreadDump(ExecutorService executorService, int concurrency) {
long startTimeMs = System.currentTimeMillis();

try {
return ExceptionUtils.doUnchecked(() -> {
var threadMxBean = ManagementFactory.getThreadMXBean();

var threads = org.apache.commons.lang3.ThreadUtils.findThreads((Predicate<Thread>) Objects::nonNull);

var threadInfos = new ThreadInfo[threads.size()];

var semaphore = new Semaphore(concurrency);
var latch = new CountDownLatch(threads.size());

int i = 0;
for (var thread : threads) {
var threadId = thread.getId();
semaphore.acquire();
int finalI = i;

executorService.submit(() -> {
try {
StackTraceElement[] stackTrace = thread.getStackTrace();

var threadState = new ThreadInfo()
.setStackTrace(stackTrace)
.setId(threadId)
.setState(thread.getState())
.setName(thread.getName())
.setPriority(thread.getPriority())
.setCpuTime(threadMxBean.getThreadCpuTime(threadId))
.setUserTime(threadMxBean.getThreadUserTime(threadId))
.setGroupName(thread.getThreadGroup().getName())
.setDaemon(thread.isDaemon());

threadInfos[finalI] = threadState;
} finally {
semaphore.release();
latch.countDown();
}
});
i++;
}

if (!latch.await(1, TimeUnit.MINUTES)) {
throw new IllegalStateException("We were unable to complete inconsistent thread dump.");
}

return threadInfos;
});
return ExceptionUtils.doUnchecked(() -> getInconsistentThreadDump0(executorService, concurrency));
} finally {
if (log.isDebugEnabled()) {
log.debug("Inconsistent thread dump took {} ms.", (System.currentTimeMillis() - startTimeMs));
log.debug("Inconsistent thread dump took {} ms to take.", (System.currentTimeMillis() - startTimeMs));
}
}
}

private static ArrayList<ThreadInfo> getInconsistentThreadDump0(ExecutorService executorService, int concurrency) throws InterruptedException {
var threadMxBean = ManagementFactory.getThreadMXBean();

var threads = org.apache.commons.lang3.ThreadUtils.findThreads((Predicate<Thread>) Objects::nonNull);

final var threadsCount = threads.size();
var threadInfos = new AtomicReferenceArray<ThreadInfo>(threadsCount);

var semaphore = new Semaphore(concurrency);
var latch = new CountDownLatch(threadsCount);

int i = 0;
for (var thread : threads) {
var threadId = thread.getId();
semaphore.acquire();
int finalI = i;

executorService.submit(() -> {
try {
StackTraceElement[] stackTrace = thread.getStackTrace();
final var threadGroup = thread.getThreadGroup();
var threadGroupName = threadGroup == null ? null : threadGroup.getName();

var threadInfo = new ThreadInfo()
.setStackTrace(stackTrace)
.setId(threadId)
.setState(thread.getState())
.setName(thread.getName())
.setPriority(thread.getPriority())
.setCpuTime(threadMxBean.getThreadCpuTime(threadId))
.setUserTime(threadMxBean.getThreadUserTime(threadId))
.setGroupName(threadGroupName)
.setDaemon(thread.isDaemon());

threadInfos.set(finalI, threadInfo);
} catch (Throwable t) {
log.error("Processing thread with id {} failed.", threadId, t);
} finally {
semaphore.release();
latch.countDown();
}
});
i++;
}

if (!latch.await(1, TimeUnit.MINUTES)) {
throw new IllegalStateException("We were unable to complete inconsistent thread dump.");
}

var result = new ArrayList<ThreadInfo>(threadsCount);
for (int j = 0; j < threadsCount; j++) {
var threadInfo = threadInfos.get(j);
if (threadInfo != null) {
result.add(threadInfo);
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.transferwise.common.baseutils.clock;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.time.Duration;
import org.junit.jupiter.api.Test;

public class TestClockTest {

// Plus operation is used in Groovy based tests.
@Test
void plusOperationWorks() {
var testClock = new TestClock();
var startInstant = testClock.instant();

testClock.plus("P1D");

assertThat(testClock.instant(), equalTo(startInstant.plus(Duration.ofDays(1))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.hamcrest.Matchers.greaterThan;

import com.transferwise.common.baseutils.ExceptionUtils;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -17,6 +18,9 @@ public class ThreadUtilsTest {
@Test
@SneakyThrows
void takingThreadDumpWorks() {
ProcessHandle processHandle = ProcessHandle.current();
long pid = processHandle.pid();
System.out.println("The PID of this Java process is: " + pid);

var executorService = Executors.newCachedThreadPool();

Expand Down Expand Up @@ -62,6 +66,6 @@ void takingThreadDumpWorks() {
@Test
@SneakyThrows
void takingThreadDumpWithCurrentThreadWorks() {
assertThat(ThreadUtils.getInconsistentThreadDump().length, greaterThan(0));
assertThat(ThreadUtils.getInconsistentThreadDump().size(), greaterThan(0));
}
}

0 comments on commit 8a17da8

Please sign in to comment.