Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes ThirdPartyResource for tracking Spark Jobs #3

Open
foxish opened this issue Nov 10, 2016 · 39 comments
Open

Kubernetes ThirdPartyResource for tracking Spark Jobs #3

foxish opened this issue Nov 10, 2016 · 39 comments

Comments

@foxish
Copy link
Owner

foxish commented Nov 10, 2016

Issues:

  • We need a way for users to find, monitor status, and kill their spark jobs.
  • Other cluster managers provide an interface to view all of a user's spark-jobs in a cluster, along with their state (QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED).

Shortcomings of the current method:

  • It is not possible to get sufficient detail about every spark job just by looking at the pods and their states.
  • As the spark framework is currently performing the entire function of a kubernetes controller, it gives us no way to find the state of various running Spark jobs than to implement spark-submit --status, or spark-submit --kill which would have to encode special logic to find the drivers in a particular namespace and find their status. This would be bad for cluster administrators who deal with kubernetes abstractions, and not application specifics.
  • Accessing arbitrary data such as progress from the driver pod, or the names of its associated executors is difficult.

Proposed Solution:

A ThirdPartyResource to keep track of the execution state (pending/running/failed) of each SparkJob, failure reasons if any, the identity of the driver and executor pods associated, as well as configuration metadata associated with that job (number of executors, memory per executor, etc).

metadata:
  name: spark-job.kubernetes.io
  labels:
    resource: spark-job
    object: spark
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A resource that manages a spark job"
versions:
  - name: v1

The cluster administrator is responsible for creating this resource, which makes a new API endpoint available in Kubernetes. This TPR would enable us to create objects of the kind SparkJob and store JSON within them. Each such object would be associated with a single spark job, and would store all the status and metadata associated with it. The driver pod is responsible for the life-cycle of the SparkJob object, from creation till deletion.

A sample object of the above kind looks like the following:

{
    "apiVersion": "kubernetes.io/v1",
    "image": "driver-image",
    "kind": "SparkJob",
    "metadata": {
        "name": "spark-driver-1924",
        "namespace": "default",
        "selfLink": "/apis/kubernetes.io/v1/namespaces/default/sparkjobs/spark-driver-1924",
        "uid": "91022bc2-a71d-11e6-a4be-42010af00002",
        "resourceVersion": "765519",
        "creationTimestamp": "2016-11-10T08:13:31Z"
    },
    "num-executors": 10,
    "state": "completed",
    "driver-pod": "driver-2ds9f"
    ...
    ...
}

The driver pod has complete visibility into progress of the job, and can set the status of its SparkJob object. The driver can also watch this resource for configuration changes which may be triggered by the user/cluster administrator. Killing a spark-job can be performed by destroying the associated SparkJob object, which will cause the driver pod to terminate its executors and clean up gracefully.

  • It makes the state of each SparkJob available outside Spark itself and provides some visibility to the cluster administrator regarding various running spark jobs in the system.
  • This SparkJob object can be consumed by spark-submit --status, or a dashboard to display various details about spark jobs in the system.

Further thought:

  • What if the driver pods do not exit cleanly (or are force-killed)? Who is responsible for cleaning up the SparkJob object?
@foxish
Copy link
Owner Author

foxish commented Nov 10, 2016

cc @tnachen @erikerlandson @erictune

@tnachen
Copy link

tnachen commented Nov 10, 2016

It sounds like a great way to provide visiblity and operate Spark.
Need to think through a bit more, but some questions for now:

  • Are we going to have a TPR per namespace, so not all Spark jobs across all namespaces is visible and managable?
  • This TPR is going to have a UI and persistent storage somewhere to actually store all the state of each job? Then this also becomes where a driver can also queue jobs up as well?

@erikerlandson
Copy link
Collaborator

@foxish TPR looks like a cool kube feature! Regarding lifecycle visibility and mgmt, I'm increasingly interested in adding some new published metrics to Spark. Particularly, having Executors report some new metrics that reflect the size of their queued job backlog. My main interest in that is so it could be harvested for use by an HPA for executor pods. In other words, it would allow external scaling logic to use the same flavor of information that Spark's dynamic-executor logic uses, but since the logic could be external, it opens up a large solution space of scaling logic without requiring that logic to live inside Spark (let Spark do what it does well, and not try to be in the business of advanced scaling and sharing features)(*)

