Skip to content

Commit

Permalink
Kafka connector integration test
Browse files Browse the repository at this point in the history
A simple integration test is part of this commit. It verifies that change events
originated by a CockroachDB changefeed, and routed via a single node Kafka
cluster are received by the connector and applied to a target CockroachDB datatabase.
  • Loading branch information
sravotto committed Mar 19, 2024
1 parent 21810d3 commit 601a361
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 0 deletions.
20 changes: 20 additions & 0 deletions .github/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,26 @@ services:
entrypoint: /cockroach/cockroach
command: start-single-node --insecure --store type=mem,size=2G --listen-addr :5401 --http-addr :8082

# Single broker kafka cluster
kafka-v7.6:
image: confluentinc/cp-kafka:7.6.0
depends_on:
- zookeeper-v7.6
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-v7.6:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-v7.6:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
zookeeper-v7.6:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

mysql-v5.7:
image: mysql:5.7
platform: linux/x86_64
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/go-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ jobs:
- cockroachdb: v23.2
target: oracle-v21.3
targetConn: "oracle://system:[email protected]:1521/XEPDB1"
- cockroachdb: v23.2
integration: kafka-v7.6

# Test CRDB -> PostgreSQL for migration backfill use cases.
- cockroachdb: v23.2
Expand Down
5 changes: 5 additions & 0 deletions internal/sinktest/all/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
// integration tests for some databases. We expect this to be of the
// format "database-v123".
IntegrationEnvName = "CDC_INTEGRATION"
// KafkaName must be kept in alignment with the
// .github/docker-compose.yml file and the integration matrix
// variable in workflows/tests.yaml.
KafkaName = "kafka"

// MySQLName must be kept in alignment with the
// .github/docker-compose.yml file and the integration matrix
// variable in workflows/tests.yaml.
Expand Down
200 changes: 200 additions & 0 deletions internal/source/kafka/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2024 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package kafka

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/sinkprod"
"github.com/cockroachdb/cdc-sink/internal/sinktest"
"github.com/cockroachdb/cdc-sink/internal/sinktest/all"
"github.com/cockroachdb/cdc-sink/internal/sinktest/base"
"github.com/cockroachdb/cdc-sink/internal/sinktest/scripttest"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type fixtureConfig struct {
chaos bool
script bool
}

const (
broker = "localhost:29092"
)

// TestMain verifies that we can run the integration test for Kafka.
func TestMain(m *testing.M) {
all.IntegrationMain(m, all.KafkaName)
}

// TestKafka verifies that we can process simple messages from Kafka.
// The kafka messages are generated by a CockroachDB changefeed in JSON format.
func TestKafka(t *testing.T) {
t.Run("immediate", func(t *testing.T) { testIntegration(t, &fixtureConfig{}) })
t.Run("immediate chaos", func(t *testing.T) { testIntegration(t, &fixtureConfig{chaos: true}) })
t.Run("immediate script", func(t *testing.T) { testIntegration(t, &fixtureConfig{script: true}) })
}

func testIntegration(t *testing.T, fc *fixtureConfig) {
a := assert.New(t)
r := require.New(t)
var stopped <-chan struct{}
defer func() {
if stopped != nil {
<-stopped
}
}()

// Create a basic fixture to represent a source database.
sourceFixture, err := base.NewFixture(t)
r.NoError(err)

ctx := sourceFixture.Context

// Create a basic destination database connection.
destFixture, err := base.NewFixture(t)
r.NoError(err)

targetDB := destFixture.TargetSchema.Schema()
targetPool := destFixture.TargetPool

// Set up source and target tables.
source, err := sourceFixture.CreateSourceTable(ctx, "CREATE TABLE %s (pk INT PRIMARY KEY, v STRING)")
r.NoError(err)

// Since we're creating the target table without using the helper
// CreateTable(), we need to manually refresh the target's Watcher.
target := ident.NewTable(targetDB, source.Name().Table())
targetCol := "v"
if fc.script {
targetCol = "v_mapped"
}
_, err = targetPool.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (pk INT PRIMARY KEY, %s VARCHAR(2048))", target, targetCol))
r.NoError(err)

serverCfg, err := getConfig(destFixture, fc, target)
r.NoError(err)

kafka, err := Start(ctx, serverCfg)
r.NoError(err)

watcher, err := kafka.Conn.watchers.Get(targetDB)
r.NoError(err)
r.NoError(watcher.Refresh(ctx, targetPool))

createStmt := "CREATE CHANGEFEED FOR TABLE %s INTO" +
fmt.Sprintf("'kafka://%s?topic_prefix=%s.'", broker, targetDB.Raw()) +
" WITH updated,diff,resolved='5s',min_checkpoint_frequency='5s'"
r.NoError(source.Exec(ctx, createStmt))

// Add base data to the source table.
r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (1, 'one')"))
ct, err := source.RowCount(ctx)
r.NoError(err)
a.Equal(1, ct)

// Wait for the backfilled value.
for {
ct, err := base.GetRowCount(ctx, targetPool, target)
r.NoError(err)
if ct >= 1 {
break
}
log.Infof("waiting for backfill %s", target)
time.Sleep(time.Second)
}

// Update the first value
r.NoError(source.Exec(ctx, "UPSERT INTO %s (pk, v) VALUES (1, 'updated')"))

// Insert an additional value
r.NoError(source.Exec(ctx, "INSERT INTO %s (pk, v) VALUES (2, 'two')"))
ct, err = source.RowCount(ctx)
r.NoError(err)
a.Equal(2, ct)

// Wait for the streamed value.
for {
ct, err := base.GetRowCount(ctx, targetPool, target)
r.NoError(err)
if ct >= 2 {
break
}
log.Infof("waiting for stream %s", target)
time.Sleep(100 * time.Millisecond)
}

// Also wait to see that the update was applied.
for {
var val string
r.NoError(targetPool.QueryRowContext(ctx,
fmt.Sprintf("SELECT %s FROM %s WHERE pk = 1", targetCol, target),
).Scan(&val))
if val == "updated" {
break
}
log.Debug("waiting for update")
time.Sleep(100 * time.Millisecond)
}

metrics, err := prometheus.DefaultGatherer.Gather()
a.NoError(err)
log.WithField("metrics", metrics).Trace()
sinktest.CheckDiagnostics(ctx, t, kafka.Diagnostics)

}

// getConfig is a helper function to create a configuration for the connector
func getConfig(fixture *base.Fixture, fc *fixtureConfig, tgt ident.Table) (*Config, error) {
dbName := fixture.TargetSchema.Schema()
crdbPool := fixture.TargetPool
config := &Config{
Staging: sinkprod.StagingConfig{
Schema: fixture.StagingDB.Schema(),
},
Target: sinkprod.TargetConfig{
CommonConfig: sinkprod.CommonConfig{
Conn: crdbPool.ConnectionString,
},
ApplyTimeout: 2 * time.Minute, // Increase to make using the debugger easier.
},
TargetSchema: dbName,

batchSize: 100,
brokers: []string{broker},
group: dbName.Raw(),
strategy: "sticky",
topics: []string{tgt.Raw()},
}
if fc.chaos {
config.Sequencer.Chaos = 0.0005
}
if fc.script {
config.Script = script.Config{
FS: scripttest.ScriptFSFor(tgt),
MainPath: "/testdata/logical_test.ts",
}
}
return config, config.Preflight()
}

0 comments on commit 601a361

Please sign in to comment.