Skip to content

Commit

Permalink
solve filenotexsitexception when table has an empty partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Mingfei committed Jun 13, 2014
1 parent 062a6f2 commit ff56841
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
10 changes: 4 additions & 6 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName))
offHeapWriter.createTable()
outputRDD.context.runJob(
outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(op, offHeapWriter))
outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter))
offHeapWriter.cleanTmpPath()
} else {
// Run a job on the RDD that contains the query output to force the data into the memory
Expand Down Expand Up @@ -203,10 +203,8 @@ class MemoryStoreSinkOperator extends TerminalOperator {
}

object MemoryStoreSinkOperator {
def processOffHeapSinkPartition(op: OperatorSerializationWrapper[MemoryStoreSinkOperator],
offHeapWriter: OffHeapTableWriter) = {
def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = {
def writeFiles(context: TaskContext, iter: Iterator[_]): Long = {
op.logDebug("Started executing writeFiles for operator: " + op)
val partId = context.partitionId
val partition = iter.next().asInstanceOf[TablePartition]
val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId
Expand All @@ -215,8 +213,8 @@ object MemoryStoreSinkOperator {
offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir)
writeBytes += buf.limit
}
offHeapWriter.commitPartition(partId, taskTmpDir)
op.logDebug("Finished executing writeFiles for operator: " + op)
val numColumns = partition.columns.size + 1
offHeapWriter.commitPartition(partId, numColumns, taskTmpDir)
writeBytes
}
writeFiles _
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/memstore2/OffHeapStorageClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ abstract class OffHeapTableWriter extends Serializable {

def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String)

def commitPartition(part: Int, tempDir: String)
def commitPartition(part: Int, numColumns: Int, tempDir: String)

def cleanTmpPath()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns:
outStream.close()
}

override def commitPartition(part: Int, tempDir: String) {
override def commitPartition(part: Int, numColumns: Int, tempDir: String) {
val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP
(0 until rawTable.getColumns()).reverse.foreach { column =>
tfs.rename(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR
+ column + Constants.PATH_SEPARATOR + part, rawTable.getPath() + Constants.PATH_SEPARATOR
+ MasterInfo.COL + column + Constants.PATH_SEPARATOR + part)
(0 until numColumns).reverse.foreach { column =>
val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR +
column + Constants.PATH_SEPARATOR + part
val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR +
MasterInfo.COL + column + Constants.PATH_SEPARATOR + part
tfs.rename(srcPath, destPath)
}
tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true)
}
Expand Down

0 comments on commit ff56841

Please sign in to comment.