But you could take this idea further, and publish other metrics that provide increased visibility into the job progress. Some of this may already be available from the rest API that already exists in Spark? Spark's web UI gets its job progress information from somewhere.

(*) This would also require the ability to externally signal a Spark executor into graceful shutdown, but that is outside the scope of this topic

@tnachen
Copy link

tnachen commented Nov 10, 2016

Job progress information is all available through the Spark Listener interface that gets reported from the job itself. I think most of the dynamic allocation information is published through jmx already.

Anyhow, seems like an interesting idea but I don't know how this fits with this proposed TPR?

@iyanuobidele
Copy link
Collaborator

It sure sounds like a great way to monitor the state of spark jobs!

Are we going to have a TPR per namespace.....

I believe k8s doesn't currently support non-namespaced object, so for this to work we need to create this TPR to exist in the namespace as the spark job about to be deployed. So this might answer some part of @tnachen's first question. Since we already depend on the namespace specified at runtime or in the spark conf file, I agree with @foxish's initial idea of creating the TPR ahead of time in this same namespace.

so not all Spark jobs across all namespaces is visible and managable?

By this, if you mean the new kind just created by TPR, I believe you should be able to do kubectl get sparkjob --all-namespaces provided that the TPR's in all namespaces all have the same resource type name. (I haven't really tried this out)

@erikerlandson
Copy link
Collaborator

erikerlandson commented Nov 10, 2016

@tnachen I was proposing some possible alternatives to increasing visibility into driver+executor(s) state (from the k8s layer). (alternatives that might be aligned with some of my other interests)

But TPR is idiomatic w.r.t. a kube environment. I might turn it around, and consider publishing TPR objects to represent the loading state of each executor.

Can HPA using custom metrics be aimed at one of these ThirdPartyResource endpoints?

@iyanuobidele
Copy link
Collaborator

Some questions:

The driver pod has complete visibility into progress of the job, and can set the status of its SparkJob object.

How do you see implementing this ? Maybe explain a little further what the relationship between the pod and the sparkjob resource type is. Is this like a pod and job kind of relationship, and if so are there any additional reasons why we can't use the existing job kind instead of creating a new resource type ?

@erikerlandson
Copy link
Collaborator

@iyanuobidele, IIUC the idea is to have the scheduler (and/or scheduler backend) publish these objects into the namespace the app is running on, presumably using the fabric8 client, with information on the current state of the application

@foxish
Copy link
Owner Author

foxish commented Nov 10, 2016

@iyanuobidele
The TPR object is meant to store state about the running SparkJob and act as a data model. There is benefit in looking at individual entities as "SparkJobs" in the kubernetes system, as opposed to simply driver/executor pods IMO. Exposing the state of the spark job to kubernetes also allows us to write custom controllers which can make cluster-level decisions as @erikerlandson mentioned.

Is this like a pod and job kind of relationship, and if so are there any additional reasons why we can't use the existing job kind instead of creating a new resource type ? Maybe explain a little further what the relationship between the pod and the sparkjob resource type is

A Kubernetes Job is managed by the JobController, which deals with pod creation and deletion decisions. With native integration, we let Spark act as the "controller". So, it's not quite the pod-job type of relationship. The TPR is really just being used as a key-value store which the driver writes to, and all other components read from. I'm unsure if there is a use-case where we want to change the configuration of a job after launching the driver.

The driver pod would create the TPR object when it comes up and update it at regular intervals with metrics and state. Since we are using namespaces as the mechanism to isolate users, it makes sense for the TPR object to live in the same namespace as that user's spark driver and executor pods. The Spark "JobID" could be associated with the TPR object, and be used for finding status, summarizing results, etc and we would not need to directly talk to pods.

@tnachen
Copy link

tnachen commented Nov 10, 2016

I'm also wondering if we should make it a hard requirement for the TPR to exist for Spark jobs to work on k8s. I don't think there is any reason why the TPR should be a problem for anyone, but just want to make sure this is explicit and we should probably check from the driver side and have a sensible message.

@erikerlandson
Copy link
Collaborator

I'd prefer at least the option for the submission client to create the TPR automatically, unless it requires admin privs like the service acct does. It should be easy to do in the fabric8 client, assuming it has a fluent constructor for it.

@foxish
Copy link
Owner Author

foxish commented Nov 10, 2016

