Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Update compactor to flush total records upon compaction #3483

Merged

Conversation

drewkim
Copy link
Contributor

@drewkim drewkim commented Jan 14, 2025

Description of changes

Updates the compactor to calculate the total records per collection and flush to the sysdb upon every compaction.

Summarize the changes made by this PR.

  • New functionality
    • FlushCollectionCompaction struct includes TotalRecordsPostCompaction
    • SysDB populates the total_records_post_compaction column when receiving a flush
    • ArrowBlockfileFlusher contains a new attribute, total_keys
    • ArrowUnorderedBlockfileWriter sums the total keys using the SparseIndexWriter and returns an ArrowBlockfileFlusher with the summed count
    • RegisterInput contains a new attribute, total_records_post_compaction
    • If CompactOrchestrator, when handling CommitSegmentWriterOutput, receives a ChromaSegmentFlusher::RecordSegment, it reads total_keys() and sets it as an attribute on itself.
      • ChromaSegmentFlusher::RecordSegment has total_keys() through its ArrowBlockfileFlusher
    • CompactOrchestrator sends its num_records_last_compaction value to a RegisterInput to be flushed to the SysDB

Test plan

How are these changes tested?

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust
  • Tested locally and confirmed that compaction correctly updates the column in the SysDB

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs repository?

Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Contributor Author

drewkim commented Jan 14, 2025

@drewkim drewkim force-pushed the drew/_enh_update_compactor_to_flush_total_records_upon_compaction branch from b0e05a7 to 6b32b1b Compare January 14, 2025 02:11
@drewkim drewkim requested a review from HammadB January 14, 2025 21:00
@drewkim drewkim marked this pull request as ready for review January 14, 2025 21:57
@@ -133,12 +133,23 @@ impl ArrowUnorderedBlockfileWriter {
Box::new(ArrowBlockfileError::MigrationError(e)) as Box<dyn ChromaError>
})?;

let total_keys = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already loop over the blocks above, do we need a second iteration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the suggestion to calculate the total before the loop and then iteratively add to the total for each delta?

@@ -733,6 +744,7 @@ mod tests {
writer.set(prefix_2, key2, value2).await.unwrap();

let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
assert_eq!(Some(2_u64), flusher.total_keys());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more testing around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, wanted to make sure I wasn't missing the mark directionally before going deeper. Will write next

LogPosition int64
CurrentCollectionVersion int32
FlushSegmentCompactions []*FlushSegmentCompaction
TotalRecordsPostCompaction uint64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will you have to backfill, or is the frontend expected to handle null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to backfill as the field defaults to 0 and over time will become correct as collections are updated. The potential overage is bounded by compaction frequency. Happy to discuss here

@sanketkedia
Copy link
Contributor

Only one main comment: update the record count together with the log offset and collection version in the same query instead of doing two queries to DB

@drewkim drewkim force-pushed the drew/_enh_update_compactor_to_flush_total_records_upon_compaction branch from b794d14 to 25c1fb0 Compare January 16, 2025 18:35
@drewkim drewkim changed the base branch from drew/_enh_add_num_records_last_compaction_to_sysdb to main January 16, 2025 18:35
@drewkim drewkim merged commit 537f29a into main Jan 16, 2025
79 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants