Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

In-cluster client mode #456

Open
wants to merge 5 commits into
base: branch-2.2-kubernetes
Choose a base branch
from

Conversation

sahilprasad
Copy link

What changes were proposed in this pull request?

As per the discussion on my original PR (#402), this allows client mode if the submission environment is within a Kubernetes cluster. I erroneously stated in the above PR that the application resolves to the submission client at org.apache.spark.deploy.kubernetes.Client. However, this is not the case, and submitted applications resolve to the user-submitted class in the case of Java/Scala execution, and org.apache.spark.deploy.PythonRunner for Python execution. Since execution involves being inside a Kubernetes driver pod via kubectl run, I was able to get this to work after setting the spark.kubernetes.driver.pod.name to the HOSTNAME environment variable within the pod. Due to this configuration, once the KubernetesClusterSchedulerBackend class is invoked by the application class, the driver pod that the user submitted the application from is recognized as the driver of the application and execution proceeds as normal, with no extra driver pod being unnecessarily constructed.

The in-cluster use case is more of a stopgap for actual client mode support on this project, but is something that allows for client-mode applications like the PySpark and Scala shells, Jupyter, etc. The logic that checks whether the application was submitted from within a Kubernetes cluster just checks if KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT are provided.

Would definitely appreciate comments or questions, especially around how to better detect in-cluster execution!

How was this patch tested?

Ran provided unit and integration tests. Manual testing done through spark-submit, standalone PySpark scripts, PySpark on Jupyter, and both shells.

@tristanz
Copy link

@sahilprasad
Copy link
Author

@tristanz added docs. Did I leave anything out?

@foxish
Copy link
Member

foxish commented Aug 24, 2017

The docs are actually published separately out of https://github.com/apache-spark-on-k8s/userdocs
I think this change is useful and the in-cluster client mode use-case is especially useful for Jupyter notebooks that are important for the interactive mode use-case.

@mccheah, I think creating a separate ClientSchedulerBackend should not be required here for this more limited pseudo-client mode.Thoughts?

This will open up a shell into the specified driver pod from which you can run client mode applications. In order to appropriately configure
these in-cluster applications, be sure to set the following configuration value, which essentially tells the cluster manager to refer back to the current driver pod as the driver for any applications you submit:

spark.kubernetes.driver.pod.name=$HOSTNAME
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason spark.kubernetes.driver.pod.name is being set prior to the spark-submit command, instead of using --conf ... ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I worded it makes it seem like that is the case. I was going for making the user aware that spark.kubernetes.driver.pod.name must be set for all client mode applications executed in-cluster.

Perhaps appending to "be sure to set the following configuration value" with "in all client-mode applications you run, either through --conf or spark-defaults.conf" would help clarify the point?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also in the example command, so maybe it is clear enough. Possibly add in "as in the following example spark-submit command"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amended for clarity.

@@ -344,8 +344,8 @@ object SparkSubmit extends CommandLineUtils {

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) if !inK8sCluster() =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's probably best to limit this to in-cluster now, but generally speaking I don't see why we shouldn't allow client mode with the appropriate networking caveats. Some network configurations allow the pod network to be fully routable (e.g. Calico).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how difficult it would be to detect pod network connectivity upon application submission. The logic would probably have to abstracted out to the cluster manager or one of the initial steps, and in that case, we would have to throw a validation error much later on in the process as we would need to allow client mode through unobstructed in SparkSubmit. Definitely feasible though.

@@ -240,6 +240,38 @@ the command may then look like the following:

## Advanced

### Running in-cluster client mode applications

While Spark on Kubernetes does not officially support client mode applications, such as the PySpark shell, there is a workaround that

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest rephrasing as:

"While Spark on Kubernetes does not support client mode applications, such as the PySpark shell, when launched from outside Kubernetes, Spark on Kubernetes does support client mode applications launched within the cluster. This in-cluster..."

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.


This will open up a shell into the specified driver pod from which you can run client mode applications. In order to appropriately configure
these in-cluster applications, be sure to set the following configuration value for all applications, as in the following `spark-submit` example,
which essentially tells the cluster manager to refer back to the current driver pod as the driver for any applications you submit:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; drop "essentially"

With that set, you should be able to run the following example from within the pod:

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is --deploy-mode client? If this is the default, I would add it explicitly in case the user has a global spark.conf that defaults to cluster mode instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is default, but I'll include it to be more explicit.

these in-cluster applications, be sure to set the following configuration value for all applications, as in the following `spark-submit` example,
which essentially tells the cluster manager to refer back to the current driver pod as the driver for any applications you submit:

spark.kubernetes.driver.pod.name=$HOSTNAME

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this parameter special compared to other spark kubernetes options you use below?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying the driver pod's name tells the cluster manager that the application being submitted with this configuration should refer back to a pod in the k8s cluster with the provided name. See the validation and reference logic in the cluster scheduler backend class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By setting the driver pod name to the hostname of the user's pod, every application uses the user's pod as it's driver in client mode, thus meaning that a new driver pod isn't allocated.


While Spark on Kubernetes does not officially support client mode applications, such as the PySpark shell, there is a workaround that
allows for execution of these apps from within an existing Kubernetes cluster. This _in-cluster_ client mode bypasses some of the networking and
dependency issues inherent to running a client from outside of a cluster while allowing much of the same functionality in terms of interactive use cases.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interactive use cases like PySpark shell and Jupyter.

@erikerlandson
Copy link
Member

this needs to be rebased

@erikerlandson
Copy link
Member

@mccheah @foxish my recollection from last meeting was desire to create ClientSchedulerBackend, was that precondition for this?

@foxish
Copy link
Member

foxish commented Aug 30, 2017

Method of finding if we're running in the cluster LGTM; there may be a better method using the fabric8 client, but that isn't the best fit for spark submit code.
Thinking about this a bit more, the only hacky part here is having to manually set the driver pod name. An InClusterClientSchedulerBackend could do that for us, making the experience a bit cleaner.

@foxish
Copy link
Member

foxish commented Aug 30, 2017

An alternate method of finding if we're running in-cluster is to check for /var/run/secrets/kubernetes.io/serviceaccount and see if we have credentials there that can talk to the apiserver. This is common, but both env-vars and checking that path are heuristics and there can be situations where we can't tell apart in-cluster and out-of-cluster modes. It should cover most cases though.

@sahilprasad
Copy link
Author

@foxish adding the extra check and including a lightweight InClusterSchedulerBackend (which should actually just be KubernetesClientSchedulerBackend, now that I think about it) sounds good to me. I'm thinking about maintaining the existing env variable check and incorporating the credentials check as part of the new schedule backend class.

An alternative might be to determine whether the pod network is routable (as per @tristanz's earlier comment). I'm not sure how best to do this, but it might cover more cases than the other option.


In order to run in client mode, use `kubectl attach` to attach to an existing driver pod on the cluster, or the following to run a new driver:

kubectl run -it --image=<driver image> --restart=Never -- /bin/bash

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use kubespark/spark-driver:latest as image in the commandline?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not want to specify, but I see the benefit.

--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel perhaps we should leave
`--conf spark.dynamicAllocation.enabled=true \

  •  --conf spark.shuffle.service.enabled=true
    

` out of the example? these are not default or required when running k8s right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.2.0?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will change later today.

@paulreimer
Copy link

I'm not sure how to use this correctly, or it doesn't seem to be working for me.

I tried submitting jobs from the jupyter webUI, as well as exec within the jupyter pod.

spark-submit --deploy-mode client --master k8s://http://127.0.0.1:8001 --kubernetes-namespace spark --conf spark.kubernetes.driver.docker.image=<> --conf spark.kubernetes.executor.docker.image=<> --conf spark.kubernetes.initcontainer.docker.image=<>  --conf spark.executor.instances=3 --conf spark.app.name=spark-pi --conf=spark.kubernetes.driver.pod.name=$HOSTNAME gs://spark-resource-staging/pi.py 10

...

17/10/19 21:10:02 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
17/10/19 21:10:02 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /tmp/tmp8817217172572762368/pi.py:43) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
17/10/19 21:10:02 INFO KubernetesTaskSchedulerImpl: Adding task set 0.0 with 10 tasks
17/10/19 21:10:17 WARN KubernetesTaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

And it repeats that last message indefinitely. I also do not see any executor pods created, through either method. The same command works in cluster mode within the jupyter pod, just not client mode. My jupyter pod is in the default namespace and I would like the spark executors in the spark namespace.

@sahilprasad
Copy link
Author

It's been a while since I've looked at this, but can you try enabling dynamic allocation when running in client mode?

@paulreimer
Copy link

Oh! I tried just now with everything in the default namespace and it worked.

@paulreimer
Copy link

It all works now, and is glorious.

Two important things I needed (my bad), I was using a kubectl-proxy sidecar container and using --master=k8s://http://127.0.0.1:8001, now I have no sidecar and I am using --master k8s://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT.

The second thing is that I thought/hoped those environment variables would be expanded, in a k8s Deployment args: ["$FOO", "$BAR"] but I suppose that is not the case or I'm getting the syntax wrong. With the exact same command baked into my jupyter docker image CMD, then it all worked great. I am able to run within the jupyter notebook in "client" mode and I do see executors launched and results coming back to jupyter. It's really cool, tbh.

@paulreimer
Copy link

I should note that dynamic allocation + shuffle service also works in this setup.

The only restriction I can see so far, is I had to locate all the resources within the same namespace as jupyter (I would have preferred the spark executors to be in a separate isolated namespace).

@sahilprasad
Copy link
Author

@paulreimer Let me know if there are any documentation changes that can be made to make the process of starting up with "client" mode easier!

As for scheduling executors in a separate namespace than the Jupyter notebook, I am not sure how to go about resolving that. If you make any progress on this front, let me know!

--master k8s://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT \
--kubernetes-namespace default \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.driver.pod.name=$HOSTNAME \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if pods can infer their own name via the environment?

@mccheah
Copy link

mccheah commented Nov 9, 2017

Was thinking about this a little more and it makes sense. But there's some more food for thought here.

I recently learned that TCP connections in Spark are only initiated from executor->executor and executor->driver. There is no connection from driver->executor. That means that theoretically, if the driver is reachable over a hostname or IP address from the executor pods, then client mode can be supported as long as the driver is not behind a firewall. So it may not be the case that the driver has to be running from within the context of a pod. In this case, the idea of client mode would be a lot closer to the notion of client mode in the other cluster managers.

We should adjust this accordingly so that:

  • client mode is not opinionated to the idea of running in a pod,
  • integration tests also test client mode.

And then we should try to test this on real clusters where the driver is just on a host that is in the cluster, not in a container.

@mccheah
Copy link

mccheah commented Nov 9, 2017

I also think that the cluster-mode scheduler backend has some conventions that are specific to cluster mode, such as the init container that downloads dependencies that the submission client sent over. A KubernetesClientSchedulerBackend should remove all components from the KubernetesClusterSchedulerBackend that only applied to cluster mode and that more importantly do not apply to the lack of a submission client deploying the application. Kubernetes credentials provision should also be reworked in client mode. The spark.kubernetes.authenticate.driver.mounted prefix doesn't make as much sense in client mode; it should just be a straight spark.kubernetes.authenticate.oauthToken without any prefix.

@echarles
Copy link
Member

I have tried this branch running in client deploy mode from in_cluster pod (after fixing the small merge conflict) on a local k8s cluster created with kubeadm.

I have run interactive spark-shell, spark-submit and Zeppelin with 3 executors (will open a PR to document the needed configuration for Zeppelin).

It works globally well for what I have done (basic dataframe processing in scala, not additional external dependencies).

Three questions (mainly around pod naming):

  1. Executor Pod names have all the same pattern (spark-exec-1, spark-exec-2...) which does not follow the classical behavior (e.g spark-submit creates names like app spark-pi-1510391597235-exec-3)

This leads to "AlreadyExists" exception if you want to instantiate multiple Spark REPL on the same K8S cluster.

2017-11-11 09:57:27 ERROR KubernetesClusterSchedulerBackend:91 - Failed to allocate executor pod.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/default/pods. Message: pods "spark-exec-1" already exists. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=null, kind=pods, name=spark-exec-1, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=pods "spark-exec-1" already exists, metadata=ListMeta(resourceVersion=null, selfLink=null, additionalProperties={}), reason=AlreadyExists, status=Failure, additionalProperties={}).
  1. I receive a warning message when run in client mode with shuffle service enabled (no warning on my setup while running in cluster mode);
