spark.driver.memoryOverhead
spark.executor.memory
2g
, 8g
).
spark.executor.memoryOverhead
spark.extraListeners
spark.dynamicAllocation.enabled
, the initial set of executors will be at least this large.
spark.yarn.executor.memoryOverhead
spark.yarn.driver.memoryOverhead
spark.yarn.am.memoryOverhead
spark.yarn.driver.memoryOverhead
, but for the YARN Application Master in client mode.
+ Same as spark.driver.memoryOverhead
, but for the YARN Application Master in client mode.
* Options not listed here nor in the "switch" list below will result in a call to - * {@link $#handleUnknown(String)}. + * {@link #handleUnknown(String)}. *
* These two arrays are visible for tests.
*/
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index f0742b91987b6..f35fb38798218 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s
+import java.util.concurrent.TimeUnit
+
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
@@ -24,12 +26,16 @@ private[spark] object Config extends Logging {
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
- .doc("The namespace that will be used for running the driver and executor pods. When using " +
- "spark-submit in cluster mode, this can also be passed to spark-submit via the " +
- "--kubernetes-namespace command line argument.")
+ .doc("The namespace that will be used for running the driver and executor pods.")
.stringConf
.createWithDefault("default")
+ val DRIVER_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.driver.docker.image")
+ .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.")
+ .stringConf
+ .createOptional
+
val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
@@ -44,9 +50,9 @@ private[spark] object Config extends Logging {
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")
- val APISERVER_AUTH_DRIVER_CONF_PREFIX =
+ val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
- val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+ val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
@@ -55,7 +61,7 @@ private[spark] object Config extends Logging {
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
val KUBERNETES_SERVICE_ACCOUNT_NAME =
- ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+ ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
@@ -63,19 +69,17 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
- // Note that while we set a default for this when we start up the
- // scheduler, the specific default value is dynamically determined
- // based on the executor memory.
- val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
- ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
- .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
- "is memory that accounts for things like VM overheads, interned strings, other native " +
- "overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
- .bytesConf(ByteUnit.MiB)
+ val KUBERNETES_DRIVER_LIMIT_CORES =
+ ConfigBuilder("spark.kubernetes.driver.limit.cores")
+ .doc("Specify the hard cpu limit for the driver pod")
+ .stringConf
.createOptional
- val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
- val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
+ val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ ConfigBuilder("spark.kubernetes.executor.limit.cores")
+ .doc("Specify the hard cpu limit for each executor pod")
+ .stringConf
+ .createOptional
val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
@@ -104,12 +108,6 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)
- val KUBERNETES_EXECUTOR_LIMIT_CORES =
- ConfigBuilder("spark.kubernetes.executor.limit.cores")
- .doc("Specify the hard cpu limit for a single executor pod")
- .stringConf
- .createOptional
-
val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
@@ -119,5 +117,46 @@ private[spark] object Config extends Logging {
"must be a positive integer")
.createWithDefault(10)
+ val WAIT_FOR_APP_COMPLETION =
+ ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+ .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
+ "launcher process.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val REPORT_INTERVAL =
+ ConfigBuilder("spark.kubernetes.report.interval")
+ .doc("Interval between reports of the current app status in cluster mode.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
+ .createWithDefaultString("1s")
+
+ val JARS_DOWNLOAD_LOCATION =
+ ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+ .doc("Location to download jars to in the driver and executors. When using" +
+ " spark-submit, this directory must be empty and will be mounted as an empty directory" +
+ " volume on the driver and executor pod.")
+ .stringConf
+ .createWithDefault("/var/spark-data/spark-jars")
+
+ val FILES_DOWNLOAD_LOCATION =
+ ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+ .doc("Location to download files to in the driver and executors. When using" +
+ " spark-submit, this directory must be empty and will be mounted as an empty directory" +
+ " volume on the driver and executor pods.")
+ .stringConf
+ .createWithDefault("/var/spark-data/spark-files")
+
+ val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
+ "spark.kubernetes.authenticate.submission"
+
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+
+ val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
+ val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
+
+ val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
+ val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
+
+ val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 4ddeefb15a89d..0b91145405d3a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -25,9 +25,30 @@ private[spark] object Constants {
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"
+ // Annotations
+ val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
+
+ // Credentials secrets
+ val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
+ "/mnt/secrets/spark-kubernetes-credentials"
+ val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
+ val DRIVER_CREDENTIALS_CA_CERT_PATH =
+ s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
+ val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
+ val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
+ s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
+ val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
+ val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
+ s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
+ val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
+ val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
+ s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
+ val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
+
// Default and fixed ports
val DEFAULT_DRIVER_PORT = 7078
val DEFAULT_BLOCKMANAGER_PORT = 7079
+ val DRIVER_PORT_NAME = "driver-rpc-port"
val BLOCK_MANAGER_PORT_NAME = "blockmanager"
val EXECUTOR_PORT_NAME = "executor"
@@ -42,9 +63,16 @@ private[spark] object Constants {
val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
+ val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
+ val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
+ val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
+ val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
+ val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
+ val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
+ val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
new file mode 100644
index 0000000000000..c563fc5bfbadf
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.UUID
+
+import com.google.common.primitives.Longs
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.steps._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.SystemClock
+
+/**
+ * Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
+ */
+private[spark] class DriverConfigurationStepsOrchestrator(
+ namespace: String,
+ kubernetesAppId: String,
+ launchTime: Long,
+ mainAppResource: Option[MainAppResource],
+ appName: String,
+ mainClass: String,
+ appArgs: Array[String],
+ submissionSparkConf: SparkConf) {
+
+ // The resource name prefix is derived from the Spark application name, making it easy to connect
+ // the names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the
+ // application the user submitted.
+ private val kubernetesResourceNamePrefix = {
+ val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)).toString.replaceAll("-", "")
+ s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
+ }
+
+ private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+ private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
+ private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
+
+ def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
+ val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ submissionSparkConf,
+ KUBERNETES_DRIVER_LABEL_PREFIX)
+ require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
+ s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+ "operations.")
+ require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
+ s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
+ "operations.")
+
+ val allDriverLabels = driverCustomLabels ++ Map(
+ SPARK_APP_ID_LABEL -> kubernetesAppId,
+ SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+
+ val initialSubmissionStep = new BaseDriverConfigurationStep(
+ kubernetesAppId,
+ kubernetesResourceNamePrefix,
+ allDriverLabels,
+ dockerImagePullPolicy,
+ appName,
+ mainClass,
+ appArgs,
+ submissionSparkConf)
+
+ val driverAddressStep = new DriverServiceBootstrapStep(
+ kubernetesResourceNamePrefix,
+ allDriverLabels,
+ submissionSparkConf,
+ new SystemClock)
+
+ val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+ submissionSparkConf, kubernetesResourceNamePrefix)
+
+ val additionalMainAppJar = if (mainAppResource.nonEmpty) {
+ val mayBeResource = mainAppResource.get match {
+ case JavaMainAppResource(resource) if resource != SparkLauncher.NO_RESOURCE =>
+ Some(resource)
+ case _ => None
+ }
+ mayBeResource
+ } else {
+ None
+ }
+
+ val sparkJars = submissionSparkConf.getOption("spark.jars")
+ .map(_.split(","))
+ .getOrElse(Array.empty[String]) ++
+ additionalMainAppJar.toSeq
+ val sparkFiles = submissionSparkConf.getOption("spark.files")
+ .map(_.split(","))
+ .getOrElse(Array.empty[String])
+
+ val maybeDependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
+ Some(new DependencyResolutionStep(
+ sparkJars,
+ sparkFiles,
+ jarsDownloadPath,
+ filesDownloadPath))
+ } else {
+ None
+ }
+
+ Seq(
+ initialSubmissionStep,
+ driverAddressStep,
+ kubernetesCredentialsStep) ++
+ maybeDependencyResolutionStep.toSeq
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
new file mode 100644
index 0000000000000..4d17608c602d8
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+ def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+ var mainAppResource: Option[MainAppResource] = None
+ var mainClass: Option[String] = None
+ val driverArgs = mutable.ArrayBuffer.empty[String]
+
+ args.sliding(2, 2).toList.foreach {
+ case Array("--primary-java-resource", primaryJavaResource: String) =>
+ mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+ case Array("--main-class", clazz: String) =>
+ mainClass = Some(clazz)
+ case Array("--arg", arg: String) =>
+ driverArgs += arg
+ case other =>
+ val invalid = other.mkString(" ")
+ throw new RuntimeException(s"Unknown arguments: $invalid")
+ }
+
+ require(mainClass.isDefined, "Main class must be specified via --main-class")
+
+ ClientArguments(
+ mainAppResource,
+ mainClass.get,
+ driverArgs.toArray)
+ }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a
+ * watcher that monitors and logs the application status. Waits for the application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the application status
+ */
+private[spark] class Client(
+ submissionSteps: Seq[DriverConfigurationStep],
+ submissionSparkConf: SparkConf,
+ kubernetesClient: KubernetesClient,
+ waitForAppCompletion: Boolean,
+ appName: String,
+ loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+ private val driverJavaOptions = submissionSparkConf.get(
+ org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+ /**
+ * Run command that initializes a DriverSpec that will be updated after each
+ * DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec
+ * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources
+ */
+ def run(): Unit = {
+ var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf)
+ // submissionSteps contain steps necessary to take, to resolve varying
+ // client arguments that are passed in, created by orchestrator
+ for (nextStep <- submissionSteps) {
+ currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+ }
+
+ val resolvedDriverJavaOpts = currentDriverSpec
+ .driverSparkConf
+ // Remove this as the options are instead extracted and set individually below using
+ // environment variables with prefix SPARK_JAVA_OPT_.
+ .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+ .getAll
+ .map {
+ case (confKey, confValue) => s"-D$confKey=$confValue"
+ } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+ val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map {
+ case (option, index) =>
+ new EnvVarBuilder()
+ .withName(s"$ENV_JAVA_OPT_PREFIX$index")
+ .withValue(option)
+ .build()
+ }
+
+ val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
+ .addAllToEnv(driverJavaOptsEnvs.asJava)
+ .build()
+ val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
+ .editSpec()
+ .addToContainers(resolvedDriverContainer)
+ .endSpec()
+ .build()
+
+ Utils.tryWithResource(
+ kubernetesClient
+ .pods()
+ .withName(resolvedDriverPod.getMetadata.getName)
+ .watch(loggingPodStatusWatcher)) { _ =>
+ val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+ try {
+ if (currentDriverSpec.otherKubernetesResources.nonEmpty) {
+ val otherKubernetesResources = currentDriverSpec.otherKubernetesResources
+ addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
+ kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
+ }
+ } catch {
+ case NonFatal(e) =>
+ kubernetesClient.pods().delete(createdDriverPod)
+ throw e
+ }
+
+ if (waitForAppCompletion) {
+ logInfo(s"Waiting for application $appName to finish...")
+ loggingPodStatusWatcher.awaitCompletion()
+ logInfo(s"Application $appName finished.")
+ } else {
+ logInfo(s"Deployed Spark application $appName into Kubernetes.")
+ }
+ }
+ }
+
+ // Add a OwnerReference to the given resources making the driver pod an owner of them so when
+ // the driver pod is deleted, the resources are garbage collected.
+ private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
+ val driverPodOwnerReference = new OwnerReferenceBuilder()
+ .withName(driverPod.getMetadata.getName)
+ .withApiVersion(driverPod.getApiVersion)
+ .withUid(driverPod.getMetadata.getUid)
+ .withKind(driverPod.getKind)
+ .withController(true)
+ .build()
+ resources.foreach { resource =>
+ val originalMetadata = resource.getMetadata
+ originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
+ }
+ }
+}
+
+/**
+ * Main class and entry point of application submission in KUBERNETES mode.
+ */
+private[spark] class KubernetesClientApplication extends SparkApplication {
+
+ override def start(args: Array[String], conf: SparkConf): Unit = {
+ val parsedArguments = ClientArguments.fromCommandLineArgs(args)
+ run(parsedArguments, conf)
+ }
+
+ private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
+ val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
+ // For constructing the app ID, we can't use the Spark application name, as the app ID is going
+ // to be added as a label to group resources belonging to the same application. Label values are
+ // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
+ // a unique app ID (captured by spark.app.id) in the format below.
+ val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
+ val launchTime = System.currentTimeMillis()
+ val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
+ val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
+ // The master URL has been checked for validity already in SparkSubmit.
+ // We just need to get rid of the "k8s:" prefix here.
+ val master = sparkConf.get("spark.master").substring("k8s:".length)
+ val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
+
+ val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
+ kubernetesAppId, loggingInterval)
+
+ val configurationStepsOrchestrator = new DriverConfigurationStepsOrchestrator(
+ namespace,
+ kubernetesAppId,
+ launchTime,
+ clientArguments.mainAppResource,
+ appName,
+ clientArguments.mainClass,
+ clientArguments.driverArgs,
+ sparkConf)
+
+ Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
+ master,
+ Some(namespace),
+ KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
+ sparkConf,
+ None,
+ None)) { kubernetesClient =>
+ val client = new Client(
+ configurationStepsOrchestrator.getAllConfigurationSteps(),
+ sparkConf,
+ kubernetesClient,
+ waitForAppCompletion,
+ appName,
+ loggingPodStatusWatcher)
+ client.run()
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
new file mode 100644
index 0000000000000..db13f09387ef9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverSpec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder}
+
+import org.apache.spark.SparkConf
+
+/**
+ * Represents the components and characteristics of a Spark driver. The driver can be considered
+ * as being comprised of the driver pod itself, any other Kubernetes resources that the driver
+ * pod depends on, and the SparkConf that should be supplied to the Spark application. The driver
+ * container should be operated on via the specific field of this case class as opposed to trying
+ * to edit the container directly on the pod. The driver container should be attached at the
+ * end of executing all submission steps.
+ */
+private[spark] case class KubernetesDriverSpec(
+ driverPod: Pod,
+ driverContainer: Container,
+ otherKubernetesResources: Seq[HasMetadata],
+ driverSparkConf: SparkConf)
+
+private[spark] object KubernetesDriverSpec {
+ def initialSpec(initialSparkConf: SparkConf): KubernetesDriverSpec = {
+ KubernetesDriverSpec(
+ // Set new metadata and a new spec so that submission steps can use
+ // PodBuilder#editMetadata() and/or PodBuilder#editSpec() safely.
+ new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(),
+ new ContainerBuilder().build(),
+ Seq.empty[HasMetadata],
+ initialSparkConf.clone())
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
new file mode 100644
index 0000000000000..a38cf55fc3d58
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesFileUtils {
+
+ /**
+ * For the given collection of file URIs, resolves them as follows:
+ * - File URIs with scheme file:// are resolved to the given download path.
+ * - File URIs with scheme local:// resolve to just the path of the URI.
+ * - Otherwise, the URIs are returned as-is.
+ */
+ def resolveFileUris(
+ fileUris: Iterable[String],
+ fileDownloadPath: String): Iterable[String] = {
+ fileUris.map { uri =>
+ resolveFileUri(uri, fileDownloadPath, false)
+ }
+ }
+
+ /**
+ * If any file uri has any scheme other than local:// it is mapped as if the file
+ * was downloaded to the file download path. Otherwise, it is mapped to the path
+ * part of the URI.
+ */
+ def resolveFilePaths(fileUris: Iterable[String], fileDownloadPath: String): Iterable[String] = {
+ fileUris.map { uri =>
+ resolveFileUri(uri, fileDownloadPath, true)
+ }
+ }
+
+ private def resolveFileUri(
+ uri: String,
+ fileDownloadPath: String,
+ assumesDownloaded: Boolean): String = {
+ val fileUri = Utils.resolveURI(uri)
+ val fileScheme = Option(fileUri.getScheme).getOrElse("file")
+ fileScheme match {
+ case "local" =>
+ fileUri.getPath
+ case _ =>
+ if (assumesDownloaded || fileScheme == "file") {
+ val fileName = new File(fileUri.getPath).getName
+ s"$fileDownloadPath/$fileName"
+ } else {
+ uri
+ }
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
new file mode 100644
index 0000000000000..173ac541626a7
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+ def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+ appId: String,
+ maybeLoggingInterval: Option[Long])
+ extends LoggingPodStatusWatcher with Logging {
+
+ private val podCompletedFuture = new CountDownLatch(1)
+ // start timer for periodic logging
+ private val scheduler =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+ private val logRunnable: Runnable = new Runnable {
+ override def run() = logShortStatus()
+ }
+
+ private var pod = Option.empty[Pod]
+
+ private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+ def start(): Unit = {
+ maybeLoggingInterval.foreach { interval =>
+ scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ override def eventReceived(action: Action, pod: Pod): Unit = {
+ this.pod = Option(pod)
+ action match {
+ case Action.DELETED | Action.ERROR =>
+ closeWatch()
+
+ case _ =>
+ logLongStatus()
+ if (hasCompleted()) {
+ closeWatch()
+ }
+ }
+ }
+
+ override def onClose(e: KubernetesClientException): Unit = {
+ logDebug(s"Stopping watching application $appId with last-observed phase $phase")
+ closeWatch()
+ }
+
+ private def logShortStatus() = {
+ logInfo(s"Application status for $appId (phase: $phase)")
+ }
+
+ private def logLongStatus() = {
+ logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown"))
+ }
+
+ private def hasCompleted(): Boolean = {
+ phase == "Succeeded" || phase == "Failed"
+ }
+
+ private def closeWatch(): Unit = {
+ podCompletedFuture.countDown()
+ scheduler.shutdown()
+ }
+
+ private def formatPodState(pod: Pod): String = {
+ val details = Seq[(String, String)](
+ // pod metadata
+ ("pod name", pod.getMetadata.getName),
+ ("namespace", pod.getMetadata.getNamespace),
+ ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
+ ("pod uid", pod.getMetadata.getUid),
+ ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
+
+ // spec details
+ ("service account name", pod.getSpec.getServiceAccountName),
+ ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
+ ("node name", pod.getSpec.getNodeName),
+
+ // status
+ ("start time", formatTime(pod.getStatus.getStartTime)),
+ ("container images",
+ pod.getStatus.getContainerStatuses
+ .asScala
+ .map(_.getImage)
+ .mkString(", ")),
+ ("phase", pod.getStatus.getPhase),
+ ("status", pod.getStatus.getContainerStatuses.toString)
+ )
+
+ formatPairsBundle(details)
+ }
+
+ private def formatPairsBundle(pairs: Seq[(String, String)]) = {
+ // Use more loggable format if value is null or empty
+ pairs.map {
+ case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
+ }.mkString("")
+ }
+
+ override def awaitCompletion(): Unit = {
+ podCompletedFuture.await()
+ logInfo(pod.map { p =>
+ s"Container final statuses:\n\n${containersDescription(p)}"
+ }.getOrElse("No containers were found in the driver pod."))
+ }
+
+ private def containersDescription(p: Pod): String = {
+ p.getStatus.getContainerStatuses.asScala.map { status =>
+ Seq(
+ ("Container name", status.getName),
+ ("Container image", status.getImage)) ++
+ containerStatusDescription(status)
+ }.map(formatPairsBundle).mkString("\n\n")
+ }
+
+ private def containerStatusDescription(
+ containerStatus: ContainerStatus): Seq[(String, String)] = {
+ val state = containerStatus.getState
+ Option(state.getRunning)
+ .orElse(Option(state.getTerminated))
+ .orElse(Option(state.getWaiting))
+ .map {
+ case running: ContainerStateRunning =>
+ Seq(
+ ("Container state", "Running"),
+ ("Container started at", formatTime(running.getStartedAt)))
+ case waiting: ContainerStateWaiting =>
+ Seq(
+ ("Container state", "Waiting"),
+ ("Pending reason", waiting.getReason))
+ case terminated: ContainerStateTerminated =>
+ Seq(
+ ("Container state", "Terminated"),
+ ("Exit code", terminated.getExitCode.toString))
+ case unknown =>
+ throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
+ }.getOrElse(Seq(("Container state", "N/A")))
+ }
+
+ private def formatTime(time: Time): String = {
+ if (time != null) time.getTime else "N/A"
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
new file mode 100644
index 0000000000000..cca9f4627a1f6
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+private[spark] sealed trait MainAppResource
+
+private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
new file mode 100644
index 0000000000000..ba2a11b9e6689
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.config.{DRIVER_CLASS_PATH, DRIVER_MEMORY, DRIVER_MEMORY_OVERHEAD}
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+ kubernetesAppId: String,
+ kubernetesResourceNamePrefix: String,
+ driverLabels: Map[String, String],
+ dockerImagePullPolicy: String,
+ appName: String,
+ mainClass: String,
+ appArgs: Array[String],
+ submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+ private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+ .getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+ private val driverExtraClasspath = submissionSparkConf.get(
+ DRIVER_CLASS_PATH)
+
+ private val driverDockerImage = submissionSparkConf
+ .get(DRIVER_DOCKER_IMAGE)
+ .getOrElse(throw new SparkException("Must specify the driver Docker image"))
+
+ // CPU settings
+ private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+ private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+ // Memory settings
+ private val driverMemoryMiB = submissionSparkConf.get(
+ DRIVER_MEMORY)
+ private val driverMemoryString = submissionSparkConf.get(
+ DRIVER_MEMORY.key,
+ DRIVER_MEMORY.defaultValueString)
+ private val memoryOverheadMiB = submissionSparkConf
+ .get(DRIVER_MEMORY_OVERHEAD)
+ .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+ MEMORY_OVERHEAD_MIN_MIB))
+ private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
+
+ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+ new EnvVarBuilder()
+ .withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+ .withValue(classPath)
+ .build()
+ }
+
+ val driverCustomAnnotations = ConfigurationUtils
+ .parsePrefixedKeyValuePairs(
+ submissionSparkConf,
+ KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+ require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+ s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
+ " Spark bookkeeping operations.")
+
+ val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+ .map { env =>
+ new EnvVarBuilder()
+ .withName(env._1)
+ .withValue(env._2)
+ .build()
+ }
+
+ val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
+
+ val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+ val driverCpuQuantity = new QuantityBuilder(false)
+ .withAmount(driverCpuCores)
+ .build()
+ val driverMemoryQuantity = new QuantityBuilder(false)
+ .withAmount(s"${driverMemoryMiB}Mi")
+ .build()
+ val driverMemoryLimitQuantity = new QuantityBuilder(false)
+ .withAmount(s"${driverContainerMemoryWithOverheadMiB}Mi")
+ .build()
+ val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
+ ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
+ }
+
+ val driverContainer = new ContainerBuilder(driverSpec.driverContainer)
+ .withName(DRIVER_CONTAINER_NAME)
+ .withImage(driverDockerImage)
+ .withImagePullPolicy(dockerImagePullPolicy)
+ .addAllToEnv(driverCustomEnvs.asJava)
+ .addToEnv(driverExtraClasspathEnv.toSeq: _*)
+ .addNewEnv()
+ .withName(ENV_DRIVER_MEMORY)
+ .withValue(driverMemoryString)
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_DRIVER_MAIN_CLASS)
+ .withValue(mainClass)
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_DRIVER_ARGS)
+ .withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_DRIVER_BIND_ADDRESS)
+ .withValueFrom(new EnvVarSourceBuilder()
+ .withNewFieldRef("v1", "status.podIP")
+ .build())
+ .endEnv()
+ .withNewResources()
+ .addToRequests("cpu", driverCpuQuantity)
+ .addToRequests("memory", driverMemoryQuantity)
+ .addToLimits("memory", driverMemoryLimitQuantity)
+ .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
+ .endResources()
+ .build()
+
+ val baseDriverPod = new PodBuilder(driverSpec.driverPod)
+ .editOrNewMetadata()
+ .withName(kubernetesDriverPodName)
+ .addToLabels(driverLabels.asJava)
+ .addToAnnotations(allDriverAnnotations.asJava)
+ .endMetadata()
+ .withNewSpec()
+ .withRestartPolicy("Never")
+ .withNodeSelector(nodeSelector.asJava)
+ .endSpec()
+ .build()
+
+ val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+ .setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
+ .set("spark.app.id", kubernetesAppId)
+ .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
+
+ driverSpec.copy(
+ driverPod = baseDriverPod,
+ driverSparkConf = resolvedSparkConf,
+ driverContainer = driverContainer)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
new file mode 100644
index 0000000000000..44e0ecffc0e93
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model.ContainerBuilder
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, KubernetesFileUtils}
+
+/**
+ * Step that configures the classpath, spark.jars, and spark.files for the driver given that the
+ * user may provide remote files or files with local:// schemes.
+ */
+private[spark] class DependencyResolutionStep(
+ sparkJars: Seq[String],
+ sparkFiles: Seq[String],
+ jarsDownloadPath: String,
+ localFilesDownloadPath: String) extends DriverConfigurationStep {
+
+ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val resolvedSparkJars = KubernetesFileUtils.resolveFileUris(sparkJars, jarsDownloadPath)
+ val resolvedSparkFiles = KubernetesFileUtils.resolveFileUris(
+ sparkFiles, localFilesDownloadPath)
+ val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
+ if (resolvedSparkJars.nonEmpty) {
+ sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))
+ }
+ if (resolvedSparkFiles.nonEmpty) {
+ sparkConfResolvedSparkDependencies.set("spark.files", resolvedSparkFiles.mkString(","))
+ }
+ val resolvedClasspath = KubernetesFileUtils.resolveFilePaths(sparkJars, jarsDownloadPath)
+ val driverContainerWithResolvedClasspath = if (resolvedClasspath.nonEmpty) {
+ new ContainerBuilder(driverSpec.driverContainer)
+ .addNewEnv()
+ .withName(ENV_MOUNTED_CLASSPATH)
+ .withValue(resolvedClasspath.mkString(File.pathSeparator))
+ .endEnv()
+ .build()
+ } else {
+ driverSpec.driverContainer
+ }
+ driverSpec.copy(
+ driverContainer = driverContainerWithResolvedClasspath,
+ driverSparkConf = sparkConfResolvedSparkDependencies)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
new file mode 100644
index 0000000000000..c99c0436cf25f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents a step in preparing the Kubernetes driver.
+ */
+private[spark] trait DriverConfigurationStep {
+
+ /**
+ * Apply some transformation to the previous state of the driver to add a new feature to it.
+ */
+ def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
new file mode 100644
index 0000000000000..ccc18908658f1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+ submissionSparkConf: SparkConf,
+ kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+ private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+ private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+ private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+ private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+ private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+ val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+ val oauthTokenBase64 = submissionSparkConf
+ .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+ .map { token =>
+ BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+ }
+ val caCertDataBase64 = safeFileConfToBase64(
+ s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+ "Driver CA cert file")
+ val clientKeyDataBase64 = safeFileConfToBase64(
+ s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+ "Driver client key file")
+ val clientCertDataBase64 = safeFileConfToBase64(
+ s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+ "Driver client cert file")
+
+ val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations(
+ driverSparkConf,
+ oauthTokenBase64,
+ caCertDataBase64,
+ clientKeyDataBase64,
+ clientCertDataBase64)
+
+ val kubernetesCredentialsSecret = createCredentialsSecret(
+ oauthTokenBase64,
+ caCertDataBase64,
+ clientKeyDataBase64,
+ clientCertDataBase64)
+
+ val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret =>
+ new PodBuilder(driverSpec.driverPod)
+ .editOrNewSpec()
+ .addNewVolume()
+ .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+ .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+ .endVolume()
+ .endSpec()
+ .build()
+ }.getOrElse(
+ driverServiceAccount.map { account =>
+ new PodBuilder(driverSpec.driverPod)
+ .editOrNewSpec()
+ .withServiceAccount(account)
+ .withServiceAccountName(account)
+ .endSpec()
+ .build()
+ }.getOrElse(driverSpec.driverPod)
+ )
+
+ val driverContainerWithMountedSecretVolume = kubernetesCredentialsSecret.map { secret =>
+ new ContainerBuilder(driverSpec.driverContainer)
+ .addNewVolumeMount()
+ .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+ .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
+ .endVolumeMount()
+ .build()
+ }.getOrElse(driverSpec.driverContainer)
+
+ driverSpec.copy(
+ driverPod = driverPodWithMountedKubernetesCredentials,
+ otherKubernetesResources =
+ driverSpec.otherKubernetesResources ++ kubernetesCredentialsSecret.toSeq,
+ driverSparkConf = driverSparkConfWithCredentialsLocations,
+ driverContainer = driverContainerWithMountedSecretVolume)
+ }
+
+ private def createCredentialsSecret(
+ driverOAuthTokenBase64: Option[String],
+ driverCaCertDataBase64: Option[String],
+ driverClientKeyDataBase64: Option[String],
+ driverClientCertDataBase64: Option[String]): Option[Secret] = {
+ val allSecretData =
+ resolveSecretData(
+ driverClientKeyDataBase64,
+ DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
+ resolveSecretData(
+ driverClientCertDataBase64,
+ DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
+ resolveSecretData(
+ driverCaCertDataBase64,
+ DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
+ resolveSecretData(
+ driverOAuthTokenBase64,
+ DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
+
+ if (allSecretData.isEmpty) {
+ None
+ } else {
+ Some(new SecretBuilder()
+ .withNewMetadata()
+ .withName(s"$kubernetesResourceNamePrefix-kubernetes-credentials")
+ .endMetadata()
+ .withData(allSecretData.asJava)
+ .build())
+ }
+ }
+
+ private def setDriverPodKubernetesCredentialLocations(
+ driverSparkConf: SparkConf,
+ driverOauthTokenBase64: Option[String],
+ driverCaCertDataBase64: Option[String],
+ driverClientKeyDataBase64: Option[String],
+ driverClientCertDataBase64: Option[String]): SparkConf = {
+ val resolvedMountedOAuthTokenFile = resolveSecretLocation(
+ maybeMountedOAuthTokenFile,
+ driverOauthTokenBase64,
+ DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
+ val resolvedMountedClientKeyFile = resolveSecretLocation(
+ maybeMountedClientKeyFile,
+ driverClientKeyDataBase64,
+ DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
+ val resolvedMountedClientCertFile = resolveSecretLocation(
+ maybeMountedClientCertFile,
+ driverClientCertDataBase64,
+ DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
+ val resolvedMountedCaCertFile = resolveSecretLocation(
+ maybeMountedCaCertFile,
+ driverCaCertDataBase64,
+ DRIVER_CREDENTIALS_CA_CERT_PATH)
+
+ val sparkConfWithCredentialLocations = driverSparkConf
+ .setOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+ resolvedMountedCaCertFile)
+ .setOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+ resolvedMountedClientKeyFile)
+ .setOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+ resolvedMountedClientCertFile)
+ .setOption(
+ s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX",
+ resolvedMountedOAuthTokenFile)
+
+ // Redact all OAuth token values
+ sparkConfWithCredentialLocations
+ .getAll
+ .filter(_._1.endsWith(OAUTH_TOKEN_CONF_SUFFIX)).map(_._1)
+ .foreach {
+ sparkConfWithCredentialLocations.set(_, "