Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Dec 6, 2024
1 parent c4172e0 commit 9e172e2
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,4 +610,19 @@ private void addShuffleFallbackCounts(Map<String, Long> fallbackCounts) {
fallbackPolicy, (k, v) -> v == null ? fallbackCounts.get(k) : v + fallbackCounts.get(k));
}
}

public void updateWorkerResourceConsumptions(
String host,
int rpcPort,
int pushPort,
int fetchPort,
int replicatePort,
Map<UserIdentifier, ResourceConsumption> resourceConsumptions) {
WorkerInfo worker =
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, null, null);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(info -> info.updateThenGetUserResourceConsumption(resourceConsumptions));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,8 @@ private[celeborn] class Master(
private val hasHDFSStorage = conf.hasHDFSStorage
private val hasS3Storage = conf.hasS3Storage

// workerUniqueId -> ResourceConsumption
private val workerToResourceConsumptions =
JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, ResourceConsumption]]()
private val quotaManager = new QuotaManager(
workerToResourceConsumptions,
statusSystem,
masterSource,
resourceConsumptionSource,
conf,
Expand Down Expand Up @@ -675,9 +672,15 @@ private[celeborn] class Master(
highWorkload,
workerStatus,
requestId)
statusSystem.updateWorkerResourceConsumptions(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
userResourceConsumption)
}

