Skip to content

Commit

Permalink
Merge pull request #3 from sameeragarwal/master
Browse files Browse the repository at this point in the history
Master
  • Loading branch information
sameeragarwal committed Sep 3, 2013
2 parents f843491 + e4c88d3 commit 3a644b4
Show file tree
Hide file tree
Showing 65 changed files with 568 additions and 185 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ VAR and STDEV).
#### BlinkDB requires:
* Scala 2.9.3
* Spark 0.8.x
* OpenJDK 7 or Oracle HotSpot JDK 7 or Oracle HotSpot JDK 6u23+

### For current documentation, see the [BlinkDB Wiki](https://github.com/sameeragarwal/blinkdb/wiki).
### For more information about the BlinkDB Project, see the [BlinkDB Website](http://blinkdb.cs.berkeley.edu).
4 changes: 2 additions & 2 deletions bin/dev/run-tests-from-scratch
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ if $SKIP_SPARK ; then
else
# Clean up past Spark artifacts published locally.
rm -rf ./spark
rm -rf ~/.ivy2/local/org.spark*
rm -rf ~/.ivy2/cache/org.spark*
rm -rf ~/.ivy2/local/org.apache.spark*
rm -rf ~/.ivy2/cache/org.apache.spark*
# Download and build Spark.
git clone $SPARK_GIT_URL
pushd spark
Expand Down
3 changes: 2 additions & 1 deletion conf/blinkdb-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ export HIVE_HOME=""
#export HADOOP_HOME=""
#export SPARK_HOME=""
#export MASTER=""
#export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so
# Only required if using Mesos:
#export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so

# (Optional) Extra classpath
#export SPARK_LIBRARY_PATH=""
Expand Down
3 changes: 3 additions & 0 deletions data/files/test1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 012
2 345
3 678
34 changes: 28 additions & 6 deletions project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import sbt._
import Keys._

import sbtassembly.Plugin._
import AssemblyKeys._

object SharkBuild extends Build {

val BLINKDB_VERSION = "0.1.0-SNAPSHOT"
Expand All @@ -39,10 +42,11 @@ object SharkBuild extends Build {
lazy val root = Project(
id = "root",
base = file("."),
settings = coreSettings)
settings = coreSettings ++ assemblyProjSettings)

val excludeKyro = ExclusionRule(organization = "de.javakaffee")
val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")

def coreSettings = Defaults.defaultSettings ++ Seq(

Expand All @@ -59,7 +63,7 @@ object SharkBuild extends Build {
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository"
),

Expand All @@ -83,7 +87,10 @@ object SharkBuild extends Build {
val baseDirectories = (base / "lib") +++ (hiveFile)
val customJars = (baseDirectories ** "*.jar")
// Hive uses an old version of guava that doesn't have what we want.
customJars.classpath.filter(!_.toString.contains("guava"))
customJars.classpath
.filter(!_.toString.contains("guava"))
.filter(!_.toString.contains("log4j"))
.filter(!_.toString.contains("servlet"))
},

unmanagedJars in Test ++= Seq(
Expand All @@ -92,10 +99,10 @@ object SharkBuild extends Build {
),

libraryDependencies ++= Seq(
"org.spark-project" %% "spark-core" % SPARK_VERSION,
"org.spark-project" %% "spark-repl" % SPARK_VERSION,
"org.apache.spark" %% "spark-core" % SPARK_VERSION,
"org.apache.spark" %% "spark-repl" % SPARK_VERSION,
"com.google.guava" % "guava" % "14.0.1",
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeNetty),
// See https://code.google.com/p/guava-libraries/issues/detail?id=1095
"com.google.code.findbugs" % "jsr305" % "1.3.+",

Expand All @@ -111,4 +118,19 @@ object SharkBuild extends Build {
"com.novocode" % "junit-interface" % "0.8" % "test") ++
(if (TACHYON_ENABLED) Some("org.tachyonproject" % "tachyon" % "0.3.0-SNAPSHOT" excludeAll(excludeKyro, excludeHadoop) ) else None).toSeq
)

def assemblyProjSettings = Seq(
name := "shark-assembly",
jarName in assembly <<= version map { v => "shark-assembly-" + v + "-hadoop" + HADOOP_VERSION + ".jar" }
) ++ assemblySettings ++ extraAssemblySettings

def extraAssemblySettings() = Seq(
test in assembly := {},
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
)
}
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0")

addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.4.0")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")

