Skip to content

Commit

Permalink
adding support for scripts + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sravotto committed Jan 23, 2024
1 parent ae6cff3 commit 90362f8
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 203 deletions.
2 changes: 1 addition & 1 deletion .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ services:
# TODO (silvano) improve start up time
test: bash -c '[ -f /asncdctools/src/asncdc.nlk ] '
interval: 1s
retries: 300
retries: 600
mysql-v5.7:
image: mysql:5.7
platform: linux/x86_64
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

PWD = $(shell pwd)
COVER_OUT ?= cover.out
IBM_DRIVER = ${PWD}/drivers/clidriver
Expand All @@ -6,7 +7,11 @@ export CGO_LDFLAGS=-L${IBM_DRIVER}/lib
export DYLD_LIBRARY_PATH=${IBM_DRIVER}/lib
export LD_LIBRARY_PATH=${IBM_DRIVER}/lib

all:
.PHONY: all clidriver db2 clean realclean testdb2

all: cdc-sink

cdc-sink:
go build

clidriver:
Expand Down
55 changes: 55 additions & 0 deletions internal/sinktest/scripttest/testdata/logical_test_db2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 The Cockroach Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

import * as api from "cdc-sink@v1";
import {Document, Table} from "cdc-sink@v1";

// The sentinel name will be replaced by the test rig. It would normally be
// "my_db.public" or "my_db" depending on the target product.
api.configureSource("{{ SCHEMA }}", {
dispatch: (doc: Document, meta: Document): Record<Table, Document[]> => {
console.trace(JSON.stringify(doc), JSON.stringify(meta));
let ret: Record<Table, Document> = {};
ret[meta.table] = [
{
PK: doc.PK,
ignored: 'by_configuration_below',
v_dispatched: doc.V, // Rename the property
}
];
return ret
}
})

// We introduce an unknown column in the dispatch function above.
// We'll add an extra configuration here to ignore it.
// The sentinel name would be replaced by "my_table".
api.configureTable("{{ TABLE }}", {
map: (doc: Document): Document => {
console.trace("map", JSON.stringify(doc));
if (doc.v_dispatched === undefined) {
throw "did not find expected property";
}
doc.v_mapped = doc.v_dispatched;
delete doc.v_dispatched; // Avoid schema-drift error due to unknown column.
return doc;
},
ignore: {
"ignored": true,
}
})
19 changes: 10 additions & 9 deletions internal/source/db2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,28 @@ type Config struct {
logical.BaseConfig
logical.LoopConfig

SourceConn string // Connection string for the source db.
SourceSchema ident.Schema
// The fields below are extracted by Preflight.
SourceConn string // Connection string for the source db.
SourceSchema ident.Schema // The schema we are replicating.

// The fields below are extracted by Preflight.
host string
password string
port uint16
user string
database string
}

var schema string

// Bind adds flags to the set. It delegates to the embedded Config.Bind.
func (c *Config) Bind(f *pflag.FlagSet) {
c.BaseConfig.Bind(f)
c.LoopConfig.LoopName = "db2"
c.LoopConfig.Bind(f)
f.StringVar(&c.SourceConn, "sourceConn", "",
"the source database's connection string")
var schema string
f.StringVar(&schema, "sourceSchema", "",
"the source schema")
c.SourceSchema = ident.MustSchema(ident.New(schema))
f.StringVar(&schema, "sourceSchema", "", "the source schema")

}

