diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index c5bdd1195e6fc..204fdc7fa4afb 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -54,15 +54,7 @@ com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations + * com.fasterxml.jackson.dataformat diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala index 63b28302b1baa..56444c16fd0ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala @@ -34,14 +34,7 @@ private[spark] object ConfigurationUtils { sparkConf: SparkConf, prefix: String, configType: String): Map[String, String] = { - val fromPrefix = sparkConf.getAllWithPrefix(prefix) - fromPrefix.groupBy(_._1).foreach { - case (key, values) => - require(values.size == 1, - s"Cannot have multiple values for a given $configType key, got key $key with" + - s" values $values") - } - fromPrefix.toMap + sparkConf.getAllWithPrefix(prefix).toMap } def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index ab2d118ca72a1..38cda4565f45b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -40,13 +40,13 @@ private[spark] object SparkKubernetesClientFactory { namespace: Option[String], kubernetesAuthConfPrefix: String, sparkConf: SparkConf, - maybeServiceAccountToken: Option[File], - maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + defaultServiceAccountToken: Option[File], + defaultServiceAccountCaCert: Option[File]): KubernetesClient = { val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) .map(new File(_)) - .orElse(maybeServiceAccountToken) + .orElse(defaultServiceAccountToken) val oauthTokenValue = sparkConf.getOption(oauthTokenConf) ConfigurationUtils.requireNandDefined( oauthTokenFile, @@ -56,7 +56,7 @@ private[spark] object SparkKubernetesClientFactory { val caCertFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") - .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) + .orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath)) val clientKeyFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") val clientCertFile = sparkConf diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index 74cd0c45a98cd..c6eff28d3b87b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -16,14 +16,14 @@ */ package org.apache.spark.deploy.k8s -import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigBuilder import org.apache.spark.network.util.ByteUnit -package object config extends Logging { +private[spark] object config extends Logging { - private[spark] val KUBERNETES_NAMESPACE = + val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") .doc("The namespace that will be used for running the driver and executor pods. When using" + " spark-submit in cluster mode, this can also be passed to spark-submit via the" + @@ -31,30 +31,30 @@ package object config extends Logging { .stringConf .createWithDefault("default") - private[spark] val EXECUTOR_DOCKER_IMAGE = + val EXECUTOR_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.executor.docker.image") .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + " format.") .stringConf - .createWithDefault(s"spark-executor:$sparkVersion") + .createWithDefault(s"spark-executor:$SPARK_VERSION") - private[spark] val DOCKER_IMAGE_PULL_POLICY = + val DOCKER_IMAGE_PULL_POLICY = ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") .stringConf .createWithDefault("IfNotPresent") - private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX = + val APISERVER_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" - private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" - private[spark] val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" - private[spark] val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" - private[spark] val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" - private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" - private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" - private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = + val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses" + " this service account when requesting executor pods from the API server. If specific" + @@ -66,7 +66,7 @@ package object config extends Logging { // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. - private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = + val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.executor.memoryOverhead") .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + " is memory that accounts for things like VM overheads, interned strings, other native" + @@ -74,41 +74,41 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional - private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." - private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." - private[spark] val KUBERNETES_DRIVER_POD_NAME = + val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name") .doc("Name of the driver pod.") .stringConf .createOptional - private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = + val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = ConfigBuilder("spark.kubernetes.executor.podNamePrefix") .doc("Prefix to use in front of the executor pod names.") .internal() .stringConf .createWithDefault("spark") - private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE = + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") .intConf .checkValue(value => value > 0, "Allocation batch size should be a positive integer") .createWithDefault(5) - private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY = + val KUBERNETES_ALLOCATION_BATCH_DELAY = ConfigBuilder("spark.kubernetes.allocation.batch.delay") .doc("Number of seconds to wait between each round of executor allocation.") .longConf .checkValue(value => value > 0, s"Allocation batch delay should be a positive integer") .createWithDefault(1) - private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES = + val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores") .doc("Specify the hard cpu limit for a single executor pod") .stringConf .createOptional - private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 87c289e1f8793..764167a68c229 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -27,10 +27,13 @@ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.util.Utils /** - * Configures executor pods. Construct one of these with a SparkConf to set up properties that are - * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + * A factory class for configuring and creating executor pods. */ private[spark] trait ExecutorPodFactory { + + /** + * Configure and construct an executor pod with the given parameters. + */ def createExecutorPod( executorId: String, applicationId: String, @@ -161,12 +164,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) val requiredPorts = Seq( (EXECUTOR_PORT_NAME, executorPort), (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) - .map(port => { + .map { case (name, port) => new ContainerPortBuilder() - .withName(port._1) - .withContainerPort(port._2) + .withName(name) + .withContainerPort(port) .build() - }) + } val executorContainer = new ContainerBuilder() .withName(s"executor") @@ -202,16 +205,15 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) .endSpec() .build() - val containerWithExecutorLimitCores = executorLimitCores.map { - limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() - new ContainerBuilder(executorContainer) - .editResources() - .addToLimits("cpu", executorCpuLimitQuantity) - .endResources() - .build() + val containerWithExecutorLimitCores = executorLimitCores.map { limitCores => + val executorCpuLimitQuantity = new QuantityBuilder(false) + .withAmount(limitCores) + .build() + new ContainerBuilder(executorContainer) + .editResources() + .addToLimits("cpu", executorCpuLimitQuantity) + .endResources() + .build() }.getOrElse(executorContainer) new PodBuilder(executorPod) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 3ceb15fc1aab7..ccdf1e1e9f4a6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -69,7 +69,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withName(kubernetesDriverPodName) .get() - override val minRegisteredRatio = + protected override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 } else { @@ -77,7 +77,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private val executorWatchResource = new AtomicReference[Closeable] - protected val totalExpectedExecutors = new AtomicInteger(0) + private val totalExpectedExecutors = new AtomicInteger(0) private val driverUrl = RpcEndpointAddress( conf.get("spark.driver.host"), @@ -98,10 +98,11 @@ private[spark] class KubernetesClusterSchedulerBackend( override def run(): Unit = { handleDisconnectedExecutors() + val executorsToAllocate = mutable.Map[String, Pod]() val currentTotalRegisteredExecutors = totalRegisteredExecutors.get val currentTotalExpectedExecutors = totalExpectedExecutors.get - val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts + val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts() if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) { logDebug("Waiting for pending executors before scaling") } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) { @@ -117,25 +118,18 @@ private[spark] class KubernetesClusterSchedulerBackend( conf.getExecutorEnv, driverPod, currentNodeToLocalTaskCount) - require(executorPod.getMetadata.getLabels.containsKey(SPARK_EXECUTOR_ID_LABEL), - s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" + - s" executor pods must contain the label $SPARK_EXECUTOR_ID_LABEL.") - val resolvedExecutorIdLabel = executorPod.getMetadata.getLabels.get( - SPARK_EXECUTOR_ID_LABEL) - require(resolvedExecutorIdLabel == executorId, - s"Illegal internal state for pod with name ${executorPod.getMetadata.getName} - all" + - s" executor pods must map the label with key ${SPARK_EXECUTOR_ID_LABEL} to the" + - s" executor's ID. This label mapped instead to: $resolvedExecutorIdLabel.") executorsToAllocate(executorId) = executorPod logInfo( s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") } } + val allocatedExecutors = executorsToAllocate.mapValues { pod => Utils.tryLog { kubernetesClient.pods().create(pod) } } + RUNNING_EXECUTOR_PODS_LOCK.synchronized { allocatedExecutors.map { case (executorId, attemptedAllocatedExecutor) => @@ -184,8 +178,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = { - deleteExecutorFromDataStructures(executorId) - .foreach(pod => kubernetesClient.pods().delete(pod)) + deleteExecutorFromDataStructures(executorId).foreach { pod => + kubernetesClient.pods().delete(pod) } } def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = { @@ -253,10 +247,10 @@ private[spark] class KubernetesClusterSchedulerBackend( * locality if an executor launches on the cluster node. */ private def getNodesWithLocalTaskCounts() : Map[String, Int] = { - val nodeToLocalTaskCount = mutable.Map[String, Int]() ++ - synchronized { - hostToLocalTaskCount - } + val nodeToLocalTaskCount = synchronized { + mutable.Map[String, Int]() ++ hostToLocalTaskCount + } + for (pod <- executorPodsByIPs.values().asScala) { // Remove cluster nodes that are running our executors already. // TODO: This prefers spreading out executors across nodes. In case users want @@ -276,19 +270,20 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { - val podsToDelete = mutable.Buffer[Pod]() - RUNNING_EXECUTOR_PODS_LOCK.synchronized { - for (executor <- executorIds) { - val maybeRemovedExecutor = runningExecutorsToPods.remove(executor) - maybeRemovedExecutor.foreach { executorPod => - disconnectedPodsByExecutorIdPendingRemoval.put(executor, executorPod) - podsToDelete += executorPod - } - if (maybeRemovedExecutor.isEmpty) { - logWarning(s"Unable to remove pod for unknown executor $executor") + val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + executorIds.flatMap { executorId => + runningExecutorsToPods.remove(executorId) match { + case Some(pod) => + disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + Some(pod) + + case None => + logWarning(s"Unable to remove pod for unknown executor $executorId") + None } } } + kubernetesClient.pods().delete(podsToDelete: _*) true } @@ -298,27 +293,30 @@ private[spark] class KubernetesClusterSchedulerBackend( private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 override def eventReceived(action: Action, pod: Pod): Unit = { + val podName = pod.getMetadata.getName + val podIP = pod.getStatus.getPodIP + action match { case Action.MODIFIED if (pod.getStatus.getPhase == "Running" - && pod.getMetadata.getDeletionTimestamp == null) => - val podIP = pod.getStatus.getPodIP + && pod.getMetadata.getDeletionTimestamp == null) => val clusterNodeName = pod.getSpec.getNodeName - logInfo(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") + logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.") executorPodsByIPs.put(podIP, pod) + case Action.DELETED | Action.ERROR => val executorId = getExecutorId(pod) - val podName = pod.getMetadata.getName - val podIP = pod.getStatus.getPodIP logDebug(s"Executor pod $podName at IP $podIP was at $action.") if (podIP != null) { executorPodsByIPs.remove(podIP) } val executorExitReason = if (action == Action.ERROR) { - logWarning(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) + logWarning(s"Received error event of executor pod $podName. Reason: " + + pod.getStatus.getReason) executorExitReasonOnError(pod) } else if (action == Action.DELETED) { - logWarning(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) + logWarning(s"Received delete event of executor pod $podName. Reason: " + + pod.getStatus.getReason) executorExitReasonOnDelete(pod) } else { throw new IllegalStateException( @@ -332,6 +330,8 @@ private[spark] class KubernetesClusterSchedulerBackend( s" have failed to start in the first place and never registered with the driver.") } disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod) + + case _ => logDebug(s"Received event of executor pod $podName: " + action) } } @@ -339,7 +339,7 @@ private[spark] class KubernetesClusterSchedulerBackend( logDebug("Executor pod watch closed.", cause) } - def getExecutorExitStatus(pod: Pod): Int = { + private def getExecutorExitStatus(pod: Pod): Int = { val containerStatuses = pod.getStatus.getContainerStatuses if (!containerStatuses.isEmpty) { // we assume the first container represents the pod status. This assumption may not hold @@ -349,21 +349,22 @@ private[spark] class KubernetesClusterSchedulerBackend( } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS } - def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { - Option(containerStatus.getState).map(containerState => - Option(containerState.getTerminated).map(containerStateTerminated => - containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE) - ).getOrElse(UNKNOWN_EXIT_CODE) + private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { + Option(containerStatus.getState).map { containerState => + Option(containerState.getTerminated).map {containerStateTerminated => + containerStateTerminated.getExitCode.intValue() + }.getOrElse(UNKNOWN_EXIT_CODE) + }.getOrElse(UNKNOWN_EXIT_CODE) } - def isPodAlreadyReleased(pod: Pod): Boolean = { + private def isPodAlreadyReleased(pod: Pod): Boolean = { val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) RUNNING_EXECUTOR_PODS_LOCK.synchronized { !runningExecutorsToPods.contains(executorId) } } - def executorExitReasonOnError(pod: Pod): ExecutorExited = { + private def executorExitReasonOnError(pod: Pod): ExecutorExited = { val containerExitStatus = getExecutorExitStatus(pod) // container was probably actively killed by the driver. if (isPodAlreadyReleased(pod)) { @@ -377,17 +378,16 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - def executorExitReasonOnDelete(pod: Pod): ExecutorExited = { + private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = { val exitMessage = if (isPodAlreadyReleased(pod)) { s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request." } else { s"Pod ${pod.getMetadata.getName} deleted or lost." } - ExecutorExited( - getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage) + ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage) } - def getExecutorId(pod: Pod): String = { + private def getExecutorId(pod: Pod): String = { val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) require(executorId != null, "Unexpected pod metadata; expected all executor pods " + s"to have label $SPARK_EXECUTOR_ID_LABEL.")