From 91028bbbab937a46f4d4b0397fcab3e77b8bc4b2 Mon Sep 17 00:00:00 2001 From: AdalbertMemSQL Date: Fri, 14 Jun 2024 11:32:51 +0300 Subject: [PATCH] Fix for the thread contention issue (#90) * Fix thread contention issue * Upgraded version * Delete 'beta' from version --- CHANGELOG | 5 +++++ README.md | 6 +++--- build.sbt | 4 ++-- .../notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln | 2 +- demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln | 2 +- .../spark-sql-singlestore-demo_2F7PZ81H6.zpln | 2 +- .../spark/AggregatorParallelReadListener.scala | 9 ++++++++- .../scala/com/singlestore/spark/SinglestoreRDD.scala | 12 +++++++----- 8 files changed, 28 insertions(+), 14 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index f5dbab69..9a876476 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +2024-06-14 Version 4.1.8 + * Changed retry during reading from result table to use exponential backoff + * Used ForkJoinPool instead of FixedThreadPool + * Added more logging + 2024-05-13 Version 4.1.7 * Fixed bug that caused reading from the wrong result table when the task was restarted diff --git a/README.md b/README.md index bae7c582..a2377d13 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # SingleStoreDB Spark Connector -## Version: 4.1.7 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) +## Version: 4.1.8 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) ## Getting Started @@ -13,13 +13,13 @@ spark-packages.org. The group is `com.singlestore` and the artifact is You can add the connector to your Spark application using: spark-shell, pyspark, or spark-submit ``` -$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0 +$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0 ``` We release multiple versions of the `singlestore-spark-connector`, one for each supported Spark version. The connector follows the `x.x.x-spark-y.y.y` naming convention, where `x.x.x` represents the connector version and `y.y.y` represents the corresponding Spark version. -For example, in connector `4.1.7-spark-3.5.0`, 4.1.7 is the version of the connector, +For example, in connector `4.1.8-spark-3.5.0`, 4.1.8 is the version of the connector, compiled and tested against Spark version 3.5.0. It is critical to select the connector version that corresponds to the Spark version in use. diff --git a/build.sbt b/build.sbt index d6ec8c96..8752f638 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import xerial.sbt.Sonatype._ To run tests or publish with a specific spark version use this java option: -Dspark.version=3.0.0 */ -val sparkVersion = sys.props.get("spark.version").getOrElse("3.5.0") +val sparkVersion = sys.props.get("spark.version").getOrElse("3.1.3") val scalaVersionStr = "2.12.12" val scalaVersionPrefix = scalaVersionStr.substring(0, 4) val jacksonDatabindVersion = sparkVersion match { @@ -30,7 +30,7 @@ lazy val root = project case "3.4.2" => "scala-sparkv3.4" case "3.5.0" => "scala-sparkv3.5" }), - version := s"4.1.7-spark-${sparkVersion}", + version := s"4.1.8-spark-${sparkVersion}", licenses += "Apache-2.0" -> url( "http://opensource.org/licenses/Apache-2.0" ), diff --git a/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln b/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln index 8a66c516..e1ea6eed 100644 --- a/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln +++ b/demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:26:15.232", "progress": 0, diff --git a/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln b/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln index 3855ba97..fc890dd0 100644 --- a/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln +++ b/demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:31:08.311", "progress": 0, diff --git a/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln b/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln index 8c74a1df..256c8ae6 100644 --- a/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln +++ b/demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln @@ -45,7 +45,7 @@ }, { "title": "Configure Spark", - "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", + "text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password", "user": "anonymous", "dateUpdated": "2022-07-06 11:32:22.885", "progress": 0, diff --git a/src/main/scala/com/singlestore/spark/AggregatorParallelReadListener.scala b/src/main/scala/com/singlestore/spark/AggregatorParallelReadListener.scala index 0fc389ce..c9cea04d 100644 --- a/src/main/scala/com/singlestore/spark/AggregatorParallelReadListener.scala +++ b/src/main/scala/com/singlestore/spark/AggregatorParallelReadListener.scala @@ -82,6 +82,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene connectionsMap += (tableName -> conn) ) + log.info(s"Creating result table '$tableName'") try { // Create result table JdbcHelpers.createResultTable( @@ -94,9 +95,13 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene singleStoreRDDInfo.needsRepartition, singleStoreRDDInfo.repartitionColumns ) + log.info(s"Successfully created result table '$tableName'") } catch { // Cancel execution if we failed to create a result table - case _: SQLException => singleStoreRDDInfo.sc.cancelStage(stageId) + case e: SQLException => { + singleStoreRDDInfo.sc.cancelStage(stageId) + throw e + } } }) } @@ -117,7 +122,9 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene .get(tableName) .foreach(conn => { // Drop result table + log.info(s"Dropping result table '$tableName'") JdbcHelpers.dropResultTable(conn, tableName) + log.info(s"Successfully dropped result table '$tableName'") // Close connection conn.close() // Delete connection from map diff --git a/src/main/scala/com/singlestore/spark/SinglestoreRDD.scala b/src/main/scala/com/singlestore/spark/SinglestoreRDD.scala index 1b6fc45d..7ecd403f 100644 --- a/src/main/scala/com/singlestore/spark/SinglestoreRDD.scala +++ b/src/main/scala/com/singlestore/spark/SinglestoreRDD.scala @@ -1,8 +1,7 @@ package com.singlestore.spark -import java.sql.{Connection, PreparedStatement, ResultSet, SQLTransientConnectionException} -import java.util.concurrent.Executors - +import java.sql.{Connection, PreparedStatement, ResultSet} +import java.util.concurrent.{Executors, ForkJoinPool} import com.singlestore.spark.SQLGen.VariableList import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row @@ -54,7 +53,7 @@ case class SinglestoreRDD(query: String, override def compute(rawPartition: Partition, context: TaskContext): Iterator[Row] = { val multiPartition: SinglestoreMultiPartition = rawPartition.asInstanceOf[SinglestoreMultiPartition] - val threadPool = Executors.newFixedThreadPool(multiPartition.partitions.size) + val threadPool = new ForkJoinPool(multiPartition.partitions.size) try { val executionContext = ExecutionContext.fromExecutor(threadPool) @@ -121,13 +120,16 @@ case class SinglestoreRDD(query: String, } var lastError: java.sql.SQLException = null + var delay = 50 + val maxDelay = 10000 while (rs == null && (timeout == 0 || System.currentTimeMillis() - startTime < timeout)) { try { rs = stmt.executeQuery() } catch { case e: java.sql.SQLException if e.getErrorCode == ErrResultTableNotExistCode => lastError = e - Thread.sleep(10) + delay = Math.min(maxDelay, delay * 2) + Thread.sleep(delay) } }