Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
One more round of comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Nov 14, 2017
1 parent 71a971f commit 0ab9ca7
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 107 deletions.
10 changes: 1 addition & 9 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,7 @@
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,7 @@ private[spark] object ConfigurationUtils {
sparkConf: SparkConf,
prefix: String,
configType: String): Map[String, String] = {
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
fromPrefix.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
fromPrefix.toMap
sparkConf.getAllWithPrefix(prefix).toMap
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ private[spark] object SparkKubernetesClientFactory {
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
.map(new File(_))
.orElse(maybeServiceAccountToken)
.orElse(defaultServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
Expand All @@ -56,7 +56,7 @@ private[spark] object SparkKubernetesClientFactory {

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
.orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,45 @@
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config extends Logging {
private[spark] object config extends Logging {

private[spark] val KUBERNETES_NAMESPACE =
val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods. When using" +
" spark-submit in cluster mode, this can also be passed to spark-submit via the" +
" --kubernetes-namespace command line argument.")
.stringConf
.createWithDefault("default")

private[spark] val EXECUTOR_DOCKER_IMAGE =
val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" +
" format.")
.stringConf
.createWithDefault(s"spark-executor:$sparkVersion")
.createWithDefault(s"spark-executor:$SPARK_VERSION")

private[spark] val DOCKER_IMAGE_PULL_POLICY =
val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
.stringConf
.createWithDefault("IfNotPresent")

private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
private[spark] val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
private[spark] val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
private[spark] val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME =
val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses" +
" this service account when requesting executor pods from the API server. If specific" +
Expand All @@ -66,49 +66,49 @@ package object config extends Logging {
// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" +
" is memory that accounts for things like VM overheads, interned strings, other native" +
" overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

private[spark] val KUBERNETES_DRIVER_POD_NAME =
val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")

private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE =
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
.intConf
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, s"Allocation batch delay should be a positive integer")
.createWithDefault(1)

private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.util.Utils

/**
* Configures executor pods. Construct one of these with a SparkConf to set up properties that are
* common across all executors. Then, pass in dynamic parameters into createExecutorPod.
* A factory class for configuring and creating executor pods.
*/
private[spark] trait ExecutorPodFactory {

/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
Expand Down Expand Up @@ -161,12 +164,12 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map(port => {
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(port._1)
.withContainerPort(port._2)
.withName(name)
.withContainerPort(port)
.build()
})
}

val executorContainer = new ContainerBuilder()
.withName(s"executor")
Expand Down Expand Up @@ -202,16 +205,15 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
.endSpec()
.build()

val containerWithExecutorLimitCores = executorLimitCores.map {
limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
}.getOrElse(executorContainer)

new PodBuilder(executorPod)
Expand Down
Loading

0 comments on commit 0ab9ca7

Please sign in to comment.