From d1590073030e654a782aad8dd36878df8798fa48 Mon Sep 17 00:00:00 2001 From: Mingfei Date: Wed, 3 Sep 2014 12:12:27 +0800 Subject: [PATCH] some format fix --- .../execution/MemoryStoreSinkOperator.scala | 14 +++++++---- .../memstore2/OffHeapStorageClient.scala | 4 ++-- .../tachyon/TachyonOffHeapTableWriter.scala | 23 ++++++++----------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 092c3413..956b2d27 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -18,18 +18,22 @@ package shark.execution import java.nio.ByteBuffer + import scala.collection.mutable.ArrayBuffer import scala.reflect.BeanProperty + import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.io.Writable -import org.apache.spark.rdd.{RDD, UnionRDD} + +import org.apache.spark.broadcast.Broadcast import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.{TaskContext, SerializableWritable} + import shark.{SharkConfVars, SharkEnv} import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer} import shark.memstore2._ -import org.apache.spark.TaskContext -import org.apache.spark.SerializableWritable -import org.apache.spark.broadcast.Broadcast + /** * Cache the RDD and force evaluate it (so the cache is filled). */ @@ -78,7 +82,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { localHconf.setInt(SharkConfVars.COLUMN_BUILDER_PARTITION_SIZE.varname, partitionSize) localHconf.setBoolean(SharkConfVars.COLUMNAR_COMPRESSION.varname, shouldCompress) } - + override def execute(): RDD[_] = { val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 5648968e..364fe89c 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -80,9 +80,9 @@ abstract class OffHeapTableWriter extends Serializable { /** Write the data of a partition of a given column. Called only on worker nodes. */ def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) - + def commitPartition(part: Int, numColumns: Int, tempDir: String) - + def cleanTmpPath() } diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 776d1377..04c45563 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -26,8 +26,8 @@ import shark.execution.serialization.JavaSerializer import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats} import tachyon.client.WriteType -import tachyon.Constants import tachyon.master.MasterInfo +import tachyon.util.CommonUtils class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int) extends OffHeapTableWriter with LogHelper { @@ -52,9 +52,8 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: @transient lazy val rawTable = tfs.getRawTable(rawTableId) override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP - val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR - + column + Constants.PATH_SEPARATOR + part) + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) + val fid = tfs.createFile(CommonUtils.concat(tmpPath, tempDir, column + "", part + "")) val file = tfs.getFile(fid) val writeType: WriteType = WriteType.valueOf( SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE)) @@ -62,21 +61,19 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: outStream.write(data.array(), 0, data.limit()) outStream.close() } - + override def commitPartition(part: Int, numColumns: Int, tempDir: String) { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) (0 until numColumns).reverse.foreach { column => - val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + - column + Constants.PATH_SEPARATOR + part - val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR + - MasterInfo.COL + column + Constants.PATH_SEPARATOR + part + val srcPath = CommonUtils.concat(tmpPath, tempDir, column + "", part + "") + val destPath = CommonUtils.concat(rawTable.getPath(), MasterInfo.COL, column + "", part + "") tfs.rename(srcPath, destPath) } - tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true) + tfs.delete(CommonUtils.concat(tmpPath, tempDir), true) } - + override def cleanTmpPath() { - val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP + val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP) tfs.delete(tmpPath, true) } }