From ff5684169a3bcce398fc5fd53128d4b9e3e3bb8e Mon Sep 17 00:00:00 2001 From: Mingfei Date: Fri, 13 Jun 2014 09:54:40 +0800 Subject: [PATCH] solve filenotexsitexception when table has an empty partition --- .../shark/execution/MemoryStoreSinkOperator.scala | 10 ++++------ .../scala/shark/memstore2/OffHeapStorageClient.scala | 2 +- .../shark/tachyon/TachyonOffHeapTableWriter.scala | 12 +++++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index e1a8eb81..0afbf043 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -130,7 +130,7 @@ class MemoryStoreSinkOperator extends TerminalOperator { op.logInfo("Putting RDD for %s.%s in off-heap storage".format(databaseName, tableName)) offHeapWriter.createTable() outputRDD.context.runJob( - outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(op, offHeapWriter)) + outputRDD, MemoryStoreSinkOperator.processOffHeapSinkPartition(offHeapWriter)) offHeapWriter.cleanTmpPath() } else { // Run a job on the RDD that contains the query output to force the data into the memory @@ -203,10 +203,8 @@ class MemoryStoreSinkOperator extends TerminalOperator { } object MemoryStoreSinkOperator { - def processOffHeapSinkPartition(op: OperatorSerializationWrapper[MemoryStoreSinkOperator], - offHeapWriter: OffHeapTableWriter) = { + def processOffHeapSinkPartition(offHeapWriter: OffHeapTableWriter) = { def writeFiles(context: TaskContext, iter: Iterator[_]): Long = { - op.logDebug("Started executing writeFiles for operator: " + op) val partId = context.partitionId val partition = iter.next().asInstanceOf[TablePartition] val taskTmpDir = context.stageId + "_" + context.partitionId + "_" + context.attemptId @@ -215,8 +213,8 @@ object MemoryStoreSinkOperator { offHeapWriter.writePartitionColumn(partId, column, buf, taskTmpDir) writeBytes += buf.limit } - offHeapWriter.commitPartition(partId, taskTmpDir) - op.logDebug("Finished executing writeFiles for operator: " + op) + val numColumns = partition.columns.size + 1 + offHeapWriter.commitPartition(partId, numColumns, taskTmpDir) writeBytes } writeFiles _ diff --git a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala index 230540d3..0435de5a 100644 --- a/src/main/scala/shark/memstore2/OffHeapStorageClient.scala +++ b/src/main/scala/shark/memstore2/OffHeapStorageClient.scala @@ -79,7 +79,7 @@ abstract class OffHeapTableWriter extends Serializable { def writePartitionColumn(part: Int, column: Int, data: ByteBuffer, tempDir: String) - def commitPartition(part: Int, tempDir: String) + def commitPartition(part: Int, numColumns: Int, tempDir: String) def cleanTmpPath() } diff --git a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala index 4f8267d1..d1533241 100644 --- a/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala +++ b/src/tachyon_enabled/scala/shark/tachyon/TachyonOffHeapTableWriter.scala @@ -67,12 +67,14 @@ class TachyonOffHeapTableWriter(@transient path: String, @transient numColumns: outStream.close() } - override def commitPartition(part: Int, tempDir: String) { + override def commitPartition(part: Int, numColumns: Int, tempDir: String) { val tmpPath = rawTable.getPath() + Constants.PATH_SEPARATOR + TEMP - (0 until rawTable.getColumns()).reverse.foreach { column => - tfs.rename(tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR - + column + Constants.PATH_SEPARATOR + part, rawTable.getPath() + Constants.PATH_SEPARATOR - + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part) + (0 until numColumns).reverse.foreach { column => + val srcPath = tmpPath + Constants.PATH_SEPARATOR + tempDir + Constants.PATH_SEPARATOR + + column + Constants.PATH_SEPARATOR + part + val destPath = rawTable.getPath() + Constants.PATH_SEPARATOR + + MasterInfo.COL + column + Constants.PATH_SEPARATOR + part + tfs.rename(srcPath, destPath) } tfs.delete(tmpPath + Constants.PATH_SEPARATOR + tempDir, true) }