Skip to content

Commit

Permalink
[SPARK-47458][CORE] Fix the problem with calculating the maximum conc…
Browse files Browse the repository at this point in the history
…urrent tasks for the barrier stage

### What changes were proposed in this pull request?

This PR addresses the problem of calculating the maximum concurrent tasks while evaluating the number of slots for barrier stages, specifically for the case when the task resource amount is greater than 1.

### Why are the changes needed?

``` scala
  test("problem of calculating the maximum concurrent task") {
    withTempDir { dir =>
      val discoveryScript = createTempScriptWithExpectedOutput(
        dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")

      val conf = new SparkConf()
        // Setup a local cluster which would only has one executor with 2 CPUs and 1 GPU.
        .setMaster("local-cluster[1, 6, 1024]")
        .setAppName("test-cluster")
        .set(WORKER_GPU_ID.amountConf, "4")
        .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
        .set(EXECUTOR_GPU_ID.amountConf, "4")
        .set(TASK_GPU_ID.amountConf, "2")
        // disable barrier stage retry to fail the application as soon as possible
        .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 1)
      sc = new SparkContext(conf)
      TestUtils.waitUntilExecutorsUp(sc, 1, 60000)

      // Setup a barrier stage which contains 2 tasks and each task requires 1 CPU and 1 GPU.
      // Therefore, the total resources requirement (2 CPUs and 2 GPUs) of this barrier stage
      // can not be satisfied since the cluster only has 2 CPUs and 1 GPU in total.
      assert(sc.parallelize(Range(1, 10), 2)
        .barrier()
        .mapPartitions { iter => iter }
        .collect() sameElements Range(1, 10).toArray[Int])
    }
  }
```

In the described test scenario, the executor has 6 CPU cores and 4 GPUs, and each task requires 1 CPU core and 2 GPUs. Consequently, the maximum number of concurrent tasks should be 2. However, the issue arises when attempting to launch the subsequent 2 barrier tasks, as the 'checkBarrierStageWithNumSlots' function gets the incorrect concurrent task limit that is 1 instead of 2. The bug needs to be fixed.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

The existing and newly added unit tests should pass

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45528 from wbo4958/2-gpu.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
wbo4958 authored and tgravescs committed Mar 19, 2024
1 parent a6bffcc commit 90560dc
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,6 @@ private[spark] class ExecutorResourceInfo(

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses

/**
* Calculate how many parts the executor can offer according to the task resource amount
* @param taskAmount how many resource amount the task required
* @return the total parts
*/
def totalParts(taskAmount: Double): Int = {
assert(taskAmount > 0.0)
if (taskAmount >= 1.0) {
addresses.length / taskAmount.ceil.toInt
} else {
addresses.length * Math.floor(1.0 / taskAmount).toInt
}
}
def totalAddressesAmount: Int = this.addresses.length

}
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,11 @@ private[spark] object TaskSchedulerImpl {
numTasksPerExecCores
} else {
val availAddrs = resources.getOrElse(limitingResource, 0)
val resourceLimit = (availAddrs / taskLimit).toInt
val resourceLimit = if (taskLimit >= 1.0) {
availAddrs / taskLimit.ceil.toInt
} else {
availAddrs * Math.floor(1.0 / taskLimit).toInt
}
// when executor cores config isn't set, we can't calculate the real limiting resource
// and number of tasks per executor ahead of time, so calculate it now based on what
// is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executor.resourceProfileId,
executor.totalCores,
executor.resourcesInfo.map { case (name, rInfo) =>
val taskAmount = rp.taskResources.get(name).get.amount
(name, rInfo.totalParts(taskAmount))
(name, rInfo.totalAddressesAmount)
}
)
}.unzip3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ import org.scalatest.concurrent.Eventually
import org.scalatestplus.mockito.MockitoSugar._

import org.apache.spark._
import org.apache.spark.TestUtils.createTempScriptWithExpectedOutput
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, TaskResourceRequests}
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests}
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
Expand Down Expand Up @@ -137,6 +138,47 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
}
}