workerToResourceConsumptions.put(targetWorker.toUniqueId(), userResourceConsumption)
val expiredShuffleKeys = new util.HashSet[String]
activeShuffleKeys.asScala.foreach { shuffleKey =>
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
import org.apache.celeborn.server.common.service.config.ConfigService
import org.apache.celeborn.service.deploy.master.MasterSource
import org.apache.celeborn.service.deploy.master.MasterSource.UPDATE_RESOURCE_CONSUMPTION_TIME
import org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager
import org.apache.celeborn.service.deploy.master.quota.QuotaStatus._

class QuotaManager(
workerToResourceConsumptions: JMap[String, JMap[UserIdentifier, ResourceConsumption]],
statusSystem: AbstractMetaManager,
masterSource: MasterSource,
resourceConsumptionSource: ResourceConsumptionSource,
celebornConf: CelebornConf,
Expand Down Expand Up @@ -189,77 +190,79 @@ class QuotaManager(
masterSource.sample(UPDATE_RESOURCE_CONSUMPTION_TIME, this.getClass.getSimpleName, Map.empty) {
val clusterQuota = getClusterStorageQuota
var clusterResourceConsumption = ResourceConsumption(0, 0, 0, 0)

val tenantResourceConsumption =
workerToResourceConsumptions.asScala.flatMap(_._2.asScala).groupBy(_._1.tenantId).map {
case (tenantId, tenantConsumptionList) =>
var tenantResourceConsumption = ResourceConsumption(0, 0, 0, 0)
val userResourceConsumption =
tenantConsumptionList.groupBy(_._1).map {
case (userIdentifier, userConsumptionList) =>
// Step 1: Compute user consumption and set quota status.
val resourceConsumptionList = userConsumptionList.values.toSeq
val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList)
statusSystem.availableWorkers.asScala.flatMap { workerInfo =>
workerInfo.userResourceConsumption.asScala
}.groupBy(_._1.tenantId).toSeq.map { case (tenantId, tenantConsumptionList) =>
var tenantResourceConsumption = ResourceConsumption(0, 0, 0, 0)
val userResourceConsumption =
tenantConsumptionList.groupBy(_._1).map {
case (userIdentifier, userConsumptionList) =>
// Step 1: Compute user consumption and set quota status.
val resourceConsumptionList = userConsumptionList.map(_._2).toSeq
val resourceConsumption = computeUserResourceConsumption(resourceConsumptionList)

// Step 2: Update user resource consumption metrics.
// For extract metrics
userResourceConsumptionMap.put(userIdentifier, resourceConsumption)
registerUserResourceConsumptionMetrics(userIdentifier)
// Step 2: Update user resource consumption metrics.
// For extract metrics
userResourceConsumptionMap.put(userIdentifier, resourceConsumption)
registerUserResourceConsumptionMetrics(userIdentifier)

// Step 3: Expire user level exceeded app except already expired app
clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption)
tenantResourceConsumption = tenantResourceConsumption.add(resourceConsumption)
val quotaStatus = checkUserQuotaSpace(userIdentifier, resourceConsumption)
userQuotaStatus.put(userIdentifier, quotaStatus)
if (interruptShuffleEnabled && quotaStatus.exceed) {
val subResourceConsumptions = computeSubConsumption(resourceConsumptionList)
// Compute expired size
val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) =>
appQuotaStatus.containsKey(app)
}
val userConsumptions =
expired.values.foldLeft(resourceConsumption)(_.subtract(_))
expireApplication(
userConsumptions,
getUserStorageQuota(userIdentifier),
notExpired.toSeq,
USER_EXHAUSTED)
(Option(subResourceConsumptions), resourceConsumptionList)
} else {
(None, resourceConsumptionList)
// Step 3: Expire user level exceeded app except already expired app
clusterResourceConsumption = clusterResourceConsumption.add(resourceConsumption)
tenantResourceConsumption = tenantResourceConsumption.add(resourceConsumption)
val quotaStatus = checkUserQuotaSpace(userIdentifier, resourceConsumption)
userQuotaStatus.put(userIdentifier, quotaStatus)
if (interruptShuffleEnabled && quotaStatus.exceed) {
val subResourceConsumptions = computeSubConsumption(resourceConsumptionList)
// Compute expired size
val (expired, notExpired) = subResourceConsumptions.partition { case (app, _) =>
appQuotaStatus.containsKey(app)
}
}
val userConsumptions =
expired.values.foldLeft(resourceConsumption)(_.subtract(_))
expireApplication(
userConsumptions,
getUserStorageQuota(userIdentifier),
notExpired.toSeq,
USER_EXHAUSTED)
(Option(subResourceConsumptions), resourceConsumptionList)
} else {
(None, resourceConsumptionList)
}
}

val quotaStatus = checkTenantQuotaSpace(tenantId, tenantResourceConsumption)
tenantQuotaStatus.put(tenantId, quotaStatus)
// Expire tenant level exceeded app except already expired app
if (interruptShuffleEnabled && quotaStatus.exceed) {
val appConsumptions = userResourceConsumption.map {
case (None, subConsumptionList) => computeSubConsumption(subConsumptionList)
case (Some(subConsumptions), _) => subConsumptions
}.flatMap(_.toSeq).toSeq
val quotaStatus = checkTenantQuotaSpace(tenantId, tenantResourceConsumption)
tenantQuotaStatus.put(tenantId, quotaStatus)
// Expire tenant level exceeded app except already expired app
if (interruptShuffleEnabled && quotaStatus.exceed) {
val appConsumptions = userResourceConsumption.map {
case (None, subConsumptionList) => computeSubConsumption(subConsumptionList)
case (Some(subConsumptions), _) => subConsumptions
}.flatMap(_.toSeq).toSeq

// Compute nonExpired app total usage
val (expired, notExpired) = appConsumptions.partition { case (app, _) =>
appQuotaStatus.containsKey(app)
}
tenantResourceConsumption =
expired.map(_._2).foldLeft(tenantResourceConsumption)(_.subtract(_))
expireApplication(
tenantResourceConsumption,
getTenantStorageQuota(tenantId),
notExpired,
TENANT_EXHAUSTED)
(Option(appConsumptions), tenantConsumptionList.values)
} else {
(None, tenantConsumptionList.values)
// Compute nonExpired app total usage
val (expired, notExpired) = appConsumptions.partition { case (app, _) =>
appQuotaStatus.containsKey(app)
}
tenantResourceConsumption =
expired.map(_._2).foldLeft(tenantResourceConsumption)(_.subtract(_))
expireApplication(
tenantResourceConsumption,
getTenantStorageQuota(tenantId),
notExpired,
TENANT_EXHAUSTED)
(Option(appConsumptions), tenantConsumptionList.map(_._2).toSeq)
} else {
(None, tenantConsumptionList.map(_._2).toSeq)
}
}

// Expire cluster level exceeded app except already expired app
clusterQuotaStatus = checkClusterQuotaSpace(clusterResourceConsumption)
if (interruptShuffleEnabled && clusterQuotaStatus.exceed) {
val appConsumptions = tenantResourceConsumption.map {
case (None, subConsumptionList) => computeSubConsumption(subConsumptionList.toSeq)
case (None, subConsumptionList) => computeSubConsumption(subConsumptionList)
case (Some(subConsumptions), _) => subConsumptions
}.flatMap(_.toSeq).toSeq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.metrics.source.ResourceConsumptionSource
import org.apache.celeborn.common.protocol.message.ControlMessages.CheckQuotaResponse
import org.apache.celeborn.common.quota.{ResourceConsumption, StorageQuota}
import org.apache.celeborn.common.rpc.RpcEnv
import org.apache.celeborn.common.util.{JavaUtils, Utils}
import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory, FsConfigServiceImpl}
import org.apache.celeborn.service.deploy.master.MasterSource
import org.apache.celeborn.service.deploy.master.clustermeta.{AbstractMetaManager, SingleMasterMetaManager}

class QuotaManagerSuite extends CelebornFunSuite
with BeforeAndAfterAll
Expand All @@ -52,6 +54,10 @@ class QuotaManagerSuite extends CelebornFunSuite
10003,
10004)

var statusSystem: AbstractMetaManager = _

var rpcEnv: RpcEnv = _

val workerToResourceConsumptions =
JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, ResourceConsumption]]()

