diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index 232aca4f2f55c..d5710fab5a207 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -131,21 +131,27 @@ private[spark] class DriverConfigurationStepsOrchestrator( submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ => (filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep]) }.getOrElse { - // Else - use a small files bootstrap that submits the local files via a secret. - // Then, indicate to the outer block that the init-container should not handle - // those local files simply by filtering them out. - val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles) - val smallFilesSecretName = s"$kubernetesAppId-submitted-files" - val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl( + // Otherwise, if there are any submitter local files, use a small files bootstrap that + // submits the local files via a secret. If this is the case, indicate to the outer + // block that the init-container should not handle those local files simply by filtering + // them out. + val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles) + if (submitterLocalFiles.nonEmpty) { + val nonSubmitterLocalFiles = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles) + val smallFilesSecretName = s"$kubernetesAppId-submitted-files" + val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl( smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH) - val mountSmallLocalFilesStep = new MountSmallLocalFilesStep( - sparkFiles, - smallFilesSecretName, - MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, - mountSmallFilesBootstrap) - (MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, - sparkFilesWithoutLocal.toArray, - Some(mountSmallLocalFilesStep)) + val mountSmallLocalFilesStep = new MountSmallLocalFilesStep( + submitterLocalFiles.toSeq, + smallFilesSecretName, + MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + mountSmallFilesBootstrap) + (MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH, + nonSubmitterLocalFiles.toArray, + Some(mountSmallLocalFilesStep)) + } else { + (filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep]) + } } val initContainerBootstrapStep = @@ -169,6 +175,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( initContainerConfigMapName, INIT_CONTAINER_CONFIG_MAP_KEY)) } else Option.empty[DriverConfigurationStep] + (submittedLocalFilesDownloadPath, mountSmallFilesWithoutInitContainerStep.toSeq ++ initContainerBootstrapStep.toSeq) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStep.scala index 52503b1b3910e..c8c14f01be060 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStep.scala @@ -27,15 +27,16 @@ import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSmallFilesB import org.apache.spark.util.Utils private[spark] class MountSmallLocalFilesStep( - sparkFiles: Seq[String], + submitterLocalFiles: Seq[String], smallFilesSecretName: String, smallFilesSecretMountPath: String, mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep { import MountSmallLocalFilesStep._ override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { - val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles) - .map(localFileUri => new File(Utils.resolveURI(localFileUri).getPath)) + val localFiles = submitterLocalFiles.map { localFileUri => + new File(Utils.resolveURI(localFileUri).getPath) + } val totalSizeBytes = localFiles.map(_.length()).sum val totalSizeBytesString = Utils.bytesToString(totalSizeBytes) require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index 7af6613fcc9b3..c3ec943e3e87f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -128,7 +128,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS } - test("Only local files without a resource staging server.") { + test("Only submitter local files without a resource staging server.") { val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") val orchestrator = new DriverConfigurationStepsOrchestrator( @@ -151,6 +151,30 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[MountSmallLocalFilesStep]) } + test("No submitter local files without a resource staging server") { + val sparkConf = new SparkConf(false).set( + "spark.files", "hdfs://localhost:9000/var/foo.txt,https://localhost:8080/var/bar.txt") + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigurationStepsOrchestrator( + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + Seq.empty[String], + sparkConf) + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], + classOf[InitContainerBootstrapStep]) + } + test("Submission steps with driver secrets to mount") { val sparkConf = new SparkConf(false) .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepSuite.scala index 812031a306063..b65862f7569eb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepSuite.scala @@ -38,7 +38,6 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be private val SECOND_TEMP_FILE_NAME = "file2.txt" private val FIRST_TEMP_FILE_CONTENTS = "123" private val SECOND_TEMP_FILE_CONTENTS = "456" - private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt" private val SECRET_NAME = "secret" private var tempFolder: File = _ @@ -61,8 +60,7 @@ private[spark] class MountSmallLocalFilesStepSuite extends SparkFunSuite with Be tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS) val sparkFiles = Seq( s"file://${firstTempFile.getAbsolutePath}", - secondTempFile.getAbsolutePath, - REMOTE_FILE_URI) + secondTempFile.getAbsolutePath) val configurationStep = new MountSmallLocalFilesStep( sparkFiles, SECRET_NAME,