From 704430aceaa8bf996e92bd924a296691c5de03e1 Mon Sep 17 00:00:00 2001 From: forrestchen Date: Thu, 11 Jan 2018 09:50:44 +0800 Subject: [PATCH] add spark.kubernetes.hadoop.conf.configmap.name conf to use exist hadoop conf configmap Signed-off-by: forrestchen --- docs/running-on-kubernetes.md | 9 +++++ .../deploy/k8s/HadoopConfBootstrap.scala | 13 ++----- .../org/apache/spark/deploy/k8s/config.scala | 7 ++++ ...DriverConfigurationStepsOrchestrator.scala | 20 +++++++---- .../HadoopConfigBootstrapStep.scala | 20 ++++++----- .../hadoopsteps/HadoopConfMounterStep.scala | 10 +++--- .../hadoopsteps/HadoopStepsOrchestrator.scala | 13 +++---- .../k8s/KubernetesClusterManager.scala | 6 +--- .../k8s/submit/HadoopConfBootstrapSuite.scala | 11 +----- .../HadoopConfMounterStepSuite.scala | 2 +- .../HadoopStepsOrchestratorSuite.scala | 8 ++--- .../cluster/k8s/ExecutorPodFactorySuite.scala | 34 ++++--------------- 12 files changed, 72 insertions(+), 81 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 1b070973afe7e..e0ddd812b79c6 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -838,6 +838,15 @@ from the other deployment modes. See the [configuration page](configuration.html in the executor Pods. The user can specify multiple instances of this for multiple secrets. + + spark.kubernetes.hadoop.conf.configmap.name + (none) + + If this is specified, will not create new configmap to store hadoop conf file and reuse the + exist configmap. The configmap will be mounted into driver/executor pod and + HADOOP_CONF_DIR will be set. + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala index 4e8e1f2499eb2..c493092532a5c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala @@ -39,25 +39,18 @@ private[spark] trait HadoopConfBootstrap { } private[spark] class HadoopConfBootstrapImpl( - hadoopConfConfigMapName: String, - hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging { + hadoopConfConfigMapName: String) extends HadoopConfBootstrap with Logging { override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) : PodWithMainContainer = { - logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files") - val keyPaths = hadoopConfigFiles.map { file => - val fileStringPath = file.toPath.getFileName.toString - new KeyToPathBuilder() - .withKey(fileStringPath) - .withPath(fileStringPath) - .build() } + logInfo("HADOOP_CONF_DIR or spark.kubernetes.hadoop.conf.configmap.name defined. " + + "Mounting Hadoop specific files") val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod) .editSpec() .addNewVolume() .withName(HADOOP_FILE_VOLUME) .withNewConfigMap() .withName(hadoopConfConfigMapName) - .withItems(keyPaths.asJava) .endConfigMap() .endVolume() .endSpec() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index e395fed810a3d..9182139f08a00 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -539,6 +539,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME = + ConfigBuilder("spark.kubernetes.hadoop.conf.configmap.name") + .doc("Specify the configmap name of the config where the hadoop conf exist." + + "It will be mounted to spark pods.") + .stringConf + .createOptional + private[spark] def resolveK8sMaster(rawMasterString: String): String = { if (!rawMasterString.startsWith("k8s://")) { throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index d8210ad87c0f8..5b5d9f333866d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -53,7 +53,10 @@ private[spark] class DriverConfigurationStepsOrchestrator( private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config" - private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config" + private val hadoopConfigMapName = submissionSparkConf.get(KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME) + .getOrElse(s"$kubernetesResourceNamePrefix-hadoop-config") + private val noNeedUploadHadoopConf = submissionSparkConf.get( + KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME).isDefined def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = { val additionalMainAppJar = mainAppResource match { @@ -123,18 +126,23 @@ private[spark] class DriverConfigurationStepsOrchestrator( None } - val hadoopConfigSteps = - hadoopConfDir.map { conf => + val hadoopConfigSteps = if (hadoopConfDir.isDefined || noNeedUploadHadoopConf) { val hadoopStepsOrchestrator = new HadoopStepsOrchestrator( kubernetesResourceNamePrefix, namespace, hadoopConfigMapName, submissionSparkConf, - conf) + hadoopConfDir, + noNeedUploadHadoopConf) val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps() - Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))} - .getOrElse(Option.empty[DriverConfigurationStep]) + Some(new HadoopConfigBootstrapStep( + hadoopConfSteps, + hadoopConfigMapName, + noNeedUploadHadoopConf)) + } else { + Option.empty[DriverConfigurationStep] + } val resourceStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala index 916619475bc2a..3d7353befaa32 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.ConfigMapBuilder +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata} import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep} @@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigS */ private[spark] class HadoopConfigBootstrapStep( hadoopConfigurationSteps: Seq[HadoopConfigurationStep], - hadoopConfigMapName: String ) + hadoopConfigMapName: String, + noNeedUploadHadoopConf: Boolean = false) extends DriverConfigurationStep { override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { @@ -46,13 +47,16 @@ private[spark] class HadoopConfigBootstrapStep( for (nextStep <- hadoopConfigurationSteps) { currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec) } - val configMap = - new ConfigMapBuilder() + val configMap = if (noNeedUploadHadoopConf) { + Option.empty[HasMetadata] + } else { + Some(new ConfigMapBuilder() .withNewMetadata() - .withName(hadoopConfigMapName) - .endMetadata() + .withName(hadoopConfigMapName) + .endMetadata() .addToData(currentHadoopSpec.configMapProperties.asJava) - .build() + .build()) + } val driverSparkConfWithExecutorSetup = driverSpec.driverSparkConf.clone() .set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName) .setAll(currentHadoopSpec.additionalDriverSparkConf) @@ -62,7 +66,7 @@ private[spark] class HadoopConfigBootstrapStep( driverSparkConf = driverSparkConfWithExecutorSetup, otherKubernetesResources = driverSpec.otherKubernetesResources ++ - Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq + configMap.toSeq ++ currentHadoopSpec.dtSecret.toSeq ) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala index 37a41d71ba616..8a5e74c574888 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala @@ -35,7 +35,8 @@ private[spark] class HadoopConfMounterStep( hadoopConfigMapName: String, hadoopConfigurationFiles: Seq[File], hadoopConfBootstrapConf: HadoopConfBootstrap, - hadoopConfDir: String) + hadoopConfDir: Option[String], + noNeedUploadHadoopConf: Boolean = false) extends HadoopConfigurationStep { override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { @@ -48,10 +49,11 @@ private[spark] class HadoopConfMounterStep( driverPod = bootstrappedPodAndMainContainer.pod, driverContainer = bootstrappedPodAndMainContainer.mainContainer, configMapProperties = - hadoopConfigurationFiles.map(file => + hadoopConfigurationFiles.filter(_ => !noNeedUploadHadoopConf).map(file => (file.toPath.getFileName.toString, Files.toString(file, Charsets.UTF_8))).toMap, - additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++ - Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir) + additionalDriverSparkConf = hadoopConfDir.filter(_ => !noNeedUploadHadoopConf) + .foldLeft(hadoopConfigSpec.additionalDriverSparkConf)((sparkConf, conf) => + sparkConf ++ Map(HADOOP_CONF_DIR_LOC -> conf)) ) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala index a3764769f4dcf..e0de6cf21c691 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala @@ -32,7 +32,8 @@ private[spark] class HadoopStepsOrchestrator( namespace: String, hadoopConfigMapName: String, submissionSparkConf: SparkConf, - hadoopConfDir: String) extends Logging { + hadoopConfDir: Option[String], + noNeedUploadHadoopConf: Boolean = false) extends Logging { private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT) private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL) @@ -43,7 +44,8 @@ private[spark] class HadoopStepsOrchestrator( submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) private val maybeRenewerPrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL) - private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir) + private val hadoopConfigurationFiles = hadoopConfDir.map(HadoopConfUtils.getHadoopConfFiles) + .getOrElse(Seq.empty[File]) private val hadoopUGI = new HadoopUGIUtilImpl logInfo(s"Hadoop Conf directory: $hadoopConfDir") @@ -68,14 +70,13 @@ private[spark] class HadoopStepsOrchestrator( " you must also specify the name of the secret") def getHadoopSteps(): Seq[HadoopConfigurationStep] = { - val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl( - hadoopConfigMapName, - hadoopConfigurationFiles) + val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(hadoopConfigMapName) val hadoopConfMounterStep = new HadoopConfMounterStep( hadoopConfigMapName, hadoopConfigurationFiles, hadoopConfBootstrapImpl, - hadoopConfDir) + hadoopConfDir, + noNeedUploadHadoopConf) val maybeKerberosStep = if (isKerberosEnabled) { maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index d2f19fda113a0..265fef5667804 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -88,11 +88,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit } val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap => - val hadoopConfigurations = maybeHadoopConfDir.map( - conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File]) - new HadoopConfBootstrapImpl( - hadoopConfigMap, - hadoopConfigurations) + new HadoopConfBootstrapImpl(hadoopConfigMap) } val kerberosBootstrap = diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala index 8113a965ecd5a..2b5578319ac09 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala @@ -34,7 +34,6 @@ import org.apache.spark.util.Utils private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{ private val CONFIG_MAP_NAME = "config-map" private val TEMP_HADOOP_FILE = createTempFile("core-site.xml") - private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE) private val SPARK_USER_VALUE = "sparkUser" before { @@ -42,21 +41,13 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeA } test("Test of bootstrapping hadoop_conf_dir files") { - val hadoopConfStep = new HadoopConfBootstrapImpl( - CONFIG_MAP_NAME, - HADOOP_FILES) - val expectedKeyPaths = Seq( - new KeyToPathBuilder() - .withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString) - .withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString) - .build()) + val hadoopConfStep = new HadoopConfBootstrapImpl(CONFIG_MAP_NAME) val expectedPod = new PodBuilder() .editOrNewSpec() .addNewVolume() .withName(HADOOP_FILE_VOLUME) .withNewConfigMap() .withName(CONFIG_MAP_NAME) - .withItems(expectedKeyPaths.asJava) .endConfigMap() .endVolume() .endSpec() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala index 18bb3b631cf28..2458a520cd7ff 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala @@ -73,7 +73,7 @@ private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with Befor CONFIG_MAP_NAME, HADOOP_FILES, hadoopConfBootstrap, - HADOOP_CONF_DIR_VAL) + Some(HADOOP_CONF_DIR_VAL)) val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL) val expectedConfigMap = Map( TEMP_HADOOP_FILE.toPath.getFileName.toString -> diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala index b7701b12c5b0c..bedef8ad2a6dd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala @@ -33,7 +33,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite { NAMESPACE, HADOOP_CONFIG_MAP, sparkTestConf, - HADOOP_CONF_DIR_VAL) + Some(HADOOP_CONF_DIR_VAL)) val steps = hadoopOrchestrator.getHadoopSteps() assert(steps.length === 2) assert(steps.head.isInstanceOf[HadoopConfMounterStep]) @@ -50,7 +50,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite { NAMESPACE, HADOOP_CONFIG_MAP, sparkTestConf, - HADOOP_CONF_DIR_VAL) + Some(HADOOP_CONF_DIR_VAL)) val steps = hadoopOrchestrator.getHadoopSteps() assert(steps.length === 2) assert(steps.head.isInstanceOf[HadoopConfMounterStep]) @@ -65,7 +65,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite { NAMESPACE, HADOOP_CONFIG_MAP, sparkTestConf, - HADOOP_CONF_DIR_VAL) + Some(HADOOP_CONF_DIR_VAL)) val steps = hadoopOrchestrator.getHadoopSteps() assert(steps.length === 2) assert(steps.head.isInstanceOf[HadoopConfMounterStep]) @@ -82,7 +82,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite { NAMESPACE, HADOOP_CONFIG_MAP, sparkTestConf, - HADOOP_CONF_DIR_VAL) + Some(HADOOP_CONF_DIR_VAL)) val steps = hadoopOrchestrator.getHadoopSteps() assert(steps.length === 2) assert(steps.head.isInstanceOf[HadoopConfMounterStep]) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 6c4c20b4da1ad..e8deea2f93f0d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -349,11 +349,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef when(hadoopUGI.getShortUserName).thenReturn("test-user") val conf = baseConf.clone() val configName = "hadoop-test" - val hadoopFile = createTempFile - val hadoopFiles = Seq(hadoopFile) - val hadoopBootsrap = new HadoopConfBootstrapImpl( - hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles) + val hadoopBootsrap = new HadoopConfBootstrapImpl(configName) val factory = new ExecutorPodFactoryImpl( conf, @@ -376,8 +372,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) checkConfigMapVolumes(executor, HADOOP_FILE_VOLUME, - configName, - hadoopFile.toPath.getFileName.toString) + configName) checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) } @@ -385,11 +380,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef when(hadoopUGI.getShortUserName).thenReturn("test-user") val conf = baseConf.clone() val configName = "hadoop-test" - val hadoopFile = createTempFile - val hadoopFiles = Seq(hadoopFile) - val hadoopBootstrap = new HadoopConfBootstrapImpl( - hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles) + val hadoopBootstrap = new HadoopConfBootstrapImpl(configName) val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI) val factory = new ExecutorPodFactoryImpl( @@ -414,8 +405,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) checkConfigMapVolumes(executor, HADOOP_FILE_VOLUME, - configName, - hadoopFile.toPath.getFileName.toString) + configName) checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) } @@ -423,11 +413,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef when(hadoopUGI.getShortUserName).thenReturn("test-user") val conf = baseConf.clone() val configName = "hadoop-test" - val hadoopFile = createTempFile - val hadoopFiles = Seq(hadoopFile) - val hadoopBootstrap = new HadoopConfBootstrapImpl( - hadoopConfConfigMapName = configName, - hadoopConfigFiles = hadoopFiles) + val hadoopBootstrap = new HadoopConfBootstrapImpl(configName) val secretName = "secret-test" val secretItemKey = "item-test" val userName = "sparkUser" @@ -459,8 +445,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) checkConfigMapVolumes(executor, HADOOP_FILE_VOLUME, - configName, - hadoopFile.toPath.getFileName.toString) + configName) checkSecretVolumes(executor, SPARK_APP_HADOOP_SECRET_VOLUME_NAME, secretName) checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) checkVolumeMounts(executor, @@ -505,15 +490,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef private def checkConfigMapVolumes(executor: Pod, volName: String, - configMapName: String, - content: String) : Unit = { + configMapName: String) : Unit = { val volume = executor.getSpec.getVolumes.asScala.find(_.getName == volName) assert(volume.nonEmpty) assert(volume.get.getConfigMap.getName == configMapName) - assert(volume.get.getConfigMap.getItems.asScala.find(_.getKey == content).get == - new KeyToPathBuilder() - .withKey(content) - .withPath(content).build() ) } private def checkSecretVolumes(executor: Pod, volName: String, secretName: String) : Unit = {