Skip to content

Commit

Permalink
[SPARK-38723][SS][TESTS] Add test for streaming query resume race con…
Browse files Browse the repository at this point in the history
…dition

### What changes were proposed in this pull request?
Add a test for the CONCURRENT_QUERY error raised when multiple sessions try to simultaneously resume the same streaming query from checkpoint.

### Why are the changes needed?
Improve testing coverage per https://issues.apache.org/jira/browse/SPARK-38723.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Change is itself a test - ran locally and confirmed the suite passes.
```
[info] All tests passed.
[success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#43405 from PhilDakin/pdakin.SPARK-38723.

Authored-by: Phil Dakin <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
PhilDakin authored and HeartSaVioR committed Oct 26, 2023
1 parent a3146c8 commit 7d7afb0
Showing 1 changed file with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Locale, Properties, ServiceConfigurationError}
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.mockito.Mockito.{mock, spy, when}
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row, SaveMode}
Expand All @@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DataType, DecimalType, LongType, MetadataBuilder, StructType}
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.Utils

class QueryExecutionErrorsSuite
Expand Down Expand Up @@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite
assert(e.getCause.isInstanceOf[NullPointerException])
}

test("CONCURRENT_QUERY: streaming query is resumed from many sessions") {
failAfter(90 seconds) {
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
withTempDir { dir =>
val ds = spark.readStream.format("rate").load()

// Queries have the same ID when they are resumed from the same checkpoint.
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

// Run an initial query to setup the checkpoint.
val initialQuery = ds.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)

// Error is thrown due to a race condition. Ensure it happens with high likelihood in the
// test by spawning many threads.
val exceptions = ThreadUtils.parmap(Seq.range(1, 50), "QueryExecutionErrorsSuite", 50)
{ _ =>
var exception = None : Option[SparkConcurrentModificationException]
try {
val restartedQuery = ds.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
restartedQuery.stop()
restartedQuery.awaitTermination()
} catch {
case e: SparkConcurrentModificationException =>
exception = Some(e)
}
exception
}
assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _))
exceptions.map { e =>
if (e.isDefined) {
checkError(
e.get,
errorClass = "CONCURRENT_QUERY",
sqlState = Some("0A000")
)
}
}
spark.streams.active.foreach(_.stop())
}
}
}
}

test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") {
withTable("t") {
sql("CREATE TABLE t(c String) USING parquet")
Expand Down

0 comments on commit 7d7afb0

Please sign in to comment.