Skip to content

Commit

Permalink
fix(server): Fix netty memory leak when removeBuffer and cacheShuffle…
Browse files Browse the repository at this point in the history
…Data happen concurrent (merge request !278)
  • Loading branch information
wenlongwlli committed Oct 10, 2024
1 parent ba2302c commit 30229b5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public abstract class AbstractShuffleBuffer implements ShuffleBuffer {

protected long size;

protected volatile boolean evicted;
public static final long BUFFER_EVICTED = -1L;

public AbstractShuffleBuffer() {
this.size = 0;
this.evicted = false;
}

/** Only for test */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ public StatusCode cacheShuffleData(

ShuffleBuffer buffer = entry.getValue();
long size = buffer.append(spd);
if (size == AbstractShuffleBuffer.BUFFER_EVICTED) {
return StatusCode.NO_REGISTER;
}
if (!isPreAllocated) {
updateUsedMemory(size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ public ShuffleBufferWithLinkedList() {
}

@Override
public long append(ShufflePartitionedData data) {
long size = 0;
public synchronized long append(ShufflePartitionedData data) {
if (evicted) {
return BUFFER_EVICTED;
}
long currentSize = 0;

synchronized (this) {
for (ShufflePartitionedBlock block : data.getBlockList()) {
blocks.add(block);
size += block.getSize();
}
this.size += size;
for (ShufflePartitionedBlock block : data.getBlockList()) {
blocks.add(block);
currentSize += block.getSize();
}
this.size += currentSize;

return size;
return currentSize;
}

@Override
Expand Down Expand Up @@ -110,7 +111,8 @@ public int getBlockCount() {
}

@Override
public void release() {
public synchronized void release() {
evicted = true;
blocks.forEach(spb -> spb.getData().release());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,20 @@ private ConcurrentSkipListMap<Long, ShufflePartitionedBlock> newConcurrentSkipLi
}

@Override
public long append(ShufflePartitionedData data) {
long size = 0;

synchronized (this) {
for (ShufflePartitionedBlock block : data.getBlockList()) {
blocksMap.put(block.getBlockId(), block);
blockCount++;
size += block.getSize();
}
this.size += size;
public synchronized long append(ShufflePartitionedData data) {
if (evicted) {
return BUFFER_EVICTED;
}
long currentSize = 0;

for (ShufflePartitionedBlock block : data.getBlockList()) {
blocksMap.put(block.getBlockId(), block);
blockCount++;
currentSize += block.getSize();
}
this.size += currentSize;

return size;
return currentSize;
}

@Override
Expand Down Expand Up @@ -111,7 +112,8 @@ public int getBlockCount() {
}

@Override
public void release() {
public synchronized void release() {
evicted = true;
blocksMap.values().forEach(spb -> spb.getData().release());
}

Expand Down

0 comments on commit 30229b5

Please sign in to comment.