Expand All @@ -68,14 +74,27 @@ class QuotaManagerSuite extends CelebornFunSuite
DynamicConfigServiceFactory.reset()
configService = DynamicConfigServiceFactory.getConfigService(conf)

rpcEnv = RpcEnv.create(
"test-rpc",
"rpc",
"localhost",
9001,
conf,
None)
statusSystem = new SingleMasterMetaManager(rpcEnv, conf)
statusSystem.availableWorkers.add(worker)
quotaManager = new QuotaManager(
workerToResourceConsumptions,
statusSystem,
new MasterSource(conf),
resourceConsumptionSource,
conf,
configService)
}

override def afterAll(): Unit = {
rpcEnv.shutdown()
}

test("test celeborn quota conf") {
configService.refreshCache()
assertEquals(
Expand Down Expand Up @@ -415,8 +434,17 @@ class QuotaManagerSuite extends CelebornFunSuite
conf1.set(
CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key,
getTestResourceFile("dynamicConfig-quota-2.yaml").getPath)
val rpcEnv = RpcEnv.create(
"test-rpc",
"rpc",
"localhost",
9002,
conf,
None)
val statusSystem1 = new SingleMasterMetaManager(rpcEnv, conf)
statusSystem1.availableWorkers.add(worker)
val quotaManager1 = new QuotaManager(
workerToResourceConsumptions,
statusSystem1,
new MasterSource(conf1),
resourceConsumptionSource,
conf1,
Expand Down Expand Up @@ -514,8 +542,17 @@ class QuotaManagerSuite extends CelebornFunSuite
conf1.set(
CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH.key,
getTestResourceFile("dynamicConfig-quota-3.yaml").getPath)
val rpcEnv = RpcEnv.create(
"test-rpc",
"rpc",
"localhost",
9003,
conf,
None)
val statusSystem1 = new SingleMasterMetaManager(rpcEnv, conf)
statusSystem1.availableWorkers.add(worker)
val quotaManager1 = new QuotaManager(
workerToResourceConsumptions,
statusSystem1,
new MasterSource(conf1),
resourceConsumptionSource,
conf1,
Expand Down

0 comments on commit 9e172e2

Please sign in to comment.