From 579f6ea52613e40e83c4b2d8e532b858d93abdc4 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 3 Sep 2014 12:01:44 +0800 Subject: [PATCH] merge PR327 --- src/main/scala/shark/SharkConfVars.scala | 10 ++++++++++ .../execution/MemoryStoreSinkOperator.scala | 17 ++++++++++------- .../scala/shark/execution/SparkLoadTask.scala | 3 ++- .../shark/memstore2/OffHeapStorageClient.scala | 4 ++++ .../tachyon/TachyonOffHeapTableWriter.scala | 13 +++++++++---- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/main/scala/shark/SharkConfVars.scala b/src/main/scala/shark/SharkConfVars.scala index b5592024..d2fe00f2 100755 --- a/src/main/scala/shark/SharkConfVars.scala +++ b/src/main/scala/shark/SharkConfVars.scala @@ -57,6 +57,16 @@ object SharkConfVars { // Number of mappers to force for table scan jobs val NUM_MAPPERS = new ConfVar("shark.map.tasks", -1) + // WriteType for Tachyon off-heap table writer,e.g., "TRY_CACHE", "MUST_CACHE", + // "CACHE_THROUGH", "THROUGH". + // For the reliability concern, we strongly recommend to use the default "CACHE_THROUGH", + // which means to write the table synchronously to the under fs, and cache the host columns. + // Both "TRY_CACHE" and "MUST_CACHE" options only cache the table with better write + // performance. However be careful to use those two options! If the entire table + // cannot be fully cached, some data part will be evicted and lost forever. + // "THROUGH" only writes the table to under fs and with no cache at all. + val TACHYON_WRITER_WRITETYPE = new ConfVar("shark.tachyon.writetype", "CACHE_THROUGH") + // Add Shark configuration variables and their default values to the given conf, // so default values show up in 'set'. def initializeWithDefaults(conf: Configuration) { diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 0afbf043..092c3413 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -18,20 +18,18 @@ package shark.execution import java.nio.ByteBuffer - import scala.collection.mutable.ArrayBuffer import scala.reflect.BeanProperty - +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.io.Writable - import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.storage.StorageLevel - import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ - import org.apache.spark.TaskContext +import org.apache.spark.SerializableWritable +import org.apache.spark.broadcast.Broadcast /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -129,8 +127,11 @@ class MemoryStoreSinkOperator extends TerminalOperator { // Put the table in off-heap storage. op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() + val broadcastedHiveConf + = outputRDD.context.broadcast(new SerializableWritable(op.getLocalHconf)) outputRDD.context.runJob( - outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) + outputRDD, + MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, broadcastedHiveConf)) offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory @@ -203,13 +204,15 @@ class MemoryStoreSinkOperator extends TerminalOperator { } object MemoryStoreSinkOperator { - def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = { + def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter, + broadcastedHiveConf: Broadcast[SerializableWritable[HiveConf]]) = { def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { val partId = context.partitionId val partition = iter.next().asInstanceOf[TablePartition] val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId var writeBytes: Long = 0 partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => + offHeapWriter.setLocalHconf(broadcastedHiveConf.value.value) offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) writeBytes += buf.limit } diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 0138c941..f3b4527a 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -232,7 +232,8 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe } offHeapWriter.createTable() transformedRdd.context.runJob( - transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) + transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter, + broadcastedHiveConf)) } else { transformedRdd.persist(StorageLevel.MEMORY_AND_DISK) transformedRdd.context.runJob( diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 082ac651..5648968e 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -20,6 +20,9 @@ package shark.memstore2 import java.util import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.rdd.RDD import shark.LogHelper @@ -67,6 +70,7 @@ abstract class OffHeapStorageClient { } abstract class OffHeapTableWriter extends Serializable { + @transient @BeanProperty var localHconf: HiveConf = _ /** Creates this table. Called only on the driver node. */ def createTable() diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 85a8d125..776d1377 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -19,12 +19,15 @@ package shark.tachyon import java.nio.ByteBuffer +import scala.reflect.BeanProperty + +import shark.{LogHelper, SharkConfVars} +import shark.execution.serialization.JavaSerializer +import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} + import tachyon.client.WriteType import tachyon.Constants import tachyon.master.MasterInfo -import shark.LogHelper -import shark.execution.serialization.JavaSerializer -import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int) extends OffHeapTableWriter with LogHelper { @@ -53,7 +56,9 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + column + Constants.PATH_SEPARATOR + part) val file = tfs.getFile(fid) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) + val writeType: WriteType = WriteType.valueOf( + SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) + val outStream = file.getOutStream(writeType) outStream.write(data.array(), 0, data.limit()) outStream.close() }