Skip to content

Commit

Permalink
Rebase and update the code
Browse files Browse the repository at this point in the history
Change-Id: I0e66ff509903ab276ca816818e4b288b45699213
  • Loading branch information
jerryshao committed May 25, 2017
1 parent fd84568 commit 19381f8
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,13 @@ object MiniLivyMain extends MiniClusterBase {
var livyConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
<<<<<<< 5a769d34c7a3538b72fc1f12a4fcd166a6678e4c
<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
SparkEnvironment.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
=======
"livy.spark.scalaVersion" -> getSparkScalaVersion(),
>>>>>>> Add unit tests and change docs and scripts
=======
"livy.server.spark-env.default.scalaVersion" -> getSparkScalaVersion(),
>>>>>>> Fix test issue
"livy.server.spark-env.default.scala-version" -> getSparkScalaVersion(),
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store",
"livy.server.spark-env.default.enableHiveContext" -> "true")
"livy.server.spark-env.default.enable-hive-context" -> "true")

if (Cluster.isRunningOnTravis) {
livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
Expand Down
51 changes: 3 additions & 48 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration

import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.client.common.ClientConf.ConfEntry
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
import com.cloudera.livy.client.common.ClientConf.DeprecatedConf
=======
import com.cloudera.livy.client.common.ClientConf._
import com.cloudera.livy.utils.SparkEnvironment
>>>>>>> Add SparkEnvironment

object LivyConf {

Expand All @@ -49,23 +45,9 @@ object LivyConf {
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)

<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
// Two configurations to specify Spark and related Scala version. These are internal
// configurations will be set by LivyServer and used in session creation. It is not required to
// set usually unless running with unofficial Spark + Scala versions
// (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11)
val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null)
val LIVY_SPARK_VERSION = Entry("livy.spark.version", null)

val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null)
val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false)
=======
val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null)
>>>>>>> Add SparkEnvironment

val ENVIRONMENT = Entry("livy.environment", "production")

