Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

add spark.kubernetes.hadoop.conf.configmap.name conf to use exist had… #599

Open
wants to merge 1 commit into
base: branch-2.2-kubernetes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,15 @@ from the other deployment modes. See the [configuration page](configuration.html
in the executor Pods. The user can specify multiple instances of this for multiple secrets.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.hadoop.conf.configmap.name</code></td>
<td>(none)</td>
<td>
If this is specified, will not create new configmap to store hadoop conf file and reuse the
exist configmap. The configmap will be mounted into driver/executor pod and
<code>HADOOP_CONF_DIR</code> will be set.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,18 @@ private[spark] trait HadoopConfBootstrap {
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging {
hadoopConfConfigMapName: String) extends HadoopConfBootstrap with Logging {

override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files")
val keyPaths = hadoopConfigFiles.map { file =>
val fileStringPath = file.toPath.getFileName.toString
new KeyToPathBuilder()
.withKey(fileStringPath)
.withPath(fileStringPath)
.build() }
logInfo("HADOOP_CONF_DIR or spark.kubernetes.hadoop.conf.configmap.name defined. " +
"Mounting Hadoop specific files")
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME =
ConfigBuilder("spark.kubernetes.hadoop.conf.configmap.name")
.doc("Specify the configmap name of the config where the hadoop conf exist." +
"It will be mounted to spark pods.")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
private val hadoopConfigMapName = submissionSparkConf.get(KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME)
.getOrElse(s"$kubernetesResourceNamePrefix-hadoop-config")
private val noNeedUploadHadoopConf = submissionSparkConf.get(
KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME).isDefined

def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val additionalMainAppJar = mainAppResource match {
Expand Down Expand Up @@ -123,18 +126,23 @@ private[spark] class DriverConfigurationStepsOrchestrator(
None
}

val hadoopConfigSteps =
hadoopConfDir.map { conf =>
val hadoopConfigSteps = if (hadoopConfDir.isDefined || noNeedUploadHadoopConf) {
val hadoopStepsOrchestrator =
new HadoopStepsOrchestrator(
kubernetesResourceNamePrefix,
namespace,
hadoopConfigMapName,
submissionSparkConf,
conf)
hadoopConfDir,
noNeedUploadHadoopConf)
val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps()
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))}
.getOrElse(Option.empty[DriverConfigurationStep])
Some(new HadoopConfigBootstrapStep(
hadoopConfSteps,
hadoopConfigMapName,
noNeedUploadHadoopConf))
} else {
Option.empty[DriverConfigurationStep]
}
val resourceStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.ConfigMapBuilder
import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
Expand All @@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigS
*/
private[spark] class HadoopConfigBootstrapStep(
hadoopConfigurationSteps: Seq[HadoopConfigurationStep],
hadoopConfigMapName: String )
hadoopConfigMapName: String,
noNeedUploadHadoopConf: Boolean = false)
extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
Expand All @@ -46,13 +47,16 @@ private[spark] class HadoopConfigBootstrapStep(
for (nextStep <- hadoopConfigurationSteps) {
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
}
val configMap =
new ConfigMapBuilder()
val configMap = if (noNeedUploadHadoopConf) {
Option.empty[HasMetadata]
} else {
Some(new ConfigMapBuilder()
.withNewMetadata()
.withName(hadoopConfigMapName)
.endMetadata()
.withName(hadoopConfigMapName)
.endMetadata()
.addToData(currentHadoopSpec.configMapProperties.asJava)
.build()
.build())
}
val driverSparkConfWithExecutorSetup = driverSpec.driverSparkConf.clone()
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)
.setAll(currentHadoopSpec.additionalDriverSparkConf)
Expand All @@ -62,7 +66,7 @@ private[spark] class HadoopConfigBootstrapStep(
driverSparkConf = driverSparkConfWithExecutorSetup,
otherKubernetesResources =
driverSpec.otherKubernetesResources ++
Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq
configMap.toSeq ++ currentHadoopSpec.dtSecret.toSeq
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private[spark] class HadoopConfMounterStep(
hadoopConfigMapName: String,
hadoopConfigurationFiles: Seq[File],
hadoopConfBootstrapConf: HadoopConfBootstrap,
hadoopConfDir: String)
hadoopConfDir: Option[String],
noNeedUploadHadoopConf: Boolean = false)
extends HadoopConfigurationStep {

override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
Expand All @@ -48,10 +49,11 @@ private[spark] class HadoopConfMounterStep(
driverPod = bootstrappedPodAndMainContainer.pod,
driverContainer = bootstrappedPodAndMainContainer.mainContainer,
configMapProperties =
hadoopConfigurationFiles.map(file =>
hadoopConfigurationFiles.filter(_ => !noNeedUploadHadoopConf).map(file =>
(file.toPath.getFileName.toString, Files.toString(file, Charsets.UTF_8))).toMap,
additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++
Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir)
additionalDriverSparkConf = hadoopConfDir.filter(_ => !noNeedUploadHadoopConf)
.foldLeft(hadoopConfigSpec.additionalDriverSparkConf)((sparkConf, conf) =>
sparkConf ++ Map(HADOOP_CONF_DIR_LOC -> conf))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[spark] class HadoopStepsOrchestrator(
namespace: String,
hadoopConfigMapName: String,
submissionSparkConf: SparkConf,
hadoopConfDir: String) extends Logging {
hadoopConfDir: Option[String],
noNeedUploadHadoopConf: Boolean = false) extends Logging {

private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
Expand All @@ -43,7 +44,8 @@ private[spark] class HadoopStepsOrchestrator(
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
private val maybeRenewerPrincipal =
submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL)
private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir)
private val hadoopConfigurationFiles = hadoopConfDir.map(HadoopConfUtils.getHadoopConfFiles)
.getOrElse(Seq.empty[File])
private val hadoopUGI = new HadoopUGIUtilImpl
logInfo(s"Hadoop Conf directory: $hadoopConfDir")

Expand All @@ -68,14 +70,13 @@ private[spark] class HadoopStepsOrchestrator(
" you must also specify the name of the secret")

def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
hadoopConfigMapName,
hadoopConfigurationFiles)
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(hadoopConfigMapName)
val hadoopConfMounterStep = new HadoopConfMounterStep(
hadoopConfigMapName,
hadoopConfigurationFiles,
hadoopConfBootstrapImpl,
hadoopConfDir)
hadoopConfDir,
noNeedUploadHadoopConf)
val maybeKerberosStep =
if (isKerberosEnabled) {
maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
}

