Skip to content

Commit

Permalink
c2c: add external connection tests
Browse files Browse the repository at this point in the history
This patch increases test coverage for replication streams created with an
external connection to source. Specifically, this patch adds a dedicated
External Connection client test and metamorphically creates the c2c e2e tests
with an external connection.

Fixes: cockroachdb#110449

Release note: None
  • Loading branch information
msbutler committed Sep 21, 2023
1 parent 2a23692 commit 370c781
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 4 deletions.
30 changes: 27 additions & 3 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"net/url"
"sort"
"testing"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -72,6 +74,8 @@ type TenantStreamingClustersArgs struct {

MultitenantSingleClusterNumNodes int
MultiTenantSingleClusterTestRegions []string

NoMetamorphicExternalConnection bool
}

var DefaultTenantStreamingClustersArgs = TenantStreamingClustersArgs{
Expand Down Expand Up @@ -112,6 +116,8 @@ type TenantStreamingClusters struct {
DestSysSQL *sqlutils.SQLRunner
DestTenantConn *gosql.DB
DestTenantSQL *sqlutils.SQLRunner

Rng *rand.Rand
}

func (c *TenantStreamingClusters) setupSrcTenant() {
Expand Down Expand Up @@ -242,16 +248,29 @@ func (c *TenantStreamingClusters) Cutover(

// StartStreamReplication producer job ID and ingestion job ID.
func (c *TenantStreamingClusters) StartStreamReplication(ctx context.Context) (int, int) {
c.DestSysSQL.Exec(c.T, c.BuildCreateTenantQuery())

// 50% of time, start replication stream via an external connection.
var externalConnection string
if c.Rng.Intn(2) == 0 && !c.Args.NoMetamorphicExternalConnection {
externalConnection = "replication-source-addr"
c.DestSysSQL.Exec(c.T, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`,
externalConnection, c.SrcURL.String()))
}

c.DestSysSQL.Exec(c.T, c.BuildCreateTenantQuery(externalConnection))
streamProducerJobID, ingestionJobID := GetStreamJobIds(c.T, ctx, c.DestSysSQL, c.Args.DestTenantName)
return streamProducerJobID, ingestionJobID
}

func (c *TenantStreamingClusters) BuildCreateTenantQuery() string {
func (c *TenantStreamingClusters) BuildCreateTenantQuery(externalConnection string) string {
sourceURI := c.SrcURL.String()
if externalConnection != "" {
sourceURI = fmt.Sprintf("external://%s", externalConnection)
}
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'",
c.Args.DestTenantName,
c.Args.SrcTenantName,
c.SrcURL.String())
sourceURI)
if c.Args.RetentionTTLSeconds > 0 {
streamReplStmt = fmt.Sprintf("%s WITH RETENTION = '%ds'", streamReplStmt, c.Args.RetentionTTLSeconds)
}
Expand Down Expand Up @@ -328,6 +347,8 @@ func CreateMultiTenantStreamingCluster(
cluster, url, cleanup := startC2CTestCluster(ctx, t, serverArgs,
args.MultitenantSingleClusterNumNodes, args.MultiTenantSingleClusterTestRegions)

rng, _ := randutil.NewPseudoRand()

destNodeIdx := args.MultitenantSingleClusterNumNodes - 1
tsc := &TenantStreamingClusters{
T: t,
Expand All @@ -340,6 +361,7 @@ func CreateMultiTenantStreamingCluster(
DestCluster: cluster,
DestSysSQL: sqlutils.MakeSQLRunner(cluster.ServerConn(destNodeIdx)),
DestSysServer: cluster.Server(destNodeIdx),
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
Expand Down Expand Up @@ -374,6 +396,7 @@ func CreateTenantStreamingClusters(
})

require.NoError(t, g.Wait())
rng, _ := randutil.NewPseudoRand()

tsc := &TenantStreamingClusters{
T: t,
Expand All @@ -386,6 +409,7 @@ func CreateTenantStreamingClusters(
DestCluster: destCluster,
DestSysSQL: sqlutils.MakeSQLRunner(destCluster.ServerConn(0)),
DestSysServer: destCluster.Server(0),
Rng: rng,
}
tsc.setupSrcTenant()
tsc.init()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,14 @@ go_test(
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/cancelchecker",
"//pkg/util/ctxgroup",
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -138,8 +144,48 @@ func TestGetFirstActiveClientEmpty(t *testing.T) {

}

func TestExternalConnectionClient(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly})
defer srv.Stopper().Stop(ctx)

sql := sqlutils.MakeSQLRunner(db)
pgURL, cleanupSinkCert := sqlutils.PGUrl(t, srv.AdvSQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupSinkCert()

externalConnection := "replication-source-addr"
sql.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`,
externalConnection, pgURL.String()))
nonExistentConnection := "i-dont-exist"
address := streamingccl.StreamAddress(fmt.Sprintf("external://%s", externalConnection))
dontExistAddress := streamingccl.StreamAddress(fmt.Sprintf("external://%s", nonExistentConnection))

isqlDB := srv.InternalDB().(isql.DB)
client, err := NewStreamClient(ctx, address, isqlDB)
require.NoError(t, err)
require.NoError(t, client.Dial(ctx))
_, err = NewStreamClient(ctx, dontExistAddress, isqlDB)
require.Contains(t, err.Error(), "failed to load external connection object")

externalConnURL, err := address.URL()
require.NoError(t, err)
spanCfgClient, err := NewSpanConfigStreamClient(ctx, externalConnURL, isqlDB)
require.NoError(t, err)
require.NoError(t, spanCfgClient.Dial(ctx))
dontExistURL, err := dontExistAddress.URL()
require.NoError(t, err)
_, err = NewSpanConfigStreamClient(ctx, dontExistURL, isqlDB)
require.Contains(t, err.Error(), "failed to load external connection object")
}

func TestGetFirstActiveClient(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

client := GetRandomStreamClientSingletonForTesting()
defer func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestDataDriven(t *testing.T) {

case "create-replication-clusters":
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.NoMetamorphicExternalConnection = d.HasArg("no-external-conn")
var cleanup func()
ds.replicationClusters, cleanup = replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
ds.cleanupFns = append(ds.cleanupFns, func() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ func TestTenantStreamingShowTenant(t *testing.T) {

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.NoMetamorphicExternalConnection = true

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/testdata/simple
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create-replication-clusters
create-replication-clusters no-external-conn
----

start-replication-stream
Expand Down

0 comments on commit 370c781

Please sign in to comment.