From 30229b551b9e3c8bfd8679bc2d70976797231073 Mon Sep 17 00:00:00 2001 From: wenlongwlli Date: Tue, 13 Aug 2024 17:41:20 +0800 Subject: [PATCH] fix(server): Fix netty memory leak when removeBuffer and cacheShuffleData happen concurrent (merge request !278) --- .../server/buffer/AbstractShuffleBuffer.java | 4 +++ .../server/buffer/ShuffleBufferManager.java | 3 +++ .../buffer/ShuffleBufferWithLinkedList.java | 22 +++++++++------- .../buffer/ShuffleBufferWithSkipList.java | 26 ++++++++++--------- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java index ddbeb21cf2..63fd3a3afb 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java @@ -41,8 +41,12 @@ public abstract class AbstractShuffleBuffer implements ShuffleBuffer { protected long size; + protected volatile boolean evicted; + public static final long BUFFER_EVICTED = -1L; + public AbstractShuffleBuffer() { this.size = 0; + this.evicted = false; } /** Only for test */ diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 1b432abee9..4ee15f4f18 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -194,6 +194,9 @@ public StatusCode cacheShuffleData( ShuffleBuffer buffer = entry.getValue(); long size = buffer.append(spd); + if (size == AbstractShuffleBuffer.BUFFER_EVICTED) { + return StatusCode.NO_REGISTER; + } if (!isPreAllocated) { updateUsedMemory(size); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java index cd9d7ab669..87260990a9 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java @@ -48,18 +48,19 @@ public ShuffleBufferWithLinkedList() { } @Override - public long append(ShufflePartitionedData data) { - long size = 0; + public synchronized long append(ShufflePartitionedData data) { + if (evicted) { + return BUFFER_EVICTED; + } + long currentSize = 0; - synchronized (this) { - for (ShufflePartitionedBlock block : data.getBlockList()) { - blocks.add(block); - size += block.getSize(); - } - this.size += size; + for (ShufflePartitionedBlock block : data.getBlockList()) { + blocks.add(block); + currentSize += block.getSize(); } + this.size += currentSize; - return size; + return currentSize; } @Override @@ -110,7 +111,8 @@ public int getBlockCount() { } @Override - public void release() { + public synchronized void release() { + evicted = true; blocks.forEach(spb -> spb.getData().release()); } diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java index 2419a7dd48..e10fe65681 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java @@ -57,19 +57,20 @@ private ConcurrentSkipListMap newConcurrentSkipLi } @Override - public long append(ShufflePartitionedData data) { - long size = 0; - - synchronized (this) { - for (ShufflePartitionedBlock block : data.getBlockList()) { - blocksMap.put(block.getBlockId(), block); - blockCount++; - size += block.getSize(); - } - this.size += size; + public synchronized long append(ShufflePartitionedData data) { + if (evicted) { + return BUFFER_EVICTED; + } + long currentSize = 0; + + for (ShufflePartitionedBlock block : data.getBlockList()) { + blocksMap.put(block.getBlockId(), block); + blockCount++; + currentSize += block.getSize(); } + this.size += currentSize; - return size; + return currentSize; } @Override @@ -111,7 +112,8 @@ public int getBlockCount() { } @Override - public void release() { + public synchronized void release() { + evicted = true; blocksMap.values().forEach(spb -> spb.getData().release()); }