Skip to content

Commit

Permalink
Misc fixes and performance improvements
Browse files Browse the repository at this point in the history
- quote table names in all store DDL/DML/query strings to allow for special characters
  and keywords in table names
- add retries in Utils.mapExecutors to correct cases where partitions may not get routed
  to all executors due to unavailable cores for long
- use a separate MemoryOwner object instead of Tuple2 for better and more efficient hash
- commented out execution memory acquire for COMPRESS/DECOMPRESS that cause frequent LME
  for some reason (needs to be looked further) and instead use store memory for all causes
- use a single "addValue" operation when adding/subtracting from memory object pool
  instead of two operations of lookup and update as done by previous "addTo" method
- changed TrieMap to ConcurrentHashMap in SnappyContext static executor tracker map
- renamed SparkConnectorRDDHelper to SmartConnectorRDDHelper
- correct scalastyle errors in TPCH_Queries, TPCH_Memsql, QueryExecutionJob
- add JDODataStoreException to expected exception strings to correct occasional failures
  in quickstart precheckin
  • Loading branch information
Sumedh Wale committed Feb 27, 2018
1 parent 2042e4b commit adf4664
Show file tree
Hide file tree
Showing 51 changed files with 546 additions and 452 deletions.
2 changes: 2 additions & 0 deletions cluster/bin/snappy-job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,5 @@ case $cmd in
curl -X DELETE ${jobServerURL} $CURL_OPTS ${securePart}
;;
esac

echo
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ class QueryRoutingDUnitTest(val s: String)

