Skip to content


[SPARK-22646][K8S] Spark on Kubernetes - basic submission client
Browse files Browse the repository at this point in the history
This PR contains implementation of the basic submission client for the cluster mode of Spark on Kubernetes. It's step 2 from the step-wise plan documented [here](apache-spark-on-k8s#441 (comment)).
This addition is covered by the [SPIP]( vote which passed on Aug 31.

This PR and #19468 together form a MVP of Spark on Kubernetes that allows users to run Spark applications that use resources locally within the driver and executor containers on Kubernetes 1.6 and up. Some changes on pom and build/test setup are copied over from #19468 to make this PR self contained and testable.

The submission client is mainly responsible for creating the Kubernetes pod that runs the Spark driver. It follows a step-based approach to construct the driver pod, as the code under the `submit.steps` package shows. The steps are orchestrated by `DriverConfigurationStepsOrchestrator`. `Client` creates the driver pod and waits for the application to complete if it's configured to do so, which is the case by default.

This PR also contains Dockerfiles of the driver and executor images. They are included because some of the environment variables set in the code would not make sense without referring to the Dockerfiles.

* The patch contains unit tests which are passing.
* Manual testing: ./build/mvn -Pkubernetes clean package succeeded.
* It is a subset of the entire changelist hosted at which is in active use in several organizations.
* There is integration testing enabled in the fork currently hosted by PepperData which is being moved over to RiseLAB CI.
* Detailed documentation on trying out the patch in its entirety is in:

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926

Author: Yinan Li <[email protected]>

Closes #19717 from liyinan926/spark-kubernetes-4.
  • Loading branch information
liyinan926 authored and Marcelo Vanzin committed Dec 11, 2017
1 parent c235b5f commit 3f4060c
Show file tree
Hide file tree
Showing 39 changed files with 2,566 additions and 67 deletions.
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,11 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import java.lang.reflect.Constructor
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
Expand Down
48 changes: 41 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val KUBERNETES = 16

// Deploy modes
private val CLIENT = 1
Expand All @@ -97,6 +98,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
Expand Down Expand Up @@ -257,9 +260,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")

Expand Down Expand Up @@ -294,6 +298,16 @@ object SparkSubmit extends CommandLineUtils with Logging {

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
Expand All @@ -302,6 +316,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand All @@ -322,6 +342,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -557,19 +578,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
Expand Down Expand Up @@ -703,6 +724,19 @@ object SparkSubmit extends CommandLineUtils with Logging {

if (isKubernetesCluster) {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local
| (Default: local[*]).
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ package object config {

private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")

private[spark] val EVENT_LOG_COMPRESS =
Expand Down Expand Up @@ -80,6 +84,10 @@ package object config {

private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")

private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,42 @@ private[spark] object Utils extends Logging {

* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)

// To handle master URLs, e.g., k8s://host:port.
if (!masterWithoutK8sPrefix.contains("://")) {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
val resolvedURL = masterScheme.toLowerCase match {
case "https" =>
case "http" =>
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
case null =>
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
case _ =>
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)

return s"k8s:$resolvedURL"

private[util] object CallerContext extends Logging {
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu

test("client mode with a k8s master url") {
intercept[SparkException] {
sc = new SparkContext("k8s://https://host:port", "test", new SparkConf())

testCancellingTasks("that raise interrupted exception on cancel") {
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,33 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")

test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.namespace=spark",
"--conf", "spark.kubernetes.driver.docker.image=bar",
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,27 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")

val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://")
assert(k8sMasterURLWithoutScheme === "k8s:")

val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://")
assert(k8sMasterURLWithoutScheme2 === "k8s:")

intercept[IllegalArgumentException] {

intercept[IllegalArgumentException] {

private class SimpleExtension
Expand Down
20 changes: 20 additions & 0 deletions docs/
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,33 @@ of the most common options to set are:
or in your default properties file.
<td>driverMemory * 0.10, with minimum of 384 </td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
This tends to grow with the container size (typically 6-10%). This option is currently supported
on YARN and Kubernetes.
Amount of memory to use per executor process (e.g. <code>2g</code>, <code>8g</code>).
<td>executorMemory * 0.10, with minimum of 384 </td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that
accounts for things like VM overheads, interned strings, other native overheads, etc. This tends
to grow with the executor size (typically 6-10%). This option is currently supported on YARN and
Expand Down
16 changes: 1 addition & 15 deletions docs/
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,11 @@ To use a custom for the application master and executors, upd
The number of executors for static allocation. With <code>spark.dynamicAllocation.enabled</code>, the initial set of executors will be at least this large.
<td>executorMemory * 0.10, with minimum of 384 </td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
<td>driverMemory * 0.10, with minimum of 384 </td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
<td>AM memory * 0.10, with minimum of 384 </td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SparkSubmitOptionParser {
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
Expand Down

0 comments on commit 3f4060c

Please sign in to comment.