Expand Down Expand Up @@ -130,7 +112,6 @@ object LivyConf {
// How often Livy polls YARN to refresh YARN app state.
val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")

<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
// Days to keep Livy server request logs.
val REQUEST_LOG_RETAIN_DAYS = Entry("livy.server.request-log-retain.days", 5)

Expand All @@ -152,16 +133,6 @@ object LivyConf {
// How long a finished session state will be kept in memory
val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
val SPARK_FILES = "spark.files"
val SPARK_ARCHIVES = "spark.yarn.dist.archives"
val SPARK_PY_FILES = "spark.submit.pyFiles"
=======
val LIVY_REPL_JARS = Entry("livy.repl.jars", null)
>>>>>>> Add SparkEnvironment

/**
* These are Spark configurations that contain lists of files that the user can add to
* their jobs in one way or another. Livy needs to pre-process these to make sure the
Expand All @@ -172,19 +143,6 @@ object LivyConf {
* the hardcoded list, or new versions of Spark add new configs.
*/
val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null)
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f

private val HARDCODED_SPARK_FILE_LISTS = Seq(
SPARK_JARS,
SPARK_FILES,
SPARK_ARCHIVES,
SPARK_PY_FILES,
"spark.yarn.archive",
"spark.yarn.dist.files",
"spark.yarn.dist.jars",
"spark.yarn.jar",
"spark.yarn.jars"
)

case class DepConf(
override val key: String,
Expand All @@ -194,8 +152,8 @@ object LivyConf {

private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf](
LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"),
LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"),
ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"),
"livy.spark.scala-version" -> DepConf("livy.spark.scalaVersion", "0.4"),
"livy-repl.enable-hive-context" -> DepConf("livy.repl.enableHiveContext", "0.4"),
CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"),
ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"),
ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"),
Expand All @@ -216,9 +174,6 @@ object LivyConf {

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}

=======
>>>>>>> Add SparkEnvironment
}

/**
Expand Down
24 changes: 0 additions & 24 deletions server/src/main/scala/com/cloudera/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,6 @@ class LivyServer extends Logging {
maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE))
).toMultipartConfigElement

// Make sure the `spark-submit` program exists, otherwise much of livy won't work.
testSparkHome(livyConf)

// Test spark-submit and get Spark Scala version accordingly.
val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf)
testSparkVersion(sparkVersion)

// If Spark and Scala version is set manually, should verify if they're consistent with
// ones parsed from "spark-submit --version"
val formattedSparkVersion = formatSparkVersion(sparkVersion)
Option(livyConf.get(LIVY_SPARK_VERSION)).map(formatSparkVersion).foreach { version =>
require(formattedSparkVersion == version,
s"Configured Spark version $version is not equal to Spark version $formattedSparkVersion " +
"got from spark-submit -version")
}

// Set formatted Spark and Scala version into livy configuration, this will be used by
// session creation.
// TODO Create a new class to pass variables from LivyServer to sessions and remove these
// internal LivyConfs.
livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString("."))
livyConf.set(LIVY_SPARK_SCALA_VERSION.key,
sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf))

if (UserGroupInformation.isSecurityEnabled) {
// If Hadoop security is enabled, run kinit periodically. runKinit() should be called
// before any Hadoop operation, otherwise Kerberos exception will be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ package com.cloudera.livy.server.interactive
import java.io.{File, InputStream}
import java.net.URI
import java.nio.ByteBuffer
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
=======
>>>>>>> Add SparkEnvironment
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
Expand All @@ -47,12 +43,7 @@ import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions._
import com.cloudera.livy.sessions.Session._
import com.cloudera.livy.sessions.SessionState.Dead
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
import com.cloudera.livy.util.LineBufferedProcess
import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener}
=======
import com.cloudera.livy.utils._
>>>>>>> Add SparkEnvironment

@JsonIgnoreProperties(ignoreUnknown = true)
case class InteractiveRecoveryMetadata(
Expand All @@ -68,11 +59,6 @@ case class InteractiveRecoveryMetadata(
extends RecoveryMetadata

object InteractiveSession extends Logging {
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
private[interactive] val SPARK_YARN_IS_PYTHON = "spark.yarn.isPython"

=======
>>>>>>> Add SparkEnvironment
val RECOVERY_SESSION_TYPE = "interactive"

def create(
Expand Down Expand Up @@ -172,7 +158,6 @@ object InteractiveSession extends Logging {
builderProperties ++= conf

def livyJars(livyConf: LivyConf, scalaVersion: String): List[String] = {
<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
Option(livyConf.get(LivyConf.REPL_JARS)).map { jars =>
val regex = """[\w-]+_(\d\.\d\d).*\.jar""".r
jars.split(",").filter { name => new Path(name).getName match {
Expand All @@ -183,9 +168,6 @@ object InteractiveSession extends Logging {
}
}.toList
}.getOrElse {
=======
Option(livyConf.get(LivyConf.LIVY_REPL_JARS)).map(_.split(",").toList).getOrElse {
>>>>>>> Add SparkEnvironment
val home = sys.env("LIVY_HOME")
val jars = Option(new File(home, s"repl_$scalaVersion-jars"))
.filter(_.isDirectory())
Expand Down Expand Up @@ -261,20 +243,11 @@ object InteractiveSession extends Logging {
builderProperties.put(RSCConf.Entry.SPARK_HOME.key, sparkEnv.sparkHome())
builderProperties.put(RSCConf.Entry.SPARK_CONF_DIR.key, sparkEnv.sparkConfDir())

<<<<<<< 2abb8a3d2850c506ffd2b8a210813f1b8353045f
// Set Livy.rsc.jars from livy conf to rsc conf, RSC conf will take precedence if both are set.
Option(livyConf.get(LivyConf.RSC_JARS)).foreach(
builderProperties.getOrElseUpdate(RSCConf.Entry.LIVY_JARS.key(), _))

require(livyConf.get(LivyConf.LIVY_SPARK_VERSION) != null)
require(livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION) != null)

val (sparkMajorVersion, _) =
LivySparkUtils.formatSparkVersion(livyConf.get(LivyConf.LIVY_SPARK_VERSION))
val scalaVersion = livyConf.get(LivyConf.LIVY_SPARK_SCALA_VERSION)
=======
mergeConfList(livyJars(livyConf, sparkEnv.scalaVersion()), SparkEnvironment.SPARK_JARS)
>>>>>>> Add SparkEnvironment

val enableHiveContext = sparkEnv.getBoolean(SparkEnvironment.ENABLE_HIVE_CONTEXT)
// pass spark.livy.spark_major_version to driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.cloudera.livy.util
package com.cloudera.livy.utils

import com.cloudera.livy.{Logging, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.cloudera.livy.util
package com.cloudera.livy.utils

import java.io.InputStream
import java.util.concurrent.locks.ReentrantLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.SortedMap
import scala.math.Ordering.Implicits._

import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.util.LineBufferedProcess

object LivySparkUtils extends Logging {

Expand Down Expand Up @@ -118,7 +117,7 @@ object LivySparkUtils extends Logging {
formattedSparkVersion: (Int, Int),
scalaVersionFromSparkSubmit: Option[String],
sparkEnv: SparkEnvironment): String = {
val scalaVersionInLivyConf = Option(sparkEnv.get(SparkEnvironment.LIVY_SPARK_SCALA_VERSION))
val scalaVersionInLivyConf = Option(sparkEnv.get(SparkEnvironment.SPARK_SCALA_VERSION))
.filter(_.nonEmpty)
.map(formatScalaVersion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package com.cloudera.livy.utils
import scala.collection.JavaConverters._

import com.cloudera.livy.LivyConf
import com.cloudera.livy.util.LineBufferedProcess

object AppInfo {
val DRIVER_LOG_URL_NAME = "driverLogUrl"
Expand Down
Loading

0 comments on commit 19381f8

Please sign in to comment.