Skip to content

Commit

Permalink
Fix for the thread contention issue (#90)
Browse files Browse the repository at this point in the history
* Fix thread contention issue

* Upgraded version

* Delete 'beta' from version
  • Loading branch information
AdalbertMemSQL authored Jun 14, 2024
1 parent 86c0ea5 commit 91028bb
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 14 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
2024-06-14 Version 4.1.8
* Changed retry during reading from result table to use exponential backoff
* Used ForkJoinPool instead of FixedThreadPool
* Added more logging

2024-05-13 Version 4.1.7
* Fixed bug that caused reading from the wrong result table when the task was restarted

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# SingleStoreDB Spark Connector
## Version: 4.1.7 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
## Version: 4.1.8 [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)

## Getting Started

Expand All @@ -13,13 +13,13 @@ spark-packages.org. The group is `com.singlestore` and the artifact is

You can add the connector to your Spark application using: spark-shell, pyspark, or spark-submit
```
$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0
$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0
```

We release multiple versions of the `singlestore-spark-connector`, one for each supported Spark version.
The connector follows the `x.x.x-spark-y.y.y` naming convention, where `x.x.x` represents the connector version
and `y.y.y` represents the corresponding Spark version.
For example, in connector `4.1.7-spark-3.5.0`, 4.1.7 is the version of the connector,
For example, in connector `4.1.8-spark-3.5.0`, 4.1.8 is the version of the connector,
compiled and tested against Spark version 3.5.0.
It is critical to select the connector version that corresponds to the Spark version in use.

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import xerial.sbt.Sonatype._
To run tests or publish with a specific spark version use this java option:
-Dspark.version=3.0.0
*/
val sparkVersion = sys.props.get("spark.version").getOrElse("3.5.0")
val sparkVersion = sys.props.get("spark.version").getOrElse("3.1.3")
val scalaVersionStr = "2.12.12"
val scalaVersionPrefix = scalaVersionStr.substring(0, 4)
val jacksonDatabindVersion = sparkVersion match {
Expand All @@ -30,7 +30,7 @@ lazy val root = project
case "3.4.2" => "scala-sparkv3.4"
case "3.5.0" => "scala-sparkv3.5"
}),
version := s"4.1.7-spark-${sparkVersion}",
version := s"4.1.8-spark-${sparkVersion}",
licenses += "Apache-2.0" -> url(
"http://opensource.org/licenses/Apache-2.0"
),
Expand Down
2 changes: 1 addition & 1 deletion demo/notebook/pyspark-singlestore-demo_2F8XQUKFG.zpln
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
{
"title": "Configure Spark",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"user": "anonymous",
"dateUpdated": "2022-07-06 11:26:15.232",
"progress": 0,
Expand Down
2 changes: 1 addition & 1 deletion demo/notebook/scala-singlestore-demo_2F6Y3APTX.zpln
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
{
"title": "Configure Spark",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"user": "anonymous",
"dateUpdated": "2022-07-06 11:31:08.311",
"progress": 0,
Expand Down
2 changes: 1 addition & 1 deletion demo/notebook/spark-sql-singlestore-demo_2F7PZ81H6.zpln
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
{
"title": "Configure Spark",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.7-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"text": "%spark.conf\n\n// Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths\nspark.jars.packages com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0\n\n// The hostname or IP address of the SingleStore Master Aggregator in the `host[:port]` format, where port is an optional parameter\n// singlestore-ciab-for-zeppelin - hostname of the docker created by https://hub.docker.com/r/singlestore/cluster-in-a-box\n// 3306 - port on which SingleStore Master Aggregator is started\nspark.datasource.singlestore.ddlEndpoint singlestore-ciab-for-zeppelin:3306\n\n// The hostname or IP address of SingleStore Aggregator nodes to run queries against in the `host[:port],host[:port],...` format, \n// where :port is an optional parameter (multiple hosts separated by comma) (default: ddlEndpoint)\n// Example\n// spark.datasource.singlestore.dmlEndpoints child-agg:3308,child-agg2\nspark.datasource.singlestore.dmlEndpoints singlestore-ciab-for-zeppelin:3306\n\n// SingleStore username (default: root)\nspark.datasource.singlestore.user root\n\n// SingleStore password (default: no password)\nspark.datasource.singlestore.password my_password",
"user": "anonymous",
"dateUpdated": "2022-07-06 11:32:22.885",
"progress": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
connectionsMap += (tableName -> conn)
)

log.info(s"Creating result table '$tableName'")
try {
// Create result table
JdbcHelpers.createResultTable(
Expand All @@ -94,9 +95,13 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
singleStoreRDDInfo.needsRepartition,
singleStoreRDDInfo.repartitionColumns
)
log.info(s"Successfully created result table '$tableName'")
} catch {
// Cancel execution if we failed to create a result table
case _: SQLException => singleStoreRDDInfo.sc.cancelStage(stageId)
case e: SQLException => {
singleStoreRDDInfo.sc.cancelStage(stageId)
throw e
}
}
})
}
Expand All @@ -117,7 +122,9 @@ class AggregatorParallelReadListener(applicationId: String) extends SparkListene
.get(tableName)
.foreach(conn => {
// Drop result table
log.info(s"Dropping result table '$tableName'")
JdbcHelpers.dropResultTable(conn, tableName)
log.info(s"Successfully dropped result table '$tableName'")
// Close connection
conn.close()
// Delete connection from map
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/com/singlestore/spark/SinglestoreRDD.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.singlestore.spark

import java.sql.{Connection, PreparedStatement, ResultSet, SQLTransientConnectionException}
import java.util.concurrent.Executors

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.concurrent.{Executors, ForkJoinPool}
import com.singlestore.spark.SQLGen.VariableList
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -54,7 +53,7 @@ case class SinglestoreRDD(query: String,
override def compute(rawPartition: Partition, context: TaskContext): Iterator[Row] = {
val multiPartition: SinglestoreMultiPartition =
rawPartition.asInstanceOf[SinglestoreMultiPartition]
val threadPool = Executors.newFixedThreadPool(multiPartition.partitions.size)
val threadPool = new ForkJoinPool(multiPartition.partitions.size)
try {
val executionContext =
ExecutionContext.fromExecutor(threadPool)
Expand Down Expand Up @@ -121,13 +120,16 @@ case class SinglestoreRDD(query: String,
}

var lastError: java.sql.SQLException = null
var delay = 50
val maxDelay = 10000
while (rs == null && (timeout == 0 || System.currentTimeMillis() - startTime < timeout)) {
try {
rs = stmt.executeQuery()
} catch {
case e: java.sql.SQLException if e.getErrorCode == ErrResultTableNotExistCode =>
lastError = e
Thread.sleep(10)
delay = Math.min(maxDelay, delay * 2)
Thread.sleep(delay)
}
}

Expand Down

0 comments on commit 91028bb

Please sign in to comment.