The ThirdPartyResource itself (which creates the additional API endpoint) needs to be created exactly once for the entire cluster. That would be a prerequisite I imagine. The TPR objects on the other hand could be created automatically by the submission client.

@erikerlandson
Copy link
Collaborator

@foxish, if it's not idempotent that makes it slightly harder, but I assume the code can check for existence first if need be. If I'm reading correctly, a TPR gets created on a per-namespace basis.

@foxish
Copy link
Owner Author

foxish commented Nov 10, 2016

A TPR (for example, the one of kind 'SparkJob' in the issue) itself is not namespaced. The third party objects which are created of that SparkJob resource we introduce, however, are namespaced. If the TPR is not created for that cluster, Spark will be unable to create Third Party objects in which we want to store state and as you said, it can detect this and complain about it.

@iyanuobidele
Copy link
Collaborator

@foxish @erikerlandson Thanks for the clarification.

@iyanuobidele
Copy link
Collaborator

A TPR (for example, the one of kind 'SparkJob' in the issue) itself is not namespaced. The third party objects which are created of that SparkJob we introduce, however, are namespaced. If the TPR is not created for that cluster, Spark will be unable to create Third Party objects in which we want to store state and as you said, it can detect this and complain about it.

We could add a conf value spark.kubernetes.failOnUncreatedTPR (a better name can be used) with true or false values and in the case where the TPR hasn't been created make a decision based on the value. IMO, I think it's okay to create it as long as the user specifies that. Otherwise the user can create it before running the spark job.

@tnachen
Copy link

tnachen commented Nov 11, 2016

I rather we keep it simple for now and don't introduce a flag until later, since I don't know if a TPR creation is idempotent, and seems like this TPR needs to be configured correctly as it needs to store state, etc.

@erikerlandson
Copy link
Collaborator

erikerlandson commented Nov 11, 2016

If a TPR is created cluster-wide, it almost certainly requires admin privs. On those grounds (and because it is only created once per cluster anyway) I'm in agreement that it's sanest to assume it was created by an admin, prior to submission

yeah, it requires admin privs:

[eje@localhost ~]$ kubectl create -f resource.yaml 
Error from server: error when creating "resource.yaml": User "developer" cannot create extensions.thirdpartyresources at the cluster scope

[eje@localhost ~]$ oc login -u system:admin
Logged into "https://10.0.1.36:8443" as "system:admin" using existing credentials.

[eje@localhost ~]$ kubectl create -f resource.yaml 
thirdpartyresource "cron-tab.stable.example.com" created

@erictune
Copy link

Thoughts:

  • Spark jobs can and should still run if the TPR-schema is not installed in the cluster, or TPR creation fails for some other reason (misconfigured auth). No need for flag. I can still run spark if I am not an admin and can't get my admin to install the TPR; I just lose some kubernetes integration, but still get my results out.
  • Using the TPR to control the spark driver/pods sounds promising long term (such as scaling, as Erik suggests). I don't know that we want to tackle that first.
  • Still, does it make sense to separate spec and status when we start, so that we have the option to do that later?
  • We need to have a general way in kubectl and Dashboard to say "show me all the top-level objects, regardless of type. These are my "applications", whether they are SparkJobs, Deployments, Helm Charts, or what not. I'll file a separate issue to track this.
  • While the driver is running, there should be a link to the driver's dashboard page. When it is about to exit, it can move its state to FINISHED and remove the link.
  • TPR-object should be the parent of all its resources (using controller-refs). The Kubernetes dashboard needs to have a view that shows each "root" of the "controller-ref" forest as an "app" the user is running.
  • If the driver is somehow wedged, deleting the TPR can cause kubernetes GC to clean up all the pods. That will be developed in parallel by SIG-api-machinery, and we can use it when it is ready.

@tnachen
Copy link

tnachen commented Nov 11, 2016

@erictune mostly agree, but to your point the link somewhere to show the driver SparkUI and states either is just a console output telling you where it is, or we start simple with the TPR and let is just act as a UI querying state in the namespace and then eventually move to more.

@erikerlandson
Copy link
Collaborator

@erictune if the TPR is global, can you delete it without clobbering every spark driver running on the cluster?

Agreed that if TPR isn't defined, then child objects just don't get published.

With some basic loading stats we can do kube-level executor scaling now, as long as we're willing to lose some work due to non-graceful scale down. It's wasteful to some degree, but Spark is designed to work around it. Graceful executor shutdown could be added later, although I'm hoping to try it out to see if it is as straightforward as I believe.

