Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1792] MemoryManager resume should use pinnedDirectMemory instead of usedDirectMemory #3018

Closed
wants to merge 18 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.celeborn.common.network.util;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -47,6 +49,8 @@ public class NettyUtils {
private static final ByteBufAllocator[] _sharedByteBufAllocator = new ByteBufAllocator[2];
private static final ConcurrentHashMap<String, Integer> allocatorsIndex =
JavaUtils.newConcurrentHashMap();
private static final List<PooledByteBufAllocator> pooledByteBufAllocators = new ArrayList<>();

/** Creates a new ThreadFactory which prefixes each thread with the given name. */
public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
return new DefaultThreadFactory(threadPoolPrefix, true);
Expand Down Expand Up @@ -141,6 +145,9 @@ public static synchronized ByteBufAllocator getSharedByteBufAllocator(
_sharedByteBufAllocator[index] =
createByteBufAllocator(
conf.networkMemoryAllocatorPooled(), true, allowCache, conf.networkAllocatorArenas());
if (conf.networkMemoryAllocatorPooled()) {
pooledByteBufAllocators.add((PooledByteBufAllocator) _sharedByteBufAllocator[index]);
}
if (source != null) {
new NettyMemoryMetrics(
_sharedByteBufAllocator[index],
Expand Down Expand Up @@ -178,6 +185,9 @@ public static ByteBufAllocator getByteBufAllocator(
conf.preferDirectBufs(),
allowCache,
arenas);
if (conf.getCelebornConf().networkMemoryAllocatorPooled()) {
pooledByteBufAllocators.add((PooledByteBufAllocator) allocator);
}
if (source != null) {
String poolName = "default-netty-pool";
Map<String, String> labels = new HashMap<>();
Expand All @@ -196,4 +206,8 @@ public static ByteBufAllocator getByteBufAllocator(
}
return allocator;
}

public static List<PooledByteBufAllocator> getAllPooledByteBufAllocators() {
return pooledByteBufAllocators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3858,7 +3858,7 @@ object CelebornConf extends Logging {
.doc("If direct memory usage is less than this limit, worker will resume.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.7)
.createWithDefault(0.3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a new conf for pinnedMemoryToResume and keep exist conf for directMemoryRatioToResume


val WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.memoryFileStorage.maxFileSize")
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ license: |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | false | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | false | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.7 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.directMemoryRatioToResume | 0.3 | false | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | |
| celeborn.worker.disk.clean.threads | 4 | false | Thread number of worker to clean up directories of expired shuffle keys on disk. | 0.3.2 | |
| celeborn.worker.fetch.heartbeat.enabled | false | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | |
| celeborn.worker.fetch.io.threads | &lt;undefined&gt; | false | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ trait MemorySparkTestBase extends AnyFunSuite

override def beforeAll(): Unit = {
logInfo("test initialized , setup Celeborn mini cluster")
val workerConfs = Map("celeborn.worker.directMemoryRatioForMemoryFileStorage" -> "0.2")
val workerConfs =
Map(
"celeborn.worker.directMemoryRatioForMemoryFileStorage" -> "0.2",
"celeborn.worker.directMemoryRatioToResume" -> "0.4")
setupMiniClusterWithRandomPorts(workerConf = workerConfs, workerNum = 5)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
Expand Down Expand Up @@ -93,6 +95,7 @@ public class MemoryManager {
private long memoryFileStorageThreshold;
private final LongAdder memoryFileStorageCounter = new LongAdder();
private final StorageManager storageManager;
private boolean networkMemoryAllocatorPooled;

@VisibleForTesting
public static MemoryManager initialize(CelebornConf conf) {
Expand Down Expand Up @@ -159,6 +162,7 @@ private MemoryManager(CelebornConf conf, StorageManager storageManager, Abstract
readBufferThreshold = (long) (maxDirectMemory * readBufferRatio);
readBufferTarget = (long) (readBufferThreshold * readBufferTargetRatio);
memoryFileStorageThreshold = (long) (maxDirectMemory * memoryFileStorageRatio);
networkMemoryAllocatorPooled = conf.networkMemoryAllocatorPooled();

checkService.scheduleWithFixedDelay(
() -> {
Expand Down Expand Up @@ -293,6 +297,18 @@ public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double evic

public ServingState currentServingState() {
long memoryUsage = getMemoryUsage();
long allocatedMemory;
if (networkMemoryAllocatorPooled) {
allocatedMemory = getAllocatedMemory();
} else {
allocatedMemory = memoryUsage;
}
// trigger resume
// CELEBORN-1792: resume should use pinnedDirectMemory instead of usedDirectMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we needn't change to pause state, it would be better to call trim when netty direct memory used above pausePushDataThreshold/pauseReplicateThreshold, WDYT?

if (allocatedMemory / (double) (maxDirectMemory) < resumeRatio) {
isPaused = false;
return ServingState.NONE_PAUSED;
}
// pause replicate threshold always greater than pause push data threshold
// so when trigger pause replicate, pause both push and replicate
if (memoryUsage > pauseReplicateThreshold) {
Expand All @@ -304,11 +320,6 @@ public ServingState currentServingState() {
isPaused = true;
return ServingState.PUSH_PAUSED;
}
// trigger resume
if (memoryUsage / (double) (maxDirectMemory) < resumeRatio) {
isPaused = false;
return ServingState.NONE_PAUSED;
}
// if isPaused and not trigger resume, then return pause push
// wait for trigger resumeThreshold to resume state
return isPaused ? ServingState.PUSH_PAUSED : ServingState.NONE_PAUSED;
Expand Down Expand Up @@ -436,6 +447,16 @@ public long getMemoryUsage() {
return getNettyUsedDirectMemory() + sortMemoryCounter.get();
}

public long getAllocatedMemory() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be renamed to getPinnedMemory. The allocated memory is the netty memory counter.

return getNettyPinnedDirectMemory() + sortMemoryCounter.get();
}

public long getNettyPinnedDirectMemory() {
return NettyUtils.getAllPooledByteBufAllocators().stream()
.mapToLong(PooledByteBufAllocator::pinnedDirectMemory)
.sum();
}

public AtomicLong getSortMemoryCounter() {
return sortMemoryCounter;
}
Expand Down
Loading