Skip to content

Commit

Permalink
skip dynamic cpusPerTask setting with smart connector
Browse files Browse the repository at this point in the history
also fix few dunit test failures in ColumnBatchAndExternalTableDUnitTest
  • Loading branch information
sumwale committed Mar 28, 2020
1 parent 29f2705 commit f1f675c
Show file tree
Hide file tree
Showing 199 changed files with 93 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ object SplitSnappyClusterDUnitTest
.set("snappydata.connection", connectionURL)
.set("snapptdata.sql.planCaching", random.nextBoolean().toString)
.set(Property.TestDisableCodeGenFlag.name, "false")
logInfo("Spark conf:" + conf.getAll.toString)
logInfo("Spark conf: " + conf.getAll.mkString(", "))

val sc = SparkContext.getOrCreate(conf)
// sc.setLogLevel("DEBUG")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ package org.apache.spark.sql


import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.Property
import io.snappydata.cluster.ClusterManagerTestBase
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableCallable}
import io.snappydata.util.TestUtils
import io.snappydata.{Property, SnappyFunSuite}
import org.scalatest.Assertions

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.ui.SQLExecutionUIData

case class TestRecord(col1: Int, col2: Int, col3: Int)

class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTestBase(s)
with Assertions with Logging with SparkSupport {

private def activeExecutionIds(session: SparkSession): Set[Long] = {
private def sqlExecutionIds(session: SparkSession): Set[Long] = {
session.sharedState.statusStore.executionsList().map(_.executionId).toSet
}

Expand All @@ -53,144 +54,148 @@ class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTest

import session.implicits._

val ds = session.createDataset(sc.range(1, 101).map(i =>
val ds = session.createDataset(sc.range(1, 1001).map(i =>
AirlineData(2015, 2, 15, 1002, i.toInt, "AA" + i)))
ds.write.insertInto("airline")

// ***Check for the case when all the column batches are scanned ****
var previousExecutionIds = activeExecutionIds(session)
var previousExecutionIds = sqlExecutionIds(session)

val df_allColumnBatchesScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where ArrDelay < 101 " +
"from AIRLINE where ArrDelay < 1001 " +
"group by UniqueCarrier order by arrivalDelay")

df_allColumnBatchesScan.count()
df_allColumnBatchesScan.collect()

var executionIds = activeExecutionIds(session).diff(previousExecutionIds)
var executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

var executionId = executionIds.head

val (scanned1, skipped1) =
findColumnBatchStats(df_allColumnBatchesScan, session, executionId)
val (scanned1, skipped1) = findColumnBatchStats(session, executionId)
assert(skipped1 == 0, "All Column batches should have been scanned")
assert(scanned1 > 0, "All Column batches should have been scanned")

// ***Check for the case when all the column batches are skipped****
previousExecutionIds = activeExecutionIds(session)
previousExecutionIds = sqlExecutionIds(session)

val df_noColumnBatchesScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where ArrDelay > 101 " +
"from AIRLINE where ArrDelay > 1001 " +
"group by UniqueCarrier order by arrivalDelay")

df_noColumnBatchesScan.count()
df_noColumnBatchesScan.collect()

executionIds = activeExecutionIds(session).diff(previousExecutionIds)
executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

executionId = executionIds.head

val (scanned2, skipped2) =
findColumnBatchStats(df_allColumnBatchesScan, session, executionId)
val (scanned2, skipped2) = findColumnBatchStats(session, executionId)
assert(scanned2 == skipped2, "No Column batches should have been scanned")
assert(skipped2 > 0, "No Column batches should have been scanned")

// ***Check for the case when some of the column batches are scanned ****
previousExecutionIds = activeExecutionIds(session)
previousExecutionIds = sqlExecutionIds(session)

val df_someColumnBatchesScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where ArrDelay < 20 " +
"group by UniqueCarrier order by arrivalDelay")

df_someColumnBatchesScan.count()
df_someColumnBatchesScan.collect()

executionIds = activeExecutionIds(session).diff(previousExecutionIds)
executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

executionId = executionIds.head

val (scanned3, skipped3) =
findColumnBatchStats(df_allColumnBatchesScan, session, executionId)
val (scanned3, skipped3) = findColumnBatchStats(session, executionId)

assert(skipped3 > 0, "Some Column batches should have been skipped")
assert(scanned3 != skipped3, "Some Column batches should have been skipped - comparison")

// check for StartsWith predicate with MAX/MIN handling

// first all batches chosen
previousExecutionIds = activeExecutionIds(session)
previousExecutionIds = sqlExecutionIds(session)

val df_allColumnBatchesLikeScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA%' " +
"group by UniqueCarrier order by arrivalDelay")

var count = df_allColumnBatchesLikeScan.count()
assert(count == 100, s"Unexpected count = $count, expected 100")
var count = df_allColumnBatchesLikeScan.collect().length
assert(count == 1000, s"Unexpected count = $count, expected 1000")

executionIds = activeExecutionIds(session).diff(previousExecutionIds)
executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

executionId = executionIds.head

val (scanned4, skipped4) =
findColumnBatchStats(df_allColumnBatchesLikeScan, session, executionId)
val (scanned4, skipped4) = findColumnBatchStats(session, executionId)

assert(skipped4 == 0, "No Column batches should have been skipped")
assert(scanned4 > 0, "All Column batches should have been scanned")

