Skip to content

Commit

Permalink
Remove PreAllocated buffer earlier by unregisterPureEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Jul 27, 2024
1 parent 457c865 commit a473a27
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,16 @@ public void sendShuffleData(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
}
}
int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);
int requireSize =
shuffleServer.getShuffleTaskManager().getRequireBufferSize(appId, requireBufferId);

StatusCode ret = StatusCode.SUCCESS;
String responseMessage = "OK";
if (req.getShuffleDataCount() > 0) {
ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
PreAllocatedBufferInfo info =
manager.getAndRemovePreAllocatedBuffer(appId, requireBufferId);
boolean isPreAllocated = info != null;
if (!isPreAllocated) {
String errorMsg =
Expand Down
112 changes: 82 additions & 30 deletions server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -114,7 +115,9 @@ public class ShuffleTaskManager {
private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds;
private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap();
// appId -> {requireBufferId -> PreAllocatedBufferInfo}
private Map<String, Map<Long, PreAllocatedBufferInfo>> appIdToRequireBufferIdsMap =
JavaUtils.newConcurrentMap();
private Thread clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
private final Cache<String, ReentrantReadWriteLock> appLocks;
Expand Down Expand Up @@ -319,17 +322,21 @@ public StatusCode cacheShuffleData(
return shuffleBufferManager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
}

public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(long requireBufferId) {
return requireBufferIds.remove(requireBufferId);
public PreAllocatedBufferInfo getAndRemovePreAllocatedBuffer(String appId, long requireBufferId) {
Map<Long, PreAllocatedBufferInfo> requireBufferIdMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdMap == null) {
return null;
}
return requireBufferIdMap.remove(requireBufferId);
}

public void releasePreAllocatedSize(long requireSize) {
shuffleBufferManager.releasePreAllocatedSize(requireSize);
}

@VisibleForTesting
void removeAndReleasePreAllocatedBuffer(long requireBufferId) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(requireBufferId);
void removeAndReleasePreAllocatedBuffer(String appId, long requireBufferId) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(appId, requireBufferId);
if (info != null) {
releasePreAllocatedSize(info.getRequireSize());
}
Expand Down Expand Up @@ -541,9 +548,18 @@ public long requireBuffer(
public long requireBuffer(String appId, int requireSize) {
if (shuffleBufferManager.requireMemory(requireSize, true)) {
long requireId = requireBufferId.incrementAndGet();
requireBufferIds.put(
requireId,
new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize));
ReentrantReadWriteLock.WriteLock appLock = getAppWriteLock(appId);
try {
// preAllocatedBufferCheck will obtain lock and remove the empty appId
appLock.lock();
Map<Long, PreAllocatedBufferInfo> requireBufferMaps =
appIdToRequireBufferIdsMap.computeIfAbsent(appId, x -> JavaUtils.newConcurrentMap());
requireBufferMaps.put(
requireId,
new PreAllocatedBufferInfo(appId, requireId, System.currentTimeMillis(), requireSize));
} finally {
appLock.unlock();
}
return requireId;
} else {
LOG.warn("Failed to require buffer, require size: {}", requireSize);
Expand Down Expand Up @@ -829,6 +845,12 @@ public void removeResources(String appId, boolean checkAppExpired) {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
Map<Long, PreAllocatedBufferInfo> requireBufferIdsMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdsMap != null) {
for (PreAllocatedBufferInfo info : requireBufferIdsMap.values()) {
removeAndReleasePreAllocatedBuffer(appId, info.getRequireId());
}
}

String operationMsg = String.format("removing storage data for appId:%s", appId);
withTimeoutExecution(
Expand Down Expand Up @@ -896,34 +918,64 @@ public void refreshAppId(String appId) {
private void preAllocatedBufferCheck() {
try {
long current = System.currentTimeMillis();
List<Long> removeIds = Lists.newArrayList();
for (PreAllocatedBufferInfo info : requireBufferIds.values()) {
if (current - info.getTimestamp() > preAllocationExpired) {
removeIds.add(info.getRequireId());
for (Map.Entry<String, Map<Long, PreAllocatedBufferInfo>> entry :
appIdToRequireBufferIdsMap.entrySet()) {
String appId = entry.getKey();
if (MapUtils.isEmpty(entry.getValue())) {
ReentrantReadWriteLock.WriteLock appLock = getAppWriteLock(appId);
try {
appLock.lock();
// After double check, remove empty map related this appId from
// appIdToRequireBufferIdsMap
if (MapUtils.isEmpty(entry.getValue())) {
// Keep single point remove appId from appIdToRequireBufferIdsMap
appIdToRequireBufferIdsMap.remove(appId);
continue;
}
} finally {
appLock.unlock();
}
}
}
for (Long requireId : removeIds) {
PreAllocatedBufferInfo info = requireBufferIds.remove(requireId);
if (info != null) {
// move release memory code down to here as the requiredBuffer could be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
LOG.warn(
"Remove expired preAllocatedBuffer[id={}] that required by app: {}",
requireId,
info.getAppId());
ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
LOG.info("PreAllocatedBuffer[id={}] has already be used", requireId);
List<Long> toRemoveIds = Lists.newArrayList();
for (PreAllocatedBufferInfo info : entry.getValue().values()) {
if (current - info.getTimestamp() > preAllocationExpired) {
toRemoveIds.add(info.getRequireId());
}
}
List<Long> removedIds = Lists.newArrayList();
List<Long> usedIds = Lists.newArrayList();
for (Long requireId : toRemoveIds) {
PreAllocatedBufferInfo info = getAndRemovePreAllocatedBuffer(appId, requireId);
if (info != null) {
// move release memory code down to here as the requiredBuffer could be consumed during
// removing processing.
shuffleBufferManager.releaseMemory(info.getRequireSize(), false, true);
removedIds.add(requireId);
ShuffleServerMetrics.counterPreAllocatedBufferExpired.inc();
} else {
usedIds.add(requireId);
}
if (removedIds.size() > 0) {
LOG.info(
"Remove expired preAllocatedBuffer[id={}] for app[{}], removedIds: {}, usedIds: {}",
requireId,
appId,
removedIds,
usedIds);
}
}
}
} catch (Exception e) {
LOG.warn("Error happened in preAllocatedBufferCheck", e);
}
}

public int getRequireBufferSize(long requireId) {
PreAllocatedBufferInfo pabi = requireBufferIds.get(requireId);
public int getRequireBufferSize(String appId, long requireId) {
Map<Long, PreAllocatedBufferInfo> requireBufferIdMap = appIdToRequireBufferIdsMap.get(appId);
if (requireBufferIdMap == null) {
return 0;
}
PreAllocatedBufferInfo pabi = requireBufferIdMap.get(requireId);
if (pabi == null) {
return 0;
}
Expand All @@ -940,8 +992,8 @@ public Set<String> getAppIds() {
}

@VisibleForTesting
Map<Long, PreAllocatedBufferInfo> getRequireBufferIds() {
return requireBufferIds;
Map<Long, PreAllocatedBufferInfo> getRequireBufferIds(String appId) {
return appIdToRequireBufferIdsMap.get(appId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// thread,
// otherwise we need to release the required size.
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
shuffleTaskManager.getAndRemovePreAllocatedBuffer(appId, requireBufferId);
int requireSize = info == null ? 0 : info.getRequireSize();
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ public void writeProcessTest() throws Exception {
StringUtils.EMPTY);
final List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
final List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
final Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
final Map<Long, PreAllocatedBufferInfo> bufferIds =
shuffleTaskManager.getRequireBufferIds(appId);

shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
Expand All @@ -391,7 +392,7 @@ public void writeProcessTest() throws Exception {
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// manually release the pre allocate buffer
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
assertEquals(
Expand All @@ -404,7 +405,7 @@ public void writeProcessTest() throws Exception {
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData1);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);

// won't flush for partition 1-1
Expand All @@ -421,7 +422,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData3);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
assertEquals(StatusCode.SUCCESS, sc);

// flush for partition 2-2
Expand All @@ -430,7 +431,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData4);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);
assertEquals(StatusCode.SUCCESS, sc);

shuffleTaskManager.commitShuffle(appId, shuffleId);
Expand All @@ -444,7 +445,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData5);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
Expand All @@ -460,7 +461,7 @@ public void writeProcessTest() throws Exception {
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(appId, bufferId);

// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
Expand Down

0 comments on commit a473a27

Please sign in to comment.