From 6c64bf2400e0e3b5d936d11481f6d39d15a4550b Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 23 Apr 2024 20:50:52 +0800 Subject: [PATCH] [SPARK-47954][K8S] Support creating ingress entry for external UI access --- .../org/apache/spark/deploy/k8s/Config.scala | 24 +++++ .../spark/deploy/k8s/KubernetesConf.scala | 17 +++- .../features/DriverIngressFeatureStep.scala | 96 +++++++++++++++++++ .../features/DriverServiceFeatureStep.scala | 14 +-- .../k8s/submit/KubernetesDriverBuilder.scala | 3 +- 5 files changed, 140 insertions(+), 14 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index a25876de5aee5..43df42e3107ac 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -749,6 +749,28 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_INGRESS_ENABLED = + ConfigBuilder("spark.kubernetes.driver.ingress.enabled") + .doc("Whether to create ingress entry of the driver service for external UI access " + + "in cluster mode. This only takes effect when `spark.ui.enabled` is true.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + val KUBERNETES_INGRESS_HOST_PATTERN = + ConfigBuilder("spark.kubernetes.driver.ingress.host") + .doc("Host for driver UI ingress, {{APP_ID}} will be replaced by application ID.") + .version("4.0.0") + .stringConf + .createOptional + + val KUBERNETES_INGRESS_CLASS_NAME = + ConfigBuilder("spark.kubernetes.driver.ingress.ingressClassName") + .doc("Reference the IngressClass that should be used to implement the ingress for spark UI.") + .version("4.0.0") + .stringConf + .createOptional + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_LABEL_PREFIX = "spark.kubernetes.driver.service.label." @@ -756,6 +778,8 @@ private[spark] object Config extends Logging { val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef." val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes." + val KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX = "spark.kubernetes.driver.ingress.label." + val KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX = "spark.kubernetes.driver.ingress.annotation." val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 9fdd9518d2d81..ac4a046839cb2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKey.{CONFIG, EXECUTOR_ENV_REGEX} @@ -84,7 +85,7 @@ private[spark] class KubernetesDriverConf( val mainClass: String, val appArgs: Array[String], val proxyUser: Option[String]) - extends KubernetesConf(sparkConf) { + extends KubernetesConf(sparkConf) with Logging { def driverNodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX) @@ -94,6 +95,20 @@ private[spark] class KubernetesDriverConf( custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) } + val driverServiceName: String = { + val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX" + if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName + } else { + val randomServiceId = KubernetesUtils.uniqueID() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + + s"$shorterServiceName as the driver service's name.") + shorterServiceName + } + } + override def labels: Map[String, String] = { val presetLabels = Map( SPARK_VERSION_LABEL -> SPARK_VERSION, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala new file mode 100644 index 0000000000000..a5bf25a95eedb --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverIngressFeatureStep.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.api.model.networking.v1.{HTTPIngressPathBuilder, IngressBuilder, IngressRuleBuilder, ServiceBackendPortBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.SPARK_APP_ID_LABEL +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.UI._ + +class DriverIngressFeatureStep(kubernetesConf: KubernetesDriverConf) + extends KubernetesFeatureConfigStep with Logging { + + private lazy val driverServiceName: String = kubernetesConf.driverServiceName + + override def configurePod(pod: SparkPod): SparkPod = pod + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + if (!kubernetesConf.get(UI_ENABLED) || !kubernetesConf.get(KUBERNETES_INGRESS_ENABLED)) { + logInfo( + s"Skip building ingress for Spark UI, due to" + + s"${UI_ENABLED.key} or ${KUBERNETES_INGRESS_ENABLED.key} is false.") + return Seq.empty + } + + val appId = kubernetesConf.appId + val uiPort = kubernetesConf.get(UI_PORT) + val ingressHost = kubernetesConf.get(KUBERNETES_INGRESS_HOST_PATTERN) match { + case Some(ingressHostPattern) => + ingressHostPattern.replace("{{APP_ID}}", appId) + case None => + logWarning(s"Skip building ingress for Spark UI, due to " + + s"${KUBERNETES_INGRESS_HOST_PATTERN.key} is absent.") + return Seq.empty + } + + val customLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_LABEL_PREFIX) + val labels = customLabels ++ Map(SPARK_APP_ID_LABEL -> appId) + + val annotations = KubernetesUtils.parsePrefixedKeyValuePairs( + kubernetesConf.sparkConf, + KUBERNETES_DRIVER_INGRESS_ANNOTATION_PREFIX) + + val path = new HTTPIngressPathBuilder() + .withPath("/") + .withPathType("Prefix") + .withNewBackend() + .withNewService() + .withName(driverServiceName) + .withPort(new ServiceBackendPortBuilder().withNumber(uiPort).build()) + .endService() + .endBackend() + .build() + + val uiRule = new IngressRuleBuilder() + .withHost(ingressHost) + .withNewHttp() + .addToPaths(path) + .endHttp() + .build() + + val ingress = new IngressBuilder() + .withNewMetadata() + .withName(s"$driverServiceName-ingress") + .addToLabels(labels.asJava) + .addToAnnotations(annotations.asJava) + .endMetadata() + .withNewSpec() + .withIngressClassName(kubernetesConf.get(KUBERNETES_INGRESS_CLASS_NAME).orNull) + .withRules(uiRule) + .endSpec() + .build() + Seq(ingress) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index cba4f442371c9..9adfb2b8de491 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} -import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} @@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep( s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + "managed via a Kubernetes service.") - private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX" - private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { - preferredServiceName - } else { - val randomServiceId = KubernetesUtils.uniqueID(clock = clock) - val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" - logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + - s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + - s"$shorterServiceName as the driver service's name.") - shorterServiceName - } + private val resolvedServiceName = kubernetesConf.driverServiceName private val ipFamilyPolicy = kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY) private val ipFamilies = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 3b69754b9cdf1..2067e9d95735c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -73,7 +73,8 @@ private[spark] class KubernetesDriverBuilder { new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), new PodTemplateConfigMapStep(conf), - new LocalDirsFeatureStep(conf)) ++ userFeatures + new LocalDirsFeatureStep(conf), + new DriverIngressFeatureStep(conf)) ++ userFeatures val spec = KubernetesDriverSpec( initialPod,