resolvers += Resolver.url(
"sbt-plugin-releases",
new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)
4 changes: 3 additions & 1 deletion src/main/scala/shark/KryoRegistrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.hive.ql.exec.persistence.{MapJoinSingleKey, MapJoinObjectKey,
MapJoinDoubleKeys, MapJoinObjectValue}

import org.apache.spark.serializer.{KryoRegistrator => SparkKryoRegistrator}

class KryoRegistrator extends spark.KryoRegistrator {

class KryoRegistrator extends SparkKryoRegistrator {
def registerClasses(kryo: Kryo) {

kryo.register(classOf[execution.ReduceKey])
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/shark/LogHelper.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2012 The Regents of The University California.
* Copyright (C) 2012 The Regents of The University California.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,23 +22,24 @@ import java.io.PrintStream
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.Logging

/**
* Utility trait for classes that want to log data. This wraps around Spark's
* Logging trait. It creates a SLF4J logger for the class and allows logging
* messages at different levels using methods that only evaluate parameters
* lazily if the log level is enabled.
*
*
* It differs from the Spark's Logging trait in that it can print out the
* error to the specified console of the Hive session.
*/
trait LogHelper extends spark.Logging {
trait LogHelper extends Logging {

override def logError(msg: => String) = {
errStream().println(msg)
super.logError(msg)
}

def logError(msg: String, detail: String) = {
errStream().println(msg)
super.logError(msg + StringUtils.defaultString(detail))
Expand All @@ -50,7 +51,7 @@ trait LogHelper extends spark.Logging {
exception.printStackTrace(err)
super.logError(msg, exception)
}

def outStream(): PrintStream = {
val ss = SessionState.get()
if (ss != null && ss.out != null) ss.out else System.out
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/SharkCliDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.io.IOUtils

import spark.SparkContext
import org.apache.spark.SparkContext


object SharkCliDriver {
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/shark/SharkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.{SparkContext, SparkEnv}

import shark.api._
import spark.{SparkContext, SparkEnv}


class SharkContext(
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/shark/SharkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package shark

import scala.collection.mutable.{HashMap, HashSet}

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer}

import shark.api.JavaSharkContext
import shark.memstore2.MemoryMetadataManager
import shark.execution.serialization.ShuffleSerializer
import shark.tachyon.TachyonUtilImpl
import spark.{RDD, SparkContext}
import spark.scheduler.StatsReportListener


/** A singleton object for the master program. The slaves should not access this. */
object SharkEnv extends LogHelper {
Expand Down Expand Up @@ -80,7 +85,7 @@ object SharkEnv extends LogHelper {

logInfo("Initializing SharkEnv")

System.setProperty("spark.serializer", classOf[spark.KryoSerializer].getName)
System.setProperty("spark.serializer", classOf[SparkKryoSerializer].getName)
System.setProperty("spark.kryo.registrator", classOf[KryoRegistrator].getName)

val executorEnvVars = new HashMap[String, String]
Expand All @@ -95,7 +100,7 @@ object SharkEnv extends LogHelper {

var sc: SparkContext = _

val shuffleSerializerName = classOf[shark.execution.serialization.ShuffleSerializer].getName
val shuffleSerializerName = classOf[ShuffleSerializer].getName

val memoryMetadataManager = new MemoryMetadataManager

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/SharkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.thrift.transport.TServerSocket
import org.apache.thrift.transport.TTransport
import org.apache.thrift.transport.TTransportFactory

import spark.SparkEnv
import org.apache.spark.SparkEnv


/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/shark/api/JavaSharkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.util.{List => JList}

import scala.collection.JavaConversions._

import org.apache.spark.api.java.JavaSparkContext

import shark.SharkContext
import spark.api.java.JavaSparkContext


class JavaSharkContext(val sharkCtx: SharkContext) extends JavaSparkContext(sharkCtx) {

Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/shark/api/JavaTableRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package shark.api

import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDDLike
import spark.RDD
import spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel


class JavaTableRDD(val rdd: RDD[Row], val schema: Array[ColumnDesc])
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/shark/api/TableRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, StructObj

import shark.execution.serialization.KryoSerializer

import spark.{Partition, RDD, TaskContext}
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD


class TableRDD(
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/shark/execution/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

package spark
package org.apache.spark

import java.io.{ObjectOutputStream, IOException}
import java.util.{HashMap => JHashMap}

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.rdd.RDD

import shark.SharkEnv

// A version of CoGroupedRDD with the following changes:
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/shark/execution/CommonJoinOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ import org.apache.hadoop.hive.ql.plan.{ExprNodeDesc, JoinCondDesc, JoinDesc, Tab
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, PrimitiveObjectInspector}

import shark.SharkConfVars
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.SparkContext.rddToPairRDDFunctions

import spark.RDD
import spark.rdd.UnionRDD
import spark.SparkContext.rddToPairRDDFunctions
import shark.SharkConfVars


abstract class CommonJoinOperator[JOINDESCTYPE <: JoinDesc, T <: HiveCommonJoinOperator[JOINDESCTYPE]]
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/EmptyRDD.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package shark.execution

import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}

import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.rdd.RDD

/**
* An RDD that is empty, i.e. has no element in it.
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/execution/ExtractOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{ExtractDesc, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.io.BytesWritable

import spark.RDD
import org.apache.spark.rdd.RDD


class ExtractOperator extends UnaryOperator[HiveExtractOperator] with HiveTopOperator {
Expand Down
28 changes: 25 additions & 3 deletions src/main/scala/shark/execution/FileSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator => HiveFileSinkOperator}
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.SparkHadoopWriter

import shark.execution.serialization.OperatorSerializationWrapper
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD

import spark.RDD
import spark.TaskContext
import shark.execution.serialization.OperatorSerializationWrapper


class FileSinkOperator extends TerminalOperator with Serializable {
Expand Down Expand Up @@ -119,6 +120,27 @@ class FileSinkOperator extends TerminalOperator with Serializable {
val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null
val rdd = preprocessRdd(inputRdd)

val hiveIslocal = ShimLoader.getHadoopShims.isLocalMode(hconf)
if (!rdd.context.isLocal && hiveIslocal) {
val warnMessage = "Hive Hadoop shims detected local mode, but Shark is not running locally."
logWarning(warnMessage)

// Try to fix this without bothering user
val newValue = "Spark_%s".format(System.currentTimeMillis())
for (k <- Seq("mapred.job.tracker", "mapreduce.framework.name")) {
val v = hconf.get(k)
if (v == null || v == "" || v == "local") {
hconf.set(k, newValue)
logWarning("Setting %s to '%s' (was '%s')".format(k, newValue, v))
}
}

// If still not fixed, bail out
if (ShimLoader.getHadoopShims.isLocalMode(hconf)) {
throw new Exception(warnMessage)
}
}

parentOperators.head match {
case op: LimitOperator =>
// If there is a limit operator, let's only run one partition at a time to avoid
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/ForwardOperator.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2012 The Regents of The University California.
* Copyright (C) 2012 The Regents of The University California.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,7 +19,7 @@ package shark.execution

import org.apache.hadoop.hive.ql.exec.{ForwardOperator => HiveForwardOperator}

import spark.RDD
import org.apache.spark.rdd.RDD


class ForwardOperator extends UnaryOperator[HiveForwardOperator] {
Expand Down
Loading

0 comments on commit 3a644b4

Please sign in to comment.