// next some batches skipped
previousExecutionIds = activeExecutionIds(session)
previousExecutionIds = sqlExecutionIds(session)

val df_someColumnBatchesLikeScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA1%' " +
"group by UniqueCarrier order by arrivalDelay")

count = df_someColumnBatchesLikeScan.count()
assert(count == 12, s"Unexpected count = $count, expected 12")
count = df_someColumnBatchesLikeScan.collect().length
assert(count == 112, s"Unexpected count = $count, expected 112")

executionIds = activeExecutionIds(session).diff(previousExecutionIds)
executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

executionId = executionIds.head

val (scanned5, skipped5) =
findColumnBatchStats(df_someColumnBatchesLikeScan, session, executionId)
val (scanned5, skipped5) = findColumnBatchStats(session, executionId)

assert(skipped5 > 0, "Some Column batches should have been skipped")
assert(scanned5 != skipped5, "Some Column batches should have been skipped - comparison")

// last all batches skipped
previousExecutionIds = activeExecutionIds(session)
previousExecutionIds = sqlExecutionIds(session)

val df_noColumnBatchesLikeScan = session.sql(
"select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier " +
"from AIRLINE where UniqueCarrier like 'AA0%' " +
"group by UniqueCarrier order by arrivalDelay")

count = df_noColumnBatchesLikeScan.count()
count = df_noColumnBatchesLikeScan.collect().length
assert(count == 0, s"Unexpected count = $count, expected 0")

executionIds = activeExecutionIds(session).diff(previousExecutionIds)
executionIds = sqlExecutionIds(session).diff(previousExecutionIds)

executionId = executionIds.head

val (scanned6, skipped6) =
findColumnBatchStats(df_noColumnBatchesLikeScan, session, executionId)
val (scanned6, skipped6) = findColumnBatchStats(session, executionId)

assert(scanned6 == skipped6, "No Column batches should have been returned")
assert(skipped6 > 0, "No Column batches should have been returned")
}

private def findColumnBatchStats(df: DataFrame,
session: SnappySession, executionId: Long): (Long, Long) = {
private def getAccumulatorValue(execData: SQLExecutionUIData, name: String): Long = {
execData.metrics.find(_.name == name) match {
case Some(id) => execData.metricValues.get(id.accumulatorId) match {
case Some(v) => v.toLong
case _ => 0L
}
case _ => 0L
}
}

val execData = session.sharedState.statusStore.executionsList().find(
_.executionId == executionId).get
val seenId = execData.metrics.find(_.name == "column batches seen").get
val skippedId = execData.metrics.find(_.name == "column batches skipped by the predicate").get
private def findColumnBatchStats(session: SnappySession, executionId: Long): (Long, Long) = {
var execData: SQLExecutionUIData = null
SnappyFunSuite.waitForCriterion({
execData = session.sharedState.statusStore.executionsList().find(
_.executionId == executionId).get
execData.metricValues ne null
}, s"waiting for metricValues of executionId = $executionId", 10000, 10)

(execData.metricValues.filter(_._1 == seenId).head._2.toInt,
execData.metricValues.filter(_._1 == skippedId).head._2.toInt)
(getAccumulatorValue(execData, "column batches seen"),
getAccumulatorValue(execData, "column batches skipped by the predicate"))
}

def testCreateColumnTablesFromOtherTables(): Unit = {
Expand Down
17 changes: 17 additions & 0 deletions cluster/src/test/scala/io/snappydata/filodb/FiloDBApp_Column.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package io.snappydata.filodb

import scala.concurrent.duration.Duration
Expand Down
17 changes: 17 additions & 0 deletions cluster/src/test/scala/io/snappydata/filodb/FiloDBApp_Row.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package io.snappydata.filodb

import java.sql.{DriverManager, PreparedStatement}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/internal/session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,23 @@ class SnappyConf(@transient val session: SnappySession)

private[sql] def setDynamicCpusPerTask(): Unit = synchronized {
if (dynamicCpusPerTask != -1) {
val numExecutors = SnappyContext.numExecutors
val totalUsableHeap = SnappyContext.foldLeftBlockIds(0L)(_ + _.usableHeapBytes)

// skip for smart connector where there is no information of physical cores or heap
if (numExecutors == 0 || totalUsableHeap <= 0) return

val sparkCores = session.sparkContext.defaultParallelism.toDouble
// calculate minimum required heap assuming a block size of 128M
val minRequiredHeap = 128.0 * 1024.0 * 1024.0 * sparkCores * 1.2
val totalUsableHeap = SnappyContext.foldLeftBlockIds(0L)(_ + _.usableHeapBytes)

// select bigger among (required heap / available) and (logical cores / physical)
val cpusPerTask0 = math.max(minRequiredHeap / totalUsableHeap,
sparkCores / SnappyContext.totalPhysicalCoreCount.get())
// keep a reasonable upper-limit so tasks can at least be scheduled:
// used below is average logical cores / 2
val cpusPerTask = math.max(1, math.ceil(math.min(sparkCores /
(2 * SnappyContext.numExecutors), cpusPerTask0)).toInt)
(2 * numExecutors), cpusPerTask0)).toInt)
setConfString(Constant.CPUS_PER_TASK_PROP, cpusPerTask.toString)
dynamicCpusPerTask = cpusPerTask
logDebug(s"Set dynamic ${Constant.CPUS_PER_TASK_PROP} to $cpusPerTask")
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit f1f675c

Please sign in to comment.