Skip to content

Commit

Permalink
some format fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Sep 3, 2014
1 parent 77684be commit d159007
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
14 changes: 9 additions & 5 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
package shark.execution

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.reflect.BeanProperty

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.io.Writable
import org.apache.spark.rdd.{RDD, UnionRDD}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.{TaskContext, SerializableWritable}

import shark.{SharkConfVars, SharkEnv}
import shark.execution.serialization.{OperatorSerializationWrapper, JavaSerializer}
import shark.memstore2._
import org.apache.spark.TaskContext
import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast

/**
* Cache the RDD and force evaluate it (so the cache is filled).
*/
Expand Down Expand Up @@ -78,7 +82,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
localHconf.setInt(SharkConfVars.COLUMN_BUILDER_PARTITION_SIZE.varname, partitionSize)
localHconf.setBoolean(SharkConfVars.COLUMNAR_COMPRESSION.varname, shouldCompress)
}

override def execute(): RDD[_] = {
val inputRdd = if (parentOperators.size == 1) executeParents().head._2 else null

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ abstract class OffHeapTableWriter extends Serializable {

/** Write the data of a partition of a given column. Called only on worker nodes. */
def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String)

def commitPartition(part: Int, numColumns: Int, tempDir: String)

def cleanTmpPath()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import shark.execution.serialization.JavaSerializer
import shark.memstore2.{OffHeapStorageClient, OffHeapTableWriter, TablePartitionStats}

import tachyon.client.WriteType
import tachyon.Constants
import tachyon.master.MasterInfo
import tachyon.util.CommonUtils

class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: Int)
extends OffHeapTableWriter with LogHelper {
Expand All @@ -52,31 +52,28 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
@transient lazy val rawTable = tfs.getRawTable(rawTableId)

override def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) {
val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP
val fid = tfs.createFile(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR
+ column + Constants.PATH_SEPARATOR + part)
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
val fid = tfs.createFile(CommonUtils.concat(tmpPath, tempDir, column + "", part + ""))
val file = tfs.getFile(fid)
val writeType: WriteType = WriteType.valueOf(
SharkConfVars.getVar(localHconf, SharkConfVars.TACHYON_WRITER_WRITETYPE))
val outStream = file.getOutStream(writeType)
outStream.write(data.array(), 0, data.limit())
outStream.close()
}

override def commitPartition(part: Int, numColumns: Int, tempDir: String) {
val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
(0 until numColumns).reverse.foreach { column =>
val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR +
column + Constants.PATH_SEPARATOR + part
val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR +
MasterInfo.COL + column + Constants.PATH_SEPARATOR + part
val srcPath = CommonUtils.concat(tmpPath, tempDir, column + "", part + "")
val destPath = CommonUtils.concat(rawTable.getPath(), MasterInfo.COL, column + "", part + "")
tfs.rename(srcPath, destPath)
}
tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true)
tfs.delete(CommonUtils.concat(tmpPath, tempDir), true)
}

override def cleanTmpPath() {
val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP
val tmpPath = CommonUtils.concat(rawTable.getPath(), TEMP)
tfs.delete(tmpPath, true)
}
}

0 comments on commit d159007

Please sign in to comment.