org.apache.spark.SparkException: Ambiguous specification of shuffle service pod. Found multiple matching pods: shuffle-n6qpv, 192.168.93.173 on datalayer-001
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl.org$apache$spark$scheduler$cluster$k8s$KubernetesExternalShuffleManagerImpl$$addShufflePodToCache(KubernetesExternalShuffleManager.scala:115)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl$$anon$1.eventReceived(KubernetesExternalShuffleManager.scala:100)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExternalShuffleManagerImpl$$anon$1.eventReceived(KubernetesExternalShuffleManager.scala:94)

It is just a warning, and the REPL is then correctly created - Just mentioning as the behavior is different in cluster mode.

  1. I receive an error when looking at executor pod details from WEB Dashboard (Error_outline Internal Server Error (500) Unknown reference kind Pod - See screenshot) - This errors only shows for Pods created by Spark drive in in_cluster client mode.
    screenshot from 2017-11-11 10-11-24

screenshot from 2017-11-11 10-10-48

@echarles
Copy link
Member

PS: We can forget my 3rd feedback (Error_outline Internal Server Error (500) - If I click via the page showing the list of pods, I receive the logs - If I click on the logs icon via the detail page, I receive the 500 error - I guess this is a dashboard or k8s issue.

@erikerlandson
Copy link
Member

@echarles when you did the tests you described above, did you try non-pod clients, vis a vis @mccheah's comment about one-way executor-to-driver connections?

@echarles
Copy link
Member

echarles commented Nov 18, 2017

@erikerlandson @mccheah I have recompiled to relax the condition on the k8s client mode and made a few (unsuccessful) tests with submitting to a remote spark-k8s from my laptop (out of pod, spark-submit + spark-shell). I receive:

Exception in thread "main" org.apache.spark.SparkException: External scheduler cannot be instantiated
	at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2770)
