From 2844c676c0679fa6b1f0329fa50ba8db90cd22b3 Mon Sep 17 00:00:00 2001 From: Sundeep Narravula Date: Sun, 13 Oct 2013 18:59:56 -0700 Subject: [PATCH] Fix to ensure tablenames for multi-insert/partitioned cached table get reflected on the shark UI --- .../execution/MemoryStoreSinkOperator.scala | 13 ++++++++++++- .../memstore2/MemoryMetadataManager.scala | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala index 10e99551..f3e1b3e5 100644 --- a/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala +++ b/src/main/scala/shark/execution/MemoryStoreSinkOperator.scala @@ -129,11 +129,22 @@ class MemoryStoreSinkOperator extends TerminalOperator { if (useUnionRDD) { // If this is an insert, find the existing RDD and create a union of the two, and then // put the union into the meta data tracker. + + + val nextPartNum = SharkEnv.memoryMetadataManager.getNextPartNum(tableName) + if (nextPartNum == 1) { + // reset rdd name for existing rdd + SharkEnv.memoryMetadataManager.get(tableName).get.asInstanceOf[RDD[TablePartition]] + .setName(tableName + ".part0") + } + rdd.setName(tableName + ".part" + nextPartNum) + rdd = rdd.union( SharkEnv.memoryMetadataManager.get(tableName).get.asInstanceOf[RDD[TablePartition]]) + } else { + rdd.setName(tableName) } SharkEnv.memoryMetadataManager.put(tableName, rdd) - rdd.setName(tableName) // Run a job on the original RDD to force it to go into cache. origRdd.context.runJob(origRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit)) diff --git a/src/main/scala/shark/memstore2/MemoryMetadataManager.scala b/src/main/scala/shark/memstore2/MemoryMetadataManager.scala index ed6efa49..4c0da09a 100755 --- a/src/main/scala/shark/memstore2/MemoryMetadataManager.scala +++ b/src/main/scala/shark/memstore2/MemoryMetadataManager.scala @@ -33,6 +33,10 @@ class MemoryMetadataManager { private val _keyToRdd: ConcurrentMap[String, RDD[_]] = new ConcurrentHashMap[String, RDD[_]]() + // Tracks number of parts inserted into cached table + private val _keyToNextPart: ConcurrentMap[String, Int] = + new ConcurrentHashMap[String, Int]() + private val _keyToStats: ConcurrentMap[String, collection.Map[Int, TablePartitionStats]] = new ConcurrentHashMap[String, collection.Map[Int, TablePartitionStats]] @@ -52,6 +56,20 @@ class MemoryMetadataManager { _keyToStats.get(key.toLowerCase) } + def getNextPartNum(key: String): Int = { + val currentPartNum = _keyToNextPart.get(key.toLowerCase) + currentPartNum match { + case Some(partNum) => { + _keyToNextPart.put(key, partNum + 1) + partNum + 1 + } + case None => { + _keyToNextPart.put(key, 1) + 1 + } + } + } + def rename(oldKey: String, newKey: String) { if (contains(oldKey)) { val oldKeyToLowerCase = oldKey.toLowerCase @@ -97,6 +115,7 @@ class MemoryMetadataManager { // corresponding to the argument for 'key'. val rddValue = _keyToRdd.remove(key.toLowerCase()) _keyToStats.remove(key) + _keyToNextPart.remove(key) // Unpersist the RDD using the nested helper fn above. rddValue match { case Some(rdd) => unpersistRDD(rdd)