Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Spark on Kubernetes - basic scheduler backend #498

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation

This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
Copyright (C) 2015 Red Hat, Inc.

This product includes/uses OkHttp (https://github.com/square/okhttp)
Copyright (C) 2012 The Android Open Source Project

=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the DataNucleus distribution. ==
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.util.Utils

private[spark] object SchedulerBackendUtils {
val DEFAULT_NUMBER_EXECUTORS = 2

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors requested by the user.
*/
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")

initialNumExecutors
} else {
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}
}
8 changes: 8 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def __hash__(self):
sbt_test_goals=["mesos/test"]
)

kubernetes = Module(
name="kubernetes",
dependencies=[],
source_file_regexes=["resource-managers/kubernetes/core"],
build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)

# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1397,10 +1397,10 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
<td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
<td>0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
<td>
The minimum ratio of registered resources (registered resources / total expected resources)
(resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
(resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
Regardless of whether the minimum ratio of resources has been reached,
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2648,6 +2648,13 @@
</modules>
</profile>

<profile>
<id>kubernetes</id>
<modules>
<module>resource-managers/kubernetes/core</module>
</modules>
</profile>

<profile>
<id>hive-thriftserver</id>
<modules>
Expand Down
8 changes: 4 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn,
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
Seq("kubernetes", "mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
Expand Down Expand Up @@ -671,9 +671,9 @@ object Unidoc {
publish := {},

unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),

unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Expand Down
100 changes: 100 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Required by kubernetes-client but we exclude it -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
</dependency>

</dependencies>


<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods. When using " +
"spark-submit in cluster mode, this can also be passed to spark-submit via the " +
"--kubernetes-namespace command line argument.")
.stringConf
.createWithDefault("default")

val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
"format.")
.stringConf
.createOptional

val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
.stringConf
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")

val APISERVER_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
"using those credentials instead.")
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
"is memory that accounts for things like VM overheads, interned strings, other native " +
"overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
.intConf
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
"before it is assumed that the executor failed.")
.intConf
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
"must be a positive integer")
.createWithDefault(10)

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
}
Loading