Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Spark UT framework: select one deep nested complex field after join, IOException parsing parquet #11628

Open
Tracked by #11405
Feng-Jiang28 opened this issue Oct 18, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@Feng-Jiang28
Copy link
Collaborator

Feng-Jiang28 commented Oct 18, 2024

contacts parquet is defined as following and has saved here: contacts.zip

Reproduce:

val dataSourceName = "parquet" 
val path = "/home/fejiang/Desktop"
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
  "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
  "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>,STRING>,`p` INT")
spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
     .createOrReplaceTempView("departments")  
val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query.show()

CPU:

scala> val dataSourceName = "parquet" 
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> 

scala> val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
departmentSchema: String = `depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>

scala> spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
res15: org.apache.spark.sql.DataFrame = [depId: int, depName: string ... 2 more fields]

scala>      .createOrReplaceTempView("departments")  

scala> val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
+------+                                                                        
|middle|
+------+
|    X.|
|    Y.|
+------+

GPU:

scala> val dataSourceName = "parquet" 
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> 

scala> val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
departmentSchema: String = `depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>

scala> spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
res2: org.apache.spark.sql.DataFrame = [depId: int, depName: string ... 2 more fields]

scala>      .createOrReplaceTempView("departments")  

scala> val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
*Exec <BroadcastExchangeExec> will run on GPU
  *Exec <FilterExec> will run on GPU
    *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:48 WARN GpuOverrides:                                            
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:48 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:49 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 4)    
java.io.IOException: Error when processing path: file:///home/fejiang/Desktop/contacts/p=2/part-00000-000fbc57-9d4a-4d07-a5fe-1c8c0815d1f8-c000.snappy.parquet, range: 0-991, partition values: [empty row]
	at com.nvidia.spark.rapids.ParquetTableReader.$anonfun$next$1(GpuParquetScan.scala:2709)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2696)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2668)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.$anonfun$apply$1(GpuDataProducer.scala:159)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.apply(GpuDataProducer.scala:156)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$4(GpuMultiFileReader.scala:1066)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapidsRetryIterator.scala:477)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:132)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$1(GpuMultiFileReader.scala:1059)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readBatch(GpuMultiFileReader.scala:1032)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.next(GpuMultiFileReader.scala:1012)

@Feng-Jiang28 Feng-Jiang28 changed the title * select one deep nested complex field after join (4 test cases) [BUG] Spark UT framework: select one deep nested complex field after join, IOException parsing parquet Oct 18, 2024
@Feng-Jiang28 Feng-Jiang28 added bug Something isn't working ? - Needs Triage Need team to review and classify labels Oct 18, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants