From c2338bedd577483c8e9547e8db5cea1c3ad0f19d Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Fri, 10 Jan 2025 15:24:34 +0000 Subject: [PATCH] crosscluster: deflake TestStreamIngestionJobWithRandomClient TestStreamIngestionJobWithRandomClient is flaking under stress with the following errors: ``` pq: cutover time 1736520389.999999000,0 is before earliest safe cutover time 1736520390.771027375,0 ``` ``` "0" is not greater than "0" ``` The first error occurs because the frontier would sometimes include a time that is earlier than the PTS created for the replication job. The second error would occur if a cutover time was picked that is earlier than any data written to the cluster. The test now uses a different strategy for picking a cutover time. The test ensures the cutover time is after the retained time and at least one checkpoint. Release note: none Epic: CRDB-40896 --- .../replication_random_client_test.go | 99 ++++++++++++------- 1 file changed, 63 insertions(+), 36 deletions(-) diff --git a/pkg/crosscluster/physical/replication_random_client_test.go b/pkg/crosscluster/physical/replication_random_client_test.go index b97f283c79ed..e8e2d9e1c114 100644 --- a/pkg/crosscluster/physical/replication_random_client_test.go +++ b/pkg/crosscluster/physical/replication_random_client_test.go @@ -9,6 +9,7 @@ import ( "context" gosql "database/sql" "fmt" + "sync/atomic" "testing" "time" @@ -16,8 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants. + "github.com/cockroachdb/cockroach/pkg/crosscluster" "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -37,27 +38,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) { - var progressBytes []byte - if err := sqlDB.QueryRow( - `SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID, - ).Scan(&progressBytes); err != nil { - return hlc.Timestamp{}, err - } - var progress jobspb.Progress - if err := protoutil.Unmarshal(progressBytes, &progress); err != nil { - return hlc.Timestamp{}, err - } - return replicationutils.ReplicatedTimeFromProgress(&progress), nil -} - func getTestRandomClientURI( tenantID roachpb.TenantID, tenantName roachpb.TenantName, ) streamclient.ClusterUri { @@ -120,6 +105,28 @@ func (sv *streamClientValidator) getValuesForKeyBelowTimestamp( return sv.StreamValidator.GetValuesForKeyBelowTimestamp(key, timestamp) } +// watchMaxCheckpointTimestamp updates an atomic pointer ever time a new +// checkpoint with a higher hlc is observed. +func watchMaxCheckpointTimestamp() (*atomic.Pointer[hlc.Timestamp], streamclient.InterceptFn) { + mu := &syncutil.Mutex{} + ts := &atomic.Pointer[hlc.Timestamp]{} + return ts, func(event crosscluster.Event, _ streamclient.SubscriptionToken) { + mu.Lock() + defer mu.Unlock() + switch event.Type() { + case crosscluster.CheckpointEvent: + maxTimestamp := ts.Load() + for _, rs := range event.GetCheckpoint().ResolvedSpans { + if maxTimestamp == nil || rs.Timestamp.After(*maxTimestamp) { + copy := rs.Timestamp + maxTimestamp = © + } + } + ts.Store(maxTimestamp) + } + } +} + // TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is // fed KVs from the random stream client. After receiving a certain number of // resolved timestamp events the test completes the job to tear down the flow, @@ -134,12 +141,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { ctx := context.Background() - canBeCompletedCh := make(chan struct{}) - const threshold = 10 - mu := syncutil.Mutex{} - completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { - canBeCompletedCh <- struct{}{} - }) + maxCheckpointHlc, watchIntercepor := watchMaxCheckpointTimestamp() // Register interceptors on the random stream client, which will be used by // the processors. @@ -162,7 +164,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { }() client.ClearInterceptors() - client.RegisterInterception(completeJobAfterCheckpoints) + client.RegisterInterception(watchIntercepor) client.RegisterInterception(validateFnWithValidator(t, streamValidator)) client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable { return replicationtestutils.SSTMaker(t, keyValues) @@ -237,28 +239,53 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { } close(allowResponse) - // Wait for the job to signal that it is ready to be cutover, after it has - // received `threshold` resolved ts events. - <-canBeCompletedCh - close(canBeCompletedCh) + // Pick a cutover time based on when we observe a checkpoint that is older + // than the retained time. We wait for a checkpoint to make sure there is + // actually data written at the time we cutover to. + var cutoverTime time.Time + testutils.SucceedsSoon(t, func() error { + checkpointHlc := maxCheckpointHlc.Load() + if checkpointHlc == nil { + return errors.New("no checkpoint has been received") + } + + var retainedTime time.Time + row := conn.QueryRow( + `SELECT retained_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`) + if err := row.Scan(&retainedTime); err != nil { + return err + } + + if retainedTime.Before(checkpointHlc.GoTime()) { + cutoverTime = checkpointHlc.GoTime().Add(time.Microsecond) + return nil + } + + return errors.New("waiting for a checkpoint that happens after the retained time") + }) - // Ensure that the job has made some progress. - var replicatedTime hlc.Timestamp + // Wait for the replicated time to pass the cutover time. This ensures the + // test rolls back some data. testutils.SucceedsSoon(t, func() error { - var err error - replicatedTime, err = getReplicatedTime(ingestionJobID, conn) - require.NoError(t, err) - if replicatedTime.IsEmpty() { + var replicatedTime gosql.NullTime + row := conn.QueryRow( + `SELECT replicated_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`) + if err := row.Scan(&replicatedTime); err != nil { + return err + } + if !replicatedTime.Valid { return errors.New("ReplicatedTime is unset, no progress has been reported") } - return nil + if cutoverTime.Before(replicatedTime.Time) { + return nil + } + return errors.New("replicated time has not yet passed the cutover time") }) // Cutting over the job should shutdown the ingestion processors via a context // cancellation, and subsequently rollback data above our frontier timestamp. // // Pick a cutover time just before the latest resolved timestamp. - cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) _, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime) require.NoError(t, err)