(snip)
Caused by: java.io.FileNotFoundException: /var/run/secrets/kubernetes.io/serviceaccount/token (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)

I have teste with/without spark.kubernetes.driver.pod.name property and with spark.dynamicAllocation.enabled=true/false, spark.shuffle.service.enabled=true/false

@mccheah
Copy link

mccheah commented Dec 4, 2017

Are we still continuing work on this? I still have some concerns - see #456 (comment)

@tristanz
Copy link

tristanz commented Dec 5, 2017

@mccheah I agree with your comment, it is a very good point. @sahilprasad can speak for himself, but I know he's back in school so may not have bandwidth to pick this up. Is anybody else interested?

@foxish
Copy link
Member

foxish commented Dec 5, 2017

I know @echarles was working on related things - would you be willing to take over?

@echarles
Copy link
Member

echarles commented Dec 5, 2017

Yep, this is listed in my todos, just after #463

@sahilprasad
Copy link
Author

@tristanz @echarles Tristan is right — since I'm back in school, I don't have the time to take this on right now. All you, Eric! I'll definitely be keeping track of the progress of this feature.

@echarles
Copy link
Member

echarles commented Dec 14, 2017

@sahilprasad enjoy and success for your exams (if any? I think it is the period for these).

The good news is that I have been able to quickly hack the code to have a running spark k8s client out-of-cluster.

