Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse: Normalize one batch at a time #2219

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Amogh-Bharadwaj
Copy link
Contributor

This helps with:

  • Memory utilisation on ClickHouse side
  • Observability
  • Debuggability

rawTbl := c.getRawTableName(flowJobName)

q := fmt.Sprintf(
`SELECT DISTINCT _peerdb_destination_table_name FROM %s WHERE _peerdb_batch_id = %d`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to craft a query to aggregate and fetch distinct tables for all batches at once

Copy link
Contributor Author

@Amogh-Bharadwaj Amogh-Bharadwaj Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the current behaviour and this PR changes it to get it for each batch
Do you want this function to be called higher up for all batches and pass a map of batchId:[table1,table2..] to syncTablesInThisBatch()

Copy link
Contributor

@heavycrystal heavycrystal Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, reduces queries we run

}

rawTbl := c.getRawTableName(req.FlowJobName)

// model the raw table data as inserts.
for _, tbl := range destinationTableNames {
// SELECT projection FROM raw_table WHERE _peerdb_batch_id > normalize_batch_id AND _peerdb_batch_id <= sync_batch_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated comment

Comment on lines +272 to +273
err = c.syncTablesInThisBatch(ctx, req, rawTbl, batchID)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err = c.syncTablesInThisBatch(ctx, req, rawTbl, batchID)
if err != nil {
if err := c.syncTablesInThisBatch(ctx, req, rawTbl, batchID); err != nil {

Comment on lines +279 to +280
err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err = c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID)
if err != nil {
if err := c.UpdateNormalizeBatchID(ctx, req.FlowJobName, batchID); err != nil {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants