Skip to content

Commit

Permalink
Kafka connector integration test
Browse files Browse the repository at this point in the history
A simple integration test is part of this commit. It verifies that change events
originated by a CockroachDB changefeed, and routed via a single node Kafka
cluster are received by the connector and applied to a target CockroachDB datatabase.
  • Loading branch information
sravotto committed Mar 20, 2024
1 parent e637869 commit 0b52b43
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 15 deletions.
20 changes: 20 additions & 0 deletions .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ services:
- "5401:5401"
- "8082:8082"

# Single broker kafka cluster
kafka-v7.6:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper-v7.6
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-v7.6:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-v7.6:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
zookeeper-v7.6:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

mysql-v5.7:
image: mysql:5.7
platform: linux/x86_64
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/go-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ jobs:
integration: postgresql-v14
- cockroachdb: v23.2
integration: postgresql-v15

# Test CRDB via a Kafka cluster
- cockroachdb: v23.2
integration: kafka-v7.6
# Test CRDB -> PostgreSQL for migration backfill use cases.
- cockroachdb: v23.2
target: postgresql-v11
Expand Down
5 changes: 5 additions & 0 deletions internal/sinktest/all/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
// integration tests for some databases. We expect this to be of the
// format "database-v123".
IntegrationEnvName = "CDC_INTEGRATION"
// KafkaName must be kept in alignment with the
// .github/docker-compose.yml file and the integration matrix
// variable in workflows/tests.yaml.
KafkaName = "kafka"

// MySQLName must be kept in alignment with the
// .github/docker-compose.yml file and the integration matrix
// variable in workflows/tests.yaml.
Expand Down
203 changes: 203 additions & 0 deletions internal/source/kafka/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2024 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package kafka

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/sinkprod"
"github.com/cockroachdb/cdc-sink/internal/sinktest"
"github.com/cockroachdb/cdc-sink/internal/sinktest/all"
"github.com/cockroachdb/cdc-sink/internal/sinktest/base"
"github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type fixtureConfig struct {
chaos bool
script bool
}

const (
broker = "localhost:29092"
)

// TestMain verifies that we can run the integration test for Kafka.
func TestMain(m *testing.M) {
all.IntegrationMain(m, all.KafkaName)
}

// TODO (silvano): add test that's driven by a seqtest.Generator
// https://github.com/cockroachdb/cdc-sink/issues/789