// Preflight updates the configuration with sane defaults or returns an
Expand All @@ -70,11 +70,12 @@ func (c *Config) Preflight() error {
if c.LoopName == "" {
return errors.New("no LoopName was configured")
}

if schema != "" {
c.SourceSchema = ident.MustSchema(ident.New(schema))
}
if c.SourceConn == "" {
return errors.New("no SourceConn was configured")
}

u, err := url.Parse(c.SourceConn)
if err != nil {
return err
Expand Down
51 changes: 37 additions & 14 deletions internal/source/db2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@ import (
"github.com/cockroachdb/cdc-sink/internal/util/stamp"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

// conn is responsible to pull the mutation from
// the db2 staging tables and applying them to the target
// the DB2 staging tables and applying them to the target
// database via the logical.Batch interface.
// The process leverages the DB2 SQL replication, similar to
// the debezium DB2 connector.
// To set up DB2 for SQL replication, see the instructions at
// https://debezium.io/documentation/reference/stable/connectors/db2.html#setting-up-db2
// Note: In DB2 identifiers (e.g. table names, column names) are converted to
// uppercase, unless they are in quotes.
type conn struct {
columns *ident.TableMap[[]types.ColData]
primaryKeys *ident.TableMap[map[int]int]
Expand All @@ -64,17 +67,20 @@ func New(config *Config) logical.Dialect {
func (c *conn) Process(
ctx *stopper.Context, ch <-chan logical.Message, events logical.Events,
) error {
var startBatch time.Time
var mutInBatch int
var batch logical.Batch
defer func() {
if batch != nil {
rollbackBatchSize.Observe(float64(mutInBatch))
_ = batch.OnRollback(ctx)
}
}()

for msg := range ch {
// Ensure that we resynchronize.
if logical.IsRollback(msg) {
if batch != nil {
rollbackBatchSize.Observe(float64(mutInBatch))
if err := batch.OnRollback(ctx); err != nil {
return err
}
Expand All @@ -88,6 +94,8 @@ func (c *conn) Process(
case beginOp:
var err error
batch, err = events.OnBegin(ctx)
mutInBatch = 0
startBatch = time.Now()
if err != nil {
return err
}
Expand All @@ -97,6 +105,8 @@ func (c *conn) Process(
if err != nil {
return err
}
batchSize.Observe(float64(mutInBatch))
batchLatency.Observe(float64(time.Since(startBatch).Seconds()))
batch = nil
case <-ctx.Done():
return ctx.Err()
Expand All @@ -110,10 +120,16 @@ func (c *conn) Process(
ident.MustSchema(ident.New(ev.table.sourceOwner)),
ident.New(ev.table.sourceTable))
targetTable := ident.NewTable(
c.config.TargetSchema,
c.config.TargetSchema.Schema(),
ident.New(ev.table.sourceTable))
targetCols, _ := c.columns.Get(sourceTable)
primaryKeys, _ := c.primaryKeys.Get(sourceTable)
targetCols, ok := c.columns.Get(sourceTable)
if !ok {
return errors.Errorf("unable to retrieve target columns for %s", sourceTable)
}
primaryKeys, ok := c.primaryKeys.Get(sourceTable)
if !ok {
return errors.Errorf("unable to retrieve primary keys for %s", sourceTable)
}
var err error
var mut types.Mutation
key := make([]any, len(primaryKeys))
Expand All @@ -130,7 +146,6 @@ func (c *conn) Process(
key[primaryKeys[idx]] = sourceCol
}
}
log.Debugf("%s %s %v\n", ev.op, sourceTable.Raw(), key)
mut.Key, err = json.Marshal(key)
if err != nil {
return err
Expand All @@ -141,8 +156,13 @@ func (c *conn) Process(
return err
}
}
script.AddMeta("db2", sourceTable, &mut)
err = batch.OnData(ctx, script.SourceName(sourceTable), targetTable, []types.Mutation{mut})
mutInBatch++
mutationCount.With(prometheus.Labels{
"table": sourceTable.Raw(),
"op": ev.op.String()}).Inc()
log.Tracef("%s %s %s %v\n", ev.op, sourceTable.Raw(), targetTable.Raw(), key)
script.AddMeta("db2", targetTable, &mut)
err = batch.OnData(ctx, script.SourceName(targetTable), targetTable, []types.Mutation{mut})
if err != nil {
return err
}
Expand Down Expand Up @@ -173,7 +193,7 @@ func (c *conn) ReadInto(
if err != nil {
return errors.Wrap(err, "cannot retrieve next sequence number")
}
log.Infof("NEXT [%x]\n", nextLsn.Value)
log.Debugf("NEXT [%x]\n", nextLsn.Value)
if nextLsn.Less(previousLsn) || nextLsn.Equal(previousLsn) || nextLsn.Equal(lsnZero()) {
select {
case <-time.After(1 * time.Second):
Expand All @@ -182,7 +202,7 @@ func (c *conn) ReadInto(
return nil
}
}
log.Infof("BEGIN [%x]\n", previousLsn.Value)
log.Debugf("BEGIN [%x]\n", previousLsn.Value)
ch <- message{
op: beginOp,
lsn: *previousLsn,
Expand All @@ -196,6 +216,7 @@ func (c *conn) ReadInto(
return errors.Wrap(err, "cannot get DB2 CDC tables")
}
for _, t := range tables {
log.Tracef("querying table %+v", t)
err := c.fetchColumnMetadata(ctx, db, &t)
if err != nil {
return errors.Wrap(err, "cannot retrieve colum names")
Expand All @@ -214,21 +235,23 @@ func (c *conn) ReadInto(
op: endOp,
lsn: *nextLsn,
}
log.Infof("COMMIT [%x]\n", nextLsn.Value)
log.Debugf("COMMIT [%x]\n", nextLsn.Value)
previousLsn = nextLsn
}

}

// ZeroStamp implements logical.Dialect.
func (*conn) ZeroStamp() stamp.Stamp {
func (c *conn) ZeroStamp() stamp.Stamp {
return lsnZero()
}

// Diagnostic implements diag.Diagnostic.
func (c *conn) Diagnostic(_ context.Context) any {
return map[string]any{
"hostname": c.config.host,
"database": c.config.database,
"hostname": c.config.host,
"database": c.config.database,
"defaultLsn": c.config.DefaultConsistentPoint,
"columns": c.columns,
}
}
Loading

0 comments on commit 90362f8

Please sign in to comment.