Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Another round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Nov 22, 2017
1 parent 3b587b4 commit cb12fec
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ private[spark] object Config extends Logging {

val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
.stringConf
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")

val APISERVER_AUTH_DRIVER_CONF_PREFIX =
Expand Down Expand Up @@ -101,7 +102,7 @@ private[spark] object Config extends Logging {
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)

val KUBERNETES_EXECUTOR_LIMIT_CORES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private[spark] object SparkKubernetesClientFactory {
extends AnyVal {

def withOption[T]
(option: Option[T])
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
(option: Option[T])
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
option.map { opt =>
configurator(opt, configBuilder)
}.getOrElse(configBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

override def createExecutorPod(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,24 +103,26 @@ private[spark] class KubernetesClusterSchedulerBackend(
val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
val currentTotalExpectedExecutors = totalExpectedExecutors.get
val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
for (i <- 0 until math.min(
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val executorPod = executorPodFactory.createExecutorPod(
executorId,
applicationId(),
driverUrl,
conf.getExecutorEnv,
driverPod,
currentNodeToLocalTaskCount)
executorsToAllocate(executorId) = executorPod
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
for (i <- 0 until math.min(
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val executorPod = executorPodFactory.createExecutorPod(
executorId,
applicationId(),
driverUrl,
conf.getExecutorEnv,
driverPod,
currentNodeToLocalTaskCount)
executorsToAllocate(executorId) = executorPod
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
}
}
}

Expand Down Expand Up @@ -182,7 +184,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
deleteExecutorFromDataStructures(executorId).foreach { pod =>
kubernetesClient.pods().delete(pod) }
kubernetesClient.pods().delete(pod)
}
}

def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
Expand Down

0 comments on commit cb12fec

Please sign in to comment.