Skip to content

Commit

Permalink
clean code for writeColumnPartition
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Jun 13, 2014
1 parent ff56841 commit 4094cab
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 21 deletions.
13 changes: 4 additions & 9 deletions src/main/scala/shark/execution/SparkLoadTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,13 @@ class SparkLoadTask extends HiveTask[SparkLoadWork] with Serializable with LogHe
OffHeapStorageClient.client.dropTablePartition(tableKey, hivePartitionKeyOpt)
}
offHeapWriter.createTable()
transformedRdd = transformedRdd.mapPartitionsWithIndex { case(part, iter) =>
val partition = iter.next()
partition.toOffHeap.zipWithIndex.foreach { case(buf, column) =>
offHeapWriter.writeColumnPartition(column, part, buf)
}
Iterator(partition)
}
transformedRdd.context.runJob(
transformedRdd, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter))
} else {
transformedRdd.persist(StorageLevel.MEMORY_AND_DISK)
transformedRdd.context.runJob(
transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
}
transformedRdd.context.runJob(
transformedRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
if (work.cacheMode == CacheType.OFFHEAP) {
offHeapWriter.setStats(statsAcc.value.toMap)
}
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ abstract class OffHeapTableWriter extends Serializable {
/** Sets stats on this table. Called only on the driver node. */
def setStats(indexToStats: collection.Map[Int, TablePartitionStats])

/** Write the data of a partition of a given column. Called only on worker nodes. */
def writeColumnPartition(column: Int, part: Int, data: ByteBuffer)

/** Write the data of a partition of a given column. Called only on worker nodes. */
def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String)

def commitPartition(part: Int, numColumns: Int, tempDir: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,6 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
// This is only used on worker nodes.
@transient lazy val rawTable = tfs.getRawTable(rawTableId)

override def writeColumnPartition(column: Int, part: Int, data: ByteBuffer) {
val rawColumn = rawTable.getRawColumn(column)
rawColumn.createPartition(part)
val file = rawColumn.getPartition(part)
val outStream = file.getOutStream(WriteType.CACHE_THROUGH)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}

override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) {
val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP
val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR
Expand Down

0 comments on commit 4094cab

Please sign in to comment.