-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source: use pull model for mylogical and pglogical #1065
base: master
Are you sure you want to change the base?
Conversation
1952f7b
to
7b85e20
Compare
This PR is based on the branch https://github.com/cockroachdb/replicator/tree/bob_core_open to modify the mylogical and pglogical frontend to use a pull based model which is enabled by the BatchReader interface.
7b85e20
to
d1f24b5
Compare
91f95ca
to
ceb8e5b
Compare
e0bfbbd
to
9848464
Compare
} | ||
} | ||
|
||
return conveyor.AcceptMultiBatch(ctx, toProcess, &types.AcceptOptions{}) | ||
for _, cursor := range cursorsPushed { | ||
_, err = stopvar.DoWhenUpdatedOrInterval(ctx, nil, &cursor.Outcome, 2*time.Second, func(ctx *stopper.Context, old, new error) error { |
Check warning
Code scanning / CodeQL
Useless assignment to local variable Warning
} | ||
|
||
for _, cursor := range cursorsPushed { | ||
_, err = stopvar.DoWhenUpdatedOrInterval(ctx, nil, &cursor.Outcome, 2*time.Second, func(ctx *stopper.Context, old, new error) error { |
Check warning
Code scanning / CodeQL
Useless assignment to local variable Warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments here. My remaining thought is this: because this is such an all encompassing change, we risk regressions across all operating modes.
Once we continue this work, I don't think we should merge and release all of this at once. Here is how I'm thinking about the sequencing (pun intended):
- Make the core changes to the sequencer and relevant interfaces + implementations as their own standalone change
- Make the changes for PG and MySQL first since those will see the most immediate performance benefit
- Perform testing and verification of the PG and MySQL cases
- Slowly add each mode of operation like objstore, kafka, and C2C
I don't want to embroil us in support tickets across the board in case something goes awry.
I'm wondering if it's possible for us to take this piecemeal approach -- starting with the PG and MySQL sources?
return nil | ||
}) | ||
return err | ||
}) | ||
|
||
return ret, stats, nil | ||
ctx.Go(func(ctx *stopper.Context) error { | ||
return ret.Fanout(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of doing a fanout here? Does this allow us to perform batching across different tables? How is this fan out affected if they have FK constraints?
if err := destination.AcceptMultiBatch(ctx, acc, opts); err != nil { | ||
return err | ||
} | ||
// AcceptMultiBatch splits the batch based on destination tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the helpful note. So that means for the PG and MY case, previously we would have had to send updates to any of the target tables single threaded:
table1 / mut 1 -> table 2 / mut 1 -> table2 / mut 2 -> table 1 / mut 2
But now we can fan it out so that we do in separate goroutines.
table1 / mut 1 -> table1 / mut 2
table2 / mut 1 -> table2 / mut 2
// Read from the source input. | ||
case cursor, ok := <-inputChan: | ||
cursorCnt++ | ||
for name, mut := range cursor.Batch.Data.All() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beyond fanning out, do we do any batch applying for the single table case? Like within single tables, can we do like:
(table1 / mut 1 + table1 / mut 2 + table1 / mut 3) as one batch apply
@@ -67,7 +67,7 @@ func (f *Fixture) SequencerFor( | |||
case switcher.ModeConsistent: | |||
return f.Staging.Wrap(ctx, f.Core) | |||
case switcher.ModeImmediate: | |||
return f.Immediate, nil | |||
return f.Core, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we now using .Core
since there is no more Immediate
after this change?
@@ -72,3 +87,70 @@ func (s *staging) Start( | |||
_, stats, err := s.delegate.Start(ctx, opts) | |||
return &acceptor{s.Staging}, stats, err | |||
} | |||
|
|||
func (s *staging) StageMutations( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, but is this StageMutations
only relevant for modes that touch the staging table for mutations storage? C2X cases I mean.
TargetQuerier: tx, | ||
}); err != nil { | ||
return nil, err | ||
cursor := &types.BatchCursor{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So previously, we were accepting a single batch at once and then flushing single threaded on the target. What is the implication of using the BatchCursor
? Does this now get us to use multibatches?
Who is the receiver of this out
channel where cursors are written too? Is that the core sequencer that will then do all the batching within and across tables?
That was the plan -- having smaller PRs to make smaller progress. The tricky thing is how the integration tests are written -- it's convoluted with the script wrapper. It means if we changed the pg/mysql front end, to make sure the CI can be green, we need to modify the interface of the script wrapper too. However, the script wrapper is shared by other frontends and thus other integrations too, meaning we need to change other frontends to accommodate the change too. We make break it into smaller commits, but given how the tests are structured, I don't think there's an easy way to break it into smaller PRs. |
This PR is based on the branch
https://github.com/cockroachdb/replicator/tree/bob_core_open to modify the mylogical and pglogical frontend to use a pull based model which is enabled by the BatchReader interface.
This change is