Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Do not add the MountSmallLocalFilesStep when there's no submitter loc…
Browse files Browse the repository at this point in the history
…al files (#557)
  • Loading branch information
liyinan926 authored Nov 28, 2017
1 parent 5fd1304 commit 15a333c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,27 @@ private[spark] class DriverConfigurationStepsOrchestrator(
submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ =>
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
}.getOrElse {
// Else - use a small files bootstrap that submits the local files via a secret.
// Then, indicate to the outer block that the init-container should not handle
// those local files simply by filtering them out.
val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
val smallFilesSecretName = s"$kubernetesAppId-submitted-files"
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
// Otherwise, if there are any submitter local files, use a small files bootstrap that
// submits the local files via a secret. If this is the case, indicate to the outer
// block that the init-container should not handle those local files simply by filtering
// them out.
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
if (submitterLocalFiles.nonEmpty) {
val nonSubmitterLocalFiles = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
val smallFilesSecretName = s"$kubernetesAppId-submitted-files"
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
sparkFiles,
smallFilesSecretName,
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
mountSmallFilesBootstrap)
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
sparkFilesWithoutLocal.toArray,
Some(mountSmallLocalFilesStep))
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
submitterLocalFiles.toSeq,
smallFilesSecretName,
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
mountSmallFilesBootstrap)
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
nonSubmitterLocalFiles.toArray,
Some(mountSmallLocalFilesStep))
} else {
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
}
}

val initContainerBootstrapStep =
Expand All @@ -169,6 +175,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
initContainerConfigMapName,
INIT_CONTAINER_CONFIG_MAP_KEY))
} else Option.empty[DriverConfigurationStep]

(submittedLocalFilesDownloadPath,
mountSmallFilesWithoutInitContainerStep.toSeq ++
initContainerBootstrapStep.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSmallFilesB
import org.apache.spark.util.Utils

private[spark] class MountSmallLocalFilesStep(
sparkFiles: Seq[String],
submitterLocalFiles: Seq[String],
smallFilesSecretName: String,
smallFilesSecretMountPath: String,
mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep {

import MountSmallLocalFilesStep._
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
.map(localFileUri => new File(Utils.resolveURI(localFileUri).getPath))
val localFiles = submitterLocalFiles.map { localFileUri =>
new File(Utils.resolveURI(localFileUri).getPath)
}
val totalSizeBytes = localFiles.map(_.length()).sum
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
}


test("Only local files without a resource staging server.") {
test("Only submitter local files without a resource staging server.") {
val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigurationStepsOrchestrator(
Expand All @@ -151,6 +151,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
classOf[MountSmallLocalFilesStep])
}

test("No submitter local files without a resource staging server") {
val sparkConf = new SparkConf(false).set(
"spark.files", "hdfs://localhost:9000/var/foo.txt,https://localhost:8080/var/bar.txt")
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
val orchestrator = new DriverConfigurationStepsOrchestrator(
NAMESPACE,
APP_ID,
LAUNCH_TIME,
mainAppResource,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
Seq.empty[String],
sparkConf)
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverServiceBootstrapStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[LocalDirectoryMountConfigurationStep],
classOf[InitContainerBootstrapStep])
}

test("Submission steps with driver secrets to mount") {
val sparkConf = new SparkConf(false)
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be
private val SECOND_TEMP_FILE_NAME = "file2.txt"
private val FIRST_TEMP_FILE_CONTENTS = "123"
private val SECOND_TEMP_FILE_CONTENTS = "456"
private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt"
private val SECRET_NAME = "secret"

private var tempFolder: File = _
Expand All @@ -61,8 +60,7 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be
tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS)
val sparkFiles = Seq(
s"file://${firstTempFile.getAbsolutePath}",
secondTempFile.getAbsolutePath,
REMOTE_FILE_URI)
secondTempFile.getAbsolutePath)
val configurationStep = new MountSmallLocalFilesStep(
sparkFiles,
SECRET_NAME,
Expand Down

0 comments on commit 15a333c

Please sign in to comment.