From 0b52b431ac4ae6c47022293526b7fb0dbaad3a38 Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Thu, 14 Mar 2024 22:31:17 -0400 Subject: [PATCH] Kafka connector integration test A simple integration test is part of this commit. It verifies that change events originated by a CockroachDB changefeed, and routed via a single node Kafka cluster are received by the connector and applied to a target CockroachDB datatabase. --- .github/docker-compose.yml | 20 +++ .github/workflows/go-tests.yaml | 4 +- internal/sinktest/all/integration.go | 5 + internal/source/kafka/integration_test.go | 203 ++++++++++++++++++++++ internal/source/kafka/provider.go | 12 +- internal/source/kafka/wire_gen.go | 12 +- 6 files changed, 241 insertions(+), 15 deletions(-) create mode 100644 internal/source/kafka/integration_test.go diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index 488d7f9d1..ee466cbe7 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -90,6 +90,26 @@ services: - "5401:5401" - "8082:8082" + # Single broker kafka cluster + kafka-v7.6: + image: confluentinc/cp-kafka:7.6.0 + depends_on: + - zookeeper-v7.6 + ports: + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper-v7.6:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-v7.6:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + zookeeper-v7.6: + image: confluentinc/cp-zookeeper:7.6.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + mysql-v5.7: image: mysql:5.7 platform: linux/x86_64 diff --git a/.github/workflows/go-tests.yaml b/.github/workflows/go-tests.yaml index cbe222332..b55f395dc 100644 --- a/.github/workflows/go-tests.yaml +++ b/.github/workflows/go-tests.yaml @@ -135,7 +135,9 @@ jobs: integration: postgresql-v14 - cockroachdb: v23.2 integration: postgresql-v15 - + # Test CRDB via a Kafka cluster + - cockroachdb: v23.2 + integration: kafka-v7.6 # Test CRDB -> PostgreSQL for migration backfill use cases. - cockroachdb: v23.2 target: postgresql-v11 diff --git a/internal/sinktest/all/integration.go b/internal/sinktest/all/integration.go index d05d07a0b..00cd6f06f 100644 --- a/internal/sinktest/all/integration.go +++ b/internal/sinktest/all/integration.go @@ -29,6 +29,11 @@ const ( // integration tests for some databases. We expect this to be of the // format "database-v123". IntegrationEnvName = "CDC_INTEGRATION" + // KafkaName must be kept in alignment with the + // .github/docker-compose.yml file and the integration matrix + // variable in workflows/tests.yaml. + KafkaName = "kafka" + // MySQLName must be kept in alignment with the // .github/docker-compose.yml file and the integration matrix // variable in workflows/tests.yaml. diff --git a/internal/source/kafka/integration_test.go b/internal/source/kafka/integration_test.go new file mode 100644 index 000000000..ce9c3f86e --- /dev/null +++ b/internal/source/kafka/integration_test.go @@ -0,0 +1,203 @@ +// Copyright 2024 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package kafka + +import ( + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/sinkprod" + "github.com/cockroachdb/cdc-sink/internal/sinktest" + "github.com/cockroachdb/cdc-sink/internal/sinktest/all" + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fixtureConfig struct { + chaos bool + script bool +} + +const ( + broker = "localhost:29092" +) + +// TestMain verifies that we can run the integration test for Kafka. +func TestMain(m *testing.M) { + all.IntegrationMain(m, all.KafkaName) +} + +// TODO (silvano): add test that's driven by a seqtest.Generator +// https://github.com/cockroachdb/cdc-sink/issues/789 + +// TestKafka verifies that we can process simple messages from Kafka. +// The kafka messages are generated by a CockroachDB changefeed in JSON format. +func TestKafka(t *testing.T) { + t.Run("immediate", func(t *testing.T) { testIntegration(t, &fixtureConfig{}) }) + t.Run("immediate chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true}) }) + t.Run("immediate script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true}) }) +} + +func testIntegration(t *testing.T, fc *fixtureConfig) { + a := assert.New(t) + r := require.New(t) + var stopped <-chan struct{} + defer func() { + if stopped != nil { + <-stopped + } + }() + + // Create a basic fixture to represent a source database. + sourceFixture, err := base.NewFixture(t) + r.NoError(err) + + ctx := sourceFixture.Context + + // Create a basic destination database connection. + destFixture, err := base.NewFixture(t) + r.NoError(err) + + targetDB := destFixture.TargetSchema.Schema() + targetPool := destFixture.TargetPool + + // Set up source and target tables. + source, err := sourceFixture.CreateSourceTable(ctx, "CREATE TABLE %s (pk INT PRIMARY KEY, v STRING)") + r.NoError(err) + + // Since we're creating the target table without using the helper + // CreateTable(), we need to manually refresh the target's Watcher. + target := ident.NewTable(targetDB, source.Name().Table()) + targetCol := "v" + if fc.script { + targetCol = "v_mapped" + } + _, err = targetPool.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (pk INT PRIMARY KEY, %s VARCHAR(2048))", target, targetCol)) + r.NoError(err) + + serverCfg, err := getConfig(destFixture, fc, target) + r.NoError(err) + + kafka, err := Start(ctx, serverCfg) + r.NoError(err) + + watcher, err := kafka.Conn.watchers.Get(targetDB) + r.NoError(err) + r.NoError(watcher.Refresh(ctx, targetPool)) + + createStmt := "CREATE CHANGEFEED FOR TABLE %s INTO" + + fmt.Sprintf("'kafka://%s?topic_prefix=%s.'", broker, targetDB.Raw()) + + " WITH updated,diff,resolved='5s',min_checkpoint_frequency='5s'" + r.NoError(source.Exec(ctx, createStmt)) + + // Add base data to the source table. + r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (1, 'one')")) + ct, err := source.RowCount(ctx) + r.NoError(err) + a.Equal(1, ct) + + // Wait for the backfilled value. + for { + ct, err := base.GetRowCount(ctx, targetPool, target) + r.NoError(err) + if ct >= 1 { + break + } + log.Infof("waiting for backfill %s", target) + time.Sleep(time.Second) + } + + // Update the first value + r.NoError(source.Exec(ctx, "UPSERT INTO %s (pk, v) VALUES (1, 'updated')")) + + // Insert an additional value + r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (2, 'two')")) + ct, err = source.RowCount(ctx) + r.NoError(err) + a.Equal(2, ct) + + // Wait for the streamed value. + for { + ct, err := base.GetRowCount(ctx, targetPool, target) + r.NoError(err) + if ct >= 2 { + break + } + log.Infof("waiting for stream %s", target) + time.Sleep(100 * time.Millisecond) + } + + // Also wait to see that the update was applied. + for { + var val string + r.NoError(targetPool.QueryRowContext(ctx, + fmt.Sprintf("SELECT %s FROM %s WHERE pk = 1", targetCol, target), + ).Scan(&val)) + if val == "updated" { + break + } + log.Debug("waiting for update") + time.Sleep(100 * time.Millisecond) + } + + metrics, err := prometheus.DefaultGatherer.Gather() + a.NoError(err) + log.WithField("metrics", metrics).Trace() + sinktest.CheckDiagnostics(ctx, t, kafka.Diagnostics) + +} + +// getConfig is a helper function to create a configuration for the connector +func getConfig(fixture *base.Fixture, fc *fixtureConfig, tgt ident.Table) (*Config, error) { + dbName := fixture.TargetSchema.Schema() + crdbPool := fixture.TargetPool + config := &Config{ + Staging: sinkprod.StagingConfig{ + Schema: fixture.StagingDB.Schema(), + }, + Target: sinkprod.TargetConfig{ + CommonConfig: sinkprod.CommonConfig{ + Conn: crdbPool.ConnectionString, + }, + ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier. + }, + TargetSchema: dbName, + + BatchSize: 100, + Brokers: []string{broker}, + Group: dbName.Raw(), + Strategy: "sticky", + Topics: []string{tgt.Raw()}, + } + if fc.chaos { + config.Sequencer.Chaos = 0.0005 + } + if fc.script { + config.Script = script.Config{ + FS: scripttest.ScriptFSFor(tgt), + MainPath: "/testdata/logical_test.ts", + } + } + return config, config.Preflight() +} diff --git a/internal/source/kafka/provider.go b/internal/source/kafka/provider.go index 1b2cf89c0..eb428b154 100644 --- a/internal/source/kafka/provider.go +++ b/internal/source/kafka/provider.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/sequencer" "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" - scriptSequencer "github.com/cockroachdb/cdc-sink/internal/sequencer/script" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" "github.com/cockroachdb/cdc-sink/internal/target/apply" "github.com/cockroachdb/cdc-sink/internal/types" @@ -47,7 +46,6 @@ func ProvideConn( chaos *chaos.Chaos, config *Config, memo types.Memo, - scriptSeq *scriptSequencer.Sequencer, stagingPool *types.StagingPool, targetPool *types.TargetPool, watchers types.Watchers, @@ -58,11 +56,11 @@ func ProvideConn( // ModeImmediate is the only mode supported for now. mode := notify.VarOf(switcher.ModeImmediate) sw = sw.WithMode(mode) - seq, err := scriptSeq.Wrap(ctx, sw) - if err != nil { - return nil, err - } - seq, err = chaos.Wrap(ctx, seq) // No-op if probability is 0. + // seq, err := scriptSeq.Wrap(ctx, sw) + // if err != nil { + // return nil, err + // } + seq, err := chaos.Wrap(ctx, sw) // No-op if probability is 0. if err != nil { return nil, err } diff --git a/internal/source/kafka/wire_gen.go b/internal/source/kafka/wire_gen.go index 788ee2500..888fdb2b7 100644 --- a/internal/source/kafka/wire_gen.go +++ b/internal/source/kafka/wire_gen.go @@ -10,11 +10,10 @@ import ( "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/sequencer/besteffort" "github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" + "github.com/cockroachdb/cdc-sink/internal/sequencer/core" "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" "github.com/cockroachdb/cdc-sink/internal/sequencer/scheduler" script2 "github.com/cockroachdb/cdc-sink/internal/sequencer/script" - "github.com/cockroachdb/cdc-sink/internal/sequencer/serial" - "github.com/cockroachdb/cdc-sink/internal/sequencer/shingle" "github.com/cockroachdb/cdc-sink/internal/sequencer/switcher" "github.com/cockroachdb/cdc-sink/internal/sinkprod" "github.com/cockroachdb/cdc-sink/internal/staging/leases" @@ -83,11 +82,10 @@ func Start(ctx *stopper.Context, config *Config) (*Kafka, error) { } stagers := stage.ProvideFactory(stagingPool, stagingSchema, ctx) bestEffort := besteffort.ProvideBestEffort(sequencerConfig, typesLeases, schedulerScheduler, stagingPool, stagers, targetPool, watchers) - immediateImmediate := &immediate.Immediate{} + coreCore := core.ProvideCore(sequencerConfig, typesLeases, schedulerScheduler, stagers, stagingPool, targetPool) + immediateImmediate := immediate.ProvideImmediate(targetPool) sequencer := script2.ProvideSequencer(loader, targetPool, watchers) - serialSerial := serial.ProvideSerial(sequencerConfig, typesLeases, stagers, stagingPool, targetPool) - shingleShingle := shingle.ProvideShingle(sequencerConfig, schedulerScheduler, stagers, stagingPool, targetPool) - switcherSwitcher := switcher.ProvideSequencer(bestEffort, diagnostics, immediateImmediate, sequencer, serialSerial, shingleShingle, stagingPool, targetPool) + switcherSwitcher := switcher.ProvideSequencer(bestEffort, coreCore, diagnostics, immediateImmediate, sequencer, stagingPool, targetPool) chaosChaos := &chaos.Chaos{ Config: sequencerConfig, } @@ -95,7 +93,7 @@ func Start(ctx *stopper.Context, config *Config) (*Kafka, error) { if err != nil { return nil, err } - conn, err := ProvideConn(ctx, acceptor, switcherSwitcher, chaosChaos, config, memoMemo, sequencer, stagingPool, targetPool, watchers) + conn, err := ProvideConn(ctx, acceptor, switcherSwitcher, chaosChaos, config, memoMemo, stagingPool, targetPool, watchers) if err != nil { return nil, err }