Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partitioning spec during data file rewrites in Spark. #11368

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rdsarvar
Copy link

@rdsarvar rdsarvar commented Oct 21, 2024

Description

Currently, data file rewrites supports specifying the output spec ID to be used. Added functionality to provide a partition spec itself and have it added as a non-default spec if it does not already exist on the table.

Edit: I've added a Github issue to track this potential improvement here: #11459

Benefits

These changes would make it simpler to tier partition granularity by time ranges. As an example: Say your table is heavily used but mostly targets most recent data and you still want to provide the ability for folks to query back in time. You could achieve additional performance improvements by applying more granular partitions in the base table and then have a compaction job that runs by tiers:

  1. Short term compaction (reuses the table definition - high granularity, get rid of as many small files as you can)
  2. Long term compaction (specified partition spec that is not the default - lower granularity, will cut down the metadata stored for the table)

Notes for Reviewers

Note: This is definitely not complete and I am open to all feedback. Whether some functionalities already exist outside OR if it should be done differently.

The part I'm mostly iffy on is modifying BaseUpdatePartitionSpec.java with table.updateSpec() instead of having something like table.addSpec(partitionSpec). addNonDefaultSpec().commit()

Currently, data file rewrites supports specifying the output spec ID to
be used. Added functionality to provide a partition spec itself and have
it added as a non-default spec if it does not already exist on the
table.
Integer partitionSpecId =
checkAndPreparePartitionSpec(
table, partitionedByString, createPartitionIfNotExists, options);
options.put(OUTPUT_SPEC_ID, partitionSpecId.toString());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I guess I could set this inside of the checkAndPreparePartitionSpec method

* @param newSpec partition spec to override the builder use during commit
* @return this for method chaining names.
*/
default UpdatePartitionSpec useSpec(PartitionSpec newSpec) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels... hacky. Question for folks reading this would be if it's overkill to support a AddPartitionSpec operation instead of relying on the UpdatePartitionSpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this feels hacky. I'm also not convinced it goes through the right validations. We probably want to walk diff the specs and make the necessary updates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to walk diff the specs and make the necessary updates.

Are you thinking something simple like:

  • Iterate through existing partition spec and removeField all fields
  • Iterate through new spec and addField all fields

Or were you thinking something like:

  • Run 'old DIFF new' to find the fields to removeField against
  • Iterate through the 'new' spec and check if the current spec has the field, if not then add it

Though this approach wouldn't guarantee the partition ordering of terms, right? If I was the end user I'd expect the spec being added matches the ordering I provided exactly.

^ This last statement is mostly why I was thinking having a 'replace' functionality would make a bit more sense than an 'update' but I don't think I'm ramped up enough yet on the repo and historical decisions 😄

What are your thoughts on it?

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rdsarvar , the part I'm a bit confused about is why we need a new useSpec API. I think the use case you described could be solved by adding a new spec, without setting it as the default (which we recently added support for). Then the compaction could be performed using the spec ID of that added spec.

@rdsarvar
Copy link
Author

rdsarvar commented Nov 5, 2024

Thanks @rdsarvar , the part I'm a bit confused about is why we need a new useSpec API. I think the use case you described could be solved by adding a new spec, without setting it as the default (which we recently added support for). Then the compaction could be performed using the spec ID of that added spec.

I was looking at the public APIs and there doesn't seem to be an available method that would allow me to parse and directly add a new partition spec. Am I missing an API that would simplify this? If I were to use the updateSpec functionality and set addNonDefaultSpec then I'd need to iteratively removeField + addField over the partition spec provided in the action.

My thinking was that as part of transactions + the provided methods in the Table interface we'd still need to provide a form of addSpec functionality as there's only updateSpec right now.

Let me know if that makes sense or if I'm missing something 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants