diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 1b070973afe7e..e0ddd812b79c6 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -838,6 +838,15 @@ from the other deployment modes. See the [configuration page](configuration.html
in the executor Pods. The user can specify multiple instances of this for multiple secrets.
+
+ spark.kubernetes.hadoop.conf.configmap.name |
+ (none) |
+
+ If this is specified, will not create new configmap to store hadoop conf file and reuse the
+ exist configmap. The configmap will be mounted into driver/executor pod and
+ HADOOP_CONF_DIR will be set.
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala
index 4e8e1f2499eb2..c493092532a5c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala
@@ -39,25 +39,18 @@ private[spark] trait HadoopConfBootstrap {
}
private[spark] class HadoopConfBootstrapImpl(
- hadoopConfConfigMapName: String,
- hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging {
+ hadoopConfConfigMapName: String) extends HadoopConfBootstrap with Logging {
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
- logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files")
- val keyPaths = hadoopConfigFiles.map { file =>
- val fileStringPath = file.toPath.getFileName.toString
- new KeyToPathBuilder()
- .withKey(fileStringPath)
- .withPath(fileStringPath)
- .build() }
+ logInfo("HADOOP_CONF_DIR or spark.kubernetes.hadoop.conf.configmap.name defined. " +
+ "Mounting Hadoop specific files")
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
- .withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala
index e395fed810a3d..9182139f08a00 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala
@@ -539,6 +539,13 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME =
+ ConfigBuilder("spark.kubernetes.hadoop.conf.configmap.name")
+ .doc("Specify the configmap name of the config where the hadoop conf exist." +
+ "It will be mounted to spark pods.")
+ .stringConf
+ .createOptional
+
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
index d8210ad87c0f8..5b5d9f333866d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
@@ -53,7 +53,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
- private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
+ private val hadoopConfigMapName = submissionSparkConf.get(KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME)
+ .getOrElse(s"$kubernetesResourceNamePrefix-hadoop-config")
+ private val noNeedUploadHadoopConf = submissionSparkConf.get(
+ KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME).isDefined
def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val additionalMainAppJar = mainAppResource match {
@@ -123,18 +126,23 @@ private[spark] class DriverConfigurationStepsOrchestrator(
None
}
- val hadoopConfigSteps =
- hadoopConfDir.map { conf =>
+ val hadoopConfigSteps = if (hadoopConfDir.isDefined || noNeedUploadHadoopConf) {
val hadoopStepsOrchestrator =
new HadoopStepsOrchestrator(
kubernetesResourceNamePrefix,
namespace,
hadoopConfigMapName,
submissionSparkConf,
- conf)
+ hadoopConfDir,
+ noNeedUploadHadoopConf)
val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps()
- Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))}
- .getOrElse(Option.empty[DriverConfigurationStep])
+ Some(new HadoopConfigBootstrapStep(
+ hadoopConfSteps,
+ hadoopConfigMapName,
+ noNeedUploadHadoopConf))
+ } else {
+ Option.empty[DriverConfigurationStep]
+ }
val resourceStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala
index 916619475bc2a..3d7353befaa32 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps
import scala.collection.JavaConverters._
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder
+import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata}
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
@@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigS
*/
private[spark] class HadoopConfigBootstrapStep(
hadoopConfigurationSteps: Seq[HadoopConfigurationStep],
- hadoopConfigMapName: String )
+ hadoopConfigMapName: String,
+ noNeedUploadHadoopConf: Boolean = false)
extends DriverConfigurationStep {
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
@@ -46,13 +47,16 @@ private[spark] class HadoopConfigBootstrapStep(
for (nextStep <- hadoopConfigurationSteps) {
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
}
- val configMap =
- new ConfigMapBuilder()
+ val configMap = if (noNeedUploadHadoopConf) {
+ Option.empty[HasMetadata]
+ } else {
+ Some(new ConfigMapBuilder()
.withNewMetadata()
- .withName(hadoopConfigMapName)
- .endMetadata()
+ .withName(hadoopConfigMapName)
+ .endMetadata()
.addToData(currentHadoopSpec.configMapProperties.asJava)
- .build()
+ .build())
+ }
val driverSparkConfWithExecutorSetup = driverSpec.driverSparkConf.clone()
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)
.setAll(currentHadoopSpec.additionalDriverSparkConf)
@@ -62,7 +66,7 @@ private[spark] class HadoopConfigBootstrapStep(
driverSparkConf = driverSparkConfWithExecutorSetup,
otherKubernetesResources =
driverSpec.otherKubernetesResources ++
- Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq
+ configMap.toSeq ++ currentHadoopSpec.dtSecret.toSeq
)
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala
index 37a41d71ba616..8a5e74c574888 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala
@@ -35,7 +35,8 @@ private[spark] class HadoopConfMounterStep(
hadoopConfigMapName: String,
hadoopConfigurationFiles: Seq[File],
hadoopConfBootstrapConf: HadoopConfBootstrap,
- hadoopConfDir: String)
+ hadoopConfDir: Option[String],
+ noNeedUploadHadoopConf: Boolean = false)
extends HadoopConfigurationStep {
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
@@ -48,10 +49,11 @@ private[spark] class HadoopConfMounterStep(
driverPod = bootstrappedPodAndMainContainer.pod,
driverContainer = bootstrappedPodAndMainContainer.mainContainer,
configMapProperties =
- hadoopConfigurationFiles.map(file =>
+ hadoopConfigurationFiles.filter(_ => !noNeedUploadHadoopConf).map(file =>
(file.toPath.getFileName.toString, Files.toString(file, Charsets.UTF_8))).toMap,
- additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++
- Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir)
+ additionalDriverSparkConf = hadoopConfDir.filter(_ => !noNeedUploadHadoopConf)
+ .foldLeft(hadoopConfigSpec.additionalDriverSparkConf)((sparkConf, conf) =>
+ sparkConf ++ Map(HADOOP_CONF_DIR_LOC -> conf))
)
}
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala
index a3764769f4dcf..e0de6cf21c691 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala
@@ -32,7 +32,8 @@ private[spark] class HadoopStepsOrchestrator(
namespace: String,
hadoopConfigMapName: String,
submissionSparkConf: SparkConf,
- hadoopConfDir: String) extends Logging {
+ hadoopConfDir: Option[String],
+ noNeedUploadHadoopConf: Boolean = false) extends Logging {
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
@@ -43,7 +44,8 @@ private[spark] class HadoopStepsOrchestrator(
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
private val maybeRenewerPrincipal =
submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL)
- private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir)
+ private val hadoopConfigurationFiles = hadoopConfDir.map(HadoopConfUtils.getHadoopConfFiles)
+ .getOrElse(Seq.empty[File])
private val hadoopUGI = new HadoopUGIUtilImpl
logInfo(s"Hadoop Conf directory: $hadoopConfDir")
@@ -68,14 +70,13 @@ private[spark] class HadoopStepsOrchestrator(
" you must also specify the name of the secret")
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
- val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
- hadoopConfigMapName,
- hadoopConfigurationFiles)
+ val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(hadoopConfigMapName)
val hadoopConfMounterStep = new HadoopConfMounterStep(
hadoopConfigMapName,
hadoopConfigurationFiles,
hadoopConfBootstrapImpl,
- hadoopConfDir)
+ hadoopConfDir,
+ noNeedUploadHadoopConf)
val maybeKerberosStep =
if (isKerberosEnabled) {
maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep(
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index d2f19fda113a0..265fef5667804 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -88,11 +88,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
}
val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap =>
- val hadoopConfigurations = maybeHadoopConfDir.map(
- conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File])
- new HadoopConfBootstrapImpl(
- hadoopConfigMap,
- hadoopConfigurations)
+ new HadoopConfBootstrapImpl(hadoopConfigMap)
}
val kerberosBootstrap =
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala
index 8113a965ecd5a..2b5578319ac09 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{
private val CONFIG_MAP_NAME = "config-map"
private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
- private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
private val SPARK_USER_VALUE = "sparkUser"
before {
@@ -42,21 +41,13 @@ private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeA
}
test("Test of bootstrapping hadoop_conf_dir files") {
- val hadoopConfStep = new HadoopConfBootstrapImpl(
- CONFIG_MAP_NAME,
- HADOOP_FILES)
- val expectedKeyPaths = Seq(
- new KeyToPathBuilder()
- .withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)
- .withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString)
- .build())
+ val hadoopConfStep = new HadoopConfBootstrapImpl(CONFIG_MAP_NAME)
val expectedPod = new PodBuilder()
.editOrNewSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(CONFIG_MAP_NAME)
- .withItems(expectedKeyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala
index 18bb3b631cf28..2458a520cd7ff 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala
@@ -73,7 +73,7 @@ private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with Befor
CONFIG_MAP_NAME,
HADOOP_FILES,
hadoopConfBootstrap,
- HADOOP_CONF_DIR_VAL)
+ Some(HADOOP_CONF_DIR_VAL))
val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL)
val expectedConfigMap = Map(
TEMP_HADOOP_FILE.toPath.getFileName.toString ->
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala
index b7701b12c5b0c..bedef8ad2a6dd 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala
@@ -33,7 +33,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
- HADOOP_CONF_DIR_VAL)
+ Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
@@ -50,7 +50,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
- HADOOP_CONF_DIR_VAL)
+ Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
@@ -65,7 +65,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
- HADOOP_CONF_DIR_VAL)
+ Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
@@ -82,7 +82,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
- HADOOP_CONF_DIR_VAL)
+ Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
index 6c4c20b4da1ad..e8deea2f93f0d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -349,11 +349,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
- val hadoopFile = createTempFile
- val hadoopFiles = Seq(hadoopFile)
- val hadoopBootsrap = new HadoopConfBootstrapImpl(
- hadoopConfConfigMapName = configName,
- hadoopConfigFiles = hadoopFiles)
+ val hadoopBootsrap = new HadoopConfBootstrapImpl(configName)
val factory = new ExecutorPodFactoryImpl(
conf,
@@ -376,8 +372,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
- configName,
- hadoopFile.toPath.getFileName.toString)
+ configName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
}
@@ -385,11 +380,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
- val hadoopFile = createTempFile
- val hadoopFiles = Seq(hadoopFile)
- val hadoopBootstrap = new HadoopConfBootstrapImpl(
- hadoopConfConfigMapName = configName,
- hadoopConfigFiles = hadoopFiles)
+ val hadoopBootstrap = new HadoopConfBootstrapImpl(configName)
val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI)
val factory = new ExecutorPodFactoryImpl(
@@ -414,8 +405,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
- configName,
- hadoopFile.toPath.getFileName.toString)
+ configName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
}
@@ -423,11 +413,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
- val hadoopFile = createTempFile
- val hadoopFiles = Seq(hadoopFile)
- val hadoopBootstrap = new HadoopConfBootstrapImpl(
- hadoopConfConfigMapName = configName,
- hadoopConfigFiles = hadoopFiles)
+ val hadoopBootstrap = new HadoopConfBootstrapImpl(configName)
val secretName = "secret-test"
val secretItemKey = "item-test"
val userName = "sparkUser"
@@ -459,8 +445,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
- configName,
- hadoopFile.toPath.getFileName.toString)
+ configName)
checkSecretVolumes(executor, SPARK_APP_HADOOP_SECRET_VOLUME_NAME, secretName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
checkVolumeMounts(executor,
@@ -505,15 +490,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
private def checkConfigMapVolumes(executor: Pod,
volName: String,
- configMapName: String,
- content: String) : Unit = {
+ configMapName: String) : Unit = {
val volume = executor.getSpec.getVolumes.asScala.find(_.getName == volName)
assert(volume.nonEmpty)
assert(volume.get.getConfigMap.getName == configMapName)
- assert(volume.get.getConfigMap.getItems.asScala.find(_.getKey == content).get ==
- new KeyToPathBuilder()
- .withKey(content)
- .withPath(content).build() )
}
private def checkSecretVolumes(executor: Pod, volName: String, secretName: String) : Unit = {