Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HybridParquetScan: Refine filter push down to avoid double evaluation #12000

Open
wants to merge 17 commits into
base: branch-25.02
Choose a base branch
from

Conversation

thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Jan 22, 2025

Closes #11892

In the current code, a HybridParquetScan followed by a Filter will result in all conditions being pushed down to the CPU, but still remaining in the Filter at the same time, so the Filter conditions are evaluated twice. Usually the second evaluation is quite fast, so this won't be a big problem. But if there are some conditions that are not supported by CPU or GPU, it will cause some problems.

This PR adds a rule to check each condition in filterExec before overriding:

  • If a filter condition is not supported by either CPU or GPU, it will fallback to CPU in FilterExec and not push down to CPU.
  • If a filter condition is only supported by the CPU, this pr pushes it down to the scan and removes it in FilterExec.
  • If a filter condition is only supported by the GPU, this pr keeps it in the filter.
  • If all conditions are pushed down to the scan, FilterExec is removed.

The supportedByHybridFilters is from velox-backend-support-progress in Gluten. Here is the script to extract CPU supported exprs, gist.

For example:

scala> val df = spark.read.parquet("parse_url_protocol")
df: org.apache.spark.sql.DataFrame = [url: string, pr: string ... 2 more fields]


scala> df.filter("startswith(pr, 'h') == False and ascii(url) >= 16").show()
25/01/23 16:32:31 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(count#2L as string) AS count#32 will run on GPU
      *Expression <Cast> cast(count#2L as string) will run on GPU
    *Exec <FilterExec> will run on GPU
      *Expression <Not> NOT StartsWith(pr#1, h) will run on GPU
        *Expression <StartsWith> StartsWith(pr#1, h) will run on GPU
      *Exec <FileSourceScanExec> will run on GPU

startswith is not supported in CPU so it will be kept in GPU, and ascii pushed down to CPU. The check is recursive.
Screenshot 2025-01-23 at 16 38 42

and

scala> df.filter("url >= 'http' and ascii(url) >= 16").show()
25/01/23 16:37:45 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> cast(count#2L as string) AS count#66 will run on GPU
      *Expression <Cast> cast(count#2L as string) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

if all filters are supported, the FilterExec will be removed.
Screenshot 2025-01-23 at 16 38 29

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven self-assigned this Jan 22, 2025
@thirtiseven thirtiseven marked this pull request as ready for review January 22, 2025 16:56
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
}

val supportedByHybridFilters = {
// Only fully supported functions are listed here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a link to the supporting list in the comments.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

def recursivelySupportsHybridFilters(condition: Expression): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isExprSupportedByHybridScan better name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

def recursivelySupportsHybridFilters(condition: Expression): Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a way to register UDF in Gluten as well (link). Hmm, probably we can have a whitelist configuration allowing pre-registered function into the pushed-down filters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a whitelist config as the gluten doc seems to be out of date for some expressions. Maybe this will allow UDF to be pushed down to the CPU, but I haven't tested it. Will file a follow up for this.

sperlingxx
sperlingxx previously approved these changes Jan 23, 2025
Copy link
Collaborator

@sperlingxx sperlingxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. But I have not worked on the GpuOverrides for a long time......

@sperlingxx
Copy link
Collaborator

build

Signed-off-by: Haoyang Li <[email protected]>
with_cpu_session(
lambda spark: gen_df(spark, [('a', StringGen(pattern='[0-9]{1,5}'))]).write.parquet(data_path),
conf=rebase_write_corrected_conf)
# filter conditions should remain on the GPU
Copy link
Collaborator

@res-life res-life Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to verify the executed plan?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that the startsWith is not supported in CPU but actually it does but not in their doc. If we pushed a unsupported operator to CPU the test should failed. I will try to find an expr that hybrid is not supported in this test.

Do you think it is necessary to write some UT to verify the executed plan?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be straightforward if checking execution plan using UT.
IMO, we may use UT instead of IT. @sperlingxx what do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this one to a pandas_udf for now. I think some IT are still necessary, will try to write some UT later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the integration tests to verify the executed plan, PTAL

lambda spark: gen_df(spark, [('a', StringGen(pattern='[0-9]{1,5}'))]).write.parquet(data_path),
conf=rebase_write_corrected_conf)
# filter conditions should be pushed down to the CPU, so the ascii will not fall back to CPU in the FilterExec
assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

@res-life res-life Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to verify the executed plan?

Copy link
Collaborator Author

@thirtiseven thirtiseven Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ascii is not supported, so if it doesn't push down the test will fail.


with_cpu_session(lambda spark: spark.udf.register("udf_fallback", udf_fallback))

assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to verify the executed plan?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an UDF and not supported in CPU so if it pushed down the test should failed.

* support it. After that we can remove the condition from one side to avoid duplicate execution
* or unnecessary fallback/crash.
*/
def applyHybridScanRules(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to tryToApplyHybridScanRules?
And at the first line of this function, check if Hybrid feature is enabled to avoid executing the following code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def canBePushedToHybrid(child: SparkPlan, conf: RapidsConf): String = {
child match {
case fsse: FileSourceScanExec if HybridFileSourceScanExecMeta.useHybridScan(conf, fsse) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to move HybridFileSourceScanExecMeta.useHybridScan to outer function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the function useHybridScan to HybridExecutionUtils?
And maybe all other functions in object HybridFileSourceScanExecMeta can be moved there too, what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the function useHybridScan to HybridExecutionUtils?
Yes, good idea.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Haoyang Li <[email protected]>
@res-life
Copy link
Collaborator

Did a NDS test, total time improved from 709s to 704s.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

1 similar comment
@thirtiseven
Copy link
Collaborator Author

build

sperlingxx
sperlingxx previously approved these changes Jan 27, 2025
Copy link
Collaborator

@sperlingxx sperlingxx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Good job!

res-life
res-life previously approved these changes Jan 27, 2025
Copy link
Collaborator

@res-life res-life left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven dismissed stale reviews from res-life and sperlingxx via 0c24b7a January 27, 2025 03:59
@thirtiseven
Copy link
Collaborator Author

build

@thirtiseven
Copy link
Collaborator Author

Addressed some comments from @res-life in offline sync, pls take another look

res-life
res-life previously approved these changes Jan 27, 2025
@thirtiseven
Copy link
Collaborator Author

build

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] [FOLLOW-UP] [Hybrid/C2C] Validate predicate push down and filtering
4 participants