diff --git a/bin/livy-server b/bin/livy-server index 71b913440..928424f30 100755 --- a/bin/livy-server +++ b/bin/livy-server @@ -83,9 +83,6 @@ start_livy_server() { LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR" - if [ -n "$SPARK_CONF_DIR" ]; then - LIVY_CLASSPATH="$LIVY_CLASSPATH:$SPARK_CONF_DIR" - fi if [ -n "$HADOOP_CONF_DIR" ]; then LIVY_CLASSPATH="$LIVY_CLASSPATH:$HADOOP_CONF_DIR" fi diff --git a/conf/livy.conf.template b/conf/livy.conf.template index d57717a13..a14951a68 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -98,9 +98,48 @@ # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 1s +<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f:conf/livy.conf.template # # Days to keep Livy server request logs. # livy.server.request-log-retain.days = 5 # If the Livy Web UI should be included in the Livy Server. Enabled by default. # livy.ui.enabled = true +======= + +# Define Spark environments in Livy Server. User could pre-define multiple Spark environments, and +# pick one environment in the run-time via session creation request. +# +# A Spark enviroment is combined by several configurations: +# livy.server.spark-env.${sparkEnv}.spark-home = +# livy.server.spark-env.${sparkEnv}.spark-conf-dir = +# livy.server.spark-env.${sparkEnv}.scalaVersion = +# whether to enable HiveContext in interpreter session, by default is false. +# livy.server.spark-env.${sparkEnv}.enableHiveContext = false +# livy.server.spark-env.${sparkEnv}.sparkr.package = +# livy.server.spark-env.${sparkEnv}.pyspark.archives = +# +# Only livy.server.spark-env.${sparkEnv}.spark-home is required, others can be inferred from +# provided spark-home. +# +# Environement variables like SPARK_HOME, SPARK_CONF_DIR can still be used and the value will be +# merged into "default" environment. +# +# User can also define "${SPARK_ENV}_SPARK_HOME" and "${SPARK_ENV}_SPARK_CONF_DIR", and these values +# will merged with ${sparkEnv} environment. +# +# ${sparkEnv} can be replaced with any name wanted. When creating a session, user could specify the +# name of Spark environment, Livy server internally will pick right Spark environment accordingly, +# by default "default" spark environment will be pick if not specify. +# +# For the backward compatibility, all the previous configurations: + +# livy.server.spark-home +# livy.server.spark-conf-dir +# livy.spark.scalaVersion +# livy.repl.enableHiveContext +# livy.sparkr.package +# livy.pyspark.archives +# +# can still be used and will automatically be merged into "default" Spark environment. +>>>>>>> Add unit tests and change docs and scripts:conf/livy.conf diff --git a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala index 540236723..fc025a0dd 100644 --- a/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala +++ b/integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala @@ -39,7 +39,6 @@ import org.scalatest.concurrent.Eventually._ import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.client.common.TestUtils import com.cloudera.livy.server.LivyServer -import com.cloudera.livy.utils.SparkEnvironment private class MiniClusterConfig(val config: Map[String, String]) { @@ -157,8 +156,12 @@ object MiniLivyMain extends MiniClusterBase { var livyConf = Map( LivyConf.LIVY_SPARK_MASTER.key -> "yarn", LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster", +<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s", SparkEnvironment.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(), +======= + "livy.spark.scalaVersion" -> getSparkScalaVersion(), +>>>>>>> Add unit tests and change docs and scripts LivyConf.YARN_POLL_INTERVAL.key -> "500ms", LivyConf.RECOVERY_MODE.key -> "recovery", LivyConf.RECOVERY_STATE_STORE.key -> "filesystem", @@ -172,7 +175,7 @@ object MiniLivyMain extends MiniClusterBase { val server = new LivyServer() server.start() - server.livyConf.set(SparkEnvironment.ENABLE_HIVE_CONTEXT, true) + server.livyConf.set("livy.repl.enableHiveContext", "true") // Write a serverUrl.conf file to the conf directory with the location of the Livy // server. Do it atomically since it's used by MiniCluster to detect when the Livy server // is up and ready. diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java index efaced24c..179e9bf88 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java @@ -28,13 +28,7 @@ import java.io.Reader; import java.io.Writer; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -171,6 +165,12 @@ private static ChildProcess startDriver(final RSCConf conf, Promise promise) } merge(conf, SPARK_JARS_KEY, livyJars, ","); + HashMap childEnv = new HashMap<>(); + String kind = conf.get(SESSION_KIND); + if ("pyspark".equals(kind) && conf.get(RSCConf.Entry.PYSPARK_ARCHIVES) != null) { + childEnv.put("PYSPARK_ARCHIVES_PATH", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES)); + } + // Disable multiple attempts since the RPC server doesn't yet support multiple // connections for the same registered app. conf.set("spark.yarn.maxAppAttempts", "1"); @@ -212,7 +212,7 @@ public void run() { }; return new ChildProcess(conf, promise, child, confFile); } else { - final SparkLauncher launcher = new SparkLauncher(); + final SparkLauncher launcher = new SparkLauncher(childEnv); // Spark 1.x does not support specifying deploy mode in conf and needs special handling. String deployMode = conf.get(SPARK_DEPLOY_MODE); diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java index 078d1c71a..4e069558b 100644 --- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java +++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java @@ -47,6 +47,7 @@ public static enum Entry implements ConfEntry { LIVY_JARS("jars", null), + PYSPARK_ARCHIVES("pyspark.archives", null), SPARK_HOME("spark_home", null), SPARK_CONF_DIR("spark_conf_dir", null), diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala index bd84b0407..8cba619ec 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala @@ -248,7 +248,7 @@ object InteractiveSession extends Logging { kind match { case PySpark() | PySpark3() => val pySparkFiles = if (!LivyConf.TEST_MODE) sparkEnv.findPySparkArchives() else Nil - mergeConfList(pySparkFiles, SparkEnvironment.SPARK_PY_FILES) + mergeConfList(pySparkFiles, RSCConf.Entry.PYSPARK_ARCHIVES.key()) builderProperties.put(SparkEnvironment.SPARK_YARN_IS_PYTHON, "true") case SparkR() => val sparkRArchive = if (!LivyConf.TEST_MODE) Some(sparkEnv.findSparkRArchive()) else None diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala index a542987a1..32eeedbaa 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkEnvironment.scala @@ -31,7 +31,7 @@ import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.client.common.ClientConf import com.cloudera.livy.client.common.ClientConf.ConfEntry -object SparkEnvironment { +object SparkEnvironment extends Logging { case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry @@ -40,20 +40,30 @@ object SparkEnvironment { } val DEFAULT_ENV_NAME = "default" + val SPARK_ENV_PREFIX = "livy.server.spark-env" - val SPARK_HOME = Entry("livy.server.spark-home", null) - val SPARK_CONF_DIR = Entry("livy.server.spark-conf-dir", null) + val SPARK_HOME = Entry("spark-home", null) + val SPARK_CONF_DIR = Entry("spark-conf-dir", null) - // Two configurations to specify Spark and related Scala version. These are internal - // configurations will be set by LivyServer and used in session creation. It is not required to + // This configuration is used to specify Spark's Scala version. It is an internal + // configurations will be used in session creation. It is not required to // set usually unless running with unofficial Spark + Scala versions // (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11) - val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scalaVersion", null) + val LIVY_SPARK_SCALA_VERSION = Entry("scalaVersion", null) - val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enableHiveContext", false) + val ENABLE_HIVE_CONTEXT = Entry("enableHiveContext", false) - val SPARKR_PACKAGE = Entry("livy.sparkr.package", null) - val PYSPARK_ARCHIVES = Entry("livy.pyspark.archives", null) + val SPARKR_PACKAGE = Entry("sparkr.package", null) + val PYSPARK_ARCHIVES = Entry("pyspark.archives", null) + + val backwardCompatibleConfs = Map( + "livy.server.spark-home" -> SPARK_HOME, + "livy.server.spark-conf-dir" -> SPARK_CONF_DIR, + "livy.spark.scalaVersion" -> LIVY_SPARK_SCALA_VERSION, + "livy.repl.enableHiveContext" -> ENABLE_HIVE_CONTEXT, + "livy.sparkr.package" -> SPARKR_PACKAGE, + "livy.pyspark.archives" -> PYSPARK_ARCHIVES + ) val SPARK_MASTER = "spark.master" val SPARK_DEPLOY_MODE = "spark.submit.deployMode" @@ -82,7 +92,7 @@ object SparkEnvironment { ) @VisibleForTesting - private[utils] val sparkEnvironments = new mutable.HashMap[String, SparkEnvironment] + private[livy] val sparkEnvironments = new mutable.HashMap[String, SparkEnvironment] def getSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = { if (sparkEnvironments.contains(env)) { @@ -102,7 +112,7 @@ object SparkEnvironment { } @VisibleForTesting - private[utils] def createSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = { + private[livy] def createSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = { val livySparkConfKeys = getClass.getMethods.filter { _.getReturnType.getCanonicalName == classOf[Entry].getCanonicalName }.map(_.invoke(this).asInstanceOf[Entry].key).toSet @@ -110,17 +120,18 @@ object SparkEnvironment { val sparkEnv = new SparkEnvironment(env) if (env == DEFAULT_ENV_NAME) { livyConf.asScala - .filter { kv => - livySparkConfKeys.contains(kv.getKey) || - livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$DEFAULT_ENV_NAME.")) - } - .foreach(kv => sparkEnv.set(kv.getKey.stripPrefix(s"$DEFAULT_ENV_NAME."), kv.getValue)) - } else { - livyConf.asScala - .filter(kv => livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$env."))) - .foreach(kv => sparkEnv.set(kv.getKey.stripPrefix(s"$env."), kv.getValue)) + .filter { kv => backwardCompatibleConfs.contains(kv.getKey) } + .foreach { kv => sparkEnv.set(backwardCompatibleConfs(kv.getKey), kv.getValue) } } + livyConf.asScala + .filter { kv => kv.getKey.startsWith(s"$SPARK_ENV_PREFIX.$env.") && + livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env.")) } + .foreach { + kv => sparkEnv.set(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env."), kv.getValue) + } + + info(s"Created Spark environments $env with configuration ${sparkEnv.asScala.mkString(",")}") sparkEnv } } @@ -130,25 +141,27 @@ object SparkEnvironment { * Livy Can have multiple Spark environments differentiated by name, for example if user * configured in Livy conf like: * - * test.livy.server.spark-home = xxx - * test.livy.server.spark-conf-dir = xxx + * livy.server.spark-env.test.spark-home = xxx + * livy.server.spark-env.test.spark-conf-dir = xxx * - * production.livy.server.spark-home = yyy - * production.livy.server.spark-conf-dir = yyy + * livy.server.spark-env.production.spark-home = yyy + * livy.server.spark-env.production.spark-conf-dir = yyy * * Livy internally will have two isolated Spark environments "test" and "production". When user * create batch or interactive session, they could specify through "sparkEnv" in json body. Livy * server will honor this env name and pick right Spark environment. This is used for Livy to * support different Spark cluster in runtime. * - * The Default Spark environment is "default" if user configured + * The Default Spark environment is "default". If user configured * * livy.server.spark-home = xxx * or: - * default.livy.server.spark-home = xxx + * livy.server.spark-conf-dir = xxx + * + * Livy server will treat configuration to "default" Spark environment to keep + * backward compatibility. This is equal to: * - * Livy server will treat configuration without prefix to "default" Spark environment to keep - * backward compatibility. + * livy.server.spark-env.default.spark-home = xxx * * Also for environment variable, user's configuration * diff --git a/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala index 8ac46a77e..e5985e421 100644 --- a/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/batch/BatchSessionSpec.scala @@ -33,7 +33,7 @@ import org.scalatest.mock.MockitoSugar.mock import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.SessionState -import com.cloudera.livy.utils.{AppInfo, SparkApp} +import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkEnvironment} class BatchSessionSpec extends FunSpec @@ -63,6 +63,10 @@ class BatchSessionSpec sessionStore = mock[SessionStore] } + after { + SparkEnvironment.sparkEnvironments.clear() + } + it("should create a process") { val req = new CreateBatchRequest() req.file = script.toString @@ -110,5 +114,26 @@ class BatchSessionSpec verify(sessionStore, atLeastOnce()).save( Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject()) } + + it("should use right Spark environment") { + assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") + val conf = new LivyConf() + .set("livy.server.spark-home", sys.env("SPARK_HOME")) + .set(SparkEnvironment.SPARK_ENV_PREFIX + ".test." + SparkEnvironment.SPARK_HOME.key, + sys.env("SPARK_HOME")) + + val mockApp = mock[SparkApp] + + val req = new CreateBatchRequest() + req.sparkEnv = "default" + BatchSession.create(0, req, conf, null, None, sessionStore, Some(mockApp)) + + val req1 = new CreateBatchRequest() + req1.sparkEnv = "test" + BatchSession.create(1, req1, conf, null, None, sessionStore, Some(mockApp)) + + SparkEnvironment.sparkEnvironments.get("default") should not be (None) + SparkEnvironment.sparkEnvironments.get("test") should not be (None) + } } } diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala index ea740b91a..c33f164aa 100644 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala @@ -30,7 +30,7 @@ import org.json4s.jackson.JsonMethods.parse import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Matchers._ import org.mockito.Mockito.{atLeastOnce, verify, when} -import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpec, Matchers} import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar.mock @@ -39,10 +39,10 @@ import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf} import com.cloudera.livy.rsc.driver.StatementState import com.cloudera.livy.server.recovery.SessionStore import com.cloudera.livy.sessions.{PySpark, SessionState, Spark} -import com.cloudera.livy.utils.{AppInfo, SparkApp} +import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkEnvironment} class InteractiveSessionSpec extends FunSpec - with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { + with Matchers with BeforeAndAfterAll with BeforeAndAfter with LivyBaseUnitTestSuite { private val livyConf = new LivyConf() <<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f @@ -58,12 +58,13 @@ class InteractiveSessionSpec extends FunSpec private var session: InteractiveSession = null private def createSession( + livyConf: LivyConf = this.livyConf, sessionStore: SessionStore = mock[SessionStore], - mockApp: Option[SparkApp] = None): InteractiveSession = { - assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") - + mockApp: Option[SparkApp] = None, + sparkEnv: String = "default"): InteractiveSession = { val req = new CreateInteractiveRequest() req.kind = PySpark() + req.sparkEnv = sparkEnv req.driverMemory = Some("512m") req.driverCores = Some(1) req.executorMemory = Some("512m") @@ -85,6 +86,11 @@ class InteractiveSessionSpec extends FunSpec } } + override def beforeAll(): Unit = { + super.beforeAll() + assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") + } + override def afterAll(): Unit = { if (session != null) { Await.ready(session.stop(), 30 seconds) @@ -167,7 +173,7 @@ class InteractiveSessionSpec extends FunSpec it("should update appId and appInfo and session store") { val mockApp = mock[SparkApp] val sessionStore = mock[SessionStore] - val session = createSession(sessionStore, Some(mockApp)) + val session = createSession(sessionStore = sessionStore, mockApp = Some(mockApp)) val expectedAppId = "APPID" session.appIdKnown(expectedAppId) @@ -266,4 +272,47 @@ class InteractiveSessionSpec extends FunSpec s.logLines().mkString should include("RSCDriver URI is unknown") } } + + describe("multiple Spark environments") { + import SparkEnvironment._ + + var session: InteractiveSession = null + + after ( + if (session != null) { + Await.ready(session.stop(), 30 seconds) + session = null + sparkEnvironments.clear() + } + ) + + it("should honor default Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.LIVY_REPL_JARS, "") + session = createSession(livyConf = conf) + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.size should be (1) + sparkEnvironments.get("default") should not be (None) + } + + it("should use customized Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.LIVY_REPL_JARS, "") + session = createSession(livyConf = conf, sparkEnv = "test") + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.get("test") should not be (None) + } + + it("should pick right Spark environment") { + val conf = new LivyConf() + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(SPARK_ENV_PREFIX + ".production." + SPARK_HOME.key, sys.env("SPARK_HOME")) + .set(LivyConf.LIVY_REPL_JARS, "") + session = createSession(livyConf = conf, sparkEnv = "production") + session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) + sparkEnvironments.get("production") should not be (None) + } + } } diff --git a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala index fae88553d..b9c8b6c2f 100644 --- a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala +++ b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala @@ -33,11 +33,11 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu private val sparkEnv = SparkEnvironment.createSparkEnv(livyConf, "default") private val livyConf210 = new LivyConf() - livyConf210.set(SparkEnvironment.LIVY_SPARK_SCALA_VERSION, "2.10.6") + livyConf210.set("livy.spark.scalaVersion", "2.10.6") private val sparkEnv210 = SparkEnvironment.createSparkEnv(livyConf210, "default") private val livyConf211 = new LivyConf() - livyConf211.set(SparkEnvironment.LIVY_SPARK_SCALA_VERSION, "2.11.1") + livyConf211.set("livy.spark.scalaVersion", "2.11.1") private val sparkEnv211 = SparkEnvironment.createSparkEnv(livyConf211, "default") test("check for SPARK_HOME") { diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala new file mode 100644 index 000000000..42cfa5d32 --- /dev/null +++ b/server/src/test/scala/com/cloudera/livy/utils/SparkEnvironmentSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.livy.utils + +import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class SparkEnvironmentSuite extends FunSuite + with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { + import SparkEnvironment._ + + override def afterAll(): Unit = { + // clean the global data when this test suite is finished. + sparkEnvironments.clear() + super.afterAll() + } + + test("default Spark environment") { + val livyConf = new LivyConf(false) + .set("livy.server.spark-home", "test-home") + .set("livy.server.spark-conf-dir", "test-conf-dir") + .set("livy.sparkr.package", "test-sparkr-package") + .set("livy.pyspark.archives", "test-pyspark-archives") + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be ("test-home") + sparkEnv.get(SPARK_CONF_DIR) should be ("test-conf-dir") + sparkEnv.get(PYSPARK_ARCHIVES) should be ("test-pyspark-archives") + sparkEnv.get(SPARKR_PACKAGE) should be ("test-sparkr-package") + + sparkEnv.sparkHome() should be ("test-home") + sparkEnv.sparkConfDir() should be ("test-conf-dir") + } + + test("default Spark environment with environment specified") { + val livyConf = new LivyConf(false) + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, "test-default-home") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_CONF_DIR.key, "test-default-conf-dir") + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be ("test-default-home") + sparkEnv.get(SPARK_CONF_DIR) should be ("test-default-conf-dir") + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + sparkEnv.sparkHome() should be ("test-default-home") + sparkEnv.sparkConfDir() should be ("test-default-conf-dir") + } + + test("default Spark environment using SPARK_HOME environment variable") { + val livyConf = new LivyConf(false) + val sparkEnv = createSparkEnv(livyConf, "default") + + sparkEnv.get(SPARK_HOME) should be (null) + sparkEnv.get(SPARK_CONF_DIR) should be (null) + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + sparkEnv.sparkHome() should not be (null) + sparkEnv.sparkConfDir() should not be (null) + } + + test("specify different Spark environments through configuration") { + val livyConf = new LivyConf(false) + .set(SPARK_ENV_PREFIX + ".test." + SPARK_HOME.key, "test-home") + .set(SPARK_ENV_PREFIX + ".test." + PYSPARK_ARCHIVES.key, "test-home/python/pyspark.tgz") + .set(SPARK_ENV_PREFIX + ".production." + SPARK_HOME.key, "production-home") + .set(SPARK_ENV_PREFIX + ".production." + SPARKR_PACKAGE.key, "production-home/R/sparkr.zip") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_HOME.key, "default-home") + .set(SPARK_ENV_PREFIX + ".default." + SPARK_CONF_DIR.key, "default-conf-dir") + + sparkEnvironments("test") = createSparkEnv(livyConf, "test") + sparkEnvironments("production") = createSparkEnv(livyConf, "production") + sparkEnvironments("default") = createSparkEnv(livyConf, "default") + + val testSparkEnv = getSparkEnv(livyConf, "test") + testSparkEnv.sparkHome() should be ("test-home") + testSparkEnv.sparkConfDir() should be ("test-home/conf") + testSparkEnv.findPySparkArchives() should be (Seq("test-home/python/pyspark.tgz")) + + val prodSparkEnv = getSparkEnv(livyConf, "production") + prodSparkEnv.sparkHome() should be ("production-home") + prodSparkEnv.findSparkRArchive() should be ("production-home/R/sparkr.zip") + prodSparkEnv.sparkConfDir() should be ("production-home/conf") + + val defaultSparkEnv = getSparkEnv(livyConf, "default") + defaultSparkEnv.sparkHome() should be ("default-home") + defaultSparkEnv.sparkConfDir() should be ("default-conf-dir") + } + + test("create non-existed Spark environment") { + val livyConf = new LivyConf(false) + val sparkEnv = createSparkEnv(livyConf, "non-exist") + + sparkEnv.get(SPARK_HOME) should be (null) + sparkEnv.get(SPARK_CONF_DIR) should be (null) + sparkEnv.get(PYSPARK_ARCHIVES) should be (null) + sparkEnv.get(SPARKR_PACKAGE) should be (null) + + intercept[Exception](sparkEnv.sparkHome()) + intercept[Exception](sparkEnv.sparkConfDir()) + } +}