Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark multiversion support #1325

Open
wants to merge 85 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
e93e1e7
First cut for multiple Spark version support from same code base
Oct 22, 2018
fb6de86
Merge remote-tracking branch 'origin/master' into spark-multiversion-…
Nov 6, 2018
081f8b5
more fixes and additions
Nov 6, 2018
604ae5a
Merge remote-tracking branch 'origin/master' into spark-multiversion-…
Feb 20, 2019
d8172d4
more fixes for Spark 2.3.x support
Feb 20, 2019
7dba6ad
many other fixes for Spark 2.3.x support
Feb 22, 2019
81a3492
more additions for multiple version support
Apr 1, 2019
2f31c88
fixes
Apr 1, 2019
dbe390e
more compatibility fixes
Apr 2, 2019
d425f8f
more fixes
Apr 2, 2019
69e8969
more fixes
Apr 2, 2019
3514705
fixing remaining failures
Apr 3, 2019
4a21c0d
splitting build for product vs connector
Apr 5, 2019
55440b3
update submodule links
May 5, 2019
64b1bd5
Merge remote-tracking branch 'origin/master' into spark-multiversion-…
Jun 7, 2019
4b06b66
fixing compilation issues after merge
Jun 7, 2019
b0da14f
minor changes to names
Jun 10, 2019
803a902
some build changes
Jun 24, 2019
d3c5fa7
more build cleanups
Jun 25, 2019
2a5082e
moved compatibility modules inside core
Jun 25, 2019
c193abd
more build cleanups and fixes
Jun 25, 2019
5080c86
build fixes
Jun 25, 2019
c71fb90
more fixes
Jun 27, 2019
8d6d58c
Merge remote-tracking branch 'origin/master' into spark-multiversion-…
sumwale Sep 22, 2019
6a0b22e
Merge remote-tracking branch 'origin/master' into spark-multiversion-…
sumwale Sep 22, 2019
fc6d95e
update gradle-scalatest and fixes for failures
sumwale Sep 23, 2019
aaad0bc
more fixes
sumwale Sep 23, 2019
966821e
more fixes
sumwale Sep 29, 2019
3ac0c9d
fixes
sumwale Sep 30, 2019
7e77344
fixes
sumwale Oct 4, 2019
54ff98a
fix few AQP failures in precheckin
sumwale Oct 13, 2019
c20a3be
correct jar names to new ones in SplitClusterDUnitTest
sumwale Oct 13, 2019
a725a23
fix JDBC pushdown driver for Spark 2.4
sumwale Oct 13, 2019
1143e9f
fix remaining failure in AQP
sumwale Oct 13, 2019
0e299b7
fixes for Spark 2.3/2.4 support
sumwale Oct 15, 2019
96eb5d8
more fixes and update to scalatest 3
sumwale Nov 18, 2019
25af34c
test build fixes
sumwale Nov 18, 2019
e948fe6
update gradle to 5.6.4
sumwale Dec 26, 2019
ac1e782
support for Spark 2.4.4
sumwale Jan 30, 2020
0928913
more fixes for Spark 2.4.4
sumwale Feb 3, 2020
fe78f8b
refactored for multiple 2.4.x versions
sumwale Feb 4, 2020
9e9f2ea
fix kafka incompatibility in tests with new kafka 2.x
sumwale Feb 4, 2020
8e97d33
update snappy-spark to snappy/branch-2.4 (v2.4.5)
sumwale Feb 12, 2020
6f811fa
update store link
sumwale Feb 12, 2020
9c579a1
support for latest 2.1 Spark (2.1.3) and 2.3 Spark (2.3.4)
sumwale Feb 12, 2020
0f707e8
fixing build and runtime issues in tests
sumwale Feb 17, 2020
a4b7fae
update spark
sumwale Feb 17, 2020
c3a07f0
code reorganization
sumwale Feb 17, 2020
3c5902a
more reorganization
sumwale Feb 17, 2020
f19e845
fixes and updated modules
sumwale Feb 18, 2020
4d09cc1
build fixes
sumwale Feb 18, 2020
bc4ba1d
update spark module
sumwale Feb 18, 2020
39a0b82
more fixes and reorganization
sumwale Feb 19, 2020
4f0ab26
fix code generation issues
sumwale Feb 19, 2020
bf6dce1
fixing insert failures due to InsertIntoTable -> InsertIntoDataSource…
sumwale Feb 19, 2020
da5ab2b
codegeneration fixes
sumwale Feb 20, 2020
356ad75
couple more fixes
sumwale Feb 20, 2020
ba672aa
fix CTAS behaviour
sumwale Feb 20, 2020
35b7e2c
more generated code and other fixes
sumwale Feb 21, 2020
add532b
fixing compatibility with Spark 2.3.x
sumwale Feb 21, 2020
db0421f
fix duplicate SQLTabs and other test/code fixes
sumwale Feb 22, 2020
447b2e9
more fixes and add Spark layer support for INSERT/PUT INTO TABLE(colu…
sumwale Feb 22, 2020
f6884de
more fixes for semantic changes in 2.4.x
sumwale Feb 24, 2020
215a27b
fixing serialization error in toJSON in recent Spark releases
sumwale Feb 24, 2020
61594ff
more fixes
sumwale Feb 25, 2020
1afbce0
minor fixes
sumwale Feb 26, 2020
0e54726
few more fixes
sumwale Feb 27, 2020
4e0376d
fix premature release and acquire of region lock
sumwale Feb 27, 2020
cd4fc19
change ctx.addNewFunction to internals.addFunction throughout
sumwale Mar 2, 2020
d8d9613
fix dbtable in JDBCOptions
sumwale Mar 6, 2020
7fc85af
update spark-jobserver
sumwale Mar 6, 2020
b5b3791
fixing initialization issues in smart connector
sumwale Mar 6, 2020
d3a0bb2
update store module
sumwale Mar 6, 2020
2094103
fix spark-unsafe deps
sumwale Mar 6, 2020
db93deb
changes to build.gradle deps
sumwale Mar 6, 2020
7086e9e
fixes for regression issues and others
sumwale Mar 15, 2020
71fad29
fixes for multiple issues
sumwale Mar 17, 2020
b722db2
more fixes; make RETURNS optional in CREATE FUNCTION
sumwale Mar 17, 2020
70eda68
more test and GUI plan fixes
sumwale Mar 18, 2020
36bab5a
fix VIEW test failure (due to auto-generated alias name mismatch) and…
sumwale Mar 20, 2020
f665b0b
fixing few more remaining failures and update spark module link
sumwale Mar 23, 2020
bc9c2c9
fixing remaining failures
sumwale Mar 27, 2020
d83bf9d
fixing test
sumwale Mar 27, 2020
d771acb
correct uncaught handler setting in Executor with for Spark 2.4
sumwale Mar 27, 2020
1b2dbb1
skip dynamic cpusPerTask setting with smart connector
sumwale Mar 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
214 changes: 164 additions & 50 deletions build.gradle

Large diffs are not rendered by default.

55 changes: 29 additions & 26 deletions cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,26 @@ dependencies {
compile project(':snappy-spark:snappy-spark-streaming_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-avro_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-mllib_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-yarn_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-graphx_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-hive-thriftserver_' + scalaBinaryVersion)
if (rootProject.hasProperty('mesos')) {
compile project(':snappy-spark:snappy-spark-mesos_' + scalaBinaryVersion)
}
if (rootProject.hasProperty('k8s')) {
compile project(':snappy-spark:snappy-spark-kubernetes_' + scalaBinaryVersion)
}
if (rootProject.hasProperty('flume')) {
compile project(':snappy-spark:snappy-spark-streaming-flume_' + scalaBinaryVersion)
compile project(':snappy-spark:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion)
}

testCompile project(path: ':snappy-spark:snappy-spark-sql_' + scalaBinaryVersion,
configuration: 'testOutput')
testCompile project(path: ':snappy-spark:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion,
configuration: 'testOutput')
} else {
compile 'io.snappydata:snappy-spark-unsafe_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-core_' + scalaBinaryVersion + ':' + snappySparkVersion
Expand All @@ -64,19 +74,29 @@ dependencies {
compile 'io.snappydata:snappy-spark-streaming_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-avro_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-mllib_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-yarn_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-graphx_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-hive-thriftserver_' + scalaBinaryVersion + ':' + snappySparkVersion
if (rootProject.hasProperty('mesos')) {
compile 'io.snappydata:snappy-spark-mesos_' + scalaBinaryVersion + ':' + snappySparkVersion
}
if (rootProject.hasProperty('k8s')) {
compile 'io.snappydata:snappy-spark-kubernetes_' + scalaBinaryVersion + ':' + snappySparkVersion
}
if (rootProject.hasProperty('flume')) {
compile 'io.snappydata:snappy-spark-streaming-flume_' + scalaBinaryVersion + ':' + snappySparkVersion
compile 'io.snappydata:snappy-spark-streaming-flume-sink_' + scalaBinaryVersion + ':' + snappySparkVersion
}

testCompile group: 'io.snappydata', name: 'snappy-spark-sql_' + scalaBinaryVersion,
version: snappySparkVersion, classifier: 'tests'
testCompile group: 'io.snappydata', name: 'snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion,
version: snappySparkVersion, classifier: 'tests'
}

compile (project(':snappy-core_' + scalaBinaryVersion)) {
compile (project(coreProjectName)) {
exclude(group: 'org.apache.spark', module: 'spark-unsafe_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-core_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-catalyst_' + scalaBinaryVersion)
Expand All @@ -85,10 +105,13 @@ dependencies {
exclude(group: 'org.apache.spark', module: 'spark-streaming_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-streaming-kafka-0-10_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-sql-kafka-0-10_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-avro_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-mllib_' + scalaBinaryVersion)
exclude(group: 'org.eclipse.jetty', module: 'jetty-servlet')
}
testCompile (project(path: ':snappy-core_' + scalaBinaryVersion, configuration: 'testOutput')) {
compile project(compatProjectName)

testCompile (project(path: coreProjectName, configuration: 'testOutput')) {
exclude(group: 'org.apache.spark', module: 'spark-unsafe_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-core_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-catalyst_' + scalaBinaryVersion)
Expand All @@ -97,6 +120,7 @@ dependencies {
exclude(group: 'org.apache.spark', module: 'spark-streaming_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-streaming-kafka-0-10_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-sql-kafka-0-10_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-avro_' + scalaBinaryVersion)
exclude(group: 'org.apache.spark', module: 'spark-mllib_' + scalaBinaryVersion)
exclude(group: 'org.eclipse.jetty', module: 'jetty-servlet')
}
Expand Down Expand Up @@ -138,36 +162,15 @@ dependencies {
testCompile "org.scalatest:scalatest_${scalaBinaryVersion}:${scalatestVersion}"

if (new File(rootDir, 'aqp/build.gradle').exists() && rootProject.hasProperty('snappydata.enterprise')) {
testRuntime project(':snappy-aqp_' + scalaBinaryVersion)
testRuntime project(aqpProjectName)
}
testRuntime files("${projectDir}/../tests/common/src/main/resources")
testRuntime "org.pegdown:pegdown:${pegdownVersion}"
}

// Creates the version properties file and writes it to the resources dir
task createVersionPropertiesFile(dependsOn: 'processResources') {
def propertiesDir = file("${sourceSets.main.scala.outputDir}/io/snappydata")
outputs.file "${propertiesDir}/SnappyDataVersion.properties"
inputs.file "${rootProject.projectDir}/build.gradle"

doLast {

def props = [
'Product-Name' : productName,
'Product-Version' : version,
'Build-Id' : buildIdPrefix + buildNumber,
'Build-Date' : buildDate,
'Build-Platform' : osName.getName() + osVersion + osArch,
'Build-Java-Version': jdkVersion,
'Source-Date' : sourceDate,
'Source-Revision' : commitId,
'Source-Repository' : gitBranch,
]

writeProperties(propertiesDir, 'SnappyDataVersion.properties',
"Properties that control what version ${productName} will think it is. Changing these values may cause ${productName} to no longer function.", props)
}
}
createVersionPropertiesTask(project, 'SnappyDataVersion.properties', productName,
sourceDate, commitId, gitBranch)

compileJava.dependsOn createVersionPropertiesFile

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.{Failure, Success, Try}

import com.gemstone.gemfire.internal.cache.PartitionedRegion
import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.core.{TestData, TestData2}
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableRunnable}
import io.snappydata.util.TestUtils
import io.snappydata.{ColumnUpdateDeleteTests, ConcurrentOpsTests, Property, SnappyTableStatsProviderService}
import org.junit.Assert

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.execution.CatalogStaleException
import org.apache.spark.sql.execution.columnar.impl.ColumnFormatRelation
import org.apache.spark.sql.kafka010.KafkaTestUtils
import org.apache.spark.sql.store.{SnappyJoinSuite, StoreUtils}
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.udf.UserDefinedFunctionsDUnitTest
import org.apache.spark.{Logging, SparkConf, SparkContext}
Expand All @@ -49,8 +51,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Basic tests for non-embedded mode connections to an embedded cluster.
*/
class SplitSnappyClusterDUnitTest(s: String)
extends ClusterManagerTestBase(s) with SplitClusterDUnitTestBase
with Serializable {
extends ClusterManagerTestBase(s) with SplitClusterDUnitTestBase with Serializable {

override val locatorNetPort: Int = testObject.locatorNetPort

Expand Down Expand Up @@ -255,7 +256,7 @@ class SplitSnappyClusterDUnitTest(s: String)
if (jars.count() > 0) {
var str = msg
jars.collect().foreach(x => str += s"$x,")
assert(false, str)
assert(assertion = false, str)
}
}

Expand Down Expand Up @@ -291,13 +292,16 @@ class SplitSnappyClusterDUnitTest(s: String)
sns.sql("deploy package testsch.mongo-spark_v1.2 'org.mongodb.spark:mongo-spark" +
"-connector_2.11:2.2.2'")
sns.sql("undeploy testsch.mongo-spark_v1.2")
sns.sql(s"""deploy package "testsch"."mongo-spark_v1.3" 'org.mongodb.spark:mongo""" +
sns.sql(
s"""deploy package "testsch"."mongo-spark_v1.3" 'org.mongodb.spark:mongo""" +
"-spark-connector_2.11:2.2.2'")
sns.sql(s"""undeploy "testsch"."mongo-spark_v1.3" """)
sns.sql(s"""deploy package testsch."mongo-spark_v1.4" 'org.mongodb.spark:mongo""" +
sns.sql(
s"""deploy package testsch."mongo-spark_v1.4" 'org.mongodb.spark:mongo""" +
"-spark-connector_2.11:2.2.2'")
sns.sql(s"""undeploy testsch."mongo-spark_v1.4" """)
sns.sql(s"""deploy package "testsch".mongo-spark_v1.5 'org.mongodb.spark:mongo""" +
sns.sql(
s"""deploy package "testsch".mongo-spark_v1.5 'org.mongodb.spark:mongo""" +
"-spark-connector_2.11:2.2.2'")
sns.sql(s"""undeploy "testsch".mongo-spark_v1.5 """)
assert(sns.sql("list packages").count() == 0)
Expand Down Expand Up @@ -346,7 +350,7 @@ class SplitSnappyClusterDUnitTest(s: String)
"Deploy command should have failed because of the duplicate alias.")
case Failure(error) =>
assert(error.getMessage.contains("Name 'akka-v1' specified in" +
" context 'of deploying jars/packages' is not unique."))
" context 'of deploying jars/packages' is not unique."))
}
sns.sql("undeploy akka-v1")
functionCheck(sns, "Some jars/packages are not cleaned up! ")
Expand Down Expand Up @@ -927,7 +931,7 @@ object SplitSnappyClusterDUnitTest
.set("snappydata.connection", connectionURL)
.set("snapptdata.sql.planCaching", random.nextBoolean().toString)
.set(Property.TestDisableCodeGenFlag.name, "false")
logInfo("Spark conf:" + conf.getAll.toString)
logInfo("Spark conf: " + conf.getAll.mkString(", "))

val sc = SparkContext.getOrCreate(conf)
// sc.setLogLevel("DEBUG")
Expand Down Expand Up @@ -1411,7 +1415,7 @@ object SplitSnappyClusterDUnitTest
.writeStream
.format("snappysink")
.queryName(tableName)
.trigger(ProcessingTime("1 seconds"))
.trigger(Trigger.ProcessingTime("1 seconds"))
.option("tableName", tableName)
.option("checkpointLocation", s"$testTempDir/checkpoint")
.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,15 +556,15 @@ class ColumnTableDUnitTest(s: String) extends ClusterManagerTestBase(s) {
"BUCKETS '1'," +
"REDUNDANCY '2')")

snc.sql("insert into COLUMNTABLE4 VALUES(1,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(2,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(3,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(1,11,null,null)")
snc.sql("insert into COLUMNTABLE4 VALUES(2,11,null,null)")
snc.sql("insert into COLUMNTABLE4 VALUES(3,11,null,null)")

snc.sql("insert into COLUMNTABLE4 VALUES(4,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(5,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(6,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(4,11,null,null)")
snc.sql("insert into COLUMNTABLE4 VALUES(5,11,null,null)")
snc.sql("insert into COLUMNTABLE4 VALUES(6,11,null,null)")

snc.sql("insert into COLUMNTABLE4 VALUES(7,11)")
snc.sql("insert into COLUMNTABLE4 VALUES(7,11,null,null)")

var data =
Seq(Seq(1, 2, 3, 4), Seq(7, 8, 9, 10), Seq(9, 2, 3, 4), Seq(4, 2, 5, 7), Seq(5, 6, 2, 3))
Expand Down
Loading