From 4094cab16e992b7a05af52e3f47729fc01e1eb9b Mon Sep 17 00:00:00 2001 From: Mingfei Date: Fri, 13 Jun 2014 09:58:34 +0800 Subject: [PATCH] clean code for writeColumnPartition --- src/main/scala/shark/execution/SparkLoadTask.scala | 13 ++++--------- .../shark/memstore2/OffHeapStorageClient.scala | 4 +--- .../shark/tachyon/TachyonOffHeapTableWriter.scala | 9 --------- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/src/main/scala/shark/execution/SparkLoadTask.scala b/src/main/scala/shark/execution/SparkLoadTask.scala index 64ae4567..0138c941 100644 --- a/src/main/scala/shark/execution/SparkLoadTask.scala +++ b/src/main/scala/shark/execution/SparkLoadTask.scala @@ -231,18 +231,13 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe OffHeapStorageClient.client.dropTablePartition(tableKey, hivePartitionKeyOpt) } offHeapWriter.createTable() - transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) => - val partition = iter.next() - partition.toOffHeap.zipWithIndex.foreach { case(buf, column) => - offHeapWriter.writeColumnPartition(column, part, buf) - } - Iterator(partition) - } + transformedRdd.context.runJob( + transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) } else { transformedRdd.persist(StorageLevel.MEMORY_AND_DISK) + transformedRdd.context.runJob( + transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) } - transformedRdd.context.runJob( - transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) if (work.cacheMode == CacheType.OFFHEAP) { offHeapWriter.setStats(statsAcc.value.toMap) } diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 0435de5a..082ac651 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -74,9 +74,7 @@ abstract class OffHeapTableWriter extends Serializable { /** Sets stats on this table. Called only on the driver node. */ def setStats(indexToStats: collection.Map[Int, TablePartitionStats]) - /** Write the data of a partition of a given column. Called only on worker nodes. */ - def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) - + /** Write the data of a partition of a given column. Called only on worker nodes. */ def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) def commitPartition(part: Int, numColumns: Int, tempDir: String) diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index d1533241..85a8d125 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -48,15 +48,6 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: // This is only used on worker nodes. @transient lazy val rawTable = tfs.getRawTable(rawTableId) - override def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) { - val rawColumn = rawTable.getRawColumn(column) - rawColumn.createPartition(part) - val file = rawColumn.getPartition(part) - val outStream = file.getOutStream(WriteType.CACHE_THROUGH) - outStream.write(data.array(), 0, data.limit()) - outStream.close() - } - override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR