Skip to content

Commit

Permalink
Add unit tests and change docs and scripts
Browse files Browse the repository at this point in the history
Change-Id: I8100820a7ac05a6568a1c05fe63f4b05c0cf5278
  • Loading branch information
jerryshao committed May 23, 2017
1 parent b5ef8c0 commit 4147dd2
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 52 deletions.
3 changes: 0 additions & 3 deletions bin/livy-server
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ start_livy_server() {

LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR"

if [ -n "$SPARK_CONF_DIR" ]; then
LIVY_CLASSPATH="$LIVY_CLASSPATH:$SPARK_CONF_DIR"
fi
if [ -n "$HADOOP_CONF_DIR" ]; then
LIVY_CLASSPATH="$LIVY_CLASSPATH:$HADOOP_CONF_DIR"
fi
Expand Down
39 changes: 39 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,48 @@

# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 1s
<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f:conf/livy.conf.template
#
# Days to keep Livy server request logs.
# livy.server.request-log-retain.days = 5

# If the Livy Web UI should be included in the Livy Server. Enabled by default.
# livy.ui.enabled = true
=======

# Define Spark environments in Livy Server. User could pre-define multiple Spark environments, and
# pick one environment in the run-time via session creation request.
#
# A Spark enviroment is combined by several configurations:
# livy.server.spark-env.${sparkEnv}.spark-home = <SPARK_HOME>
# livy.server.spark-env.${sparkEnv}.spark-conf-dir = <SPARK_CONF_DIR>
# livy.server.spark-env.${sparkEnv}.scalaVersion = <SPARK_CONF_DIR>
# whether to enable HiveContext in interpreter session, by default is false.
# livy.server.spark-env.${sparkEnv}.enableHiveContext = false
# livy.server.spark-env.${sparkEnv}.sparkr.package = <PATH_TO_SPARKR_PACKAGE>
# livy.server.spark-env.${sparkEnv}.pyspark.archives = <PATH_TO_PYSPARK_ARCHIVES>
#
# Only livy.server.spark-env.${sparkEnv}.spark-home is required, others can be inferred from
# provided spark-home.
#
# Environement variables like SPARK_HOME, SPARK_CONF_DIR can still be used and the value will be
# merged into "default" environment.
#
# User can also define "${SPARK_ENV}_SPARK_HOME" and "${SPARK_ENV}_SPARK_CONF_DIR", and these values
# will merged with ${sparkEnv} environment.
#
# ${sparkEnv} can be replaced with any name wanted. When creating a session, user could specify the
# name of Spark environment, Livy server internally will pick right Spark environment accordingly,
# by default "default" spark environment will be pick if not specify.
#
# For the backward compatibility, all the previous configurations:

# livy.server.spark-home
# livy.server.spark-conf-dir
# livy.spark.scalaVersion
# livy.repl.enableHiveContext
# livy.sparkr.package
# livy.pyspark.archives
#
# can still be used and will automatically be merged into "default" Spark environment.
>>>>>>> Add unit tests and change docs and scripts:conf/livy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.scalatest.concurrent.Eventually._
import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.client.common.TestUtils
import com.cloudera.livy.server.LivyServer
import com.cloudera.livy.utils.SparkEnvironment

private class MiniClusterConfig(val config: Map[String, String]) {

Expand Down Expand Up @@ -157,8 +156,12 @@ object MiniLivyMain extends MiniClusterBase {
var livyConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
SparkEnvironment.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
=======
"livy.spark.scalaVersion" -> getSparkScalaVersion(),
>>>>>>> Add unit tests and change docs and scripts
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
Expand All @@ -172,7 +175,7 @@ object MiniLivyMain extends MiniClusterBase {

val server = new LivyServer()
server.start()
server.livyConf.set(SparkEnvironment.ENABLE_HIVE_CONTEXT, true)
server.livyConf.set("livy.repl.enableHiveContext", "true")
// Write a serverUrl.conf file to the conf directory with the location of the Livy
// server. Do it atomically since it's used by MiniCluster to detect when the Livy server
// is up and ready.
Expand Down
16 changes: 8 additions & 8 deletions rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@
import java.io.Reader;
import java.io.Writer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -171,6 +165,12 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
}
merge(conf, SPARK_JARS_KEY, livyJars, ",");

HashMap<String, String> childEnv = new HashMap<>();
String kind = conf.get(SESSION_KIND);
if ("pyspark".equals(kind) && conf.get(RSCConf.Entry.PYSPARK_ARCHIVES) != null) {
childEnv.put("PYSPARK_ARCHIVES_PATH", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES));
}

// Disable multiple attempts since the RPC server doesn't yet support multiple
// connections for the same registered app.
conf.set("spark.yarn.maxAppAttempts", "1");
Expand Down Expand Up @@ -212,7 +212,7 @@ public void run() {
};
return new ChildProcess(conf, promise, child, confFile);
} else {
final SparkLauncher launcher = new SparkLauncher();
final SparkLauncher launcher = new SparkLauncher(childEnv);

// Spark 1.x does not support specifying deploy mode in conf and needs special handling.
String deployMode = conf.get(SPARK_DEPLOY_MODE);
Expand Down
1 change: 1 addition & 0 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public static enum Entry implements ConfEntry {

LIVY_JARS("jars", null),

PYSPARK_ARCHIVES("pyspark.archives", null),
SPARK_HOME("spark_home", null),
SPARK_CONF_DIR("spark_conf_dir", null),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ object InteractiveSession extends Logging {
kind match {
case PySpark() | PySpark3() =>
val pySparkFiles = if (!LivyConf.TEST_MODE) sparkEnv.findPySparkArchives() else Nil
mergeConfList(pySparkFiles, SparkEnvironment.SPARK_PY_FILES)
mergeConfList(pySparkFiles, RSCConf.Entry.PYSPARK_ARCHIVES.key())
builderProperties.put(SparkEnvironment.SPARK_YARN_IS_PYTHON, "true")
case SparkR() =>
val sparkRArchive = if (!LivyConf.TEST_MODE) Some(sparkEnv.findSparkRArchive()) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.client.common.ClientConf.ConfEntry

object SparkEnvironment {
object SparkEnvironment extends Logging {

case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry

Expand All @@ -40,20 +40,30 @@ object SparkEnvironment {
}

val DEFAULT_ENV_NAME = "default"
val SPARK_ENV_PREFIX = "livy.server.spark-env"

val SPARK_HOME = Entry("livy.server.spark-home", null)
val SPARK_CONF_DIR = Entry("livy.server.spark-conf-dir", null)
val SPARK_HOME = Entry("spark-home", null)
val SPARK_CONF_DIR = Entry("spark-conf-dir", null)

// Two configurations to specify Spark and related Scala version. These are internal
// configurations will be set by LivyServer and used in session creation. It is not required to
// This configuration is used to specify Spark's Scala version. It is an internal
// configurations will be used in session creation. It is not required to
// set usually unless running with unofficial Spark + Scala versions
// (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11)
val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scalaVersion", null)
val LIVY_SPARK_SCALA_VERSION = Entry("scalaVersion", null)

val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enableHiveContext", false)
val ENABLE_HIVE_CONTEXT = Entry("enableHiveContext", false)

val SPARKR_PACKAGE = Entry("livy.sparkr.package", null)
val PYSPARK_ARCHIVES = Entry("livy.pyspark.archives", null)
val SPARKR_PACKAGE = Entry("sparkr.package", null)
val PYSPARK_ARCHIVES = Entry("pyspark.archives", null)

val backwardCompatibleConfs = Map(
"livy.server.spark-home" -> SPARK_HOME,
"livy.server.spark-conf-dir" -> SPARK_CONF_DIR,
"livy.spark.scalaVersion" -> LIVY_SPARK_SCALA_VERSION,
"livy.repl.enableHiveContext" -> ENABLE_HIVE_CONTEXT,
"livy.sparkr.package" -> SPARKR_PACKAGE,
"livy.pyspark.archives" -> PYSPARK_ARCHIVES
)

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
Expand Down Expand Up @@ -82,7 +92,7 @@ object SparkEnvironment {
)

@VisibleForTesting
private[utils] val sparkEnvironments = new mutable.HashMap[String, SparkEnvironment]
private[livy] val sparkEnvironments = new mutable.HashMap[String, SparkEnvironment]

def getSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = {
if (sparkEnvironments.contains(env)) {
Expand All @@ -102,25 +112,26 @@ object SparkEnvironment {
}

@VisibleForTesting
private[utils] def createSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = {
private[livy] def createSparkEnv(livyConf: LivyConf, env: String): SparkEnvironment = {
val livySparkConfKeys = getClass.getMethods.filter {
_.getReturnType.getCanonicalName == classOf[Entry].getCanonicalName
}.map(_.invoke(this).asInstanceOf[Entry].key).toSet

val sparkEnv = new SparkEnvironment(env)
if (env == DEFAULT_ENV_NAME) {
livyConf.asScala
.filter { kv =>
livySparkConfKeys.contains(kv.getKey) ||
livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$DEFAULT_ENV_NAME."))
}
.foreach(kv => sparkEnv.set(kv.getKey.stripPrefix(s"$DEFAULT_ENV_NAME."), kv.getValue))
} else {
livyConf.asScala
.filter(kv => livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$env.")))
.foreach(kv => sparkEnv.set(kv.getKey.stripPrefix(s"$env."), kv.getValue))
.filter { kv => backwardCompatibleConfs.contains(kv.getKey) }
.foreach { kv => sparkEnv.set(backwardCompatibleConfs(kv.getKey), kv.getValue) }
}

livyConf.asScala
.filter { kv => kv.getKey.startsWith(s"$SPARK_ENV_PREFIX.$env.") &&
livySparkConfKeys.contains(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env.")) }
.foreach {
kv => sparkEnv.set(kv.getKey.stripPrefix(s"$SPARK_ENV_PREFIX.$env."), kv.getValue)
}

info(s"Created Spark environments $env with configuration ${sparkEnv.asScala.mkString(",")}")
sparkEnv
}
}
Expand All @@ -130,25 +141,27 @@ object SparkEnvironment {
* Livy Can have multiple Spark environments differentiated by name, for example if user
* configured in Livy conf like:
*
* test.livy.server.spark-home = xxx
* test.livy.server.spark-conf-dir = xxx
* livy.server.spark-env.test.spark-home = xxx
* livy.server.spark-env.test.spark-conf-dir = xxx
*
* production.livy.server.spark-home = yyy
* production.livy.server.spark-conf-dir = yyy
* livy.server.spark-env.production.spark-home = yyy
* livy.server.spark-env.production.spark-conf-dir = yyy
*
* Livy internally will have two isolated Spark environments "test" and "production". When user
* create batch or interactive session, they could specify through "sparkEnv" in json body. Livy
* server will honor this env name and pick right Spark environment. This is used for Livy to
* support different Spark cluster in runtime.
*
* The Default Spark environment is "default" if user configured
* The Default Spark environment is "default". If user configured
*
* livy.server.spark-home = xxx
* or:
* default.livy.server.spark-home = xxx
* livy.server.spark-conf-dir = xxx
*
* Livy server will treat configuration to "default" Spark environment to keep
* backward compatibility. This is equal to:
*
* Livy server will treat configuration without prefix to "default" Spark environment to keep
* backward compatibility.
* livy.server.spark-env.default.spark-home = xxx
*
* Also for environment variable, user's configuration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.mock.MockitoSugar.mock
import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.SessionState
import com.cloudera.livy.utils.{AppInfo, SparkApp}
import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkEnvironment}

class BatchSessionSpec
extends FunSpec
Expand Down Expand Up @@ -63,6 +63,10 @@ class BatchSessionSpec
sessionStore = mock[SessionStore]
}

after {
SparkEnvironment.sparkEnvironments.clear()
}

it("should create a process") {
val req = new CreateBatchRequest()
req.file = script.toString
Expand Down Expand Up @@ -110,5 +114,26 @@ class BatchSessionSpec
verify(sessionStore, atLeastOnce()).save(
Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject())
}

it("should use right Spark environment") {
assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.")
val conf = new LivyConf()
.set("livy.server.spark-home", sys.env("SPARK_HOME"))
.set(SparkEnvironment.SPARK_ENV_PREFIX + ".test." + SparkEnvironment.SPARK_HOME.key,
sys.env("SPARK_HOME"))

val mockApp = mock[SparkApp]

val req = new CreateBatchRequest()
req.sparkEnv = "default"
BatchSession.create(0, req, conf, null, None, sessionStore, Some(mockApp))

val req1 = new CreateBatchRequest()
req1.sparkEnv = "test"
BatchSession.create(1, req1, conf, null, None, sessionStore, Some(mockApp))

SparkEnvironment.sparkEnvironments.get("default") should not be (None)
SparkEnvironment.sparkEnvironments.get("test") should not be (None)
}
}
}
Loading

0 comments on commit 4147dd2

Please sign in to comment.