From 7d7afb06f682c10f3900eb8adeab9fad6d49cb24 Mon Sep 17 00:00:00 2001 From: Phil Dakin Date: Thu, 26 Oct 2023 14:24:09 +0900 Subject: [PATCH] [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition ### What changes were proposed in this pull request? Add a test for the CONCURRENT_QUERY error raised when multiple sessions try to simultaneously resume the same streaming query from checkpoint. ### Why are the changes needed? Improve testing coverage per https://issues.apache.org/jira/browse/SPARK-38723. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Change is itself a test - ran locally and confirmed the suite passes. ``` [info] All tests passed. [success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43405 from PhilDakin/pdakin.SPARK-38723. Authored-by: Phil Dakin Signed-off-by: Jungtaek Lim --- .../errors/QueryExecutionErrorsSuite.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 78bbabb1a3fda..fb1d05f2a9a05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -25,6 +25,7 @@ import java.util.{Locale, Properties, ServiceConfigurationError} import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.mockito.Mockito.{mock, spy, when} +import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row, SaveMode} @@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.streaming.StreamingQueryException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DataType, DecimalType, LongType, MetadataBuilder, StructType} +import org.apache.spark.util.ThreadUtils import org.apache.spark.util.Utils class QueryExecutionErrorsSuite @@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite assert(e.getCause.isInstanceOf[NullPointerException]) } + test("CONCURRENT_QUERY: streaming query is resumed from many sessions") { + failAfter(90 seconds) { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { + withTempDir { dir => + val ds = spark.readStream.format("rate").load() + + // Queries have the same ID when they are resumed from the same checkpoint. + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + // Run an initial query to setup the checkpoint. + val initialQuery = ds.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + + // Error is thrown due to a race condition. Ensure it happens with high likelihood in the + // test by spawning many threads. + val exceptions = ThreadUtils.parmap(Seq.range(1, 50), "QueryExecutionErrorsSuite", 50) + { _ => + var exception = None : Option[SparkConcurrentModificationException] + try { + val restartedQuery = ds.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + restartedQuery.stop() + restartedQuery.awaitTermination() + } catch { + case e: SparkConcurrentModificationException => + exception = Some(e) + } + exception + } + assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _)) + exceptions.map { e => + if (e.isDefined) { + checkError( + e.get, + errorClass = "CONCURRENT_QUERY", + sqlState = Some("0A000") + ) + } + } + spark.streams.active.foreach(_.stop()) + } + } + } + } + test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") { withTable("t") { sql("CREATE TABLE t(c String) USING parquet")