Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-34212 Parquet should read decimals correctly #11433

Open
Tracked by #11403
Feng-Jiang28 opened this issue Sep 6, 2024 · 0 comments · May be fixed by #11498
Open
Tracked by #11403

SPARK-34212 Parquet should read decimals correctly #11433

Feng-Jiang28 opened this issue Sep 6, 2024 · 0 comments · May be fixed by #11498
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@Feng-Jiang28
Copy link
Collaborator

Feng-Jiang28 commented Sep 6, 2024

Exception: a mismatch between the schema of the data stored in the Parquet file and the schema you're trying to use when reading it.

Reproduce:

Start plugin:

$SPARK_HOME/bin/spark-shell --master local[*] --jars ${SPARK_RAPIDS_PLUGIN_JAR} --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.rapids.sql.enabled=true

CPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+


scala> df.write.parquet("/home/fejiang/Documents/temp")

scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
+----+-----+-----+
|   a|    b|    c|
+----+-----+-----+
|1.00|1.230|1.230|
+----+-----+-----+


GPU:

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

scala> val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c")
df: org.apache.spark.sql.DataFrame = [a: decimal(2,1), b: decimal(17,2) ... 1 more field]

scala> df.show()
24/09/06 15:30:30 WARN GpuOverrides: 
  ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec

+---+----+----+
|  a|   b|   c|
+---+----+----+
|1.0|1.23|1.23|
+---+----+----+


scala> df.write.parquet("/home/fejiang/Documents/temp")
24/09/06 15:30:31 WARN GpuOverrides: 
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec


scala> val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
schema1: String = a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)

scala> val df2 = spark.read.schema(schema1).parquet("/home/fejiang/Documents/temp")
df2: org.apache.spark.sql.DataFrame = [a: decimal(3,2), b: decimal(18,3) ... 1 more field]

scala> df2.show()
24/09/06 15:30:32 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU

24/09/06 15:30:32 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file file:///home/fejiang/Documents/temp/part-00000-9f83dfb2-74c0-4b2a-9bd4-5faadaeb893d-c000.snappy.parquet. Column: a, Expected: decimal(3,2), Found: required int32 a (DECIMAL(2,1))
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.throwTypeIncompatibleError(GpuParquetScan.scala:1025)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12(GpuParquetScan.scala:757)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$12$adapted(GpuParquetScan.scala:757)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$6(GpuParquetScan.scala:878)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkPrimitiveCompat(GpuParquetScan.scala:1009)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.checkSchemaCompat(GpuParquetScan.scala:878)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3(GpuParquetScan.scala:830)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$3$adapted(GpuParquetScan.scala:821)
	at scala.Option.foreach(Option.scala:407)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2(GpuParquetScan.scala:821)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$checkSchemaCompat$2$adapted(GpuParquetScan.scala:820)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
@Feng-Jiang28 Feng-Jiang28 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Sep 6, 2024
@mattahrens mattahrens added good first issue Good for newcomers and removed ? - Needs Triage Need team to review and classify labels Sep 10, 2024
@mattahrens mattahrens linked a pull request Sep 25, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants