From cb12fecb9cc8b6686b08ef1e82de3e62f32b4b73 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Wed, 22 Nov 2017 09:40:30 -0800 Subject: [PATCH] Another round of comments --- .../org/apache/spark/deploy/k8s/Config.scala | 5 ++- .../k8s/SparkKubernetesClientFactory.scala | 4 +- .../cluster/k8s/ExecutorPodFactory.scala | 2 +- .../KubernetesClusterSchedulerBackend.scala | 41 ++++++++++--------- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 7a1963e6ad9f1..a621f3c861cbd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -40,8 +40,9 @@ private[spark] object Config extends Logging { val DOCKER_IMAGE_PULL_POLICY = ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") - .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") + .doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.") .stringConf + .checkValues(Set("Always", "Never", "IfNotPresent")) .createWithDefault("IfNotPresent") val APISERVER_AUTH_DRIVER_CONF_PREFIX = @@ -101,7 +102,7 @@ private[spark] object Config extends Logging { ConfigBuilder("spark.kubernetes.allocation.batch.delay") .doc("Number of seconds to wait between each round of executor allocation.") .longConf - .checkValue(value => value > 0, s"Allocation batch delay should be a positive integer") + .checkValue(value => value > 0, "Allocation batch delay should be a positive integer") .createWithDefault(1) val KUBERNETES_EXECUTOR_LIMIT_CORES = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 9f484bc1d3693..1e3f055e05766 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -92,8 +92,8 @@ private[spark] object SparkKubernetesClientFactory { extends AnyVal { def withOption[T] - (option: Option[T]) - (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = { + (option: Option[T]) + (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = { option.map { opt => configurator(opt, configBuilder) }.getOrElse(configBuilder) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 3914d87758fcf..afa95de0260fa 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -94,7 +94,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) - private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) override def createExecutorPod( executorId: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index ac62216efeeb4..47a8c189cd2eb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -103,24 +103,26 @@ private[spark] class KubernetesClusterSchedulerBackend( val currentTotalRegisteredExecutors = totalRegisteredExecutors.get val currentTotalExpectedExecutors = totalExpectedExecutors.get val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts() - if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) { - logDebug("Waiting for pending executors before scaling") - } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { - logDebug("Maximum allowed executor limit reached. Not scaling up further.") - } else { - for (i <- 0 until math.min( - currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) { - val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString - val executorPod = executorPodFactory.createExecutorPod( - executorId, - applicationId(), - driverUrl, - conf.getExecutorEnv, - driverPod, - currentNodeToLocalTaskCount) - executorsToAllocate(executorId) = executorPod - logInfo( - s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) { + logDebug("Waiting for pending executors before scaling") + } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { + logDebug("Maximum allowed executor limit reached. Not scaling up further.") + } else { + for (i <- 0 until math.min( + currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) { + val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString + val executorPod = executorPodFactory.createExecutorPod( + executorId, + applicationId(), + driverUrl, + conf.getExecutorEnv, + driverPod, + currentNodeToLocalTaskCount) + executorsToAllocate(executorId) = executorPod + logInfo( + s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") + } } } @@ -182,7 +184,8 @@ private[spark] class KubernetesClusterSchedulerBackend( def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { deleteExecutorFromDataStructures(executorId).foreach { pod => - kubernetesClient.pods().delete(pod) } + kubernetesClient.pods().delete(pod) + } } def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {