Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle database namespaces for cached tables #196

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/main/scala/shark/SharkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ object SharkEnv extends LogHelper {
val addedFiles = HashSet[String]()
val addedJars = HashSet[String]()

def unpersist(key: String): Option[RDD[_]] = {
def unpersist(databaseName: String, tableName: String): Option[RDD[_]] = {
val key = databaseName + '.' + tableName
if (SharkEnv.tachyonUtil.tachyonEnabled() && SharkEnv.tachyonUtil.tableExists(key)) {
if (SharkEnv.tachyonUtil.dropTable(key)) {
logInfo("Table " + key + " was deleted from Tachyon.");
Expand All @@ -123,7 +124,7 @@ object SharkEnv extends LogHelper {
}
}

memoryMetadataManager.unpersist(key)
memoryMetadataManager.unpersist(databaseName, tableName)
}

/** Cleans up and shuts down the Shark environments. */
Expand Down
27 changes: 20 additions & 7 deletions src/main/scala/shark/execution/MemoryStoreSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
@BeanProperty var shouldCompress: Boolean = _
@BeanProperty var storageLevel: StorageLevel = _
@BeanProperty var tableName: String = _
@BeanProperty var databaseName: String = _
@transient var useTachyon: Boolean = _
@transient var useUnionRDD: Boolean = _
@transient var numColumns: Int = _
Expand Down Expand Up @@ -100,7 +101,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
// Put the table in Tachyon.
op.logInfo("Putting RDD for %s in Tachyon".format(tableName))

SharkEnv.memoryMetadataManager.put(tableName, rdd)
SharkEnv.memoryMetadataManager.put(databaseName, tableName, rdd)

tachyonWriter.createTable(ByteBuffer.allocate(0))
rdd = rdd.mapPartitionsWithIndex { case(partitionIndex, iter) =>
Expand All @@ -114,8 +115,9 @@ class MemoryStoreSinkOperator extends TerminalOperator {
rdd.context.runJob(rdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
} else {
// Put the table in Spark block manager.
op.logInfo("Putting %sRDD for %s in Spark block manager, %s %s %s %s".format(
op.logInfo("Putting %sRDD for %s.%s in Spark block manager, %s %s %s %s".format(
if (useUnionRDD) "Union" else "",
databaseName,
tableName,
if (storageLevel.deserialized) "deserialized" else "serialized",
if (storageLevel.useMemory) "in memory" else "",
Expand All @@ -129,11 +131,22 @@ class MemoryStoreSinkOperator extends TerminalOperator {
if (useUnionRDD) {
// If this is an insert, find the existing RDD and create a union of the two, and then
// put the union into the meta data tracker.


val nextPartNum = SharkEnv.memoryMetadataManager.getNextPartNum(databaseName, tableName)
if (nextPartNum == 1) {
// reset rdd name for existing rdd
SharkEnv.memoryMetadataManager.get(databaseName, tableName).get.asInstanceOf[RDD[TablePartition]]
.setName(databaseName + '.' + tableName + ".part0")
}
rdd.setName(databaseName + ',' + tableName + ".part" + nextPartNum)

rdd = rdd.union(
SharkEnv.memoryMetadataManager.get(tableName).get.asInstanceOf[RDD[TablePartition]])
SharkEnv.memoryMetadataManager.get(databaseName, tableName).get.asInstanceOf[RDD[TablePartition]])
} else {
rdd.setName(databaseName + '.' + tableName)
}
SharkEnv.memoryMetadataManager.put(tableName, rdd)
rdd.setName(tableName)
SharkEnv.memoryMetadataManager.put(databaseName, tableName, rdd)

// Run a job on the original RDD to force it to go into cache.
origRdd.context.runJob(origRdd, (iter: Iterator[TablePartition]) => iter.foreach(_ => Unit))
Expand All @@ -158,7 +171,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
// Combine stats for the two tables being combined.
val numPartitions = statsAcc.value.toMap.size
val currentStats = statsAcc.value
val otherIndexToStats = SharkEnv.memoryMetadataManager.getStats(tableName).get
val otherIndexToStats = SharkEnv.memoryMetadataManager.getStats(databaseName, tableName).get
for ((otherIndex, tableStats) <- otherIndexToStats) {
currentStats.append((otherIndex + numPartitions, tableStats))
}
Expand All @@ -168,7 +181,7 @@ class MemoryStoreSinkOperator extends TerminalOperator {
}

// Get the column statistics back to the cache manager.
SharkEnv.memoryMetadataManager.putStats(tableName, columnStats)
SharkEnv.memoryMetadataManager.putStats(databaseName, tableName, columnStats)

if (tachyonWriter != null) {
tachyonWriter.updateMetadata(ByteBuffer.wrap(JavaSerializer.serialize(columnStats)))
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/shark/execution/OperatorFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ object OperatorFactory extends LogHelper {
def createSharkMemoryStoreOutputPlan(
hiveTerminalOp: HiveOperator,
tableName: String,
databaseName: String,
storageLevel: StorageLevel,
numColumns: Int,
useTachyon: Boolean,
useUnionRDD: Boolean): TerminalOperator = {
val sinkOp = _newOperatorInstance(
classOf[MemoryStoreSinkOperator], hiveTerminalOp).asInstanceOf[MemoryStoreSinkOperator]
sinkOp.tableName = tableName
sinkOp.databaseName = databaseName
sinkOp.storageLevel = storageLevel
sinkOp.numColumns = numColumns
sinkOp.useTachyon = useTachyon
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/execution/SharkDDLTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[shark] class SharkDDLTask extends HiveTask[SharkDDLWork] with Serializab
if (alterTableDesc.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
val oldName = alterTableDesc.getOldName
val newName = alterTableDesc.getNewName
SharkEnv.memoryMetadataManager.rename(oldName, newName)
SharkEnv.memoryMetadataManager.rename(hiveMetadataDb.getCurrentDatabase(), oldName, newName)
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/shark/execution/TableScanOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class TableScanOperator extends TopOperator[HiveTableScanOperator] with HiveTopO
override def execute(): RDD[_] = {
assert(parentOperators.size == 0)
val tableKey: String = tableDesc.getTableName.split('.')(1)
val databaseName: String = tableDesc.getTableName.split('.')(0)

// There are three places we can load the table from.
// 1. Tachyon table
Expand All @@ -120,14 +121,14 @@ class TableScanOperator extends TopOperator[HiveTableScanOperator] with HiveTopO
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
if (cacheMode == CacheType.HEAP) {
// Table should be in Spark heap (block manager).
val rdd = SharkEnv.memoryMetadataManager.get(tableKey).getOrElse {
val rdd = SharkEnv.memoryMetadataManager.get(databaseName, tableKey).getOrElse {
logError("""|Table %s not found in block manager.
|Are you trying to access a cached table from a Shark session other than
|the one in which it was created?""".stripMargin.format(tableKey))
throw(new QueryExecutionException("Cached table not found"))
}
logInfo("Loading table " + tableKey + " from Spark block manager")
createPrunedRdd(tableKey, rdd)
createPrunedRdd(databaseName, tableKey, rdd)
} else if (cacheMode == CacheType.TACHYON) {
// Table is in Tachyon.
if (!SharkEnv.tachyonUtil.tableExists(tableKey)) {
Expand All @@ -136,26 +137,26 @@ class TableScanOperator extends TopOperator[HiveTableScanOperator] with HiveTopO
logInfo("Loading table " + tableKey + " from Tachyon.")

var indexToStats: collection.Map[Int, TablePartitionStats] =
SharkEnv.memoryMetadataManager.getStats(tableKey).getOrElse(null)
SharkEnv.memoryMetadataManager.getStats(databaseName, tableKey).getOrElse(null)

if (indexToStats == null) {
val statsByteBuffer = SharkEnv.tachyonUtil.getTableMetadata(tableKey)
indexToStats = JavaSerializer.deserialize[collection.Map[Int, TablePartitionStats]](
statsByteBuffer.array())
logInfo("Loading table " + tableKey + " stats from Tachyon.")
SharkEnv.memoryMetadataManager.putStats(tableKey, indexToStats)
SharkEnv.memoryMetadataManager.putStats(databaseName, tableKey, indexToStats)
}
createPrunedRdd(tableKey, SharkEnv.tachyonUtil.createRDD(tableKey))
createPrunedRdd(databaseName, tableKey, SharkEnv.tachyonUtil.createRDD(tableKey))
} else {
// Table is a Hive table on HDFS (or other Hadoop storage).
super.execute()
}
}

private def createPrunedRdd(tableKey: String, rdd: RDD[_]): RDD[_] = {
private def createPrunedRdd(databaseName: String, tableKey: String, rdd: RDD[_]): RDD[_] = {
// Stats used for map pruning.
val indexToStats: collection.Map[Int, TablePartitionStats] =
SharkEnv.memoryMetadataManager.getStats(tableKey).get
SharkEnv.memoryMetadataManager.getStats(databaseName, tableKey).get

// Run map pruning if the flag is set, there exists a filter predicate on
// the input table and we have statistics on the table.
Expand Down
49 changes: 41 additions & 8 deletions src/main/scala/shark/memstore2/MemoryMetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,58 @@ class MemoryMetadataManager {
private val _keyToRdd: ConcurrentMap[String, RDD[_]] =
new ConcurrentHashMap[String, RDD[_]]()

// Tracks number of parts inserted into cached table
private val _keyToNextPart: ConcurrentMap[String, Int] =
new ConcurrentHashMap[String, Int]()

private val _keyToStats: ConcurrentMap[String, collection.Map[Int, TablePartitionStats]] =
new ConcurrentHashMap[String, collection.Map[Int, TablePartitionStats]]

def contains(key: String) = _keyToRdd.contains(key.toLowerCase)
def contains(databaseName: String, tableName: String) = {
val key = databaseName + '.' + tableName
_keyToRdd.contains(key.toLowerCase)
}

def put(key: String, rdd: RDD[_]) {
def put(databaseName: String, tableName: String, rdd: RDD[_]) {
val key = databaseName + '.' + tableName
_keyToRdd(key.toLowerCase) = rdd
}

def get(key: String): Option[RDD[_]] = _keyToRdd.get(key.toLowerCase)
def get(databaseName: String, tableName: String): Option[RDD[_]] = {
val key = databaseName + '.' + tableName
_keyToRdd.get(key.toLowerCase)
}

def putStats(key: String, stats: collection.Map[Int, TablePartitionStats]) {
def putStats(databaseName: String, tableName: String, stats: collection.Map[Int, TablePartitionStats]) {
val key = databaseName + '.' + tableName
_keyToStats.put(key.toLowerCase, stats)
}

def getStats(key: String): Option[collection.Map[Int, TablePartitionStats]] = {
def getStats(databaseName: String, tableName: String): Option[collection.Map[Int, TablePartitionStats]] = {
val key = databaseName + '.' + tableName
_keyToStats.get(key.toLowerCase)
}

def rename(oldKey: String, newKey: String) {
if (contains(oldKey)) {
def getNextPartNum(databaseName: String, tableName: String): Int = {
val key = databaseName + '.' + tableName
val currentPartNum = _keyToNextPart.get(key.toLowerCase)
currentPartNum match {
case Some(partNum) => {
_keyToNextPart.put(key, partNum + 1)
partNum + 1
}
case None => {
_keyToNextPart.put(key, 1)
1
}
}
}

def rename(databaseName: String, oldTableName: String, newTableName: String) {
val oldKey = databaseName + '.' + oldTableName
val newKey = databaseName + '.' + newTableName

if (contains(databaseName, oldTableName)) {
val oldKeyToLowerCase = oldKey.toLowerCase
val newKeyToLowerCase = newKey.toLowerCase

Expand All @@ -80,7 +111,8 @@ class MemoryMetadataManager {
* @return Option::isEmpty() is true if there is no RDD value corresponding to 'key' in
* '_keyToRDD'. Otherwise, returns a reference to the RDD that was unpersist()'ed.
*/
def unpersist(key: String): Option[RDD[_]] = {
def unpersist(databaseName: String, tableName: String): Option[RDD[_]] = {
val key = databaseName + '.' + tableName
def unpersistRDD(rdd: RDD[_]): Unit = {
rdd match {
case u: UnionRDD[_] => {
Expand All @@ -97,6 +129,7 @@ class MemoryMetadataManager {
// corresponding to the argument for 'key'.
val rddValue = _keyToRdd.remove(key.toLowerCase())
_keyToStats.remove(key)
_keyToNextPart.remove(key)
// Unpersist the RDD using the nested helper fn above.
rddValue match {
case Some(rdd) => unpersistRDD(rdd)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/parse/SharkDDLSemanticAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class SharkDDLSemanticAnalyzer(conf: HiveConf) extends DDLSemanticAnalyzer(conf)

astNode.getToken.getType match {
case HiveParser.TOK_DROPTABLE => {
SharkEnv.unpersist(getTableName(astNode))
SharkEnv.unpersist(db.getCurrentDatabase(), getTableName(astNode))
}
case HiveParser.TOK_ALTERTABLE_RENAME => {
analyzeAlterTableRename(astNode)
Expand All @@ -32,7 +32,7 @@ class SharkDDLSemanticAnalyzer(conf: HiveConf) extends DDLSemanticAnalyzer(conf)

private def analyzeAlterTableRename(astNode: ASTNode) {
val oldTableName = getTableName(astNode)
if (SharkEnv.memoryMetadataManager.contains(oldTableName)) {
if (SharkEnv.memoryMetadataManager.contains(db.getCurrentDatabase(), oldTableName)) {
val newTableName = BaseSemanticAnalyzer.getUnescapedName(
astNode.getChild(1).asInstanceOf[ASTNode])

Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/shark/parse/SharkSemanticAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ class SharkSemanticAnalyzer(conf: HiveConf) extends SemanticAnalyzer(conf) with
OperatorFactory.createSharkFileOutputPlan(hiveSinkOp)
} else {
// Otherwise, check if we are inserting into a table that was cached.
val cachedTableName = tableName.split('.')(1) // Ignore the database name
SharkEnv.memoryMetadataManager.get(cachedTableName) match {
val cachedTableName = tableName.split('.')(1) // Ignore the database name
val databaseName = tableName.split('.')(0)
SharkEnv.memoryMetadataManager.get(databaseName, cachedTableName) match {
case Some(rdd) => {
if (hiveSinkOps.size == 1) {
// If useUnionRDD is false, the sink op is for INSERT OVERWRITE.
Expand All @@ -199,6 +200,7 @@ class SharkSemanticAnalyzer(conf: HiveConf) extends SemanticAnalyzer(conf) with
OperatorFactory.createSharkMemoryStoreOutputPlan(
hiveSinkOp,
cachedTableName,
databaseName,
storageLevel,
_resSchema.size, // numColumns
cacheMode == CacheType.TACHYON, // use tachyon
Expand All @@ -223,6 +225,7 @@ class SharkSemanticAnalyzer(conf: HiveConf) extends SemanticAnalyzer(conf) with
OperatorFactory.createSharkMemoryStoreOutputPlan(
hiveSinkOps.head,
qb.getTableDesc.getTableName,
qb.getTableDesc.getDatabaseName,
storageLevel,
_resSchema.size, // numColumns
cacheMode == CacheType.TACHYON, // use tachyon
Expand Down
Loading