Skip to content

Commit

Permalink
passed for both pg and mysql integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouXing19 committed Nov 7, 2024
1 parent d2e6bec commit ceb8e5b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ services:
ports:
- "3306:3306"
mysql-v8:
image: mysql:8.0
image: mysql:8-debian
platform: linux/x86_64
environment:
MYSQL_ROOT_PASSWORD: SoupOrSecret
MYSQL_DATABASE: _replicator
Expand Down
16 changes: 8 additions & 8 deletions internal/sequencer/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ func (w *wrapper) Start(
}, w.watchers)
}

// Initialize downstream sequencer.
acc, stat, err := w.delegate.Start(ctx, opts)
if err != nil {
return nil, nil, err
}

// Install the source-phase acceptor. This provides the user with
// the opportunity to rewrite mutations before they are presented to
// the upstream sequencer.
Expand All @@ -111,13 +105,19 @@ func (w *wrapper) Start(
return nil, nil, err
}

acc = &sourceAcceptor{
delegate: acc,
opts.Delegate = &sourceAcceptor{
delegate: opts.Delegate,
group: opts.Group,
sourceBindings: sourceBindings,
watcher: watcher,
}
}

// Initialize downstream sequencer.
acc, stat, err := w.delegate.Start(ctx, opts)
if err != nil {
return nil, nil, err
}
return acc, stat, nil
}

Expand Down
23 changes: 23 additions & 0 deletions internal/sinktest/base/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,29 @@ func ProvideSourceSchema(
) (sinktest.SourceSchema, error) {
sch, err := provideSchema(ctx, pool, "src")
log.Infof("source schema: %s", sch)

// In PostgresSQL, connections are tightly coupled to the target
// database. Cross-database queries are generally unsupported, as
// opposed to CockroachDB, which allows any database to be queried
// from any connection.
//
// To resolve this, we're going to re-open the target database
// connection so that the connection uses the schema that we have
// just created. We also need to recreate the statement cache so
// that it's associated with the newly-constructed database
// connection.
if pool.Info().Product == types.ProductPostgreSQL {
db, _ := sch.Split()
conn := fmt.Sprintf("%s/%s", pool.ConnectionString, db.Raw())
next, err := stdpool.OpenPgxAsTarget(ctx, conn)
if err != nil {
return sinktest.SourceSchema{}, err
}

pool.ConnectionString = conn
pool.DB = next.DB
}

return sinktest.SourceSchema(sch), err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/source/mylogical/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func testMYLogical(t *testing.T, fc *fixtureConfig) {
if err := crdbPool.QueryRowContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", tgt)).Scan(&count); !a.NoError(err) {
return
}
log.Trace("backfill count", count)
log.Infof("backfill count %d", count)
if count == rowCount {
break
}
Expand Down
2 changes: 2 additions & 0 deletions internal/types/acceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (t *orderedAdapter) flush(
opts *AcceptOptions,
) error {
for _, table := range order {
// table: t
// acc: t_p_0
if muts, ok := acc.Get(table); ok {
nextBatch := &TableBatch{Table: table, Data: muts}
if err := t.AcceptTableBatch(ctx, nextBatch, opts); err != nil {
Expand Down

0 comments on commit ceb8e5b

Please sign in to comment.