Skip to content

Commit

Permalink
* Changes from PR apache-spark-on-k8s#456
Browse files Browse the repository at this point in the history
* Author @sahilprasad
* Enables spark applications to be submitted in 'in-cluster client' mode.
  • Loading branch information
Amogh Shetkar committed Apr 5, 2018
1 parent 7b8c9f5 commit 73478bf
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ object SparkSubmit extends CommandLineUtils {
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local.
// In Mesos cluster mode, non-local python files are automatically downloaded by Mesos.
if (args.isPython && !isYarnCluster && !isMesosCluster) {
if (args.isPython && !isYarnCluster && !isMesosCluster && !isKubernetesCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: ${args.primaryResource}")
}
Expand All @@ -336,16 +336,16 @@ object SparkSubmit extends CommandLineUtils {
}

// Require all R files to be local
if (args.isR && !isYarnCluster && !isMesosCluster) {
if (args.isR && !isYarnCluster && !isMesosCluster && !isKubernetesCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local R files are supported: ${args.primaryResource}")
}
}

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) if !inK8sCluster() =>
printErrorAndExit("Kubernetes currently only supports in-cluster client mode.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -682,10 +682,10 @@ object SparkSubmit extends CommandLineUtils {
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
val resolvedPyFiles = Utils.resolveURIs(pyFiles)
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster && !isKubernetesCluster) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// Ignoring formatting python path in yarn, mesos and kubernetes cluster mode, these modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
Expand Down Expand Up @@ -857,6 +857,14 @@ object SparkSubmit extends CommandLineUtils {
res == SparkLauncher.NO_RESOURCE
}

/**
* Return whether the submission environment is within a Kubernetes cluster
*/
private[deploy] def inK8sCluster(): Boolean = {
!sys.env.get("KUBERNETES_SERVICE_HOST").isEmpty &&
!sys.env.get("KUBERNETES_SERVICE_PORT").isEmpty
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
48 changes: 40 additions & 8 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ For example, if the registry host is `registry-host` and the registry is listeni
docker push registry-host:5000/spark-driver:latest
docker push registry-host:5000/spark-executor:latest
docker push registry-host:5000/spark-init:latest

Note that `spark-base` is the base image for the other images. It must be built first before the other images, and then afterwards the other images can be built in any order.

## Submitting Applications to Kubernetes
Expand Down Expand Up @@ -198,10 +198,10 @@ is currently supported.

### Running PySpark

Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
Python files can be distributed by including, in the conf, `--py-files`
Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
Python files can be distributed by including, in the conf, `--py-files`

Below is an example submission:
Below is an example submission:


```
Expand Down Expand Up @@ -265,6 +265,37 @@ other cluster managers.

## Advanced

### Running in-cluster client mode applications

While Spark on Kubernetes does not support client mode applications, such as the PySpark shell, when launched from outside Kubernetes, Spark on Kubernetes does support client mode applications launched from within the cluster. This _in-cluster_ client mode bypasses some of the networking and dependency issues inherent to running a client from outside of a cluster while allowing much of the same functionality in terms of interactive use cases, such as the PySpark shell and Jupyter notebooks.

In order to run in client mode, use `kubectl attach` to attach to an existing driver pod on the cluster, or the following to run a new driver:

kubectl run -it --image=<driver image> --restart=Never -- /bin/bash

This will open up a shell into the specified driver pod from which you can run client mode applications. In order to appropriately configure
these in-cluster applications, be sure to set the following configuration value for all applications, as in the following `spark-submit` example,
which tells the cluster manager to refer back to the current driver pod as the driver for any applications you submit:

spark.kubernetes.driver.pod.name=$HOSTNAME

With that set, you should be able to run the following example from within the pod:

bin/spark-submit \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
--master k8s://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT \
--kubernetes-namespace default \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.driver.pod.name=$HOSTNAME \
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \
local:///opt/spark/examples/jars/spark_examples_2.11-2.2.0.jar 10

### Securing the Resource Staging Server with TLS

The default configuration of the resource staging server is not secured with TLS. It is highly recommended to configure
Expand Down Expand Up @@ -742,12 +773,12 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
Expand Down Expand Up @@ -808,14 +839,15 @@ from the other deployment modes. See the [configuration page](configuration.html
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
the Executor process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to
Expand Down

0 comments on commit 73478bf

Please sign in to comment.