test("SPARK-47458 compute max number of concurrent tasks with resources limiting") {
withTempDir { dir =>
val discoveryScript = createTempScriptWithExpectedOutput(
dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""")
val conf = new SparkConf()
.set(CPUS_PER_TASK, 1)
.setMaster("local-cluster[1, 20, 1024]")
.setAppName("test")
.set(WORKER_GPU_ID.amountConf, "4")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(EXECUTOR_GPU_ID.amountConf, "4")
.set(TASK_GPU_ID.amountConf, "0.2")
sc = new SparkContext(conf)
eventually(timeout(executorUpTimeout)) {
// Ensure all executors have been launched.
assert(sc.getExecutorIds().length == 1)
}
// The concurrent tasks should be min of {20/1, 4 * (1/0.2)}
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 20)

val gpuTaskAmountToExpectedTasks = Map(
0.3 -> 12, // 4 * (1/0.3).toInt
0.4 -> 8, // 4 * (1/0.4).toInt
0.5 -> 8, // 4 * (1/0.5).toInt
0.8 -> 4, // 4 * (1/0.8).toInt
1.0 -> 4, // 4 / 1
2.0 -> 2, // 4 / 2
3.0 -> 1, // 4 / 3
4.0 -> 1 // 4 / 4
)

// It's the GPU resource that limits the concurrent number
gpuTaskAmountToExpectedTasks.keys.foreach { taskGpu =>
val treqs = new TaskResourceRequests().cpus(1).resource(GPU, taskGpu)
val rp: ResourceProfile = new ResourceProfileBuilder().require(treqs).build()
sc.resourceProfileManager.addResourceProfile(rp)
assert(sc.maxNumConcurrentTasks(rp) == gpuTaskAmountToExpectedTasks(taskGpu))
}
}
}

// Here we just have test for one happy case instead of all cases: other cases are covered in
// FsHistoryProviderSuite.
test("custom log url for Spark UI is applied") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests}
import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE}
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.status.api.v1.ThreadStackTrace
Expand Down Expand Up @@ -2687,4 +2687,58 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
assert(3 === taskDescriptions.length)
}

// 1 executor with 4 GPUS
Seq(true, false).foreach { barrierMode =>
val barrier = if (barrierMode) "in barrier" else ""
Seq(1, 2, 3, 4).foreach { gpuTaskAmount =>
test(s"SPARK-47458 GPU fraction resource should work when " +
s"gpu task amount = ${gpuTaskAmount} $barrier") {

val executorCpus = 10 // cpu will not limit the concurrent tasks number

val taskScheduler = setupScheduler(numCores = executorCpus,
config.CPUS_PER_TASK.key -> "1",
TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
EXECUTOR_GPU_ID.amountConf -> "4",
config.EXECUTOR_CORES.key -> executorCpus.toString)

val taskNum = 4 / gpuTaskAmount
val taskSet = if (barrierMode) {
FakeTask.createBarrierTaskSet(taskNum)
} else {
FakeTask.createTaskSet(10)
}
val resources = new ExecutorResourcesAmounts(
Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))

val workerOffers = IndexedSeq(
WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)
)

taskScheduler.submitTasks(taskSet)
// Launch tasks on executor that satisfies resource requirements.
var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
taskDescriptions = taskDescriptions.sortBy(t => t.index)
assert(taskNum === taskDescriptions.length)
assert(!failedTaskSet)

// The key is gpuTaskAmount
// The values are the gpu addresses of each task.
val gpuTaskAmountToExpected = Map(
1 -> Seq(Array("0"), Array("1"), Array("2"), Array("3")),
2 -> Seq(Array("0", "1"), Array("2", "3")),
3 -> Seq(Array("0", "1", "2")),
4 -> Seq(Array("0", "1", "2", "3"))
)

taskDescriptions.foreach { task =>
val taskResources = task.resources(GPU).keys.toArray.sorted
val expected = gpuTaskAmountToExpected(gpuTaskAmount)(task.index)
assert(taskResources sameElements expected)
}
}
}
}

}

0 comments on commit 90560dc

Please sign in to comment.