Skip to content

Commit

Permalink
crosscluster: deflake TestStreamIngestionJobWithRandomClient
Browse files Browse the repository at this point in the history
TestStreamIngestionJobWithRandomClient is flaking under stress with the
following errors:

```
pq: cutover time 1736520389.999999000,0 is before earliest safe cutover time 1736520390.771027375,0
```

```
"0" is not greater than "0"
```

The first error occurs because the frontier would sometimes include a
time that is earlier than the PTS created for the replication job. The
second error would occur if a cutover time was picked that is earlier
than any data written to the cluster.

The test now uses a different strategy for picking a cutover time. The
test ensures the cutover time is after the retained time and at least
one checkpoint.

Release note: none
Epic: CRDB-40896
  • Loading branch information
jeffswenson committed Jan 10, 2025
1 parent 4cf765b commit c2338be
Showing 1 changed file with 63 additions and 36 deletions.
99 changes: 63 additions & 36 deletions pkg/crosscluster/physical/replication_random_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"context"
gosql "database/sql"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/backup"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants.
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -37,27 +38,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return hlc.Timestamp{}, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return hlc.Timestamp{}, err
}
return replicationutils.ReplicatedTimeFromProgress(&progress), nil
}

func getTestRandomClientURI(
tenantID roachpb.TenantID, tenantName roachpb.TenantName,
) streamclient.ClusterUri {
Expand Down Expand Up @@ -120,6 +105,28 @@ func (sv *streamClientValidator) getValuesForKeyBelowTimestamp(
return sv.StreamValidator.GetValuesForKeyBelowTimestamp(key, timestamp)
}

// watchMaxCheckpointTimestamp updates an atomic pointer ever time a new
// checkpoint with a higher hlc is observed.
func watchMaxCheckpointTimestamp() (*atomic.Pointer[hlc.Timestamp], streamclient.InterceptFn) {
mu := &syncutil.Mutex{}
ts := &atomic.Pointer[hlc.Timestamp]{}
return ts, func(event crosscluster.Event, _ streamclient.SubscriptionToken) {
mu.Lock()
defer mu.Unlock()
switch event.Type() {
case crosscluster.CheckpointEvent:
maxTimestamp := ts.Load()
for _, rs := range event.GetCheckpoint().ResolvedSpans {
if maxTimestamp == nil || rs.Timestamp.After(*maxTimestamp) {
copy := rs.Timestamp
maxTimestamp = &copy
}
}
ts.Store(maxTimestamp)
}
}
}

// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is
// fed KVs from the random stream client. After receiving a certain number of
// resolved timestamp events the test completes the job to tear down the flow,
Expand All @@ -134,12 +141,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {

ctx := context.Background()

canBeCompletedCh := make(chan struct{})
const threshold = 10
mu := syncutil.Mutex{}
completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() {
canBeCompletedCh <- struct{}{}
})
maxCheckpointHlc, watchIntercepor := watchMaxCheckpointTimestamp()

// Register interceptors on the random stream client, which will be used by
// the processors.
Expand All @@ -162,7 +164,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}()

client.ClearInterceptors()
client.RegisterInterception(completeJobAfterCheckpoints)
client.RegisterInterception(watchIntercepor)
client.RegisterInterception(validateFnWithValidator(t, streamValidator))
client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
return replicationtestutils.SSTMaker(t, keyValues)
Expand Down Expand Up @@ -237,28 +239,53 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}
close(allowResponse)

// Wait for the job to signal that it is ready to be cutover, after it has
// received `threshold` resolved ts events.
<-canBeCompletedCh
close(canBeCompletedCh)
// Pick a cutover time based on when we observe a checkpoint that is older
// than the retained time. We wait for a checkpoint to make sure there is
// actually data written at the time we cutover to.
var cutoverTime time.Time
testutils.SucceedsSoon(t, func() error {
checkpointHlc := maxCheckpointHlc.Load()
if checkpointHlc == nil {
return errors.New("no checkpoint has been received")
}

var retainedTime time.Time
row := conn.QueryRow(
`SELECT retained_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
if err := row.Scan(&retainedTime); err != nil {
return err
}

if retainedTime.Before(checkpointHlc.GoTime()) {
cutoverTime = checkpointHlc.GoTime().Add(time.Microsecond)
return nil
}

return errors.New("waiting for a checkpoint that happens after the retained time")
})

// Ensure that the job has made some progress.
var replicatedTime hlc.Timestamp
// Wait for the replicated time to pass the cutover time. This ensures the
// test rolls back some data.
testutils.SucceedsSoon(t, func() error {
var err error
replicatedTime, err = getReplicatedTime(ingestionJobID, conn)
require.NoError(t, err)
if replicatedTime.IsEmpty() {
var replicatedTime gosql.NullTime
row := conn.QueryRow(
`SELECT replicated_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
if err := row.Scan(&replicatedTime); err != nil {
return err
}
if !replicatedTime.Valid {
return errors.New("ReplicatedTime is unset, no progress has been reported")
}
return nil
if cutoverTime.Before(replicatedTime.Time) {
return nil
}
return errors.New("replicated time has not yet passed the cutover time")
})

// Cutting over the job should shutdown the ingestion processors via a context
// cancellation, and subsequently rollback data above our frontier timestamp.
//
// Pick a cutover time just before the latest resolved timestamp.
cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
_, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime)
require.NoError(t, err)

Expand Down

0 comments on commit c2338be

Please sign in to comment.