-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468
Changes from 22 commits
f6fdd6a
75e31a9
cf82b21
488c535
82b79a7
c052212
c565c9f
2fb596d
992acbe
b0a5839
a4f9797
2b5dcac
018f4d8
4b32134
6cf4ed7
1f271be
71a971f
0ab9ca7
7f14b71
7afce3f
b75b413
3b587b4
cb12fec
ae396cf
f8e3249
a44c29e
4bed817
c386186
b85cfc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.scheduler.cluster | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} | ||
import org.apache.spark.util.Utils | ||
|
||
private[spark] object SchedulerBackendUtils { | ||
val DEFAULT_NUMBER_EXECUTORS = 2 | ||
|
||
/** | ||
* Getting the initial target number of executors depends on whether dynamic allocation is | ||
* enabled. | ||
* If not using dynamic allocation it gets the number of executors requested by the user. | ||
*/ | ||
def getInitialTargetExecutorNumber( | ||
conf: SparkConf, | ||
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { | ||
if (Utils.isDynamicAllocationEnabled(conf)) { | ||
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS) | ||
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) | ||
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS) | ||
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, | ||
s"initial executor number $initialNumExecutors must between min executor number " + | ||
s"$minNumExecutors and max executor number $maxNumExecutors") | ||
|
||
initialNumExecutors | ||
} else { | ||
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -531,6 +531,14 @@ def __hash__(self): | |
sbt_test_goals=["mesos/test"] | ||
) | ||
|
||
kubernetes = Module( | ||
name="kubernetes", | ||
dependencies=[], | ||
source_file_regexes=["resource-managers/kubernetes/core"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove 'core' |
||
build_profile_flags=["-Pkubernetes"], | ||
sbt_test_goals=["kubernetes/test"] | ||
) | ||
|
||
# The root module is a dummy module which is used to run all of the tests. | ||
# No other modules should directly depend on this module. | ||
root = Module( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2648,6 +2648,13 @@ | |
</modules> | ||
</profile> | ||
|
||
<profile> | ||
<id>kubernetes</id> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this new profile is not use in build script, shall we add this so that the new contributions in this PR will be built and tested? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also change the sbt file to make it work using sbt. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. PTAL. |
||
<modules> | ||
<module>resource-managers/kubernetes/core</module> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this going to be a multi-module module later? If so, might want to create a parent pom in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We expected it would be a multi-module (https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes). The other modules being - configuration files for the docker images and integration tests. The docker files are pretty much static configuration files, so, moving that instead to a directory like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on a discussion in last week's meeting with Shane Knapp from RISELab, we want to keep the integration tests as a sub-module here - in the interest of keeping test code together. We should have the additional parent pom to facilitate that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin this isn't a multi-module project in the sense that the Kubernetes cluster manager and spark-submit implementation are split across multiple projects - but rather that there is a module for said cluster manager + spark-submit implementation, and then there are modules for integration testing said code. @foxish The Dockerfiles feel more like application code rather than static configuration but that might just be a matter of implementation. The structure of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are integration tests in a separate module? e.g. maven has an
That would mean keeping the test code in the same module as the core code, not in a separate module. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not absolutely necessary to have integration tests in a specific separate module. However, there are some nice organizational benefits we can get. For example, integration tests in the same module as the core code will need a specific package namespace that is omitted from the It's also IMO easier to read the
We definitely don't want to run these during unit tests - they are relatively expensive, require building Docker images, and require Minikube to be pre-installed on the given machine. Having them in at least the separate integration test phase makes these differences clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That (keeping them separate) is actually pretty useful for SBT. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both yarn and meson don't have a sub-directory called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #19468 (comment) |
||
</modules> | ||
</profile> | ||
|
||
<profile> | ||
<id>hive-thriftserver</id> | ||
<modules> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.3.0-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>spark-kubernetes_2.11</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Project Kubernetes</name> | ||
<properties> | ||
<sbt.project.name>kubernetes</sbt.project.name> | ||
<kubernetes.client.version>3.0.0</kubernetes.client.version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last time I asked about the client version, there were concerns regarding maturity/stability of 3.x compared to the 2.2 (iirc) version which was in use - where they resolved ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have been using version 3.0.0 in our forked repo for a couple of months now. We haven't seen any issue with it. |
||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.fabric8</groupId> | ||
<artifactId>kubernetes-client</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these dependencies need to be listed, please see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dependency needs to be added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listed in f8e3249. |
||
<version>${kubernetes.client.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>*</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-yaml</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
<!-- Required by kubernetes-client but we exclude it --> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-yaml</artifactId> | ||
<version>${fasterxml.jackson.version}</version> | ||
</dependency> | ||
|
||
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> | ||
<dependency> | ||
<groupId>com.google.guava</groupId> | ||
<artifactId>guava</artifactId> | ||
</dependency> | ||
<!-- End of shaded deps. --> | ||
|
||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.k8s | ||
|
||
import org.apache.spark.SPARK_VERSION | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.internal.config.ConfigBuilder | ||
import org.apache.spark.network.util.ByteUnit | ||
|
||
private[spark] object Config extends Logging { | ||
|
||
val KUBERNETES_NAMESPACE = | ||
ConfigBuilder("spark.kubernetes.namespace") | ||
.doc("The namespace that will be used for running the driver and executor pods. When using" + | ||
" spark-submit in cluster mode, this can also be passed to spark-submit via the" + | ||
" --kubernetes-namespace command line argument.") | ||
.stringConf | ||
.createWithDefault("default") | ||
|
||
val EXECUTOR_DOCKER_IMAGE = | ||
ConfigBuilder("spark.kubernetes.executor.docker.image") | ||
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" + | ||
" format.") | ||
.stringConf | ||
.createWithDefault(s"spark-executor:$SPARK_VERSION") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should remove this instead of defaulting. WDYT @liyinan926 @mccheah There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Removed in 4bed817. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm somewhat concerned about this though. Secondly, it is going to be very easy to make a mistake and set the "wrong" Docker images (there are many many ways this could go wrong, including mismatch versions of submission client vs driver &/ executor) that would be very hard to diagnose (by no means it will fail fast - worse case you get some subtle correctness issues in data output). And IMO having this version number set consistently by default is really going to help that. What's the concern with having the default value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we should, as a part of releasing Spark with this feature, to have Docker image in the same release. Granted, it is not a guarantee, but given what we know about building Docker images and Docker image publishing supported officially by the ASF / ASF Infra, this shouldn't be a problem. On the flip side if we couldn't have "official" or default images, then we should consider how to build some validations in the startup path such that "wrong" or mismatch images can be detected quickly / fail-fast. (eg. rudimentary version check) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @felixcheung That would be a behavior change w.r.t how we have used docker till now in spark - we expect users to explicitly specify the image. I do agree that it is a good idea to validate version - irrespective of whether we have a default value or not. It will help not just k8s integration, but also mesos and spark on yarn + docker. |
||
|
||
val DOCKER_IMAGE_PULL_POLICY = | ||
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") | ||
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a list of acceptable values for this config if possible and add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.stringConf | ||
.createWithDefault("IfNotPresent") | ||
|
||
val APISERVER_AUTH_DRIVER_CONF_PREFIX = | ||
"spark.kubernetes.authenticate.driver" | ||
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX = | ||
"spark.kubernetes.authenticate.driver.mounted" | ||
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" | ||
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" | ||
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" | ||
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" | ||
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" | ||
|
||
val KUBERNETES_SERVICE_ACCOUNT_NAME = | ||
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") | ||
.doc("Service account that is used when running the driver pod. The driver pod uses" + | ||
" this service account when requesting executor pods from the API server. If specific" + | ||
" credentials are given for the driver pod to use, the driver will favor" + | ||
" using those credentials instead.") | ||
.stringConf | ||
.createOptional | ||
|
||
// Note that while we set a default for this when we start up the | ||
// scheduler, the specific default value is dynamically determined | ||
// based on the executor memory. | ||
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = | ||
ConfigBuilder("spark.kubernetes.executor.memoryOverhead") | ||
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + | ||
" is memory that accounts for things like VM overheads, interned strings, other native" + | ||
" overheads, etc. This tends to grow with the executor size. (typically 6-10%).") | ||
.bytesConf(ByteUnit.MiB) | ||
.createOptional | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about driver memory overhead ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perfect ! |
||
|
||
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." | ||
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." | ||
|
||
val KUBERNETES_DRIVER_POD_NAME = | ||
ConfigBuilder("spark.kubernetes.driver.pod.name") | ||
.doc("Name of the driver pod.") | ||
.stringConf | ||
.createOptional | ||
|
||
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = | ||
ConfigBuilder("spark.kubernetes.executor.podNamePrefix") | ||
.doc("Prefix to use in front of the executor pod names.") | ||
.internal() | ||
.stringConf | ||
.createWithDefault("spark") | ||
|
||
val KUBERNETES_ALLOCATION_BATCH_SIZE = | ||
ConfigBuilder("spark.kubernetes.allocation.batch.size") | ||
.doc("Number of pods to launch at once in each round of executor allocation.") | ||
.intConf | ||
.checkValue(value => value > 0, "Allocation batch size should be a positive integer") | ||
.createWithDefault(5) | ||
|
||
val KUBERNETES_ALLOCATION_BATCH_DELAY = | ||
ConfigBuilder("spark.kubernetes.allocation.batch.delay") | ||
.doc("Number of seconds to wait between each round of executor allocation.") | ||
.longConf | ||
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
.createWithDefault(1) | ||
|
||
val KUBERNETES_EXECUTOR_LIMIT_CORES = | ||
ConfigBuilder("spark.kubernetes.executor.limit.cores") | ||
.doc("Specify the hard cpu limit for a single executor pod") | ||
.stringConf | ||
.createOptional | ||
|
||
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.k8s | ||
|
||
import org.apache.spark.SparkConf | ||
|
||
private[spark] object ConfigurationUtils { | ||
|
||
/** | ||
* Extract and parse Spark configuration properties with a given name prefix and | ||
* return the result as a Map. Keys must not have more than one value. | ||
* | ||
* @param sparkConf Spark configuration | ||
* @param prefix the given property name prefix | ||
* @return a Map storing the configuration property keys and values | ||
*/ | ||
def parsePrefixedKeyValuePairs( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add comment to explain what does the function do, it not only return the configs, but also ensure no duplicate configs are set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
sparkConf: SparkConf, | ||
prefix: String): Map[String, String] = { | ||
sparkConf.getAllWithPrefix(prefix).toMap | ||
} | ||
|
||
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { | ||
opt1.foreach { _ => require(opt2.isEmpty, errMessage) } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this val from
YarnSparkHadoopUtil
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done