diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 0ee693fcbec..a85b34035ee 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.types.StructType import org.apache.kyuubi.sql.KyuubiSQLConf @@ -230,6 +231,39 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation( + _, + _, + relation @ DataSourceV2ScanRelation(_, _, _, _)) => + val table = relation.relation.table + if (table.partitioning().nonEmpty) { + val partitionColumnNames = table.partitioning().map(_.describe()) + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${table.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${table.name()} + |""".stripMargin) + } + } case _ => } } diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala new file mode 100644 index 00000000000..670d9ce7e43 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, SimpleScanBuilder, SimpleWritableDataSource} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.expressions.{Expressions, FieldReference, Transform} +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, Partitioning} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsAndPartitionAwareDataSource extends SimpleWritableDataSource { + + class MyScanBuilder( + val partitionKeys: Seq[String]) extends SimpleScanBuilder + with SupportsReportStatistics with SupportsReportPartitioning { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + override def outputPartitioning(): Partitioning = { + new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 10) + } + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder(Seq("i")) + } + + override def partitioning(): Array[Transform] = { + Array(Expressions.identity("i")) + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala new file mode 100644 index 00000000000..2035d352562 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.OptionalLong + +import org.apache.spark.sql.connector._ +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ReportStatisticsDataSource extends SimpleWritableDataSource { + + class MyScanBuilder extends SimpleScanBuilder + with SupportsReportStatistics { + + override def estimateStatistics(): Statistics = { + new Statistics { + override def sizeInBytes(): OptionalLong = OptionalLong.of(80) + + override def numRows(): OptionalLong = OptionalLong.of(10) + } + } + + override def planInputPartitions(): Array[InputPartition] = { + Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10)) + } + + } + + override def getTable(options: CaseInsensitiveStringMap): Table = { + new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new MyScanBuilder + } + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala index 139efd9ca06..22c998d3b0c 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.commons.io.FileUtils import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, MaxPartitionExceedException} @@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends KyuubiSparkSQLExtensionTest { assert(e.getMessage == "Script transformation is not allowed") } } + + test("watchdog with scan maxFileSize -- data source v2") { + val df = spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load() + df.createOrReplaceTempView("test") + val logical = df.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val tableSize = logical.computeStats().sizeInBytes.toLong + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> tableSize.toString) { + sql("SELECT * FROM test").queryExecution.sparkPlan + } + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test").queryExecution.sparkPlan) + } + + val nonPartDf = spark.read.format(classOf[ReportStatisticsDataSource].getName).load() + nonPartDf.createOrReplaceTempView("test_non_part") + val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect { + case d: DataSourceV2ScanRelation => d + }.head + val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> nonPartTableSize.toString) { + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan + } + + withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize / 2).toString) { + intercept[MaxFileSizeExceedException]( + sql("SELECT * FROM test_non_part").queryExecution.sparkPlan) + } + } }