Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/modify/delete ThirdPartyResources. #8

Merged
merged 3 commits into from
Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dev/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCac
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)

BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@)
BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts a previous change made on k8s-support.


# Actually build the jar
echo -e "\nBuilding with..."
Expand Down
26 changes: 16 additions & 10 deletions resource-managers/kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,23 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>1.4.8</version>
</dependency>

<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.11</version>
</dependency>
<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.kubernetes
import java.util.concurrent.CountDownLatch

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand All @@ -44,6 +45,7 @@ private[spark] class Client(val args: ClientArguments,
scheduler.stop()
shutdownLatch.countDown()
System.clearProperty("SPARK_KUBERNETES_MODE")
System.clearProperty("SPARK_IMAGE_PULLSECRET")
}

def awaitShutdown(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,34 @@ private[spark] class ClientArguments(args: Array[String]) {
args = tail

case Nil =>

case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
}
}

if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
" at the same time")
throw new IllegalArgumentException(
"Cannot have primary-py-file and primary-r-file" +
" at the same time")
}
}

private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
val message =
if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n"
else ""
message +
s"""
|Usage: org.apache.spark.deploy.kubernetes.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in kubernetes-cluster
| mode)
| --jar JAR_PATH Path to your application's JAR file (required in
| kubernetes-cluster mode)
| --class CLASS_NAME Name of your application's main class (required)
| --primary-py-file A main Python file
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| Multiple invocations are possible, each will be passed in
| order.
""".stripMargin
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient}
import okhttp3.{MediaType, OkHttpClient, Request, RequestBody}
import org.json4s.{CustomSerializer, DefaultFormats, JString}
import org.json4s.JsonAST.JNull
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.{read, write}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.cluster.kubernetes.JobState
import org.apache.spark.scheduler.cluster.kubernetes.JobState._

/*
* Representation of a Spark Job State in Kubernetes
* */
object SparkJobResource {
case class Metadata(name: String,
uid: Option[String] = None,
labels: Option[Map[String, String]] = None,
annotations: Option[Map[String, String]] = None)

case class SparkJobState(apiVersion: String,
kind: String,
metadata: Metadata,
spec: Map[String, Any])

case object JobStateSerDe
extends CustomSerializer[JobState](format =>
({
case JString("SUBMITTED") => JobState.SUBMITTED
case JString("QUEUED") => JobState.QUEUED
case JString("RUNNING") => JobState.RUNNING
case JString("FINISHED") => JobState.FINISHED
case JString("KILLED") => JobState.KILLED
case JString("FAILED") => JobState.FAILED
case JNull =>
throw new UnsupportedOperationException("No JobState Specified")
}, {
case JobState.FAILED => JString("FAILED")
case JobState.SUBMITTED => JString("SUBMITTED")
case JobState.KILLED => JString("KILLED")
case JobState.FINISHED => JString("FINISHED")
case JobState.QUEUED => JString("QUEUED")
case JobState.RUNNING => JString("RUNNING")
}))
}

class SparkJobResource(client: KubernetesClient) extends Logging {

import SparkJobResource._

implicit val formats = DefaultFormats + JobStateSerDe
private val httpClient = getHttpClient(client.asInstanceOf[BaseClient])
private val kind = "SparkJob"
private val apiVersion = "apache.io/v1"
private val apiEndpoint = s"${client.getMasterUrl}apis/$apiVersion/" +
s"namespaces/${client.getNamespace}/sparkjobs"

private def getHttpClient(client: BaseClient): OkHttpClient = {
val field = classOf[BaseClient].getDeclaredField("httpClient")
try {
field.setAccessible(true)
field.get(client).asInstanceOf[OkHttpClient]
} finally {
field.setAccessible(false)
}
}

/*
* using a Map as an argument here allows adding more info into the Object if needed
* */
def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = {
val resourceObject =
SparkJobState(apiVersion, kind, Metadata(name), keyValuePairs)
val payload = parse(write(resourceObject))
val requestBody = RequestBody
.create(MediaType.parse("application/json"), compact(render(payload)))
val request =
new Request.Builder().post(requestBody).url(apiEndpoint).build()

val response = httpClient.newCall(request).execute()
if (response.code() == 201) {
logInfo(
s"Successfully posted resource $name: " +
s"${pretty(render(parse(write(resourceObject))))}")
} else {
val msg =
s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}"
logError(msg)
throw new SparkException(msg)
}
}

def updateJobObject(name: String, value: String, fieldPath: String): Unit = {
val payload = List(
("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value))
val requestBody = RequestBody.create(
MediaType.parse("application/json-patch+json"),
compact(render(payload)))
val request = new Request.Builder()
.post(requestBody)
.url(s"$apiEndpoint/$name")
.build()
val response = httpClient.newCall(request).execute()
if (response.code() == 200) {
logInfo(s"Successfully patched resource $name")
} else {
val msg =
s"Failed to patch resource $name. ${response.message()}. ${compact(render(payload))}"
logError(msg)
throw new SparkException(msg)
}
}

def deleteJobObject(name: String): Unit = {
val request =
new Request.Builder().delete().url(s"$apiEndpoint/$name").build()
val response = httpClient.newCall(request).execute()
if (response.code() == 200) {
logInfo(s"Successfully deleted resource $name")
} else {
val msg =
s"Failed to delete resource $name. ${response.message()}. ${request}"
logError(msg)
throw new SparkException(msg)
}
}

def getJobObject(name: String): SparkJobState = {
val request =
new Request.Builder().get().url(s"$apiEndpoint/$name").build()
val response = httpClient.newCall(request).execute()
if (response.code() == 200) {
logInfo(s"Successfully retrieved resource $name")
read[SparkJobState](response.body().string())
} else {
val msg = s"Failed to retrieve resource $name. ${response.message()}"
logError(msg)
throw new SparkException(msg)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.kubernetes

private[spark] object JobState extends Enumeration {
type JobState = Value

/*
* QUEUED - Spark Job has been queued to run
* SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s)
* RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running
* FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were
* gracefully deleted
* FAILED - Spark Job Failed due to error
* KILLED - A user manually killed this Spark Job
*/
val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.kubernetes

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
Expand Down
Loading