foundTable = false
while (rSet2.next()) {
if (ColumnFormatRelation.columnBatchTableName(t).
if (ColumnFormatRelation.columnBatchTableName("APP." + t).
equalsIgnoreCase(rSet2.getString("TABLE_NAME"))) {
foundTable = true
assert(rSet2.getString("TABLE_TYPE").equalsIgnoreCase("TABLE"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ object SnappyTableStatsProviderDUnitTest {
left.getCombinedStats(right)
}

val expected = Utils.mapExecutors[RegionStat](snc, () => {
val expected = Utils.mapExecutors[RegionStat](snc.sparkContext, () => {
val result = if (isReplicatedTable) getReplicatedRegionStats(tableName)
else getPartitionedRegionStats(tableName, isColumnTable)
Iterator[RegionStat](convertToSerializableForm(result))
}).collect()
})

expected.map(getRegionStat).reduce(aggregateResults)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CatalogConsistencyDUnitTest(s: String) extends ClusterManagerTestBase(s) {
try {
// make sure that the column buffer does not exist
routeQueryDisabledConn.createStatement().executeQuery(
"select * from " + ColumnFormatRelation.columnBatchTableName("column_table1"))
"select * from " + ColumnFormatRelation.columnBatchTableName("app.column_table1"))
} catch {
case se: SQLException if (se.getSQLState.equals("42X05")) =>
case unknown: Throwable => throw unknown
Expand Down Expand Up @@ -156,7 +156,7 @@ class CatalogConsistencyDUnitTest(s: String) extends ClusterManagerTestBase(s) {
// drop column_table1 from store DD
val routeQueryDisabledConn = getClientConnection(netPort1, false)
routeQueryDisabledConn.createStatement().execute("drop table " +
ColumnFormatRelation.columnBatchTableName("column_table1"))
ColumnFormatRelation.columnBatchTableName("app.column_table1"))
routeQueryDisabledConn.createStatement().execute("drop table column_table1")

// make sure that the table exists in Hive metastore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

// we don't expect any increase in put distribution stats
val columnTableRegionName = ColumnFormatRelation.
columnBatchTableName(tableName).toUpperCase
columnBatchTableName("APP." + tableName).toUpperCase
val getPRMessageCount = new SerializableCallable[AnyRef] {
override def call(): AnyRef = {
Int.box(Misc.getRegionForTable("APP." + columnTableRegionName, true).
Int.box(Misc.getRegionForTable(columnTableRegionName, true).
asInstanceOf[PartitionedRegion].getPrStats.getPartitionMessagesSent)
}
}
Expand Down Expand Up @@ -180,11 +180,11 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {
val p = Map.empty[String, String]
snc.createTable(tableName, "column", dataDF.schema, p)
val columnTableRegionName = ColumnFormatRelation.
columnBatchTableName(tableName).toUpperCase
columnBatchTableName("APP." + tableName).toUpperCase
// we don't expect any increase in put distribution stats
val getPRMessageCount = new SerializableCallable[AnyRef] {
override def call(): AnyRef = {
Int.box(Misc.getRegionForTable("APP." + columnTableRegionName, true).
Int.box(Misc.getRegionForTable(columnTableRegionName, true).
asInstanceOf[PartitionedRegion].getPrStats.getPartitionMessagesSent)
}
}
Expand Down Expand Up @@ -221,12 +221,12 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {
val p = Map.empty[String, String]
snc.createTable(tableName, "column", dataDF.schema, p)

val tName = ColumnFormatRelation.columnBatchTableName(tableName.toUpperCase())
val tName = ColumnFormatRelation.columnBatchTableName("APP." + tableName.toUpperCase())
// we don't expect any increase in put distribution stats
val getTotalEntriesCount = new SerializableCallable[AnyRef] {
override def call(): AnyRef = {
val pr: PartitionedRegion =
Misc.getRegionForTable("APP." + tName, true).asInstanceOf[PartitionedRegion]
Misc.getRegionForTable(tName, true).asInstanceOf[PartitionedRegion]
var buckets = Set.empty[Integer]
0 until pr.getTotalNumberOfBuckets foreach { x =>
buckets = buckets + x
Expand All @@ -246,7 +246,7 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {
val getLocalEntriesCount = new SerializableCallable[AnyRef] {
override def call(): AnyRef = {
val pr: PartitionedRegion =
Misc.getRegionForTable("APP." + tName, true).asInstanceOf[PartitionedRegion]
Misc.getRegionForTable(tName, true).asInstanceOf[PartitionedRegion]
val iter = pr.getAppropriateLocalEntriesIterator(
pr.getDataStore.getAllLocalBucketIds, false, false, true, pr, false)
var count = 0
Expand Down Expand Up @@ -333,9 +333,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

val region = Misc.getRegionForTable(s"APP.${tableName.toUpperCase()}",
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable("APP." + ColumnFormatRelation.
columnBatchTableName(tableName).toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP." + tableName).toUpperCase(), true).asInstanceOf[PartitionedRegion]

println("startSparkJob2 " + region.size())

Expand Down Expand Up @@ -376,10 +375,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {
assert(r.length == 1005, s"Unexpected elements ${r.length}, expected=1005")
val region = Misc.getRegionForTable(s"APP.${tableNameWithPartition.toUpperCase()}",
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(
"APP." +
ColumnFormatRelation.columnBatchTableName(tableNameWithPartition).toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP." + tableNameWithPartition).toUpperCase(), true).asInstanceOf[PartitionedRegion]

println("startSparkJob3 " + region.size())
println("startSparkJob3 " + shadowRegion.size())
Expand Down Expand Up @@ -430,10 +427,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

val region = Misc.getRegionForTable(s"APP.${tableNameWithPartition.toUpperCase()}",
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(
"APP." +
ColumnFormatRelation.columnBatchTableName(tableNameWithPartition).toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP." + tableNameWithPartition).toUpperCase(), true).asInstanceOf[PartitionedRegion]

println("startSparkJob4 " + region.size())
println("startSparkJob4 " + shadowRegion.size())
Expand Down Expand Up @@ -480,10 +475,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

val region = Misc.getRegionForTable(s"APP.${tableNameWithPartition.toUpperCase()}",
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(
"APP." +
ColumnFormatRelation.columnBatchTableName(tableNameWithPartition).toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP." + tableNameWithPartition).toUpperCase(), true).asInstanceOf[PartitionedRegion]

println("startSparkJob5 " + region.size())
println("startSparkJob5 " + shadowRegion.size())
Expand Down Expand Up @@ -534,9 +527,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

val region = Misc.getRegionForTable("APP.COLUMNTABLE4", true).
asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(
"APP." + ColumnFormatRelation.columnBatchTableName("COLUMNTABLE4").toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP.COLUMNTABLE4"), true).asInstanceOf[PartitionedRegion]

println("region.size() " + region.size())
println("shadowRegion.size()" + shadowRegion.size())
Expand Down Expand Up @@ -594,9 +586,8 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {

val region = Misc.getRegionForTable("APP.COLUMNTABLE4", true).
asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable("APP." + ColumnFormatRelation.
columnBatchTableName("COLUMNTABLE4").toUpperCase(),
true).asInstanceOf[PartitionedRegion]
val shadowRegion = Misc.getRegionForTable(ColumnFormatRelation.columnBatchTableName(
"APP.COLUMNTABLE4"), true).asInstanceOf[PartitionedRegion]

println("region.size() " + region.size())
println("shadowRegion.size()" + shadowRegion.size())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class DynamicJarInstallationDUnitTest(val s: String)

def verifyClassOnExecutors(snc: SnappyContext, className: String,
version: String, count: Int): Unit = {
val countInstances = Utility.mapExecutors(snc,
val countInstances = Utility.mapExecutors[Int](snc.sparkContext,
() => {
if (DynamicJarInstallationDUnitTest.loadClass(className, version)) {
Seq(1).iterator
} else Iterator.empty
}).count
}).length

assert(countInstances == count,
s"Assertion failed as countInstances=$countInstances and count=$count did not match")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ object MemoryManagerRestartDUnitTest {
}

def failTheExecutors(): Unit = {
Utils.mapExecutors(sc, (_, _) => {
Utils.mapExecutors[Unit](sc, () => {
throw new OutOfMemoryError("Some Random message") // See SystemFailure.isJVMFailureError
}).collect()
})
}

private def sc = SnappyContext.globalSparkContext
Expand All @@ -189,9 +189,9 @@ object MemoryManagerRestartDUnitTest {
val mMap = memoryManager.memoryForObject
memoryManager.logStats()
var sum = 0L
mMap.forEach(new ObjLongConsumer[(String, MemoryMode)] {
override def accept(key: (String, MemoryMode), value: Long): Unit = {
if (key._1.toLowerCase().contains(tableName.toLowerCase())) {
mMap.forEach(new ObjLongConsumer[MemoryOwner] {
override def accept(key: MemoryOwner, value: Long): Unit = {
if (key.owner.toLowerCase().contains(tableName.toLowerCase())) {
sum += value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,9 @@ object SnappyUnifiedMemoryManagerDUnitTest {
SparkEnv.get.memoryManager
.asInstanceOf[SnappyUnifiedMemoryManager].logStats()
var sum = 0L
mMap.forEach(new ObjLongConsumer[(String, MemoryMode)] {
override def accept(key: (String, MemoryMode), value: Long): Unit = {
if (key._1.toLowerCase().contains(tableName.toLowerCase())) {
mMap.forEach(new ObjLongConsumer[MemoryOwner] {
override def accept(key: MemoryOwner, value: Long): Unit = {
if (key.owner.toLowerCase().contains(tableName.toLowerCase())) {
sum += value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class NorthWindDUnitTest(s: String) extends ClusterManagerTestBase(s) {
conn.close()
}

private lazy val totalProcessors = Utils.mapExecutors(sc, (_, _) =>
Iterator(Runtime.getRuntime.availableProcessors())).collect().sum
private lazy val totalProcessors = Utils.mapExecutors[Int](sc, () =>
Iterator(Runtime.getRuntime.availableProcessors())).sum

private def validateReplicatedTableQueries(snc: SnappyContext): Unit = {
for (q <- NWQueries.queries) {
Expand Down
Loading

0 comments on commit adf4664

Please sign in to comment.