// TestKafka verifies that we can process simple messages from Kafka.
// The kafka messages are generated by a CockroachDB changefeed in JSON format.
func TestKafka(t *testing.T) {
t.Run("immediate", func(t *testing.T) { testIntegration(t, &fixtureConfig{}) })
t.Run("immediate chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true}) })
t.Run("immediate script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true}) })
}

func testIntegration(t *testing.T, fc *fixtureConfig) {
a := assert.New(t)
r := require.New(t)
var stopped <-chan struct{}
defer func() {
if stopped != nil {
<-stopped
}
}()

// Create a basic fixture to represent a source database.
sourceFixture, err := base.NewFixture(t)
r.NoError(err)

ctx := sourceFixture.Context

// Create a basic destination database connection.
destFixture, err := base.NewFixture(t)
r.NoError(err)

targetDB := destFixture.TargetSchema.Schema()
targetPool := destFixture.TargetPool

// Set up source and target tables.
source, err := sourceFixture.CreateSourceTable(ctx, "CREATE TABLE %s (pk INT PRIMARY KEY, v STRING)")
r.NoError(err)

// Since we're creating the target table without using the helper
// CreateTable(), we need to manually refresh the target's Watcher.
target := ident.NewTable(targetDB, source.Name().Table())
targetCol := "v"
if fc.script {
targetCol = "v_mapped"
}
_, err = targetPool.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (pk INT PRIMARY KEY, %s VARCHAR(2048))", target, targetCol))
r.NoError(err)

serverCfg, err := getConfig(destFixture, fc, target)
r.NoError(err)

kafka, err := Start(ctx, serverCfg)
r.NoError(err)

watcher, err := kafka.Conn.watchers.Get(targetDB)
r.NoError(err)
r.NoError(watcher.Refresh(ctx, targetPool))

createStmt := "CREATE CHANGEFEED FOR TABLE %s INTO" +
fmt.Sprintf("'kafka://%s?topic_prefix=%s.'", broker, targetDB.Raw()) +
" WITH updated,diff,resolved='5s',min_checkpoint_frequency='5s'"
r.NoError(source.Exec(ctx, createStmt))

// Add base data to the source table.
r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (1, 'one')"))
ct, err := source.RowCount(ctx)
r.NoError(err)
a.Equal(1, ct)

// Wait for the backfilled value.
for {
ct, err := base.GetRowCount(ctx, targetPool, target)
r.NoError(err)
if ct >= 1 {
break
}
log.Infof("waiting for backfill %s", target)
time.Sleep(time.Second)
}

// Update the first value
r.NoError(source.Exec(ctx, "UPSERT INTO %s (pk, v) VALUES (1, 'updated')"))

// Insert an additional value
r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (2, 'two')"))
ct, err = source.RowCount(ctx)
r.NoError(err)
a.Equal(2, ct)

// Wait for the streamed value.
for {
ct, err := base.GetRowCount(ctx, targetPool, target)
r.NoError(err)
if ct >= 2 {
break
}
log.Infof("waiting for stream %s", target)
time.Sleep(100 * time.Millisecond)
}

// Also wait to see that the update was applied.
for {
var val string
r.NoError(targetPool.QueryRowContext(ctx,
fmt.Sprintf("SELECT %s FROM %s WHERE pk = 1", targetCol, target),
).Scan(&val))
if val == "updated" {
break
}
log.Debug("waiting for update")
time.Sleep(100 * time.Millisecond)
}

metrics, err := prometheus.DefaultGatherer.Gather()
a.NoError(err)
log.WithField("metrics", metrics).Trace()
sinktest.CheckDiagnostics(ctx, t, kafka.Diagnostics)

}

// getConfig is a helper function to create a configuration for the connector
func getConfig(fixture *base.Fixture, fc *fixtureConfig, tgt ident.Table) (*Config, error) {
dbName := fixture.TargetSchema.Schema()
crdbPool := fixture.TargetPool
config := &Config{
Staging: sinkprod.StagingConfig{
Schema: fixture.StagingDB.Schema(),
},
Target: sinkprod.TargetConfig{
CommonConfig: sinkprod.CommonConfig{
Conn: crdbPool.ConnectionString,
},
ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier.
},
TargetSchema: dbName,

BatchSize: 100,
Brokers: []string{broker},
Group: dbName.Raw(),
Strategy: "sticky",
Topics: []string{tgt.Raw()},
}
if fc.chaos {
config.Sequencer.Chaos = 0.0005
}
if fc.script {
config.Script = script.Config{
FS: scripttest.ScriptFSFor(tgt),
MainPath: "/testdata/logical_test.ts",
}
}
return config, config.Preflight()
}
12 changes: 5 additions & 7 deletions internal/source/kafka/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/sequencer"
"github.com/cockroachdb/cdc-sink/internal/sequencer/chaos"
scriptSequencer "github.com/cockroachdb/cdc-sink/internal/sequencer/script"
"github.com/cockroachdb/cdc-sink/internal/sequencer/switcher"
"github.com/cockroachdb/cdc-sink/internal/target/apply"
"github.com/cockroachdb/cdc-sink/internal/types"
Expand All @@ -47,7 +46,6 @@ func ProvideConn(
chaos *chaos.Chaos,
config *Config,
memo types.Memo,
scriptSeq *scriptSequencer.Sequencer,
stagingPool *types.StagingPool,
targetPool *types.TargetPool,
watchers types.Watchers,
Expand All @@ -58,11 +56,11 @@ func ProvideConn(
// ModeImmediate is the only mode supported for now.
mode := notify.VarOf(switcher.ModeImmediate)
sw = sw.WithMode(mode)
seq, err := scriptSeq.Wrap(ctx, sw)
if err != nil {
return nil, err
}
seq, err = chaos.Wrap(ctx, seq) // No-op if probability is 0.
// seq, err := scriptSeq.Wrap(ctx, sw)
// if err != nil {
// return nil, err
// }
seq, err := chaos.Wrap(ctx, sw) // No-op if probability is 0.
if err != nil {
return nil, err
}
Expand Down
12 changes: 5 additions & 7 deletions internal/source/kafka/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0b52b43

Please sign in to comment.