Skip to content

Commit

Permalink
[SPARK-47954][K8S] Support creating ingress entry for external UI access
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Apr 23, 2024
1 parent a97e72c commit 6c64bf2
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -749,13 +749,37 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value")
.createWithDefaultString("20s")

val KUBERNETES_INGRESS_ENABLED =
ConfigBuilder("spark.kubernetes.driver.ingress.enabled")
.doc("Whether to create ingress entry of the driver service for external UI access " +
"in cluster mode. This only takes effect when `spark.ui.enabled` is true.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val KUBERNETES_INGRESS_HOST_PATTERN =
ConfigBuilder("spark.kubernetes.driver.ingress.host")
.doc("Host for driver UI ingress, {{APP_ID}} will be replaced by application ID.")
.version("4.0.0")
.stringConf
.createOptional

val KUBERNETES_INGRESS_CLASS_NAME =
ConfigBuilder("spark.kubernetes.driver.ingress.ingressClassName")
.doc("Reference the IngressClass that should be used to implement the ingress for spark UI.")
.version("4.0.0")
.stringConf
.createOptional

val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SERVICE_LABEL_PREFIX = "spark.kubernetes.driver.service.label."
val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."
val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes."
val KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX = "spark.kubernetes.driver.ingress.label."
val KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX = "spark.kubernetes.driver.ingress.annotation."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CONFIG, EXECUTOR_ENV_REGEX}
Expand Down Expand Up @@ -84,7 +85,7 @@ private[spark] class KubernetesDriverConf(
val mainClass: String,
val appArgs: Array[String],
val proxyUser: Option[String])
extends KubernetesConf(sparkConf) {
extends KubernetesConf(sparkConf) with Logging {

def driverNodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX)
Expand All @@ -94,6 +95,20 @@ private[spark] class KubernetesDriverConf(
custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
}

val driverServiceName: String = {
val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
}

override def labels: Map[String, String] = {
val presetLabels = Map(
SPARK_VERSION_LABEL -> SPARK_VERSION,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI._

class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf)
extends KubernetesFeatureConfigStep with Logging {

private lazy val driverServiceName: String = kubernetesConf.driverServiceName

override def configurePod(pod: SparkPod): SparkPod = pod

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) {
logInfo(
s"Skip building ingress for Spark UI, due to" +
s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.")
return Seq.empty
}

val appId = kubernetesConf.appId
val uiPort = kubernetesConf.get(UI_PORT)
val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match {
case Some(ingressHostPattern) =>
ingressHostPattern.replace("{{APP_ID}}", appId)
case None =>
logWarning(s"Skip building ingress for Spark UI, due to " +
s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.")
return Seq.empty
}

val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
kubernetesConf.sparkConf,
KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX)
val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId)

val annotations = KubernetesUtils.parsePrefixedKeyValuePairs(
kubernetesConf.sparkConf,
KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX)

val path = new HTTPIngressPathBuilder()
.withPath("/")
.withPathType("Prefix")
.withNewBackend()
.withNewService()
.withName(driverServiceName)
.withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build())
.endService()
.endBackend()
.build()

val uiRule = new IngressRuleBuilder()
.withHost(ingressHost)
.withNewHttp()
.addToPaths(path)
.endHttp()
.build()

val ingress = new IngressBuilder()
.withNewMetadata()
.withName(s"$driverServiceName-ingress")
.addToLabels(labels.asJava)
.addToAnnotations(annotations.asJava)
.endMetadata()
.withNewSpec()
.withIngressClassName(kubernetesConf.get(KUBERNETES_INGRESS_CLASS_NAME).orNull)
.withRules(uiRule)
.endSpec()
.build()
Seq(ingress)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
Expand All @@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep(
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
"managed via a Kubernetes service.")

private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
private val resolvedServiceName = kubernetesConf.driverServiceName
private val ipFamilyPolicy =
kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
private val ipFamilies =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf),
new DriverIngressFeatureStep(conf)) ++ userFeatures

val spec = KubernetesDriverSpec(
initialPod,
Expand Down

0 comments on commit 6c64bf2

Please sign in to comment.