Skip to content

Commit

Permalink
merge PR327
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Sep 3, 2014
1 parent 4094cab commit 579f6ea
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/main/scala/shark/SharkConfVars.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ object SharkConfVars {
// Number of mappers to force for table scan jobs
val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1)

// WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE",
// "CACHE_THROUGH", "THROUGH".
// For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH",
// which means to write the table synchronously to the under fs, and cache the host columns.
// Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write
// performance. However be careful to use those two options! If the entire table
// cannot be fully cached, some data part will be evicted and lost forever.
// "THROUGH" only writes the table to under fs and with no cache at all.
val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH")

// Add Shark configuration variables and their default values to the given conf,
// so default values show up in 'set'.
def initializeWithDefaults(conf: Configuration) {
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package shark.execution

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.io.Writable

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.storage.StorageLevel

import shark.{SharkConfVars, SharkEnv}
import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer}
import shark.memstore2._

import org.apache.spark.TaskContext
import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
/**
* Cache the RDD and force evaluate it (so the cache is filled).
*/
Expand Down Expand Up @@ -129,8 +127,11 @@ class MemoryStoreSinkOperator extends TerminalOperator {
// Put the table in off-heap storage.
op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName))
offHeapWriter.createTable()
val broadcastedHiveConf
= outputRDD.context.broadcast(new SerializableWritable(op.getLocalHconf))
outputRDD.context.runJob(
outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter))
outputRDD,
MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, broadcastedHiveConf))
offHeapWriter.cleanTmpPath()
} else {
// Run a job on the RDD that contains the query output to force the data into the memory
Expand Down Expand Up @@ -203,13 +204,15 @@ class MemoryStoreSinkOperator extends TerminalOperator {
}

object MemoryStoreSinkOperator {
def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = {
def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter,
broadcastedHiveConf: Broadcast[SerializableWritable[HiveConf]]) = {
def writeFiles(context: TaskContext, iter: Iterator[_]): Long = {
val partId = context.partitionId
val partition = iter.next().asInstanceOf[TablePartition]
val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId
var writeBytes: Long = 0
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value)
offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir)
writeBytes += buf.limit
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
}
offHeapWriter.createTable()
transformedRdd.context.runJob(
transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter))
transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter,
broadcastedHiveConf))
} else {
transformedRdd.persist(StorageLevel.MEMORY_AND_DISK)
transformedRdd.context.runJob(
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package shark.memstore2
import java.util
import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.rdd.RDD

import shark.LogHelper
Expand Down Expand Up @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient {
}

abstract class OffHeapTableWriter extends Serializable {
@transient @BeanProperty var localHconf: HiveConf = _

/** Creates this table. Called only on the driver node. */
def createTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package shark.tachyon

import java.nio.ByteBuffer

import scala.reflect.BeanProperty

import shark.{LogHelper, SharkConfVars}
import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

import tachyon.client.WriteType
import tachyon.Constants
import tachyon.master.MasterInfo
import shark.LogHelper
import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int)
extends OffHeapTableWriter with LogHelper {
Expand Down Expand Up @@ -53,7 +56,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR
+ column + Constants.PATH_SEPARATOR + part)
val file = tfs.getFile(fid)
val outStream = file.getOutStream(WriteType.CACHE_THROUGH)
val writeType: WriteType = WriteType.valueOf(
SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE))
val outStream = file.getOutStream(writeType)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}
Expand Down

0 comments on commit 579f6ea

Please sign in to comment.