Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KERNEL] Compile kernel default tests on java 17 and setup cross compilation with spark 4 #4

Open
wants to merge 6 commits into
base: add_variant_type
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ val LATEST_RELEASED_SPARK_VERSION = "3.5.0"
val SPARK_MASTER_VERSION = "4.0.0-SNAPSHOT"
val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
kernelDefaults / sparkVersion := getSparkVersion()
goldenTables / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand Down Expand Up @@ -126,6 +128,25 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

/**
* Java-/Scala-/Uni-Doc settings aren't working yet against Spark Master.
1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
generating delta-spark unidoc compiles delta-iceberg
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
issue above.
*/
def crossSparkProjectSettings(): Seq[Setting[_]] = getSparkVersion() match {
case LATEST_RELEASED_SPARK_VERSION => Seq(
// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq()
}

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
Expand All @@ -138,13 +159,8 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
// For adding staged Spark RC versions, e.g.:
// resolvers += "Apache Spark 3.5.0 (RC1) Staging" at "https://repository.apache.org/content/repositories/orgapachespark-1444/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq(
Expand All @@ -153,6 +169,7 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
targetJvm := "17",
resolvers += "Spark master staging" at "https://repository.apache.org/content/groups/snapshots/",
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-master",
Test / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "test" / "scala-spark-master",
Antlr4 / antlr4Version := "4.13.1",
Test / javaOptions ++= Seq(
// Copied from SparkBuild.scala to support Java 17 for unit tests (see apache/spark#34153)
Expand All @@ -168,13 +185,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
// 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
// generating delta-spark unidoc compiles delta-iceberg
// 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
// issue above.
)
}

Expand All @@ -188,6 +198,7 @@ lazy val spark = (project in file("spark"))
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
crossSparkProjectSettings(),
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
Expand Down Expand Up @@ -355,6 +366,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
scalaStyleSettings,
javaOnlyReleaseSettings,
Test / javaOptions ++= Seq("-ea"),
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
Expand All @@ -371,10 +383,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",

"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down Expand Up @@ -1069,14 +1081,15 @@ lazy val goldenTables = (project in file("connectors/golden-tables"))
name := "golden-tables",
commonSettings,
skipReleaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
// Test Dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// corrupt incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 10, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
val deltas = deltaFileStatuses(10L to 13L)
testExpectedError[RuntimeException](
corruptedCheckpointStatuses ++ deltas,
Expand Down Expand Up @@ -666,7 +666,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// _last_checkpoint refers to incomplete multi-part checkpoint
val corruptedCheckpointStatuses = FileNames.checkpointFileWithParts(logPath, 20, 5).asScala
.map(p => FileStatus.of(p.toString, 10, 10))
.take(4)
.take(4).toSeq
testExpectedError[RuntimeException](
files = corruptedCheckpointStatuses ++ deltaFileStatuses(10L to 20L) ++
singularCheckpointFileStatuses(Seq(10L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,12 @@ class DeltaTableReadsSuite extends AnyFunSuite with TestUtils {
Seq(TestRow(2), TestRow(2), TestRow(2)),
TestRow("2", "2", TestRow(2, 2L)),
"2"
) :: Nil)
) :: Nil).toSeq

checkTable(
path = path,
expectedAnswer = expectedAnswer,
readCols = readCols
readCols = readCols.toSeq
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ trait FileReadMetrics { self: Object =>
}
}

def getVersionsRead: Seq[Long] = versionsRead
def getVersionsRead: Seq[Long] = versionsRead.toSeq

def resetMetrics(): Unit = {
versionsRead.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.defaults.utils

import scala.collection.JavaConverters._
import scala.collection.mutable.{Seq => MutableSeq}
import org.apache.spark.sql.{types => sparktypes}
import org.apache.spark.sql.{Row => SparkRow}
import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue, Row}
Expand Down Expand Up @@ -110,7 +111,7 @@ object TestRow {
case _: StructType => TestRow(row.getStruct(i))
case _ => throw new UnsupportedOperationException("unrecognized data type")
}
})
}.toSeq)
}

def apply(row: SparkRow): TestRow = {
Expand All @@ -133,7 +134,7 @@ object TestRow {
case _: sparktypes.BinaryType => obj.asInstanceOf[Array[Byte]]
case _: sparktypes.DecimalType => obj.asInstanceOf[java.math.BigDecimal]
case arrayType: sparktypes.ArrayType =>
obj.asInstanceOf[Seq[Any]]
obj.asInstanceOf[MutableSeq[Any]]
.map(decodeCellValue(arrayType.elementType, _))
case mapType: sparktypes.MapType => obj.asInstanceOf[Map[Any, Any]].map {
case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ trait TestUtils extends Assertions with SQLHelper {
while (iter.hasNext) {
result.append(iter.next())
}
result
result.toSeq
} finally {
iter.close()
}
Expand Down Expand Up @@ -153,7 +153,7 @@ trait TestUtils extends Assertions with SQLHelper {
// for all primitive types
Seq(new Column((basePath :+ field.getName).asJava.toArray(new Array[String](0))));
case _ => Seq.empty
}
}.toSeq
}

def collectScanFileRows(scan: Scan, tableClient: TableClient = defaultTableClient): Seq[Row] = {
Expand Down Expand Up @@ -231,7 +231,7 @@ trait TestUtils extends Assertions with SQLHelper {
}
}
}
result
result.toSeq
}

/**
Expand Down Expand Up @@ -626,7 +626,7 @@ trait TestUtils extends Assertions with SQLHelper {
toSparkType(field.getDataType),
field.isNullable
)
})
}.toSeq)
}
}

Expand Down
26 changes: 26 additions & 0 deletions spark/src/main/scala-spark-3.5/shims/VariantShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

object VariantShim {

/**
* Spark's variant type is implemented for Spark 4.0 and is not implemented in Spark 3.5. Thus,
* any Spark 3.5 DataType cannot be a variant type.
*/
def isTypeVariant(dt: DataType): Boolean = false
}
23 changes: 23 additions & 0 deletions spark/src/main/scala-spark-master/shims/VariantShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

object VariantShim {

/** Spark's variant type is only implemented in Spark 4.0 and above. */
def isTypeVariant(dt: DataType): Boolean = dt.isInstanceOf[VariantType]
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ object TableFeature {
// managed-commits are under development and only available in testing.
ManagedCommitTableFeature,
InCommitTimestampTableFeature,
TypeWideningTableFeature)
TypeWideningTableFeature,
VariantTypeTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -494,6 +495,14 @@ object IdentityColumnsTableFeature
}
}

object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-dev")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema)
}
}

object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,13 @@ def normalizeColumnNamesInDataType(
unsupportedDataTypes.toSeq
}

/**
* Find VariantType columns in the table schema.
*/
def checkForVariantTypeColumnsRecursively(schema: StructType): Boolean = {
SchemaUtils.typeExistsRecursively(schema)(VariantShim.isTypeVariant(_))
}

/**
* Find TimestampNTZ columns in the table schema.
*/
Expand Down Expand Up @@ -1302,6 +1309,7 @@ def normalizeColumnNamesInDataType(
case DateType =>
case TimestampType =>
case TimestampNTZType =>
case dt if VariantShim.isTypeVariant(dt) =>
case BinaryType =>
case _: DecimalType =>
case a: ArrayType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ private[delta] object PartitionUtils {

partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case a: AtomicType if !VariantShim.isTypeVariant(a) => // OK
case _ => throw DeltaErrors.cannotUseDataTypeForPartitionColumnError(field)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

trait DeltaVariantSparkOnlyTests { self: DeltaVariantSuite => }
Loading
Loading