val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap =>
val hadoopConfigurations = maybeHadoopConfDir.map(
conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File])
new HadoopConfBootstrapImpl(
hadoopConfigMap,
hadoopConfigurations)
new HadoopConfBootstrapImpl(hadoopConfigMap)
}

val kerberosBootstrap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,20 @@ import org.apache.spark.util.Utils
private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{
private val CONFIG_MAP_NAME = "config-map"
private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
private val SPARK_USER_VALUE = "sparkUser"

before {
MockitoAnnotations.initMocks(this)
}

test("Test of bootstrapping hadoop_conf_dir files") {
val hadoopConfStep = new HadoopConfBootstrapImpl(
CONFIG_MAP_NAME,
HADOOP_FILES)
val expectedKeyPaths = Seq(
new KeyToPathBuilder()
.withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)
.withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString)
.build())
val hadoopConfStep = new HadoopConfBootstrapImpl(CONFIG_MAP_NAME)
val expectedPod = new PodBuilder()
.editOrNewSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(CONFIG_MAP_NAME)
.withItems(expectedKeyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with Befor
CONFIG_MAP_NAME,
HADOOP_FILES,
hadoopConfBootstrap,
HADOOP_CONF_DIR_VAL)
Some(HADOOP_CONF_DIR_VAL))
val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL)
val expectedConfigMap = Map(
TEMP_HADOOP_FILE.toPath.getFileName.toString ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
HADOOP_CONF_DIR_VAL)
Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
Expand All @@ -50,7 +50,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
HADOOP_CONF_DIR_VAL)
Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
Expand All @@ -65,7 +65,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
HADOOP_CONF_DIR_VAL)
Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
Expand All @@ -82,7 +82,7 @@ private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
NAMESPACE,
HADOOP_CONFIG_MAP,
sparkTestConf,
HADOOP_CONF_DIR_VAL)
Some(HADOOP_CONF_DIR_VAL))
val steps = hadoopOrchestrator.getHadoopSteps()
assert(steps.length === 2)
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
val hadoopFile = createTempFile
val hadoopFiles = Seq(hadoopFile)
val hadoopBootsrap = new HadoopConfBootstrapImpl(
hadoopConfConfigMapName = configName,
hadoopConfigFiles = hadoopFiles)
val hadoopBootsrap = new HadoopConfBootstrapImpl(configName)

val factory = new ExecutorPodFactoryImpl(
conf,
Expand All @@ -376,20 +372,15 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
configName,
hadoopFile.toPath.getFileName.toString)
configName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
}

test("check that hadoop bootstrap mounts files w/ SPARK_USER") {
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
val hadoopFile = createTempFile
val hadoopFiles = Seq(hadoopFile)
val hadoopBootstrap = new HadoopConfBootstrapImpl(
hadoopConfConfigMapName = configName,
hadoopConfigFiles = hadoopFiles)
val hadoopBootstrap = new HadoopConfBootstrapImpl(configName)
val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI)

val factory = new ExecutorPodFactoryImpl(
Expand All @@ -414,20 +405,15 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
configName,
hadoopFile.toPath.getFileName.toString)
configName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
}

test("check that hadoop and kerberos bootstrap function properly") {
when(hadoopUGI.getShortUserName).thenReturn("test-user")
val conf = baseConf.clone()
val configName = "hadoop-test"
val hadoopFile = createTempFile
val hadoopFiles = Seq(hadoopFile)
val hadoopBootstrap = new HadoopConfBootstrapImpl(
hadoopConfConfigMapName = configName,
hadoopConfigFiles = hadoopFiles)
val hadoopBootstrap = new HadoopConfBootstrapImpl(configName)
val secretName = "secret-test"
val secretItemKey = "item-test"
val userName = "sparkUser"
Expand Down Expand Up @@ -459,8 +445,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
checkOwnerReferences(executor, driverPodUid)
checkConfigMapVolumes(executor,
HADOOP_FILE_VOLUME,
configName,
hadoopFile.toPath.getFileName.toString)
configName)
checkSecretVolumes(executor, SPARK_APP_HADOOP_SECRET_VOLUME_NAME, secretName)
checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)
checkVolumeMounts(executor,
Expand Down Expand Up @@ -505,15 +490,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef

private def checkConfigMapVolumes(executor: Pod,
volName: String,
configMapName: String,
content: String) : Unit = {
configMapName: String) : Unit = {
val volume = executor.getSpec.getVolumes.asScala.find(_.getName == volName)
assert(volume.nonEmpty)
assert(volume.get.getConfigMap.getName == configMapName)
assert(volume.get.getConfigMap.getItems.asScala.find(_.getKey == content).get ==
new KeyToPathBuilder()
.withKey(content)
.withPath(content).build() )
}

private def checkSecretVolumes(executor: Pod, volName: String, secretName: String) : Unit = {
Expand Down