Skip to content

Commit

Permalink
[SPARK-48038][K8S] Promote driverServiceName to DriverServiceFeatureStep
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Apr 29, 2024
1 parent eaed585 commit 5d6eba5
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
Expand Down Expand Up @@ -83,12 +84,27 @@ private[spark] class KubernetesDriverConf(
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String],
val proxyUser: Option[String])
extends KubernetesConf(sparkConf) {
val proxyUser: Option[String],
clock: Clock = new SystemClock())
extends KubernetesConf(sparkConf) with Logging {

def driverNodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX)

lazy val driverServiceName: String = {
val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
}

override val resourceNamePrefix: String = {
val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
Expand All @@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep(
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " +
"managed via a Kubernetes service.")

private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
s"$shorterServiceName as the driver service's name.")
shorterServiceName
}
private val resolvedServiceName = kubernetesConf.driverServiceName
private val ipFamilyPolicy =
kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
private val ipFamilies =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.util.{Clock, SystemClock}

/**
* Builder methods for KubernetesConf that allow easy control over what to return for a few
Expand Down Expand Up @@ -52,7 +53,8 @@ object KubernetesTestConf {
secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
secretNamesToMountPaths: Map[String, String] = Map.empty,
volumes: Seq[KubernetesVolumeSpec] = Seq.empty,
proxyUser: Option[String] = None): KubernetesDriverConf = {
proxyUser: Option[String] = None,
clock: Clock = new SystemClock()): KubernetesDriverConf = {
val conf = sparkConf.clone()

resourceNamePrefix.foreach { prefix =>
Expand All @@ -67,7 +69,7 @@ object KubernetesTestConf {
setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs)
setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)

new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser, clock)
}
// scalastyle:on argcount

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
}

test("Long prefixes should switch to using a generated unique name.") {
val clock = new ManualClock()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS)
val clock = new ManualClock()

// Ensure that multiple services created at the same time generate unique names.
val services = (1 to 10).map { _ =>
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS,
clock = clock)
val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock)
val serviceName = configurationStep
.getAdditionalKubernetesResources()
Expand All @@ -130,11 +131,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
val hostAddress = configurationStep
.getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)

(serviceName -> hostAddress)
}.toMap
Tuple3(kconf, serviceName, hostAddress)
}

assert(services.size === 10)
services.foreach { case (name, address) =>
services.foreach { case (kconf, name, address) =>
assert(!name.startsWith(kconf.resourceNamePrefix))
assert(!address.startsWith(kconf.resourceNamePrefix))
assert(InternetDomainName.isValid(address))
Expand Down

0 comments on commit 5d6eba5

Please sign in to comment.