The bad news is that the executor initiates connection to the driver. When I run from my laptop to a cluster in the cloud, the executor pod is created but fails directly trying to connect to my local IP address (192.168.1.7).

2017-12-14 10:37:53 INFO  SecurityManager:54 - Changing modify acls groups to: 
2017-12-14 10:37:53 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1904)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
2017-12-14 10:39:53 ERROR RpcOutboxMessage:70 - Ask timeout before connecting successfully
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 192.168.1.7:35789 in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
	at scala.util.Try$.apply(Try.scala:192)

I have then created a separated local cluster on my laptop and, from my IDE, I can run SparkPI and Spark Shell in k8s client mode.

I haven't tried on a separated machine, but if the cluster nodes have ip:port reachability, I guess it will be ok (need to test...)

As first iteration, I'd like to open a PR based on your feedbacks on those questions:

  1. I am used to run spark on yarn-client and in that case, the Yarn resource manager exposes the Application Master which proxies the Spark UI (so you browse e.g. http://<resource-manager>:8088/proxy/application_1513256521420_0002/jobs). With Yarn, the driver process remains on the local host, just like with what I have done. However, the spark UI (typically on port 4040) is not proxied and a user has to browse http://<localhost>:4040/application_1513256521420_0002/jobs). Not sure how mesos is behaving with that (I remember I had to create a ssh tunnel to browse the UI so I would say there is no proxy, but I don't have access anymore to mesos-based cluster).

  2. The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

  3. Regarding the client in-cluster, the logic should reside in our KubernetesSchedulerBackend, so the kuberenetes client (if it supports it) could do the job. Or should we test for /var/run/secrets/kubernetes.io/serviceaccount? Any pointers and concrete methods welcome.

  4. For now I disable the oauth properties. @mccheah, can you bring some light about this? (I could point to a file, but I don't see any information about oauth in my local kubernetes configuration).

  5. The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

  6. The executor pod name must be updated (for now, it is always spark-executor-1...)

  7. Anything else I miss?

@liyinan926
Copy link
Member

liyinan926 commented Dec 14, 2017

The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

What you mean by passing a null driver pod? KubernetesClusterSchedulerBackend currently gets the driver pod object from the API server and uses that to create OwnerReference for the executor pods. Do you mean ignoring this logic if the driver pod does not exist?

The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

Yes, the client can be pointed to .kube/config for configuration and use that to create a client to the API server.

There's one thing that's essential for in-cluster client mode, the driver headless service for the executors to connect to the driver. In current implementation, the submission client is solely responsible for creating the service. We need to think about who's in charge of creating the service in in-cluster client mode.

@echarles
Copy link
Member

In case of client mode, there is no driver pod indeed, so yes, I simply skip the current logic (OwnerReference...).

I will default to ~/.kube/config to create get the credentials to create the k8s client.

For the in-cluster client mode, what do you mean with driver headless service? The in-cluster is nearly the same as the out-cluster client mode - The only difference I see is the way to get the needed credentials to create the k8s client (the in-cluster client mode does not have the ~/.kube/config).

@liyinan926
Copy link
Member

liyinan926 commented Dec 14, 2017

For the in-cluster client mode, what do you mean with driver headless service? The in-cluster is nearly the same as the out-cluster client mode - The only difference I see is the way to get the needed credentials to create the k8s client (the in-cluster client mode does not have the ~/.kube/config).

The driver headless service is a Kubernetes headless service that the executors use to connect to the driver pod. The service basically gives a DNS name for the driver. Executors connect to the driver through the service instead of the driver IP address. See more details in #483.

In the in-cluster client mode as described in this PR, the headless service won't be created because the submission client is not invoked. This is problematic and should be addressed.

@echarles
Copy link
Member

In the in-cluster client mode as described in this PR, the headless service won't be created because the submission client is not invoked. This is problematic and should be addressed.

I see now your point. I guess when gave have 2 different step-paths: the current one for cluster-mode, and a slightly updated one for the client-mode with a ClientDriverConfigurationStep that simply returns a value that could have been set [1] as a k8s-env or [2] as a property in the SparkContext. Any thought?

@mccheah
Copy link

mccheah commented Dec 14, 2017

I am used to run spark on yarn-client and in that case, the Yarn resource manager exposes the Application Master which proxies the Spark UI (so you browse e.g. http://:8088/proxy/application_1513256521420_0002/jobs). With Yarn, the driver process remains on the local host, just like with what I have done. However, the spark UI (typically on port 4040) is not proxied and a user has to browse http://:4040/application_1513256521420_0002/jobs). Not sure how mesos is behaving with that (I remember I had to create a ssh tunnel to browse the UI so I would say there is no proxy, but I don't have access anymore to mesos-based cluster).

Don't think we should have to do anything special here, it's up to the JVM that runs the Spark application as to how to expose it, but we can't enforce anything here AFAIK.

The needed changes are about creating the kubernetes client with the correct credentials, passing a null driverpod and disabling a few properties. So instead of creating a separated KubernetesClientSchedulerBackend, what about configuring and renaming the exisiting KubernetesClusterSchedulerBackend (let's say simply KubernetesSchedulerBackend)?

I'm worried that introducing too many of these - edit: "these" being properties that are present or null based on cluster/client mode - will make it opaque as to what properties are specific to cluster mode and which properties are specific to client mode. Having the separate KubernetesClusterSchedulerBackend and KubernetesClientSchedulerBackend would make it clear what aspects of the schedulers and their properties that are specific to client versus cluster mode. I'd expect most of the code to be shared in the parent class, KubernetesSchedulerBackend, with cluster mode and client mode each setting up their own bootstraps. You could accomplish something similar with composition instead of inheritance, which is usually preferred. For example one could have a KubernetesSchedulerBackend which is a concrete class, that takes in a KubernetesDeployModeSpecificHandlers that is a trait and subclassed in Client and Cluster implementations, and go from there. But I think using Option or null everywhere would result in a proliferation of properties that are ambiguous in which mode they should apply to and why.

Regarding the client in-cluster, the logic should reside in our KubernetesSchedulerBackend, so the kuberenetes client (if it supports it) could do the job. Or should we test for /var/run/secrets/kubernetes.io/serviceaccount? Any pointers and concrete methods welcome.

Are you referring specifically to how we should configure the Kubernetes client? If so, I think we should just configure everything using Spark properties. The naming of these properties is tricky, because cluster mode namespaces properties between submission and driver, whereas no such division should exist in client mode. But maybe just using the driver prefix for everything will be sufficient even if it's a redundant label in client mode, so to speak.

Edit: We should also support loading from on-disk, and it's similar to spark.hadoop. properties vs. using HADOOP_CONF_DIR for Hadoop configuration - this is noted again below.

For now I disable the oauth properties. @mccheah, can you bring some light about this? (I could point to a file, but I don't see any information about oauth in my local kubernetes configuration).

Kubernetes clusters can support OAuth tokens for identity verification and authorization. We need to allow users to pass in either a token file or token text itself. The latter should be redacted somehow so that we don't print it in e.g. spark.logConf.

The place to load the certificate and keys must also be defined. ~/.kube seems a reasonable choice to me to start with.

We should support loading configuration from both the Spark config and the on-disk kube configs. This is similar to how Spark allows passing Hadoop properties via the files in HADOOP_CONF_DIR and also by setting properties prefixed with spark.hadoop..

The executor pod name must be updated (for now, it is always spark-executor-1...)

Why does this have to change? It should contain the app id and the executor id.

@mccheah
Copy link

mccheah commented Dec 14, 2017

The driver headless service is a Kubernetes headless service that the executors use to connect to the driver pod. The service basically gives a DNS name for the driver. Executors connect to the driver through the service instead of the driver IP address. See more details in #483.

To take a step back here, I don't think we should be creating a headless service at all for client mode. The idea behind client mode is that there's the inherent assumption that the driver is available over some hostname that is routable from all of the executors. @echarles - to your point that this code is unusable from a laptop - if we think about it, if one had an equivalent YARN cluster with the same firewall settings and your laptop was sitting in the same place with its own firewalls, then I'd expect our very same client mode applications that don't work with that YARN cluster to also not work in Kubernetes.

For client mode our measure of success should be: If I colocated my Kubernetes kubelets with existing YARN nodemanagers that I've been using for my client mode YARN applications, would I also be able to run my client mode Kubernetes applications from the same host where I run my YARN client mode applications? Conversely, if there was a networking setup that would have prohibited us from running in YARN client mode, then we also don't have to be concerned with the analogous scenarios in Kubernetes - such as e.g. the laptop->cluster case.

The situation I was concerned about before I found out what I did in #456 (comment) was that the driver would need to reach out to the executors to hand them work. That would create a discrepancy between YARN and Kubernetes, because executors running in YARN usually are running on a fleet of hosts that have a unified firewall setting and are either all exposed at once to the driver or none are. In the latter case, it's expected that the client mode application shouldn't work anyways, but one could deploy their application inside the YARN cluster to get inside the firewall and work from there. In Kubernetes that would have been extremely difficult to do because any given individual pod is not routable from outside the cluster by default.

But given our findings, the expectation is now only that the driver needs to be reachable from the pods, which is far more manageable. How that driver would be exposed I would expect to be different case by case, considering a laptop versus deploying onto a server, vs. running the application inside a pod that exposes a headless service, etc. Thus the user that submits the application should have decided a-priori how their driver would be reachable by the executors. It's not immediately obvious that Spark itself has to determine that connectivity mechanism.

@echarles
Copy link
Member

Don't think we should have to do anything special here, it's up to the JVM that runs the Spark application as to how to expose it, but we can't enforce anything here AFAIK.

The spark.driver.memory is an example of property we want to be sure it is taken into account. I guess spark-core is doing the job for us, but worth to double-check.

You could accomplish something similar with composition instead of inheritance, which is usually preferred. For example one could have a KubernetesSchedulerBackend which is a concrete class, that takes in a KubernetesDeployModeSpecificHandlers that is a trait and subclassed in Client and Cluster implementations, and go from there. But I think using Option or null everywhere would result in a proliferation of properties that are ambiguous in which mode they should apply to and why.

+1

Are you referring specifically to how we should configure the Kubernetes client? If so, I think we should just configure everything using Spark properties. The naming of these properties is tricky, because cluster mode namespaces properties between submission and driver, whereas no such division should exist in client mode. But maybe just using the driver prefix for everything will be sufficient even if it's a redundant label in client mode, so to speak.

I imagine the list of spark.driver.kubernetes... properties to be documented.... We will iterate on this. But I was more referring to a automatic way to detect if the client is in or out cluster. This state should not be passed via property as the system can detect it by him-self.

Kubernetes clusters can support OAuth tokens for identity verification and authorization. We need to allow users to pass in either a token file or token text itself. The latter should be redacted somehow so that we don't print it in e.g. spark.logConf.

So this is optional and should be passed via property.

We should support loading configuration from both the Spark config and the on-disk kube configs. This is similar to how Spark allows passing Hadoop properties via the files in HADOOP_CONF_DIR and also by setting properties prefixed with spark.hadoop..

Back to the property list...

Why does this have to change? It should contain the app id and the executor id.

It should, but it does not - All executors created in client mode have the strange name spark-executor-X (see screenshot a bit above).

@mccheah
Copy link

mccheah commented Dec 14, 2017

But I was more referring to a automatic way to detect if the client is in or out cluster. This state should not be passed via property as the system can detect it by him-self.

Think my idea here is similar to the spirit of #456 (comment) - though I did see we almost posted at the same time =). I don't think Spark itself should need to determine if the application is in-cluster vs. out-of-cluster, but it just says that the driver running in client mode needs to be reachable by the executor pods, and it's up to the user to determine how to resolve that connectivity.

@liyinan926
Copy link
Member

To take a step back here, I don't think we should be creating a headless service at all for client mode. The idea behind client mode is that there's the inherent assumption that the driver is available over some hostname that is routable from all of the executors. @echarles - to your point that this code is unusable from a laptop - if we think about it, if one had an equivalent YARN cluster with the same firewall settings and your laptop was sitting in the same place with its own firewalls, then I'd expect our very same client mode applications that don't work with that YARN cluster to also not work in Kubernetes.

Yes, I meant this only applies to in-cluster client mode. And I think for in-cluster client mode, users should not need to worry about how to setup the connectivity between the executors and the driver.

@mccheah
Copy link

mccheah commented Dec 14, 2017

Yes, I meant this only applies to in-cluster client mode. And I think for in-cluster client mode, users should not need to worry about how to setup the connectivity between the executors and the driver.

But do we need to distinguish between in-cluster client mode? Again, if we treat client mode as the contract that "the driver needs to be reachable by its hostname", it's left up to the user to determine how to do that.

@liyinan926
Copy link
Member

But do we need to distinguish between in-cluster client mode? Again, if we treat client mode as the contract that "the driver needs to be reachable by its hostname", it's left up to the user to determine how to do that.

Ideally yes. In case of the headless service in the in-cluster client mode, are you suggesting users to manually create that as part of starting the client pod?

@mccheah
Copy link

mccheah commented Dec 14, 2017

It would be a first for Spark in client mode to determine its own connectivity. I don't think it's strictly necessary for Spark to do this. Users can create the headless service themselves.

@liyinan926
Copy link
Member

OK, I can buy that. This needs to be documented clearly.

@echarles
Copy link
Member

echarles commented Jan 2, 2018

fyi I made a first step a few days ago but had no time to finalize (you know, end of year...). It works for me in out-cluster mode and will further work for in-cluster in the coming days.

branch-2.2-kubernetes...datalayer-contrib:client-mode-datalayer-2

@foxish
Copy link
Member

foxish commented Jan 2, 2018

Thanks @echarles.
This item is a P0 once we rebase our fork on upstream since it's a very popular feature request.

@echarles
Copy link
Member

I have a working version that covers the 6 scenarios:

  1. spark-submit cluster-mode in-cluster
  2. spark-submit cluster-mode out-cluster
  3. spark-submit client-mode in-cluster
  4. spark-submit client-mode out-cluster
  5. spark-shell client-mode in-cluster
  6. spark-shell client-mode out-cluster

This can be viewed on branch-2.2-kubernetes...datalayer-contrib:client-mode-datalayer-2

Note to myself:

  • Get rid of the hardcoded path for cert and key in case of out-cluster
  • Polish format
  • Document to ensure understanding of the behavior.
  • Unit test
  • Integration test

Questions to the community:

  1. It works without any change on the Headless driver. I would say that if it does the job, we don't have to worry about this... This may sound a bit naive, so I will try to bring light and evidence in the document I will write.
  2. Happy to get feedback on the already developed code.
  3. Once I will get something more final, should I open a PR on apache-spark-on-k8s/spark to we can prepare something or directly engage on the apache jira, repos and mailing lists?

@foxish
Copy link
Member

foxish commented Jan 12, 2018 via email

@echarles
Copy link
Member

@foxish The initial shot of the doc can be read on apache-spark-on-k8s/userdocs#25.

It is a bit dense but reviewed topics are IMHO needed. I will update based on your feedback (bring comment the PR or reply here). Thx.

@echarles
Copy link
Member

Just pushed an update to the doc apache-spark-on-k8s/userdocs#25

@fanzhen
Copy link

fanzhen commented Jan 25, 2018

@echarles Thanks for providing the codes and I'm studying it. As far as my team concerned, it's necessary to implement and have a good test on the client mode codes. Our proposal is to make a huge production-level transformation from yarn to k8s which may involve thousands of machines.

ashetkar pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Apr 5, 2018
* Author @sahilprasad
* Enables spark applications to be submitted in 'in-cluster client' mode.
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)

Not filed in upstream, touches code for conda.

## What changes were proposed in this pull request?

rLibDir contains a sequence of possible paths for the SparkR package on the executor and is passed on to the R daemon with the SPARKR_RLIBDIR environment variable. This PR filters rLibDir for paths that exist before setting SPARKR_RLIBDIR, retaining existing functionality to preferentially choose a YARN or local SparkR install over conda if both are present.

See daemon.R: https://github.com/palantir/spark/blob/master/R/pkg/inst/worker/daemon.R#L23

Fixes apache-spark-on-k8s#456 

## How was this patch tested?

Manually testing cherry picked on older version

Please review http://spark.apache.org/contributing.html before opening a pull request.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants