diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index e2b612ba6088..b319585a02da 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -19,7 +19,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
-changefeed.min_highwater_advance duration 0s minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes application
+changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration application
changefeed.protect_timestamp_interval duration 10m0s controls how often the changefeed forwards its protected timestamp to the resolved timestamp application
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index ddb8bbe2caef..4c11feeb0ea4 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -24,7 +24,7 @@
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation | Serverless/Dedicated/Self-Hosted |
changefeed.frontier_highwater_lag_checkpoint_threshold
| duration | 10m0s | controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled | Serverless/Dedicated/Self-Hosted |
changefeed.memory.per_changefeed_limit
| byte size | 512 MiB | controls amount of data that can be buffered per changefeed | Serverless/Dedicated/Self-Hosted |
-changefeed.min_highwater_advance
| duration | 0s | minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes | Serverless/Dedicated/Self-Hosted |
+changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance)
| duration | 0s | minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress | Serverless/Dedicated/Self-Hosted |
changefeed.node_throttle_config
| string |
| specifies node level throttling configuration for all changefeeeds | Serverless/Dedicated/Self-Hosted |
changefeed.protect_timestamp.max_age
| duration | 96h0m0s | fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration | Serverless/Dedicated/Self-Hosted |
changefeed.protect_timestamp_interval
| duration | 10m0s | controls how often the changefeed forwards its protected timestamp to the resolved timestamp | Serverless/Dedicated/Self-Hosted |
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index 08c3bb6bba0d..01cb2b897ac9 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -1105,19 +1105,21 @@ func (j *jobState) canCheckpointSpans() bool {
}
// canCheckpointHighWatermark returns true if we should update job high water mark (i.e. progress).
-// Normally, whenever frontier changes, we update high water mark.
+// Normally, whenever the frontier changes, we update the high water mark.
// However, if the rate of frontier changes is too high, we want to slow down
-// the frequency of job progress updates. We do this by skipping some updates
-// if the time to update the job progress is greater than the delta between
-// previous and the current progress update time.
+// the frequency of job progress updates. We do this by enforcing a minimum
+// interval between high water updates, which is the greater of the average time
+// it takes to update the job progress and changefeed.resolved_timestamp.min_update_interval.
func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
if !(frontierChanged || j.progressUpdatesSkipped) {
return false
}
- minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
- if j.checkpointDuration > 0 &&
- j.ts.Now().Before(j.lastProgressUpdate.Add(j.checkpointDuration+minAdvance)) {
+ minInterval := max(
+ j.checkpointDuration,
+ changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV),
+ )
+ if j.ts.Now().Before(j.lastProgressUpdate.Add(minInterval)) {
// Updates are too rapid; skip some.
j.progressUpdatesSkipped = true
return false
@@ -1129,16 +1131,15 @@ func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
// checkpointCompleted must be called when job checkpoint completes.
// checkpointDuration indicates how long the checkpoint took.
func (j *jobState) checkpointCompleted(ctx context.Context, checkpointDuration time.Duration) {
- minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
if j.progressUpdatesSkipped {
// Log message if we skipped updates for some time.
- warnThreshold := 2 * minAdvance
+ warnThreshold := 2 * changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV)
if warnThreshold < 60*time.Second {
warnThreshold = 60 * time.Second
}
behind := j.ts.Now().Sub(j.lastProgressUpdate)
if behind > warnThreshold {
- log.Warningf(ctx, "high water mark update delayed by %s; mean checkpoint duration %s",
+ log.Warningf(ctx, "high water mark update was delayed by %s; mean checkpoint duration %s",
behind, j.checkpointDuration)
}
}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index d6c8e8f99544..e9ebd82b77cd 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -7556,7 +7556,7 @@ func TestCheckpointFrequency(t *testing.T) {
// If we also specify minimum amount of time between updates, we would skip updates
// until enough time has elapsed.
minAdvance := 10 * time.Minute
- changefeedbase.MinHighWaterMarkCheckpointAdvance.Override(ctx, &js.settings.SV, minAdvance)
+ changefeedbase.ResolvedTimestampMinUpdateInterval.Override(ctx, &js.settings.SV, minAdvance)
require.False(t, js.canCheckpointHighWatermark(frontierAdvanced))
ts.Advance(minAdvance)
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index 221ef13103f9..ec247e53eb7b 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -165,17 +165,21 @@ func validateSinkThrottleConfig(values *settings.Values, configStr string) error
return json.Unmarshal([]byte(configStr), config)
}
-// MinHighWaterMarkCheckpointAdvance specifies the minimum amount of time the
-// changefeed high water mark must advance for it to be eligible for checkpointing.
-var MinHighWaterMarkCheckpointAdvance = settings.RegisterDurationSetting(
+// ResolvedTimestampMinUpdateInterval specifies the minimum amount of time that
+// must have elapsed since the last time a changefeed's resolved timestamp was
+// updated before it is eligible to updated again.
+var ResolvedTimestampMinUpdateInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.min_highwater_advance",
- "minimum amount of time the changefeed high water mark must advance "+
- "for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier "+
- "advances, as long as the rate of checkpointing keeps up with the rate of frontier changes",
+ "minimum amount of time that must have elapsed since the last time "+
+ "a changefeed's resolved timestamp was updated before it is eligible to be "+
+ "updated again; default of 0 means no minimum interval is enforced but "+
+ "updating will still be limited by the average time it takes to checkpoint progress",
0,
settings.NonNegativeDuration,
- settings.WithPublic)
+ settings.WithPublic,
+ settings.WithName("changefeed.resolved_timestamp.min_update_interval"),
+)
// EventMemoryMultiplier is the multiplier for the amount of memory needed to process an event.
//
diff --git a/pkg/crosscluster/physical/replication_random_client_test.go b/pkg/crosscluster/physical/replication_random_client_test.go
index b97f283c79ed..e8e2d9e1c114 100644
--- a/pkg/crosscluster/physical/replication_random_client_test.go
+++ b/pkg/crosscluster/physical/replication_random_client_test.go
@@ -9,6 +9,7 @@ import (
"context"
gosql "database/sql"
"fmt"
+ "sync/atomic"
"testing"
"time"
@@ -16,8 +17,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants.
+ "github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
- "github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
@@ -37,27 +38,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
- "github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
- "github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
-func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) {
- var progressBytes []byte
- if err := sqlDB.QueryRow(
- `SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
- ).Scan(&progressBytes); err != nil {
- return hlc.Timestamp{}, err
- }
- var progress jobspb.Progress
- if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
- return hlc.Timestamp{}, err
- }
- return replicationutils.ReplicatedTimeFromProgress(&progress), nil
-}
-
func getTestRandomClientURI(
tenantID roachpb.TenantID, tenantName roachpb.TenantName,
) streamclient.ClusterUri {
@@ -120,6 +105,28 @@ func (sv *streamClientValidator) getValuesForKeyBelowTimestamp(
return sv.StreamValidator.GetValuesForKeyBelowTimestamp(key, timestamp)
}
+// watchMaxCheckpointTimestamp updates an atomic pointer ever time a new
+// checkpoint with a higher hlc is observed.
+func watchMaxCheckpointTimestamp() (*atomic.Pointer[hlc.Timestamp], streamclient.InterceptFn) {
+ mu := &syncutil.Mutex{}
+ ts := &atomic.Pointer[hlc.Timestamp]{}
+ return ts, func(event crosscluster.Event, _ streamclient.SubscriptionToken) {
+ mu.Lock()
+ defer mu.Unlock()
+ switch event.Type() {
+ case crosscluster.CheckpointEvent:
+ maxTimestamp := ts.Load()
+ for _, rs := range event.GetCheckpoint().ResolvedSpans {
+ if maxTimestamp == nil || rs.Timestamp.After(*maxTimestamp) {
+ copy := rs.Timestamp
+ maxTimestamp = ©
+ }
+ }
+ ts.Store(maxTimestamp)
+ }
+ }
+}
+
// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is
// fed KVs from the random stream client. After receiving a certain number of
// resolved timestamp events the test completes the job to tear down the flow,
@@ -134,12 +141,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
ctx := context.Background()
- canBeCompletedCh := make(chan struct{})
- const threshold = 10
- mu := syncutil.Mutex{}
- completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() {
- canBeCompletedCh <- struct{}{}
- })
+ maxCheckpointHlc, watchIntercepor := watchMaxCheckpointTimestamp()
// Register interceptors on the random stream client, which will be used by
// the processors.
@@ -162,7 +164,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}()
client.ClearInterceptors()
- client.RegisterInterception(completeJobAfterCheckpoints)
+ client.RegisterInterception(watchIntercepor)
client.RegisterInterception(validateFnWithValidator(t, streamValidator))
client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
return replicationtestutils.SSTMaker(t, keyValues)
@@ -237,28 +239,53 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}
close(allowResponse)
- // Wait for the job to signal that it is ready to be cutover, after it has
- // received `threshold` resolved ts events.
- <-canBeCompletedCh
- close(canBeCompletedCh)
+ // Pick a cutover time based on when we observe a checkpoint that is older
+ // than the retained time. We wait for a checkpoint to make sure there is
+ // actually data written at the time we cutover to.
+ var cutoverTime time.Time
+ testutils.SucceedsSoon(t, func() error {
+ checkpointHlc := maxCheckpointHlc.Load()
+ if checkpointHlc == nil {
+ return errors.New("no checkpoint has been received")
+ }
+
+ var retainedTime time.Time
+ row := conn.QueryRow(
+ `SELECT retained_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
+ if err := row.Scan(&retainedTime); err != nil {
+ return err
+ }
+
+ if retainedTime.Before(checkpointHlc.GoTime()) {
+ cutoverTime = checkpointHlc.GoTime().Add(time.Microsecond)
+ return nil
+ }
+
+ return errors.New("waiting for a checkpoint that happens after the retained time")
+ })
- // Ensure that the job has made some progress.
- var replicatedTime hlc.Timestamp
+ // Wait for the replicated time to pass the cutover time. This ensures the
+ // test rolls back some data.
testutils.SucceedsSoon(t, func() error {
- var err error
- replicatedTime, err = getReplicatedTime(ingestionJobID, conn)
- require.NoError(t, err)
- if replicatedTime.IsEmpty() {
+ var replicatedTime gosql.NullTime
+ row := conn.QueryRow(
+ `SELECT replicated_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
+ if err := row.Scan(&replicatedTime); err != nil {
+ return err
+ }
+ if !replicatedTime.Valid {
return errors.New("ReplicatedTime is unset, no progress has been reported")
}
- return nil
+ if cutoverTime.Before(replicatedTime.Time) {
+ return nil
+ }
+ return errors.New("replicated time has not yet passed the cutover time")
})
// Cutting over the job should shutdown the ingestion processors via a context
// cancellation, and subsequently rollback data above our frontier timestamp.
//
// Pick a cutover time just before the latest resolved timestamp.
- cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
_, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime)
require.NoError(t, err)
diff --git a/pkg/sql/rolemembershipcache/cache.go b/pkg/sql/rolemembershipcache/cache.go
index c2ff615282ab..33ab53b6f719 100644
--- a/pkg/sql/rolemembershipcache/cache.go
+++ b/pkg/sql/rolemembershipcache/cache.go
@@ -121,27 +121,58 @@ func (m *MembershipCache) GetRolesForMember(
return nil, errors.AssertionFailedf("cannot use MembershipCache without a txn")
}
- // Lookup table version.
- tableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
+ // Lookup table versions.
+ roleMembersTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
if err != nil {
return nil, err
}
- tableVersion := tableDesc.GetVersion()
- if tableDesc.IsUncommittedVersion() {
+ tableVersion := roleMembersTableDesc.GetVersion()
+ if roleMembersTableDesc.IsUncommittedVersion() {
return resolveRolesForMember(ctx, txn, member)
}
+
if txn.SessionData().AllowRoleMembershipsToChangeDuringTransaction {
+ systemUsersTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.UsersTableID)
+ if err != nil {
+ return nil, err
+ }
+
+ roleOptionsTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleOptionsTableID)
+ if err != nil {
+ return nil, err
+ }
+
+ systemUsersTableVersion := systemUsersTableDesc.GetVersion()
+ if systemUsersTableDesc.IsUncommittedVersion() {
+ return resolveRolesForMember(ctx, txn, member)
+ }
+
+ roleOptionsTableVersion := roleOptionsTableDesc.GetVersion()
+ if roleOptionsTableDesc.IsUncommittedVersion() {
+ return resolveRolesForMember(ctx, txn, member)
+ }
+
defer func() {
if retErr != nil {
return
}
txn.Descriptors().ReleaseSpecifiedLeases(ctx, []lease.IDVersion{
{
- Name: tableDesc.GetName(),
- ID: tableDesc.GetID(),
+ Name: roleMembersTableDesc.GetName(),
+ ID: roleMembersTableDesc.GetID(),
Version: tableVersion,
},
+ {
+ Name: systemUsersTableDesc.GetName(),
+ ID: systemUsersTableDesc.GetID(),
+ Version: systemUsersTableVersion,
+ },
+ {
+ Name: roleOptionsTableDesc.GetName(),
+ ID: roleOptionsTableDesc.GetID(),
+ Version: roleOptionsTableVersion,
+ },
})
}()
}
diff --git a/pkg/sql/tests/allow_role_memberships_to_change_during_transaction_test.go b/pkg/sql/tests/allow_role_memberships_to_change_during_transaction_test.go
index a959b09cc537..e2e4ef1a0efc 100644
--- a/pkg/sql/tests/allow_role_memberships_to_change_during_transaction_test.go
+++ b/pkg/sql/tests/allow_role_memberships_to_change_during_transaction_test.go
@@ -82,11 +82,12 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
require.NoError(t, fooTx.Commit())
require.NoError(t, <-errCh)
})
+
// In this test we ensure that we can perform role grant and revoke
// operations while the transaction which uses the relevant roles
// remains open. We ensure that the transaction still succeeds and
// that the operations occur in a timely manner.
- t.Run("session variable prevents waiting", func(t *testing.T) {
+ t.Run("session variable prevents waiting during GRANT and REVOKE", func(t *testing.T) {
fooConn, err := fooDB.Conn(ctx)
require.NoError(t, err)
defer func() { _ = fooConn.Close() }()
@@ -116,4 +117,36 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
// Ensure the transaction we held open commits without issue.
require.NoError(t, fooTx.Commit())
})
+
+ t.Run("session variable prevents waiting during CREATE and DROP role", func(t *testing.T) {
+ fooConn, err := fooDB.Conn(ctx)
+ require.NoError(t, err)
+ defer func() { _ = fooConn.Close() }()
+ _, err = fooConn.ExecContext(ctx, "SET allow_role_memberships_to_change_during_transaction = true;")
+ require.NoError(t, err)
+ fooTx, err := fooConn.BeginTx(ctx, nil)
+ require.NoError(t, err)
+ // We need to use show roles because that access the system.users table.
+ _, err = fooTx.Exec("SHOW ROLES")
+ require.NoError(t, err)
+
+ conn, err := sqlDB.Conn(ctx)
+ require.NoError(t, err)
+ defer func() { _ = conn.Close() }()
+ // Set a timeout on the SQL operations to ensure that they both
+ // happen in a timely manner.
+ grantRevokeTimeout, cancel := context.WithTimeout(
+ ctx, testutils.DefaultSucceedsSoonDuration,
+ )
+ defer cancel()
+
+ _, err = conn.ExecContext(grantRevokeTimeout, "CREATE ROLE new_role;")
+ require.NoError(t, err)
+ _, err = conn.ExecContext(grantRevokeTimeout, "DROP ROLE new_role;")
+ require.NoError(t, err)
+
+ // Ensure the transaction we held open commits without issue.
+ require.NoError(t, fooTx.Commit())
+ })
+
}