From 15345cb902bd18587ad25ab77ce0267da1b2e0b2 Mon Sep 17 00:00:00 2001 From: Jeremy Liu Date: Thu, 13 Dec 2018 10:21:58 -0800 Subject: [PATCH] Copy over (#460) ## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain) Not filed in upstream, touches code for conda. ## What changes were proposed in this pull request? rLibDir contains a sequence of possible paths for the SparkR package on the executor and is passed on to the R daemon with the SPARKR_RLIBDIR environment variable. This PR filters rLibDir for paths that exist before setting SPARKR_RLIBDIR, retaining existing functionality to preferentially choose a YARN or local SparkR install over conda if both are present. See daemon.R: https://github.com/palantir/spark/blob/master/R/pkg/inst/worker/daemon.R#L23 Fixes #456 ## How was this patch tested? Manually testing cherry picked on older version Please review http://spark.apache.org/contributing.html before opening a pull request. --- .../main/scala/org/apache/spark/api/r/RRunner.scala | 12 ++++++++---- .../main/scala/org/apache/spark/api/r/RUtils.scala | 10 ++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index f525b6a5f7786..4b6b19459eb5e 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -360,10 +360,14 @@ private[r] object RRunner { val rConnectionTimeout = sparkConf.getInt( "spark.r.backendConnectionTimeout", SparkRDefaults.DEFAULT_CONNECTION_TIMEOUT) val rOptions = "--vanilla" - val rLibDir = condaEnv.map { conda => - RUtils.sparkRPackagePath(isDriver = false) :+ (conda.condaEnvDir + "/lib/R/library") - }.getOrElse(RUtils.sparkRPackagePath(isDriver = false)) - val rExecScript = RUtils.sparkRInstallLocation(rLibDir, "/SparkR/worker/" + script) + val rLibDir = condaEnv.map(conda => + RUtils.sparkRPackagePath(isDriver = false) :+ (conda.condaEnvDir + "/lib/R/library")) + .getOrElse(RUtils.sparkRPackagePath(isDriver = false)) + .filter(dir => new File(dir).exists) + if (rLibDir.isEmpty) { + throw new SparkException("SparkR package is not installed on executor.") + } + val rExecScript = RUtils.getSparkRScript(rLibDir, "/SparkR/worker/" + script) val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript)) // Activate the conda environment by setting the right env variables if applicable. condaEnv.map(_.activatedEnvironment()).map(_.asJava).foreach(pb.environment().putAll) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 5992509dff9a4..b070380cfdf8e 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -97,10 +97,12 @@ private[spark] object RUtils { } } - /** Finds the rLibDir with SparkR installed on it. */ - def sparkRInstallLocation(rLibDir: Seq[String], scriptPath: String): String = { - rLibDir.find(dir => new File(dir + scriptPath).exists) - .getOrElse(throw new SparkException("SparkR package not installed on executor.")) + scriptPath + /** Finds a script in a sequence of possible SparkR installation directories. */ + def getSparkRScript(rLibDir: Seq[String], scriptPath: String): String = { + rLibDir.find(dir => new File(dir + scriptPath).exists).getOrElse( + throw new SparkException( + s"Script $scriptPath not found in any SparkR installation directory.") + ) + scriptPath } /** Check if R is installed before running tests that use R commands. */