Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 1.0 #312

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/dev/run-tests-from-scratch
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ SBT_OPTS_DEFAULT="-Xms512M -Xmx2048M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:Ma
SPARK_MEM_DEFAULT=4g
SHARK_MASTER_MEM_DEFAULT=4g
SPARK_KV_JAVA_OPTS_DEFAULT=("-Dspark.local.dir=/tmp " "-Dspark.kryoserializer.buffer.mb=10 -XX:MaxPermSize=1g ")
SPARK_GIT_URL_DEFAULT="https://github.com/apache/incubator-spark.git spark"
SPARK_GIT_URL_DEFAULT="https://github.com/apache/spark.git spark"
HIVE_GIT_URL_DEFAULT="https://github.com/amplab/hive.git -b shark-0.11"
SPARK_HADOOP_VERSION_DEFAULT="1.0.4"
SPARK_WITH_YARN_DEFAULT=false
Expand Down
4 changes: 2 additions & 2 deletions project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import sbtassembly.Plugin.AssemblyKeys._
object SharkBuild extends Build {

// Shark version
val SHARK_VERSION = "0.9.1"
val SHARK_VERSION = "1.0.0-SNAPSHOT"

val SHARK_ORGANIZATION = "edu.berkeley.cs.shark"

val HIVE_VERSION = "0.11.0-shark-0.9.1"

val SPARK_VERSION = "0.9.1"
val SPARK_VERSION = "1.0.0-SNAPSHOT"

val SCALA_VERSION = "2.10.3"

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/SharkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ object SharkEnv extends LogHelper {

var sc: SharkContext = _

val shuffleSerializerName = classOf[ShuffleSerializer].getName
val shuffleSerializer = new ShuffleSerializer(new SparkConf())

val memoryMetadataManager = new MemoryMetadataManager

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/api/JavaTableRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class JavaTableRDD(val rdd: RDD[Row], val schema: Array[ColumnDesc])
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: JFunction[Row, java.lang.Boolean]): JavaTableRDD =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
wrapRDD(rdd.filter((x => f.call(x).booleanValue())))

/**
* Return a sampled subset of this RDD.
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializerName)
new ShuffleDependency[Any, Any](rdd, part, SharkEnv.shuffleSerializer)
}
}
}
Expand Down Expand Up @@ -117,7 +117,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
values
}
val serializer = SparkEnv.get.serializerManager.get(SharkEnv.shuffleSerializerName, SparkEnv.get.conf)
val serializer =SharkEnv.shuffleSerializer
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
// Read them from the parent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class GroupByPostShuffleOperator extends GroupByPreShuffleOperator
val partitioner = new ReduceKeyPartitioner(numReduceTasks)

val repartitionedRDD = new ShuffledRDD[Any, Any, (Any, Any)](inputRdd, partitioner)
.setSerializer(SharkEnv.shuffleSerializerName)
.setSerializer(SharkEnv.shuffleSerializer)

if (distinctKeyAggrs.size > 0) {
// If there are distinct aggregations, do sort-based aggregation.
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/shark/execution/HadoopTableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf

val fs = hiveTable.getPath().getFileSystem(hiveConf)
if (!fs.exists(hiveTable.getPath()))
return new EmptyRDD(SharkEnv.sc)
return SharkEnv.sc.emptyRDD

// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
Expand Down Expand Up @@ -150,7 +150,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf

val fs = partPath.getFileSystem(hiveConf)
if (!fs.exists(partPath))
return new EmptyRDD(SharkEnv.sc)
return SharkEnv.sc.emptyRDD

val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val ifc = partDesc.getInputFileFormatClass
Expand Down Expand Up @@ -223,7 +223,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf
}.toSeq
// Even if we don't use any partitions, we still need an empty RDD
if (hivePartitionRDDs.size == 0) {
new EmptyRDD[Object](SharkEnv.sc)
SharkEnv.sc.emptyRDD[Object]
} else {
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/execution/LimitOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LimitOperator extends UnaryOperator[LimitDesc] {
val inputRdd = executeParents().head._2
inputRdd.mapPartitions({ iter => iter.take(limitNum) }, preservesPartitioning = true)
} else {
new EmptyRDD(SharkEnv.sc)
SharkEnv.sc.emptyRDD
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/RDDUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object RDDUtils {
def repartition[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], part: Partitioner)
: RDD[(K, V)] =
{
new ShuffledRDD[K, V, (K, V)](rdd, part).setSerializer(SharkEnv.shuffleSerializerName)
new ShuffledRDD[K, V, (K, V)](rdd, part).setSerializer(SharkEnv.shuffleSerializer)
}

/**
Expand All @@ -88,7 +88,7 @@ object RDDUtils {
{
val part = new RangePartitioner(rdd.partitions.length, rdd)
val shuffled = new ShuffledRDD[K, V, (K, V)](rdd, part)
.setSerializer(SharkEnv.shuffleSerializerName)
.setSerializer(SharkEnv.shuffleSerializer)
shuffled.mapPartitions(iter => {
val buf = iter.toArray
buf.sortWith((x, y) => x._1.compareTo(y._1) < 0).iterator
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/SharkDDLTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[shark] class SharkDDLTask extends HiveTask[SharkDDLWork]
val memoryTable = SharkEnv.memoryMetadataManager.createMemoryTable(
dbName, tableName, cacheMode)
// An empty table has a MemoryTable table entry with 'tableRDD' referencing an EmptyRDD.
memoryTable.put(new EmptyRDD(SharkEnv.sc))
memoryTable.put(SharkEnv.sc.emptyRDD)
}
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ private[shark] class SharkDDLTask extends HiveTask[SharkDDLWork]
MemoryMetadataManager.makeTableKey(dbName, tableName), Some(partKeyStr))
} else {
val partitionedTable = getPartitionedTableWithAssertions(dbName, tableName)
partitionedTable.putPartition(partKeyStr, new EmptyRDD(SharkEnv.sc))
partitionedTable.putPartition(partKeyStr, SharkEnv.sc.emptyRDD)
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/shark/execution/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,11 @@ class OffHeapTableReader(@transient _tableDesc: TableDesc, _storageClient: OffHe
if (hivePartitionRDDs.size > 0) {
new UnionRDD(hivePartitionRDDs.head.context, hivePartitionRDDs)
} else {
new EmptyRDD[Object](SharkEnv.sc)
SharkEnv.sc.emptyRDD[Object]
}
}
}



/** Helper class for scanning tables stored in Spark's block manager */
class HeapTableReader(@transient _tableDesc: TableDesc) extends TableReader {

Expand Down Expand Up @@ -208,7 +206,7 @@ class HeapTableReader(@transient _tableDesc: TableDesc) extends TableReader {
if (hivePartitionRDDs.size > 0) {
new UnionRDD(hivePartitionRDDs.head.context, hivePartitionRDDs)
} else {
new EmptyRDD[Object](SharkEnv.sc)
SharkEnv.sc.emptyRDD[Object]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import shark.execution.{ReduceKey, ReduceKeyReduceSide}
* into a hash table. We want to reduce the size of the hash table. Having the BytesWritable wrapper
* would increase the size of the hash table by another 16 bytes per key-value pair.
*/
class ShuffleSerializer(conf: SparkConf) extends Serializer {
class ShuffleSerializer(conf: SparkConf) extends Serializer with Serializable {

// A no-arg constructor since conf is not needed in this serializer.
def this() = this(null)
Expand All @@ -58,7 +58,7 @@ class ShuffleSerializer(conf: SparkConf) extends Serializer {
}


class ShuffleSerializerInstance extends SerializerInstance {
class ShuffleSerializerInstance extends SerializerInstance with Serializable {

override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException

Expand All @@ -77,7 +77,7 @@ class ShuffleSerializerInstance extends SerializerInstance {
}


class ShuffleSerializationStream(stream: OutputStream) extends SerializationStream {
class ShuffleSerializationStream(stream: OutputStream) extends SerializationStream with Serializable {

override def writeObject[T](t: T): SerializationStream = {
// On the write-side, the ReduceKey should be of type ReduceKeyMapSide.
Expand Down Expand Up @@ -108,7 +108,7 @@ class ShuffleSerializationStream(stream: OutputStream) extends SerializationStre
}


class ShuffleDeserializationStream(stream: InputStream) extends DeserializationStream {
class ShuffleDeserializationStream(stream: InputStream) extends DeserializationStream with Serializable {

override def readObject[T](): T = {
// Return type is (ReduceKeyReduceSide, Array[Byte])
Expand Down