diff --git a/internal/sequencer/decorators/provider.go b/internal/sequencer/decorators/provider.go index d33ab36d..6ec7934f 100644 --- a/internal/sequencer/decorators/provider.go +++ b/internal/sequencer/decorators/provider.go @@ -25,6 +25,7 @@ import ( var Set = wire.NewSet( ProvideMarker, ProvideOnce, + ProvideRekey, ProvideRetryTarget, ) @@ -44,6 +45,11 @@ func ProvideOnce(pool *types.StagingPool, stagers types.Stagers) *Once { } } +// ProvideRekey is called by Wire. +func ProvideRekey(watchers types.Watchers) *Rekey { + return &Rekey{watchers: watchers} +} + // ProvideRetryTarget is called by Wire. func ProvideRetryTarget(target *types.TargetPool) *RetryTarget { return &RetryTarget{ diff --git a/internal/sequencer/decorators/rekey.go b/internal/sequencer/decorators/rekey.go new file mode 100644 index 00000000..720c25fd --- /dev/null +++ b/internal/sequencer/decorators/rekey.go @@ -0,0 +1,224 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package decorators + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/cockroachdb/field-eng-powertools/notify" + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachdb/replicator/internal/sequencer" + "github.com/cockroachdb/replicator/internal/types" + "github.com/cockroachdb/replicator/internal/util/hlc" + "github.com/cockroachdb/replicator/internal/util/merge" + "github.com/pkg/errors" +) + +// Rekey reencodes the [types.Mutation.Key] field as mutations are +// processed, based on the destination table. This is primarily to +// support the pglogical REPLICA IDENTITIY FULL option, but it could be +// used for any other case where the replication key does not actually +// preserve replication identity. +type Rekey struct { + watchers types.Watchers +} + +var _ sequencer.Shim = (*Rekey)(nil) + +// MultiAcceptor returns a rekeying facade around the acceptor. +func (r *Rekey) MultiAcceptor(acceptor types.MultiAcceptor) types.MultiAcceptor { + return &rekeyAcceptor{ + base: base{ + multiAcceptor: acceptor, + tableAcceptor: acceptor, + temporalAcceptor: acceptor, + }, + Rekey: r, + } +} + +// TableAcceptor returns a rekeying facade around the delegate. +func (r *Rekey) TableAcceptor(acceptor types.TableAcceptor) types.TableAcceptor { + return &rekeyAcceptor{ + base: base{ + tableAcceptor: acceptor, + }, + Rekey: r, + } +} + +// TemporalAcceptor returns a marking facade around the delegate. +func (r *Rekey) TemporalAcceptor(acceptor types.TemporalAcceptor) types.TemporalAcceptor { + return &rekeyAcceptor{ + base: base{ + tableAcceptor: acceptor, + temporalAcceptor: acceptor, + }, + Rekey: r, + } +} + +// Wrap implements [sequencer.Shim]. +func (r *Rekey) Wrap( + _ *stopper.Context, delegate sequencer.Sequencer, +) (sequencer.Sequencer, error) { + return &rekeyShim{r, delegate}, nil +} + +type rekeyShim struct { + *Rekey + delegate sequencer.Sequencer +} + +var _ sequencer.Sequencer = (*rekeyShim)(nil) + +// Start will inject the facade at the top of the stack. +func (r *rekeyShim) Start( + ctx *stopper.Context, opts *sequencer.StartOptions, +) (types.MultiAcceptor, *notify.Var[sequencer.Stat], error) { + acc, stat, err := r.delegate.Start(ctx, opts) + return r.MultiAcceptor(acc), stat, err +} + +type rekeyAcceptor struct { + *Rekey + base +} + +var _ types.MultiAcceptor = (*rekeyAcceptor)(nil) + +func (r *rekeyAcceptor) AcceptMultiBatch( + ctx context.Context, batch *types.MultiBatch, opts *types.AcceptOptions, +) error { + if r.multiAcceptor == nil { + return errors.New("not a MultiAcceptor") + } + next, err := r.multi(batch) + if err != nil { + return err + } + return r.multiAcceptor.AcceptMultiBatch(ctx, next, opts) +} + +func (r *rekeyAcceptor) AcceptTableBatch( + ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions, +) error { + if r.tableAcceptor == nil { + return errors.New("not a TableAcceptor") + } + next, err := r.table(batch) + if err != nil { + return err + } + return r.tableAcceptor.AcceptTableBatch(ctx, next, opts) +} + +func (r *rekeyAcceptor) AcceptTemporalBatch( + ctx context.Context, batch *types.TemporalBatch, opts *types.AcceptOptions, +) error { + if r.temporalAcceptor == nil { + return errors.New("not a TemporalAcceptor") + } + next, err := r.temporal(batch) + if err != nil { + return err + } + return r.temporalAcceptor.AcceptTemporalBatch(ctx, next, opts) +} + +func (r *rekeyAcceptor) multi(batch *types.MultiBatch) (*types.MultiBatch, error) { + ret := batch.Empty() + ret.ByTime = make(map[hlc.Time]*types.TemporalBatch, len(batch.Data)) + ret.Data = make([]*types.TemporalBatch, len(batch.Data)) + + for idx, temp := range batch.Data { + next, err := r.temporal(temp) + if err != nil { + return nil, err + } + ret.ByTime[next.Time] = next + ret.Data[idx] = next + } + return ret, nil +} + +func (r *rekeyAcceptor) table(batch *types.TableBatch) (*types.TableBatch, error) { + watcher, err := r.watchers.Get(batch.Table.Schema()) + if err != nil { + return nil, err + } + colData, ok := watcher.Get().Columns.Get(batch.Table) + if !ok { + return nil, errors.Errorf("unknown table %s", batch.Table) + } + bagSpec := &merge.BagSpec{Columns: colData} + + ret := batch.Empty() + + if err := func() error { + for table, mut := range batch.Mutations() { + // Shortest useful json we could decode is {"a":0} + if mut.IsDelete() && len(mut.Data) < 7 { + return ret.Accumulate(table, mut) + } + bag := merge.NewBag(bagSpec) + dec := json.NewDecoder(bytes.NewReader(mut.Data)) + dec.UseNumber() + if err := dec.Decode(&bag); err != nil { + return errors.WithStack(err) + } + var jsKey []any + for _, col := range colData { + if !col.Primary { + break + } + keyVal, ok := bag.Get(col.Name) + if !ok { + return errors.Errorf("could not rekey mutation; missing PK column %s", col.Name) + } + jsKey = append(jsKey, keyVal) + } + var err error + mut.Key, err = json.Marshal(jsKey) + if err != nil { + return errors.WithStack(err) + } + ret.Data = append(ret.Data, mut) + return nil + } + return nil + }(); err != nil { + return ret, err + } + + return ret, nil +} + +func (r *rekeyAcceptor) temporal(batch *types.TemporalBatch) (*types.TemporalBatch, error) { + ret := batch.Empty() + for table, tableBatch := range batch.Data.All() { + var err error + tableBatch, err = r.table(tableBatch) + if err != nil { + return nil, err + } + ret.Data.Put(table, tableBatch) + } + return ret, nil +} diff --git a/internal/source/mylogical/conn.go b/internal/source/mylogical/conn.go index 7a39ba36..ccc0eacf 100644 --- a/internal/source/mylogical/conn.go +++ b/internal/source/mylogical/conn.go @@ -23,7 +23,6 @@ package mylogical import ( "bytes" "context" - "database/sql" "encoding/json" "fmt" "os" @@ -50,13 +49,10 @@ import ( log "github.com/sirupsen/logrus" ) -// Conn exports the package-internal type. -type Conn conn - -// conn encapsulates all wire-connection behavior. It is +// Conn encapsulates all wire-connection behavior. It is // responsible for receiving replication messages and replying with // status updates. -type conn struct { +type Conn struct { // The destination for writes. acceptor types.TemporalAcceptor // Columns, as ordered by the source database. @@ -71,8 +67,6 @@ type conn struct { monotonic hlc.Clock // Map source ids to target tables. relations map[uint64]ident.Table - // Progress reports from the underlying sequencer. - stat *notify.Var[sequencer.Stat] // The configuration for opening replication connections. sourceConfig replication.BinlogSyncerConfig // Access to the staging cluster. @@ -81,6 +75,8 @@ type conn struct { target ident.Schema // Access to the target database. targetDB *types.TargetPool + // The memo key that holds the WAL offset. + walMemoKey string // Managed by persistWALOffset. walOffset notify.Var[*consistentPoint] } @@ -97,10 +93,10 @@ const ( //go:generate go run golang.org/x/tools/cmd/stringer -type=mutationType -var _ diag.Diagnostic = (*conn)(nil) +var _ diag.Diagnostic = (*Conn)(nil) // Diagnostic implements [diag.Diagnostic]. -func (c *conn) Diagnostic(_ context.Context) any { +func (c *Conn) Diagnostic(_ context.Context) any { return map[string]any{ "columns": c.columns, "flavor": c.flavor, @@ -108,30 +104,41 @@ func (c *conn) Diagnostic(_ context.Context) any { } } -func (c *conn) Start(ctx *stopper.Context) error { +func (c *Conn) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, error) { // Call this first to load the previous offset. We want to reset our // state before starting the main copier routine. - if err := c.persistWALOffset(ctx); err != nil { - return err + if err := c.loadWALOffset(ctx); err != nil { + return nil, err } + // Small channel for backpressure. + ret := make(chan *types.BatchCursor, 2) + // Start a process to copy data to the target. ctx.Go(func(ctx *stopper.Context) error { + defer close(ret) for !ctx.IsStopping() { - if err := c.copyMessages(ctx); err != nil { - log.WithError(err).Warn("error while copying messages; will retry") + if err := c.copyMessages(ctx, ret); err != nil { + cursor := &types.BatchCursor{Error: err} select { + case ret <- cursor: case <-ctx.Stopping(): - case <-time.After(time.Second): } + return nil } } return nil }) + + return ret, nil +} + +// ReportProgress reports the sequencer's progress. +func (c *Conn) ReportProgress(ctx *stopper.Context, stat *notify.Var[sequencer.Stat]) error { // Sync the sequencer's progress back to our GTID value. ctx.Go(func(ctx *stopper.Context) error { // Inner callback returns nil. - _, _ = stopvar.DoWhenChanged(ctx, nil, c.stat, + _, _ = stopvar.DoWhenChanged(ctx, nil, stat, func(ctx *stopper.Context, old, next sequencer.Stat) error { oldProgress := sequencer.CommonProgress(old).Max() progress := sequencer.CommonProgress(next).Max() @@ -141,19 +148,29 @@ func (c *conn) Start(ctx *stopper.Context) error { cp := progress.External().(*consistentPoint) log.Debugf("progressed to consistent point: %s", cp) c.walOffset.Set(cp) + + if err := c.memo.Put(ctx, c.stagingDB, c.walMemoKey, []byte(cp.String())); err == nil { + log.Tracef("stored WAL offset %s: %s", c.walMemoKey, cp) + } else { + log.WithError(err).Error("could not persist WAL offset") + } } return nil }) return nil }) - return nil } -// Process implements logical.Dialect and receives a sequence of logical -// replication messages, or possibly a rollbackMessage. -func (c *conn) accumulateBatch( - ctx *stopper.Context, ev *replication.BinlogEvent, batch *types.TemporalBatch, +// accumulateBatch folds replication messages into the batch and sends +// it to the channel when a complete transaction has been read. Any +// returned batch should be passed to the next invocation of +// accumulateBatch. +func (c *Conn) accumulateBatch( + ctx *stopper.Context, + ev *replication.BinlogEvent, + batch *types.TemporalBatch, + out chan<- *types.BatchCursor, ) (*types.TemporalBatch, error) { // See https://dev.mysql.com/doc/internals/en/binlog-event.html // Assumptions: @@ -183,33 +200,15 @@ func (c *conn) accumulateBatch( if batch.Count() == 0 { log.Trace("skipping empty transaction") } else { - tx, err := c.targetDB.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, errors.WithStack(err) - } - defer tx.Rollback() - - if err := c.acceptor.AcceptTemporalBatch(ctx, batch, &types.AcceptOptions{ - TargetQuerier: tx, - }); err != nil { - return nil, err + cursor := &types.BatchCursor{ + Batch: batch, + Progress: hlc.RangeIncluding(hlc.Zero(), batch.Time), } - if err := tx.Commit(); err != nil { - return nil, errors.WithStack(err) + select { + case out <- cursor: + case <-ctx.Stopping(): + return nil, nil } - - // TODO(bob): This is a temporary hack until this frontend - // is switched to using the core sequencer. Very shortly, - // the sequencer stat will reflect the progress of - // transactions that have been committed to the target. In - // the meantime, we're in immediate operation, so we'll fake - // one up. - fakeProgress := &ident.TableMap[hlc.Range]{} - fakeTable := ident.NewTable(c.target, ident.New("fake")) - fakeProgress.Put(fakeTable, hlc.RangeIncluding(hlc.Zero(), batch.Time)) - c.stat.Set(sequencer.NewStat(&types.TableGroup{ - Tables: []ident.Table{fakeTable}, - }, fakeProgress)) } return nil, nil @@ -288,7 +287,7 @@ func (c *conn) accumulateBatch( // copyMessages is the main replication loop. It will open a connection // to the source, accumulate messages, and commit data to the target. -func (c *conn) copyMessages(ctx *stopper.Context) error { +func (c *Conn) copyMessages(ctx *stopper.Context, out chan<- *types.BatchCursor) error { syncer := replication.NewBinlogSyncer(c.sourceConfig) defer syncer.Close() @@ -332,7 +331,7 @@ func (c *conn) copyMessages(ctx *stopper.Context) error { *replication.QueryEvent, *replication.MariadbGTIDEvent, *replication.MariadbAnnotateRowsEvent: - batch, err = c.accumulateBatch(ctx, ev, batch) + batch, err = c.accumulateBatch(ctx, ev, batch, out) if err != nil { return err } @@ -360,11 +359,11 @@ func (c *conn) copyMessages(ctx *stopper.Context) error { } // ZeroStamp implements logical.Dialect. -func (c *conn) ZeroStamp() stamp.Stamp { +func (c *Conn) ZeroStamp() stamp.Stamp { return newConsistentPoint(c.flavor) } -func (c *conn) onDataTuple( +func (c *Conn) onDataTuple( batch *types.TemporalBatch, tuple *replication.RowsEvent, operation mutationType, ) error { tbl, ok := c.relations[tuple.TableID] @@ -437,7 +436,7 @@ func (c *conn) onDataTuple( // getTableMetadata fetches table metadata from the database // if binlog_row_metadata = minimal -func (c *conn) getColNames(table ident.Table) ([][]byte, []uint64, error) { +func (c *Conn) getColNames(table ident.Table) ([][]byte, []uint64, error) { cl, err := getConnection(c.config) if err != nil { return nil, nil, err @@ -469,7 +468,7 @@ func (c *conn) getColNames(table ident.Table) ([][]byte, []uint64, error) { // onRelation updates the source database namespace mappings. // Columns names are only available if // set global binlog_row_metadata = full; -func (c *conn) onRelation(msg *replication.TableMapEvent) error { +func (c *Conn) onRelation(msg *replication.TableMapEvent) error { targetTbl := ident.NewTable(c.target, ident.New(string(msg.Table))) log.Tracef("Learned %+v", targetTbl) columnNames, primaryKeys := msg.ColumnName, msg.PrimaryKey @@ -599,7 +598,7 @@ func getFlavor(config *Config) (string, string, error) { // initializes to a user-provided value. It will also start a goroutine // in the stopper to occasionally write an updated value back to the // memo. -func (c *conn) persistWALOffset(ctx *stopper.Context) error { +func (c *Conn) loadWALOffset(ctx *stopper.Context) error { key := fmt.Sprintf("mysql-wal-offset-%s", c.target.Raw()) found, err := c.memo.Get(ctx, c.stagingDB, key) if err != nil { @@ -625,19 +624,6 @@ func (c *conn) persistWALOffset(ctx *stopper.Context) error { // transaction logs. c.monotonic.External(cp.clone()) c.walOffset.Set(cp) - - ctx.Go(func(ctx *stopper.Context) error { - _, err := stopvar.DoWhenChanged(ctx, cp, &c.walOffset, - func(ctx *stopper.Context, _, cp *consistentPoint) error { - if err := c.memo.Put(ctx, c.stagingDB, key, []byte(cp.String())); err == nil { - log.Tracef("stored WAL offset %s: %s", key, cp) - } else { - log.WithError(err).Error("could not persist WAL offset") - } - return nil - }) - return err - }) return nil } diff --git a/internal/source/mylogical/conn_test.go b/internal/source/mylogical/conn_test.go index 5d110b56..61868f53 100644 --- a/internal/source/mylogical/conn_test.go +++ b/internal/source/mylogical/conn_test.go @@ -76,7 +76,7 @@ func TestOnDataTuple(t *testing.T) { columns := &ident.TableMap[[]types.ColData]{} columns.Put(kvTable, kvCols) columns.Put(noKeyTable, noKeyCols) - c := &conn{ + c := &Conn{ columns: columns, relations: map[uint64]ident.Table{ kvTableID: kvTable, @@ -323,7 +323,7 @@ func TestOnRelation(t *testing.T) { t.Run(tt.name, func(t *testing.T) { a := assert.New(t) r := require.New(t) - c := &conn{ + c := &Conn{ columns: &ident.TableMap[[]types.ColData]{}, config: &Config{}, relations: make(map[uint64]ident.Table), @@ -435,7 +435,7 @@ func TestInitialConsistentPoint(t *testing.T) { a := assert.New(t) m := &mockMemo{} - c := &conn{ + c := &Conn{ config: &Config{ InitialGTID: tt.config, }, @@ -447,7 +447,7 @@ func TestInitialConsistentPoint(t *testing.T) { if tt.stored != "" { a.NoError(m.Put(stop, nil, key, []byte(tt.stored))) } - a.NoError(c.persistWALOffset(stop)) + a.NoError(c.loadWALOffset(stop)) a.Equal(c.monotonic.Last().External().(*consistentPoint).String(), tt.want) }) } diff --git a/internal/source/mylogical/injector.go b/internal/source/mylogical/injector.go index a78db55b..d4013c3e 100644 --- a/internal/source/mylogical/injector.go +++ b/internal/source/mylogical/injector.go @@ -25,8 +25,8 @@ import ( "github.com/cockroachdb/field-eng-powertools/stopper" scriptRuntime "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer/chaos" - "github.com/cockroachdb/replicator/internal/sequencer/decorators" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/core" + "github.com/cockroachdb/replicator/internal/sequencer/scheduler" scriptSequencer "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/staging" @@ -42,12 +42,12 @@ func Start(ctx *stopper.Context, config *Config) (*MYLogical, error) { wire.Bind(new(context.Context), new(*stopper.Context)), wire.Struct(new(MYLogical), "*"), wire.FieldsOf(new(*Config), "Script"), - wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Stage", "Staging", "Target"), + wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Staging", "Target"), Set, chaos.Set, - decorators.Set, + core.Set, diag.New, - immediate.Set, + scheduler.Set, scriptRuntime.Set, scriptSequencer.Set, sinkprod.Set, diff --git a/internal/source/mylogical/integration_test.go b/internal/source/mylogical/integration_test.go index dc52f76c..7b19b8f5 100644 --- a/internal/source/mylogical/integration_test.go +++ b/internal/source/mylogical/integration_test.go @@ -241,7 +241,7 @@ func TestColumNames(t *testing.T) { defer cancel() flavor, _, err := getFlavor(config) r.NoError(err) - myConn := &conn{ + myConn := &Conn{ flavor: flavor, config: config, } diff --git a/internal/source/mylogical/provider.go b/internal/source/mylogical/provider.go index afbc2aae..38acce7f 100644 --- a/internal/source/mylogical/provider.go +++ b/internal/source/mylogical/provider.go @@ -17,12 +17,14 @@ package mylogical import ( + "fmt" + "github.com/cockroachdb/field-eng-powertools/notify" "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/sequencer/chaos" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/core" scriptSeq "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/target/apply" "github.com/cockroachdb/replicator/internal/target/schemawatch" @@ -48,7 +50,7 @@ func ProvideConn( acc *apply.Acceptor, chaos *chaos.Chaos, config *Config, - imm *immediate.Immediate, + core *core.Core, memo types.Memo, scriptSeq *scriptSeq.Sequencer, stagingPool *types.StagingPool, @@ -74,7 +76,22 @@ func ProvideConn( TLSConfig: config.tlsConfig, } - seq, err := scriptSeq.Wrap(ctx, imm) + ret := &Conn{ + columns: &ident.TableMap[[]types.ColData]{}, + config: config, + memo: memo, + flavor: flavor, + relations: make(map[uint64]ident.Table), + sourceConfig: cfg, + stagingDB: stagingPool, + target: config.TargetSchema, + targetDB: targetPool, + walMemoKey: fmt.Sprintf("mysql-wal-offset-%s", config.TargetSchema.Raw()), + walOffset: notify.Var[*consistentPoint]{}, + } + + seq := sequencer.Sequencer(core) + seq, err = scriptSeq.Wrap(ctx, seq) if err != nil { return nil, err } @@ -82,9 +99,10 @@ func ProvideConn( if err != nil { return nil, err } - connAcceptor, stat, err := seq.Start(ctx, &sequencer.StartOptions{ - Delegate: types.OrderedAcceptorFrom(acc, watchers), - Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. + _, statVar, err := seq.Start(ctx, &sequencer.StartOptions{ + BatchReader: ret, + Delegate: types.OrderedAcceptorFrom(acc, watchers), + Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. Group: &types.TableGroup{ Name: ident.New(config.TargetSchema.Raw()), Enclosing: config.TargetSchema, @@ -94,22 +112,11 @@ func ProvideConn( return nil, err } - ret := &conn{ - acceptor: connAcceptor, - columns: &ident.TableMap[[]types.ColData]{}, - config: config, - memo: memo, - flavor: flavor, - relations: make(map[uint64]ident.Table), - sourceConfig: cfg, - stagingDB: stagingPool, - stat: stat, - target: config.TargetSchema, - targetDB: targetPool, - walOffset: notify.Var[*consistentPoint]{}, + if err := ret.ReportProgress(ctx, statVar); err != nil { + return nil, err } - return (*Conn)(ret), ret.Start(ctx) + return ret, nil } // ProvideEagerConfig is a hack to move up the evaluation of the user diff --git a/internal/source/mylogical/wire_gen.go b/internal/source/mylogical/wire_gen.go index 3432e52a..ac664fa8 100644 --- a/internal/source/mylogical/wire_gen.go +++ b/internal/source/mylogical/wire_gen.go @@ -10,12 +10,12 @@ import ( "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer/chaos" - "github.com/cockroachdb/replicator/internal/sequencer/decorators" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/core" + "github.com/cockroachdb/replicator/internal/sequencer/scheduler" script2 "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/sinkprod" + "github.com/cockroachdb/replicator/internal/staging/leases" "github.com/cockroachdb/replicator/internal/staging/memo" - "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/staging/version" "github.com/cockroachdb/replicator/internal/target/apply" "github.com/cockroachdb/replicator/internal/target/dlq" @@ -87,19 +87,22 @@ func Start(ctx *stopper.Context, config *Config) (*MYLogical, error) { chaosChaos := &chaos.Chaos{ Config: sequencerConfig, } - stageConfig := &eagerConfig.Stage - stagers := stage.ProvideFactory(stageConfig, stagingPool, stagingSchema, ctx) - marker := decorators.ProvideMarker(stagingPool, stagers) - once := decorators.ProvideOnce(stagingPool, stagers) - retryTarget := decorators.ProvideRetryTarget(targetPool) - immediateImmediate := immediate.ProvideImmediate(sequencerConfig, targetPool, marker, once, retryTarget, stagers) + typesLeases, err := leases.ProvideLeases(ctx, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + schedulerScheduler, err := scheduler.ProvideScheduler(ctx, sequencerConfig) + if err != nil { + return nil, err + } + coreCore := core.ProvideCore(sequencerConfig, typesLeases, schedulerScheduler, targetPool) sequencer := script2.ProvideSequencer(loader, targetPool, watchers) - mylogicalConn, err := ProvideConn(ctx, acceptor, chaosChaos, config, immediateImmediate, memoMemo, sequencer, stagingPool, targetPool, watchers) + conn, err := ProvideConn(ctx, acceptor, chaosChaos, config, coreCore, memoMemo, sequencer, stagingPool, targetPool, watchers) if err != nil { return nil, err } myLogical := &MYLogical{ - Conn: mylogicalConn, + Conn: conn, Diagnostics: diagnostics, } return myLogical, nil diff --git a/internal/source/pglogical/conn.go b/internal/source/pglogical/conn.go index 3da6cd2b..cac5325d 100644 --- a/internal/source/pglogical/conn.go +++ b/internal/source/pglogical/conn.go @@ -20,7 +20,6 @@ package pglogical import ( "context" - "database/sql" "encoding/json" "fmt" "time" @@ -45,8 +44,6 @@ import ( // responsible for receiving replication messages and replying with // status updates. type Conn struct { - // The destination for writes. - acceptor types.TemporalAcceptor // Columns, as ordered by the source database. columns *ident.TableMap[[]types.ColData] // Persistent storage for WAL data. @@ -65,51 +62,69 @@ type Conn struct { standbyTimeout time.Duration // Access to the staging cluster. stagingDB *types.StagingPool - // Progress reports from the underlying sequencer. - stat *notify.Var[sequencer.Stat] // The destination for writes. target ident.Schema - // Access to the target database. - targetDB *types.TargetPool + // The memo key that holds the WAL offset. + walMemoKey string // Holds the guaranteed-committed LSN. walOffset notify.Var[pglogrepl.LSN] } // Start launches goroutines into the context. -func (c *Conn) Start(ctx *stopper.Context) error { +func (c *Conn) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, error) { // Call this first to load the previous offset. We want to reset our // state before starting the main copier routine. - if err := c.persistWALOffset(ctx); err != nil { - return err + if err := c.loadWALOffset(ctx); err != nil { + return nil, err } + // Small channel for backpressure. + ret := make(chan *types.BatchCursor, 2) + // Start a process to copy data to the target. ctx.Go(func(ctx *stopper.Context) error { + defer close(ret) for !ctx.IsStopping() { - if err := c.copyMessages(ctx); err != nil { - log.WithError(err).Warn("error while copying messages; will retry") + if err := c.copyMessages(ctx, ret); err != nil { + cursor := &types.BatchCursor{Error: err} select { + case ret <- cursor: case <-ctx.Stopping(): - case <-time.After(100 * time.Millisecond): } + return nil } } return nil }) - // Sync the sequencer's progress back to our LSN value. + + return ret, nil +} + +// ReportProgress reports the sequencer's progress. +func (c *Conn) ReportProgress(ctx *stopper.Context, stat *notify.Var[sequencer.Stat]) error { + // Sync the sequencer's progress back to our LSN value and keep the + // memo table up to date. ctx.Go(func(ctx *stopper.Context) error { // Inner callback returns nil. - _, _ = stopvar.DoWhenChanged(ctx, nil, c.stat, + _, _ = stopvar.DoWhenChanged(ctx, nil, stat, func(ctx *stopper.Context, old, next sequencer.Stat) error { oldProgress := sequencer.CommonProgress(old).Max() progress := sequencer.CommonProgress(next).Max() // Debounce intermediate progress updates (e.g. partial // table progress). - if hlc.Compare(progress, oldProgress) > 0 { - lsn := progress.External().(pglogrepl.LSN) - log.Debugf("progressed to LSN: %s", lsn) - c.walOffset.Set(lsn) + if hlc.Compare(progress, oldProgress) <= 0 { + return nil + } + lsn := progress.External().(pglogrepl.LSN) + log.Debugf("progressed to LSN: %s", lsn) + c.walOffset.Set(lsn) + + if err := c.memo.Put(ctx, c.stagingDB, c.walMemoKey, []byte(lsn.String())); err == nil { + log.Tracef("stored WAL offset %s: %s", c.walMemoKey, lsn) + } else { + log.WithError(err).Warn("could not persist LSN offset") } + return nil }) return nil @@ -118,11 +133,15 @@ func (c *Conn) Start(ctx *stopper.Context) error { return nil } -// accumulateBatch folds replication messages into the batch and sends it to -// the acceptor when a complete transaction has been read. The returned -// batch should be passed to the next invocation of accumulateBatch. +// accumulateBatch folds replication messages into the batch and sends +// it to the channel when a complete transaction has been read. Any +// returned batch should be passed to the next invocation of +// accumulateBatch. func (c *Conn) accumulateBatch( - ctx *stopper.Context, msg pglogrepl.Message, batch *types.TemporalBatch, + ctx *stopper.Context, + msg pglogrepl.Message, + batch *types.TemporalBatch, + out chan<- *types.BatchCursor, ) (*types.TemporalBatch, error) { log.Tracef("message %T", msg) switch msg := msg.(type) { @@ -149,33 +168,15 @@ func (c *Conn) accumulateBatch( emptyTransactionCount.Inc() log.Trace("skipping empty transaction") } else { - tx, err := c.targetDB.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, errors.WithStack(err) - } - defer tx.Rollback() - - if err := c.acceptor.AcceptTemporalBatch(ctx, batch, &types.AcceptOptions{ - TargetQuerier: tx, - }); err != nil { - return nil, err + cursor := &types.BatchCursor{ + Batch: batch, + Progress: hlc.RangeIncluding(hlc.Zero(), batch.Time), } - - if err := tx.Commit(); err != nil { - return nil, errors.WithStack(err) + select { + case out <- cursor: + case <-ctx.Stopping(): + return nil, nil } - // TODO(bob): This is a temporary hack until this frontend - // is switched to using the core sequencer. Very shortly, - // the sequencer stat will reflect the progress of - // transactions that have been committed to the target. In - // the meantime, we're in immediate operation, so we'll fake - // one up. - fakeProgress := &ident.TableMap[hlc.Range]{} - fakeTable := ident.NewTable(c.target, ident.New("fake")) - fakeProgress.Put(fakeTable, hlc.RangeIncluding(hlc.Zero(), batch.Time)) - c.stat.Set(sequencer.NewStat(&types.TableGroup{ - Tables: []ident.Table{fakeTable}, - }, fakeProgress)) } return nil, nil @@ -204,7 +205,7 @@ func (c *Conn) accumulateBatch( // copyMessages is the main replication loop. It will open a connection // to the source, accumulate messages, and commit data to the target. -func (c *Conn) copyMessages(ctx *stopper.Context) error { +func (c *Conn) copyMessages(ctx *stopper.Context, out chan<- *types.BatchCursor) error { replConn, err := pgconn.ConnectConfig(ctx, c.sourceConfig) if err != nil { return errors.WithStack(err) @@ -306,7 +307,7 @@ func (c *Conn) copyMessages(ctx *stopper.Context) error { }).Debug("xlog data") // Update our accumulator with the received message. - batch, err = c.accumulateBatch(ctx, logicalMsg, batch) + batch, err = c.accumulateBatch(ctx, logicalMsg, batch, out) if err != nil { return err } @@ -444,12 +445,9 @@ func (c *Conn) onRelation(msg *pglogrepl.RelationMessage) { }).Trace("learned relation") } -// persistWALOffset loads an existing value from memo into walOffset. It -// will also start a goroutine in the stopper to occasionally write an -// updated value back to the memo. -func (c *Conn) persistWALOffset(ctx *stopper.Context) error { - key := fmt.Sprintf("pglogical-wal-offset-%s", c.target.Raw()) - found, err := c.memo.Get(ctx, c.stagingDB, key) +// loadWALOffset loads an existing value from memo into walOffset and initializes the local clock. +func (c *Conn) loadWALOffset(ctx *stopper.Context) error { + found, err := c.memo.Get(ctx, c.stagingDB, c.walMemoKey) if err != nil { return err } @@ -461,27 +459,7 @@ func (c *Conn) persistWALOffset(ctx *stopper.Context) error { c.monotonic.External(lsn) c.walOffset.Set(lsn) } - store := func(ctx context.Context, lsn pglogrepl.LSN) { - if err := c.memo.Put(ctx, c.stagingDB, key, []byte(lsn.String())); err == nil { - log.Tracef("stored WAL offset %s: %s", key, lsn) - } else { - log.WithError(err).Warn("could not persist LSN offset") - } - } - ctx.Go(func(ctx *stopper.Context) error { - _, err := stopvar.DoWhenChanged(ctx, lsn, &c.walOffset, - func(ctx *stopper.Context, _, lsn pglogrepl.LSN) error { - store(ctx, lsn) - return nil - }) - return err - }) - // Make a final update on the way out. - ctx.Defer(func() { - last, _ := c.walOffset.Get() - // Use background because the stopper has stopped. - store(context.Background(), last) - }) + return nil } diff --git a/internal/source/pglogical/injector.go b/internal/source/pglogical/injector.go index abf52d54..dc1729d5 100644 --- a/internal/source/pglogical/injector.go +++ b/internal/source/pglogical/injector.go @@ -25,8 +25,9 @@ import ( "github.com/cockroachdb/field-eng-powertools/stopper" scriptRuntime "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer/chaos" + "github.com/cockroachdb/replicator/internal/sequencer/core" "github.com/cockroachdb/replicator/internal/sequencer/decorators" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/scheduler" scriptSequencer "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/staging" @@ -42,12 +43,13 @@ func Start(*stopper.Context, *Config) (*PGLogical, error) { wire.Bind(new(context.Context), new(*stopper.Context)), wire.Struct(new(PGLogical), "*"), wire.FieldsOf(new(*Config), "Script"), - wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Stage", "Staging", "Target"), + wire.FieldsOf(new(*EagerConfig), "DLQ", "Sequencer", "Staging", "Target"), Set, chaos.Set, + core.Set, decorators.Set, diag.New, - immediate.Set, + scheduler.Set, scriptRuntime.Set, scriptSequencer.Set, sinkprod.Set, diff --git a/internal/source/pglogical/integration_test.go b/internal/source/pglogical/integration_test.go index 92ea6556..9d987bc2 100644 --- a/internal/source/pglogical/integration_test.go +++ b/internal/source/pglogical/integration_test.go @@ -31,23 +31,26 @@ import ( "testing" "time" + "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/script" + "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/sinkprod" "github.com/cockroachdb/replicator/internal/sinktest" "github.com/cockroachdb/replicator/internal/sinktest/all" "github.com/cockroachdb/replicator/internal/sinktest/base" "github.com/cockroachdb/replicator/internal/sinktest/scripttest" - "github.com/cockroachdb/replicator/internal/util/batches" + "github.com/cockroachdb/replicator/internal/types" + "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" + "github.com/cockroachdb/replicator/internal/util/workload" "github.com/google/uuid" - "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pglogrepl" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) var ( @@ -58,6 +61,7 @@ var ( ) func TestMain(m *testing.M) { + base.XXXSetSourceConn(*pgConnString) all.IntegrationMain(m, all.PostgreSQLName) } @@ -68,10 +72,8 @@ type fixtureConfig struct { script bool } -// This is a general smoke-test of the logical replication feed. -// -// The probabilities are chosen to make the tests pass within a -// reasonable timeframe, given the large number of rows that we insert. +// This is a general smoke-test of the logical replication feed that +// uses the workload checker to validate the replicated data. func TestPGLogical(t *testing.T) { t.Run("consistent", func(t *testing.T) { testPGLogical(t, &fixtureConfig{}) @@ -97,209 +99,183 @@ func TestPGLogical(t *testing.T) { } func testPGLogical(t *testing.T, fc *fixtureConfig) { + const transactionCount = 1024 a := assert.New(t) r := require.New(t) - // Create a basic test fixture. - fixture, err := base.NewFixture(t) + + partitionCount := 1 + if fc.partition { + partitionCount = 10 + } + + // This fixture points at the target. It will, eventually, be used + // to validate that the target contains the data expected by the + // workload checker. + crdbFixture, err := all.NewFixture(t) if !a.NoError(err) { return } + ctx := crdbFixture.Context - ctx := fixture.Context - dbSchema := fixture.TargetSchema.Schema() - dbName := dbSchema.Idents(nil)[0] // Extract first name part. - crdbPool := fixture.TargetPool + sourceSchema := crdbFixture.SourceSchema.Schema() + sourceDB := sourceSchema.Idents(nil)[0] // Extract first name part. - pgPool, cancel, err := setupPGPool(dbName) + // These generators will act as sources of mutations to apply later + // on and will then be used to validate the information in the + // target. + targetChecker, _, err := crdbFixture.NewWorkload(ctx, &all.WorkloadConfig{}) r.NoError(err) - - defer cancel() - cancel, err = setupPublication(ctx, pgPool, dbName, "ALL TABLES") - r.NoError(err) - defer cancel() - // Create the schema in both locations. - tgts := []ident.Table{ - ident.NewTable(dbSchema, ident.New("t1")), - ident.NewTable(dbSchema, ident.New("t2")), - } - srcs := tgts - if fc.partition { - srcs = []ident.Table{ - ident.NewTable(dbSchema, ident.New("t1_P_0")), - ident.NewTable(dbSchema, ident.New("t2_P_0")), + sourceGenerators := make([]*workload.GeneratorBase, partitionCount) + for i := range sourceGenerators { + parent := ident.NewTable(sourceSchema, targetChecker.Parent.Name().Table()) + child := ident.NewTable(sourceSchema, targetChecker.Child.Name().Table()) + if fc.partition { + part := i % partitionCount + parent = ident.NewTable(parent.Schema(), + ident.New(fmt.Sprintf("%s_P_%d", parent.Table().Raw(), part))) + child = ident.NewTable(child.Schema(), + ident.New(fmt.Sprintf("%s_P_%d", child.Table().Raw(), part))) } + sourceGenerators[i] = workload.NewGeneratorBase(parent, child) } - var crdbCol string - for _, src := range srcs { - pgSchema := fmt.Sprintf(`CREATE TABLE %s (pk INT PRIMARY KEY, v TEXT)`, src) - if _, err := pgPool.Exec(ctx, pgSchema); !a.NoError(err) { - return - } + // Set up the source database with a replication publication and + // create the workload table schema. + pgPool := crdbFixture.SourcePool + err = setupPublication(ctx, pgPool, sourceDB, "ALL TABLES") + r.NoError(err) + + // Create the tables in the source database. We may simulate the + // case where there are multiple partitioned source tables that fan + // into a single target table. There's an extra test case here for + // REPLICA IDENTITY FULL, which makes the source behave as though + // all columns in the table are the primary key. + for _, generator := range sourceGenerators { + parent := generator.Parent + child := generator.Child + + parentSQL, childSQL := all.WorkloadSchema( + &all.WorkloadConfig{}, types.ProductPostgreSQL, + parent, child) + _, err = pgPool.ExecContext(ctx, parentSQL) + r.NoError(err) + _, err = pgPool.ExecContext(ctx, childSQL) + r.NoError(err) if fc.rif { - if _, err := pgPool.Exec(ctx, fmt.Sprintf( - `ALTER TABLE %s REPLICA IDENTITY FULL`, src)); !a.NoError(err) { - return - } - } - } - for _, tgt := range tgts { - crdbSchema := fmt.Sprintf(`CREATE TABLE %s (pk INT PRIMARY KEY, v TEXT)`, tgt) - crdbCol = "v" - if fc.script { - // Ensure that script is wired up by renaming a column. - crdbSchema = fmt.Sprintf(`CREATE TABLE %s (pk INT PRIMARY KEY, v_mapped TEXT)`, tgt) - crdbCol = "v_mapped" - } - if _, err := crdbPool.ExecContext(ctx, crdbSchema); !a.NoError(err) { - return + _, err := pgPool.ExecContext(ctx, fmt.Sprintf( + `ALTER TABLE %s REPLICA IDENTITY FULL`, parent)) + r.NoError(err) + _, err = pgPool.ExecContext(ctx, fmt.Sprintf( + `ALTER TABLE %s REPLICA IDENTITY FULL`, child)) + r.NoError(err) } } - // We want enough rows here to make sure that the batching and - // coalescing logic gets exercised. - rowCount := 10 * batches.Size() - keys := make([]int, rowCount) - vals := make([]string, rowCount) - - // Insert data into source tables with overlapping transactions. - eg, egCtx := errgroup.WithContext(ctx) - eg.SetLimit(128) - for _, src := range srcs { - for i := 0; i < len(vals); i += 2 { - eg.Go(func() error { - keys[i] = i - keys[i+1] = i + 1 - vals[i] = fmt.Sprintf("v=%d", i) - vals[i+1] = fmt.Sprintf("v=%d", i+1) - - tx, err := pgPool.Begin(egCtx) - if err != nil { - return err - } - _, err = tx.Exec(egCtx, - fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", src), - keys[i], vals[i], - ) - if err != nil { - return err - } - _, err = tx.Exec(egCtx, - fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", src), - keys[i+1], vals[i+1], - ) - if err != nil { - return err - } - return tx.Commit(egCtx) - }) - } - } + // Create a new test fixture using the source database as the + // target. This will allow us to use the apply package to insert the + // generated workload data into the source tables. + pgFixture, err := all.NewFixtureFromBase(crdbFixture.Swapped()) r.NoError(err) + acc := types.OrderedAcceptorFrom(pgFixture.ApplyAcceptor, pgFixture.Watchers) - pubNameRaw := publicationName(dbName).Raw() - // Start the connection, to demonstrate that we can backfill pending mutations. + // Build the runtime configuration for the replication loop, which + // we'll run as though it were being started from the command line + // (i.e. it's going to dial the source and target itself, etc). + pubNameRaw := publicationName(sourceDB).Raw() cfg := &Config{ + Sequencer: sequencer.Config{ + QuiescentPeriod: 100 * time.Millisecond, + }, Staging: sinkprod.StagingConfig{ - Schema: fixture.StagingDB.Schema(), + Schema: crdbFixture.StagingDB.Schema(), }, Target: sinkprod.TargetConfig{ CommonConfig: sinkprod.CommonConfig{ - Conn: crdbPool.ConnectionString, + Conn: crdbFixture.TargetPool.ConnectionString, }, ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier. }, Publication: pubNameRaw, Slot: pubNameRaw, - SourceConn: *pgConnString + dbName.Raw(), + SourceConn: *pgConnString + sourceDB.Raw(), StandbyTimeout: 100 * time.Millisecond, - TargetSchema: dbSchema, + TargetSchema: crdbFixture.TargetSchema.Schema(), } if fc.chaos { cfg.Sequencer.Chaos = 2 } - if fc.script { + if fc.partition || fc.script { cfg.Script = script.Config{ - FS: scripttest.ScriptFSFor(tgts[0]), - MainPath: "/testdata/logical_test.ts", + FS: scripttest.ScriptFSParentChild( + targetChecker.Parent.Name(), targetChecker.Child.Name()), + MainPath: "/testdata/logical_test_parent_child.ts", } } r.NoError(cfg.Preflight()) - repl, err := Start(ctx, cfg) - r.NoError(err) - - // Wait for backfill. - for _, tgt := range tgts { - for { - var count int - if err := crdbPool.QueryRowContext(ctx, - fmt.Sprintf("SELECT count(*) FROM %s", tgt)).Scan(&count); !a.NoError(err) { - return - } - log.Trace("backfill count", count) - if count == rowCount { - break - } - time.Sleep(100 * time.Millisecond) - } - } - // Let's perform an update in a single transaction. - tx, err := pgPool.Begin(ctx) + // We control the lifecycle of the replication loop. + connCtx := stopper.WithContext(ctx) + repl, err := Start(connCtx, cfg) r.NoError(err) - for _, src := range srcs { - if _, err := tx.Exec(ctx, fmt.Sprintf("UPDATE %s SET v = 'updated'", src)); !a.NoError(err) { - return + log.Info("started pglogical conn") + + // This will be filled in on the last insert transaction below, so + // that we can monitor the replication loop until it proceeds to + // this point in the WAL. + var expectLSN pglogrepl.LSN + + // At this point, the source database has been configured with a + // publication, replication slot and table schema. The replication + // loop has started and established connections to the source and + // the target. We can finally start generating data to apply. + log.Info("starting to insert data into source") + for i := range transactionCount { + batch := &types.MultiBatch{} + sourceGenerator := sourceGenerators[i%partitionCount] + sourceGenerator.GenerateInto(batch, hlc.New(int64(i+1), i)) + + tx, err := pgFixture.TargetPool.BeginTx(ctx, &sql.TxOptions{}) + r.NoError(err) + r.NoError(acc.AcceptMultiBatch(ctx, batch, &types.AcceptOptions{TargetQuerier: tx})) + if i == transactionCount-1 { + // On our final transaction, capture the LSN that + // replication should advance to. + r.NoError(tx.QueryRowContext(ctx, + "SELECT pg_current_wal_insert_lsn()").Scan(&expectLSN)) } + r.NoError(tx.Commit()) } - r.NoError(tx.Commit(ctx)) + log.Info("finished inserting data into source") - // Wait for the update to propagate. - for _, tgt := range tgts { - for { - count, err := base.GetRowCountWithPredicate(ctx, fixture.TargetPool, tgt, fmt.Sprintf("%s = 'updated'", crdbCol)) - r.NoError(err) - log.Trace("update count", count) - if count == rowCount { - break - } - time.Sleep(100 * time.Millisecond) - } + // Merge source generators into the target checker. + for _, gen := range sourceGenerators { + targetChecker.CopyFrom(gen) } - // Delete some rows. - tx, err = pgPool.Begin(ctx) - r.NoError(err) - for _, src := range srcs { - if _, err := tx.Exec(ctx, fmt.Sprintf("DELETE FROM %s WHERE pk < 50", src)); !a.NoError(err) { - return + // Monitor the connection's committed walOffset. It should advance + // beyond the insert point that was captured in the loop above. + for { + pos, posChanged := repl.Conn.walOffset.Get() + if pos >= expectLSN { + break } - } - r.NoError(tx.Commit(ctx)) - - // Wait for the deletes to propagate. - for _, tgt := range tgts { - for { - count, err := base.GetRowCountWithPredicate(ctx, fixture.TargetPool, tgt, fmt.Sprintf("%s = 'updated'", crdbCol)) - r.NoError(err) - log.Trace("delete count", count) - if count == rowCount-50 { - break - } - time.Sleep(100 * time.Millisecond) + log.Infof("waiting for lsn: %s vs %s", pos, expectLSN) + select { + case <-posChanged: + case <-ctx.Done(): + log.Errorf("timed out waiting for lsn: %s vs %s", pos, expectLSN) + r.NoError(ctx.Err()) } } - sinktest.CheckDiagnostics(ctx, t, repl.Diagnostics) - - // Verify that a WAL offset was indeed recorded. - key := fmt.Sprintf("pglogical-wal-offset-%s", fixture.TargetSchema.Raw()) - if off, err := repl.Memo.Get(ctx, fixture.StagingPool, key); a.NoError(err) { - a.NotEmpty(off) - } + // Validate the data in the target tables. + r.True(targetChecker.CheckConsistent(ctx, t)) - ctx.Stop(time.Second) - a.NoError(ctx.Wait()) + // We need to wait for the connection to shut down, otherwise the + // database cleanup callbacks (to drop the publication, etc.) from + // the test code above can't succeed. + connCtx.Stop(time.Minute) + <-connCtx.Done() } // https://www.postgresql.org/docs/current/datatype.html @@ -367,19 +343,15 @@ func TestDataTypes(t *testing.T) { dbSchema := fixture.TargetSchema.Schema() dbName := dbSchema.Idents(nil)[0] // Extract first name part. crdbPool := fixture.TargetPool + pgPool := fixture.SourcePool - pgPool, cancel, err := setupPGPool(dbName) + err = setupPublication(ctx, pgPool, dbName, "ALL TABLES") r.NoError(err) - defer cancel() - - cancel, err = setupPublication(ctx, pgPool, dbName, "ALL TABLES") - r.NoError(err) - defer cancel() enumQ := fmt.Sprintf(`CREATE TYPE %s."Simple-Enum" AS ENUM ('foo', 'bar')`, dbSchema) _, err = crdbPool.ExecContext(ctx, enumQ) r.NoError(err, enumQ) - _, err = pgPool.Exec(ctx, enumQ) + _, err = pgPool.ExecContext(ctx, enumQ) r.NoError(err, enumQ) tcs = append(tcs, tc{ @@ -407,18 +379,18 @@ func TestDataTypes(t *testing.T) { // Substitute the created table name to send to postgres. schema = fmt.Sprintf(schema, tgt) - if _, err := pgPool.Exec(ctx, schema); !a.NoErrorf(err, "PG %s", tc.name) { + if _, err := pgPool.ExecContext(ctx, schema); !a.NoErrorf(err, "PG %s", tc.name) { return } // Insert dummy data into the source in a single transaction. - tx, err := pgPool.Begin(ctx) + tx, err := pgPool.BeginTx(ctx, &sql.TxOptions{}) if !a.NoError(err) { return } for valIdx, value := range tc.values { - if _, err := tx.Exec(ctx, + if _, err := tx.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2::%s)`, tgt, tc.name), valIdx, value, ); !a.NoErrorf(err, "%s %d %s", tc.name, valIdx, value) { return @@ -426,18 +398,21 @@ func TestDataTypes(t *testing.T) { } // Also insert a null value. - if _, err := tx.Exec(ctx, + if _, err := tx.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES ($1, NULL::%s)`, tgt, tc.name), -1, ); !a.NoError(err) { return } - a.NoError(tx.Commit(ctx)) + a.NoError(tx.Commit()) } log.Info(tgts) pubNameRaw := publicationName(dbName).Raw() // Start the connection, to demonstrate that we can backfill pending mutations. cfg := &Config{ + Sequencer: sequencer.Config{ + QuiescentPeriod: 100 * time.Millisecond, + }, Staging: sinkprod.StagingConfig{ Schema: fixture.StagingDB.Schema(), }, @@ -508,12 +483,9 @@ func TestEmptyTransactions(t *testing.T) { ctx := fixture.Context dbSchema := fixture.TargetSchema.Schema() dbName := dbSchema.Idents(nil)[0] // Extract first name part. + pgPool := fixture.SourcePool - pgPool, cancel, err := setupPGPool(dbName) - r.NoError(err) - defer cancel() - - rows, err := pgPool.Query(ctx, "select version()") + rows, err := pgPool.QueryContext(ctx, "select version()") r.NoError(err) defer rows.Close() var pgVersion string @@ -536,22 +508,24 @@ func TestEmptyTransactions(t *testing.T) { return } - if _, err := pgPool.Exec(ctx, schema); !a.NoErrorf(err, "PG %s", schema) { + if _, err := pgPool.ExecContext(ctx, schema); !a.NoErrorf(err, "PG %s", schema) { return } var localSchema = fmt.Sprintf("CREATE TABLE %s (k INT PRIMARY KEY, v int)", localTable.Table()) - if _, err := pgPool.Exec(ctx, localSchema); !a.NoErrorf(err, "PG %s", localTable) { + if _, err := pgPool.ExecContext(ctx, localSchema); !a.NoErrorf(err, "PG %s", localTable) { return } // setup replication for only the replTable - cancel, err = setupPublication(ctx, pgPool, dbName, fmt.Sprintf(`TABLE %s`, replTable)) + err = setupPublication(ctx, pgPool, dbName, fmt.Sprintf(`TABLE %s`, replTable)) r.NoError(err) - defer cancel() pubNameRaw := publicationName(dbName).Raw() // Start the connection, to demonstrate that we can backfill pending mutations. cfg := &Config{ + Sequencer: sequencer.Config{ + QuiescentPeriod: 100 * time.Millisecond, + }, Staging: sinkprod.StagingConfig{ Schema: fixture.StagingDB.Schema(), }, @@ -572,18 +546,18 @@ func TestEmptyTransactions(t *testing.T) { r.NoError(err) // Insert a value in the replTable - if _, err := pgPool.Exec(ctx, + if _, err := pgPool.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES (1,1)`, replTable)); !a.NoErrorf(err, "%s", replTable) { return } // Insert a value in the localTable - if _, err := pgPool.Exec(ctx, + if _, err := pgPool.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES (1,1)`, localTable)); !a.NoErrorf(err, "%s", replTable) { return } // Insert a value in the replTable - if _, err := pgPool.Exec(ctx, + if _, err := pgPool.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES (2,2)`, replTable)); !a.NoErrorf(err, "%s", replTable) { return } @@ -633,14 +607,10 @@ func TestToast(t *testing.T) { dbSchema := fixture.TargetSchema.Schema() dbName := dbSchema.Idents(nil)[0] // Extract first name part. crdbPool := fixture.TargetPool + pgPool := fixture.SourcePool - pgPool, cancel, err := setupPGPool(dbName) + err = setupPublication(ctx, pgPool, dbName, "ALL TABLES") r.NoError(err) - defer cancel() - - cancel, err = setupPublication(ctx, pgPool, dbName, "ALL TABLES") - r.NoError(err) - defer cancel() name := ident.NewTable(dbSchema, ident.New("toast")) // Create the schema in both locations. @@ -663,19 +633,19 @@ func TestToast(t *testing.T) { s text, t text, deleted bool not null default false)`, name) - if _, err := pgPool.Exec(ctx, schema); !a.NoErrorf(err, "PG %s", schema) { + if _, err := pgPool.ExecContext(ctx, schema); !a.NoErrorf(err, "PG %s", schema) { return } // Inserting an initial value. The t,s,j columns contains a large object that // will be toast-ed in the Postgres side. - _, err = pgPool.Exec(ctx, + _, err = pgPool.ExecContext(ctx, fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2, $3, $4, $5)`, name), 1, 0, longObj, longString, longString) r.NoError(err) // We update the "i" column few times without changing the j column. updates := 10 for i := 1; i <= updates; i++ { - if _, err := pgPool.Exec(ctx, + if _, err := pgPool.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET i = $2 WHERE k = $1`, name), 1, i, ); !a.NoErrorf(err, "%s", name) { return @@ -685,6 +655,9 @@ func TestToast(t *testing.T) { pubNameRaw := publicationName(dbName).Raw() cfg := &Config{ + Sequencer: sequencer.Config{ + QuiescentPeriod: 100 * time.Millisecond, + }, Staging: sinkprod.StagingConfig{ Schema: fixture.StagingDB.Schema(), }, @@ -749,71 +722,34 @@ func publicationName(database ident.Ident) ident.Ident { return ident.New(strings.ReplaceAll(database.Raw(), "-", "_")) } -func setupPGPool(database ident.Ident) (*pgxpool.Pool, func(), error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - baseConn, err := pgxpool.New(ctx, *pgConnString) - if err != nil { - return nil, func() {}, err - } - - if _, err := baseConn.Exec(ctx, - fmt.Sprintf("CREATE DATABASE %s", database), - ); err != nil { - return nil, func() {}, err - } - - // Open the pool, using the newly-created database. - next := baseConn.Config().Copy() - next.ConnConfig.Database = database.Raw() - retConn, err := pgxpool.NewWithConfig(ctx, next) - if err != nil { - return nil, func() {}, err - } - - return retConn, func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Can't drop the default database from its own connection. - _, err = baseConn.Exec(ctx, fmt.Sprintf("DROP DATABASE %s", database)) - if err != nil { - log.WithError(err).Error("could not drop database") - } - baseConn.Close() - log.Trace("finished pg pool cleanup") - }, nil -} - func setupPublication( - ctx context.Context, retConn *pgxpool.Pool, database ident.Ident, scope string, -) (func(), error) { + ctx *stopper.Context, pool *types.SourcePool, database ident.Ident, scope string, +) error { pubName := publicationName(database) - if _, err := retConn.Exec(ctx, + if _, err := pool.ExecContext(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR %s", pubName, scope), ); err != nil { - return func() {}, err + return err } - if _, err := retConn.Exec(ctx, + if _, err := pool.ExecContext(ctx, "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", pubName.Raw(), ); err != nil { - return func() {}, err + return err } - return func() { + ctx.Defer(func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, err := retConn.Exec(ctx, "SELECT pg_drop_replication_slot($1)", pubName.Raw()) + _, err := pool.ExecContext(ctx, "SELECT pg_drop_replication_slot($1)", pubName.Raw()) if err != nil { log.WithError(err).Error("could not drop replication slot") } - _, err = retConn.Exec(ctx, fmt.Sprintf("DROP PUBLICATION %s", pubName)) + _, err = pool.ExecContext(ctx, fmt.Sprintf("DROP PUBLICATION %s", pubName)) if err != nil { log.WithError(err).Error("could not drop publication") } - retConn.Close() - log.Trace("finished pg pool cleanup") - }, nil + }) + return nil } diff --git a/internal/source/pglogical/provider.go b/internal/source/pglogical/provider.go index 58649984..159a9b6d 100644 --- a/internal/source/pglogical/provider.go +++ b/internal/source/pglogical/provider.go @@ -17,12 +17,15 @@ package pglogical import ( + "fmt" + "github.com/cockroachdb/field-eng-powertools/notify" "github.com/cockroachdb/field-eng-powertools/stopper" scriptRT "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/sequencer/chaos" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/core" + "github.com/cockroachdb/replicator/internal/sequencer/decorators" "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/target/apply" "github.com/cockroachdb/replicator/internal/target/schemawatch" @@ -52,11 +55,11 @@ func ProvideConn( acc *apply.Acceptor, chaos *chaos.Chaos, config *Config, - imm *immediate.Immediate, + core *core.Core, memo types.Memo, + rekey *decorators.Rekey, scriptSeq *script.Sequencer, stagingPool *types.StagingPool, - targetPool *types.TargetPool, watchers types.Watchers, ) (*Conn, error) { if err := config.Preflight(); err != nil { @@ -105,7 +108,28 @@ func ProvideConn( sourceConfig := source.Config().Config.Copy() sourceConfig.RuntimeParams["replication"] = "database" - seq, err := scriptSeq.Wrap(ctx, imm) + conn := &Conn{ + columns: &ident.TableMap[[]types.ColData]{}, + memo: memo, + publicationName: config.Publication, + relations: make(map[uint32]ident.Table), + slotName: config.Slot, + sourceConfig: sourceConfig, + standbyTimeout: config.StandbyTimeout, + stagingDB: stagingPool, + target: config.TargetSchema, + walMemoKey: fmt.Sprintf("pglogical-wal-offset-%s", config.TargetSchema.Raw()), + } + + seq := sequencer.Sequencer(core) + // Handle REPLICA IDENTITY FULL by rekeying the mutations based on + // destination table before they hit the buffer. This ensures the + // transaction scheduler is able to guarantee ordering invariants. + seq, err = rekey.Wrap(ctx, seq) + if err != nil { + return nil, err + } + seq, err = scriptSeq.Wrap(ctx, seq) if err != nil { return nil, err } @@ -113,33 +137,27 @@ func ProvideConn( if err != nil { return nil, err } - connAcceptor, statVar, err := seq.Start(ctx, &sequencer.StartOptions{ - Delegate: types.OrderedAcceptorFrom(acc, watchers), - Bounds: ¬ify.Var[hlc.Range]{}, // Not currently used. + _, statVar, err := seq.Start(ctx, &sequencer.StartOptions{ + BatchReader: conn, + Bounds: notify.VarOf(hlc.RangeEmpty()), // Gating by checkpoint not required. + Delegate: types.OrderedAcceptorFrom(acc, watchers), Group: &types.TableGroup{ Name: ident.New(config.TargetSchema.Raw()), Enclosing: config.TargetSchema, + Tables: []ident.Table{ + ident.NewTable(config.TargetSchema, ident.New("pglogical")), + }, }, }) if err != nil { return nil, err } - conn := &Conn{ - acceptor: connAcceptor, - columns: &ident.TableMap[[]types.ColData]{}, - memo: memo, - publicationName: config.Publication, - relations: make(map[uint32]ident.Table), - slotName: config.Slot, - sourceConfig: sourceConfig, - standbyTimeout: config.StandbyTimeout, - stagingDB: stagingPool, - stat: statVar, - target: config.TargetSchema, - targetDB: targetPool, + if err := conn.ReportProgress(ctx, statVar); err != nil { + return nil, err } - return conn, conn.Start(ctx) + + return conn, nil } // ProvideEagerConfig is a hack to move up the evaluation of the user diff --git a/internal/source/pglogical/wire_gen.go b/internal/source/pglogical/wire_gen.go index a8ed2f2e..5e08c2c0 100644 --- a/internal/source/pglogical/wire_gen.go +++ b/internal/source/pglogical/wire_gen.go @@ -10,12 +10,13 @@ import ( "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/script" "github.com/cockroachdb/replicator/internal/sequencer/chaos" + "github.com/cockroachdb/replicator/internal/sequencer/core" "github.com/cockroachdb/replicator/internal/sequencer/decorators" - "github.com/cockroachdb/replicator/internal/sequencer/immediate" + "github.com/cockroachdb/replicator/internal/sequencer/scheduler" script2 "github.com/cockroachdb/replicator/internal/sequencer/script" "github.com/cockroachdb/replicator/internal/sinkprod" + "github.com/cockroachdb/replicator/internal/staging/leases" "github.com/cockroachdb/replicator/internal/staging/memo" - "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/staging/version" "github.com/cockroachdb/replicator/internal/target/apply" "github.com/cockroachdb/replicator/internal/target/dlq" @@ -87,14 +88,18 @@ func Start(context *stopper.Context, config *Config) (*PGLogical, error) { chaosChaos := &chaos.Chaos{ Config: sequencerConfig, } - stageConfig := &eagerConfig.Stage - stagers := stage.ProvideFactory(stageConfig, stagingPool, stagingSchema, context) - marker := decorators.ProvideMarker(stagingPool, stagers) - once := decorators.ProvideOnce(stagingPool, stagers) - retryTarget := decorators.ProvideRetryTarget(targetPool) - immediateImmediate := immediate.ProvideImmediate(sequencerConfig, targetPool, marker, once, retryTarget, stagers) + typesLeases, err := leases.ProvideLeases(context, stagingPool, stagingSchema) + if err != nil { + return nil, err + } + schedulerScheduler, err := scheduler.ProvideScheduler(context, sequencerConfig) + if err != nil { + return nil, err + } + coreCore := core.ProvideCore(sequencerConfig, typesLeases, schedulerScheduler, targetPool) + rekey := decorators.ProvideRekey(watchers) sequencer := script2.ProvideSequencer(loader, targetPool, watchers) - conn, err := ProvideConn(context, acceptor, chaosChaos, config, immediateImmediate, memoMemo, sequencer, stagingPool, targetPool, watchers) + conn, err := ProvideConn(context, acceptor, chaosChaos, config, coreCore, memoMemo, rekey, sequencer, stagingPool, watchers) if err != nil { return nil, err } diff --git a/internal/types/acceptors.go b/internal/types/acceptors.go index cd5cbc57..895fe10c 100644 --- a/internal/types/acceptors.go +++ b/internal/types/acceptors.go @@ -116,7 +116,8 @@ func (t *orderedAdapter) AcceptMultiBatch( ) error { // Peek to find the current schema. var commonSchema ident.Schema - for table := range batch.Mutations() { + for table, mutation := range batch.Mutations() { + _ = mutation commonSchema = table.Schema() break } diff --git a/internal/types/batches.go b/internal/types/batches.go index 7b267022..e05f13f3 100644 --- a/internal/types/batches.go +++ b/internal/types/batches.go @@ -26,6 +26,7 @@ import ( "iter" "slices" "sort" + "strings" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" @@ -55,6 +56,9 @@ type Batch[B any] interface { // Copy returns a deep copy of the Batch. Copy() B + // CopyInto copies the contents of the batch into accumulator. + CopyInto(acc Batch[any]) error + // Empty returns a copy of the Batch, but with no enclosed // mutations. This is useful when wanting to transform or filter a // batch. @@ -147,6 +151,17 @@ func (b *MultiBatch) Copy() *MultiBatch { return ret } +// CopyInto copies the batch into the Accumulator. The data will +// be ordered by time, table, and key. +func (b *MultiBatch) CopyInto(acc Batch[any]) error { + for _, sub := range b.Data { + if err := sub.CopyInto(acc); err != nil { + return err + } + } + return nil +} + // Count returns the number of enclosed mutations. func (b *MultiBatch) Count() int { ret := 0 @@ -223,6 +238,20 @@ func (b *TableBatch) Copy() *TableBatch { } } +// CopyInto copies the batch into the Accumulator ordered by key. +func (b *TableBatch) CopyInto(acc Batch[any]) error { + sorted := append([]Mutation(nil), b.Data...) + sort.Slice(sorted, func(i, j int) bool { + return bytes.Compare(sorted[i].Key, sorted[j].Key) < 0 + }) + for _, mut := range sorted { + if err := acc.Accumulate(b.Table, mut); err != nil { + return err + } + } + return nil +} + // Count returns the number of enclosed mutations. func (b *TableBatch) Count() int { return len(b.Data) @@ -290,6 +319,24 @@ func (b *TemporalBatch) Copy() *TemporalBatch { return ret } +// CopyInto copies the batch into the Accumulator. The data will be +// sorted by table name and then by key. +func (b *TemporalBatch) CopyInto(acc Batch[any]) error { + var tables []ident.Table + for table := range b.Data.Keys() { + tables = append(tables, table) + } + sort.Slice(tables, func(i, j int) bool { + return strings.Compare(tables[i].Raw(), tables[j].Raw()) < 0 + }) + for _, table := range tables { + if err := b.Data.GetZero(table).CopyInto(acc); err != nil { + return err + } + } + return nil +} + // Count returns the number of enclosed mutations. func (b *TemporalBatch) Count() int { ret := 0