@erictune
Copy link

Currently, deleting the TPR (the "schema" for all SparkJobs) will remove the API endpoint and leave the individual SparkJob objects in etcd, but inaccessible. I suspect in time we will make this do a cascading delete like namespace: we just haven't gotten to this. If we do this, we will need to document to admins not to delete TPRs carelessly.

@erictune
Copy link

Open question of whether TPRs will be describable via kubectl, either with or without an extension.

@iyanuobidele
Copy link
Collaborator

iyanuobidele commented Nov 18, 2016

Working on a minimal PR that pushes the sparkJob objects into etcd and also update it at interval. Here are some of the things I've found out:

  • After creating a TPR, there could be a lag between the time of creation and availability of the api endpoint. So we are required to always check before making any requests to the endpoint. The implication of this is that, unavailability of the endpoint doesn't necessarily mean the TPR does not exist. We can implement some type of backoff mechanism for this which is not a problem. See the part labeled note here.
    I'm thinking of completely avoiding this process if the TPR does not exist at all. I believe the fabric8 client has a way of getting a ThirdPartyResourceList
  • The fabric8 client currently used in the project doesn't expose an interface to create custom objects, at least I haven't seen a way to (I might be wrong, open to suggestions). I know for a fact there's no way to query a TPR instance, per this.
    Also, I have tried creating a sample object using kubectl create .. and getting the object using kubectl get sparkJobs. I realized it's more reliable to create an object, or get an object by making http requests to the api endpoint with a json payload (in the case of a post). Ultimately, the objects (TPR instances) are pushed to etcd to persist it.
    (I believe this is natural since the creation of the TPR returns an api endpoint to reach the objects). Implication of this is that, we would be adding additional dependencies for http clients and json serializing and deserializing, and we would need kubectl proxy started as well.
  • I also found out that objects once created cannot be modified through PATCH, or UPDATE. K8s currently supports only DELETE and CREATE methods. The implication of this is that we would need to delete the existing object and create a new one every time we need to update the object.

Please share some of your thoughts on these findings. @foxish @tnachen @erikerlandson @erictune

@erikerlandson
Copy link
Collaborator

My take is that it's not a show-stopper to just PUT some json to the appropriate TPR endpoint. The kubernetes scheduler is intended to be a sub-project of Spark, and so that limits the scope of any additional deps for doing REST calls to the TPR endpoints.

Not sure if it could be done fast enough to help us in the near term, but we could reach out to @prashantchitta or @tazjin, who both offered to work on a PR for fabric8io/kubernetes-client#299

@erikerlandson
Copy link
Collaborator

@iyanuobidele I'm not following the reference to "carrying state on etcd," can you expand on that?

@iyanuobidele
Copy link
Collaborator

@erikerlandson What i meant is, Ultimately all TPR instances are stored in etcd which is how the objects are persisted. etcd

@iyanuobidele
Copy link
Collaborator

Here's the behavior i noticed that made me think about an http client

Kubernetes version:

k8s-version

When I try kubectl get SparkJobs, kubectl weirdly picks when to work

spark-jobs

when clearly the TPR exists and an instance has been created. (Here, I tried to post again)

screenshot from 2016-11-18 15-02-35

but doing a get to the endpoint returns the object always.

get-request

@tazjin
Copy link

tazjin commented Nov 19, 2016

@erikerlandson I haven't had the time to put in a PR to the fabric8 client, but because we had a need to do exactly this (coincidentally for Apache Flink jobs) what we ended up doing is reflecting the configured HTTP client out of the Fabric8 client instance and using that directly.

@erikerlandson
Copy link
Collaborator

@tazjin that makes sense - I was thinking we might use whatever HTTP libs fabric8 is using to avoid adding any additional deps

@tazjin
Copy link

tazjin commented Nov 19, 2016

@erikerlandson You may want to actually reflect instance of the Fabric8 one out, it has the appropriate authorisation and TLS settings configured (which you probably don't want to do manually). See my comment over on the other issue.

@erikerlandson
Copy link
Collaborator

erikerlandson commented Nov 19, 2016

@tazjin agreed, it addresses the dependency surface and also benefits from the auth and configuration already established in the client cc/ @iyanuobidele

@foxish
Copy link
Owner Author

foxish commented Nov 22, 2016

@iyanuobidele It appears that the behavior you hit with kubectl get is a bug. You can run the same at higher verbosity (-v=6 or -v=9) to try and see what's happening.
I can't seem to reproduce it with newer versions of the client/server.

After creating a TPR, there could be a lag between the time of creation and availability of the api endpoint. So we are required to always check before making any requests to the endpoint. The implication of this is that, unavailability of the endpoint doesn't necessarily mean the TPR does not exist. We can implement some type of backoff mechanism for this which is not a problem. See the part labeled note here.

The creation of the TPR itself is a cluster-wide operation and we can assume that it will occur separately and prior to any user trying to launch Spark Jobs. I agree that we should check for its existence, but I don't think we need to retry or wait for it to be available.

While it is not possible (due to a bug to be fixed in 1.6) to use kubectl edit/patch on TPRs, it is still possible to PUT/PATCH the resource. The following works, for example:

TPR:

metadata:
  name: spark-job.apache.io
  labels:
    resource: spark-job
    object: spark
apiVersion: extensions/v1beta1
kind: ThirdPartyResource
description: "A resource that manages a spark job"
versions:
  - name: v1

Sample object

apiVersion: "apache.io/v1"
kind: "SparkJob"
metadata:
  name: "spark-job-1"
spec:
  image: "driver-image"
  state: "completed"
  num-executors: 10
curl -k -H "Authorization: Bearer XXX" -H "Content-Type: application/json-patch+json" -X PATCH --data '[{"op": "replace", "path": "/spec/num-executors", "value": "13"}]' https://<k8s-master>/apis/apache.io/v1/namespaces/default/sparkjobs/

It should be possible to use the reflected HTTP client for this as @tazjin mentioned.

@iyanuobidele
Copy link
Collaborator

Moving on to the next set of things addressing this issue

  1. completing the two way relationship we discussed about the TPR instance, that is, deleting/destroying the SparkJobObject should kill the SparkJob.

  2. adding support for spark-submit --status/kill

My initial thoughts on (1) above:

  • I'm very certain a polling mechanism is not at all efficient for this purpose.
  • an Event-based approach, if possible, seems promising for this problem.
  • I was also curious if there's a notion of a watcher for TPR instances, this might also solve the problem

Adding support for spark-submit --status/kill should be straightforward and I would start looking into that.

what are your thoughts on (1)?
/cc @foxish @erikerlandson

@foxish
Copy link
Owner Author

foxish commented Dec 8, 2016

(1) seems like something that should be done using a watch. I'm not sure if the fabric8 library allows us to watch TPR instances, but if not, we should be able to roll out our own similar to the current code surrounding TPRs. We may be able to get by without this watching/two-way relationship however, for implementing (2) which is necessary for the first PR. We can have the SparkJob instance record the names of driver and executor pods which are launched onto the cluster. Upon executing spark-submit --kill, lookup the SparkJob instance, find the corresponding driver (and executor pods if they've been created) and issue DELETE calls to k8s directly. If a cluster administrator does not create the TPR, then the user will be unable to use spark-submit to find status/control his jobs and will have to fall back to using kubectl or the kubernetes dashboard directly.

I've been thinking about the SparkJob resource. Tying it to the driver's lifecycle may not be the right solution because we want to report on the status of a SparkJob even when the driver is yet to enter the running state. In the current implementation, we don't create a SparkJob resource till the driver starts to run, which may not be for a while in a cluster with resource constraints. I think the right way would be to create the TPR at the time we create the driver (from the client-side), but allow the driver to clean it up eventually when it exits.

@foxish
Copy link
Owner Author

foxish commented Jan 9, 2017

@iyanuobidele We're now moving all development to our own org. You're one of the owners of the org as well.
Future PRs should be made against https://github.com/apache-spark-on-k8s/spark

Thanks!

@iyanuobidele
Copy link
Collaborator

Got it. Thanks !

@foxish is there a SIG or some avenue for meetings/discussions for the org ?

@foxish
Copy link
Owner Author

foxish commented Jan 18, 2017

oops, missed this.
@iyanuobidele Not yet. But I think we can reuse and reinvigorate https://groups.google.com/forum/#!forum/kubernetes-sig-big-data. Will work on getting it setup and active again.

@iyanuobidele
Copy link
Collaborator

@foxish sounds great. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants