Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
137940: sql: allow creating or dropping roles while membership cache is being used r=Dedej-Bergin a=Dedej-Bergin

When we set `allow_role_memberships_to_change_during_transaction` we should be able to create and drop users even when there are contending transactions on the `system.users` and `system.role_options`. These code changes release the locks on those system tables when `allow_role_memberships_to_change_during_transaction` is set.

Fixes: #137710
Release note (bug fix): When we set `allow_role_memberships_to_change_during_transaction` are able to create and drop users quickly even when there are contending transactions on the `system.users` and `system.role_options`.

138673: changefeedccl: correct doc and rename changefeed.min_highwater_advance r=asg0451 a=andyyang890

Epic: CRDB-37337

Release note (ops change): The `changefeed.min_highwater_advance`
cluster setting has been renamed to
`changefeed.resolved_timestamp.min_update_interval`
to more accurately reflect its function. Its description has also
been updated. The old name remains usable for backwards-compatibility.

138826: crosscluster: deflake TestStreamIngestionJobWithRandomClient r=jeffswenson a=jeffswenson

TestStreamIngestionJobWithRandomClient is flaking under stress with the
following errors:

```
pq: cutover time 1736520389.999999000,0 is before earliest safe cutover time 1736520390.771027375,0
```

```
"0" is not greater than "0"
```

The first error occurs because the frontier would sometimes include a
time that is earlier than the PTS created for the replication job. The
second error would occur if a cutover time was picked that is earlier
than any data written to the cluster.

The test now uses a different strategy for picking a cutover time. The
test ensures the cutover time is after the retained time and at least
one checkpoint.

Release note: none
Epic: [CRDB-40896](https://cockroachlabs.atlassian.net/browse/CRDB-40896)

Co-authored-by: Bergin Dedej <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
Co-authored-by: Jeff Swenson <[email protected]>
  • Loading branch information
4 people committed Jan 10, 2025
4 parents 03250fb + 5d6ebe1 + c7a4fe3 + c2338be commit 44bf6d4
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 63 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.min_highwater_advance duration 0s minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration application
changefeed.protect_timestamp_interval duration 10m0s controls how often the changefeed forwards its protected timestamp to the resolved timestamp application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.min_highwater_advance</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-interval" class="anchored"><code>changefeed.protect_timestamp_interval</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls how often the changefeed forwards its protected timestamp to the resolved timestamp</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,19 +1105,21 @@ func (j *jobState) canCheckpointSpans() bool {
}

// canCheckpointHighWatermark returns true if we should update job high water mark (i.e. progress).
// Normally, whenever frontier changes, we update high water mark.
// Normally, whenever the frontier changes, we update the high water mark.
// However, if the rate of frontier changes is too high, we want to slow down
// the frequency of job progress updates. We do this by skipping some updates
// if the time to update the job progress is greater than the delta between
// previous and the current progress update time.
// the frequency of job progress updates. We do this by enforcing a minimum
// interval between high water updates, which is the greater of the average time
// it takes to update the job progress and changefeed.resolved_timestamp.min_update_interval.
func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
if !(frontierChanged || j.progressUpdatesSkipped) {
return false
}

minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
if j.checkpointDuration > 0 &&
j.ts.Now().Before(j.lastProgressUpdate.Add(j.checkpointDuration+minAdvance)) {
minInterval := max(
j.checkpointDuration,
changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV),
)
if j.ts.Now().Before(j.lastProgressUpdate.Add(minInterval)) {
// Updates are too rapid; skip some.
j.progressUpdatesSkipped = true
return false
Expand All @@ -1129,16 +1131,15 @@ func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
// checkpointCompleted must be called when job checkpoint completes.
// checkpointDuration indicates how long the checkpoint took.
func (j *jobState) checkpointCompleted(ctx context.Context, checkpointDuration time.Duration) {
minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
if j.progressUpdatesSkipped {
// Log message if we skipped updates for some time.
warnThreshold := 2 * minAdvance
warnThreshold := 2 * changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV)
if warnThreshold < 60*time.Second {
warnThreshold = 60 * time.Second
}
behind := j.ts.Now().Sub(j.lastProgressUpdate)
if behind > warnThreshold {
log.Warningf(ctx, "high water mark update delayed by %s; mean checkpoint duration %s",
log.Warningf(ctx, "high water mark update was delayed by %s; mean checkpoint duration %s",
behind, j.checkpointDuration)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7556,7 +7556,7 @@ func TestCheckpointFrequency(t *testing.T) {
// If we also specify minimum amount of time between updates, we would skip updates
// until enough time has elapsed.
minAdvance := 10 * time.Minute
changefeedbase.MinHighWaterMarkCheckpointAdvance.Override(ctx, &js.settings.SV, minAdvance)
changefeedbase.ResolvedTimestampMinUpdateInterval.Override(ctx, &js.settings.SV, minAdvance)

require.False(t, js.canCheckpointHighWatermark(frontierAdvanced))
ts.Advance(minAdvance)
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,21 @@ func validateSinkThrottleConfig(values *settings.Values, configStr string) error
return json.Unmarshal([]byte(configStr), config)
}

// MinHighWaterMarkCheckpointAdvance specifies the minimum amount of time the
// changefeed high water mark must advance for it to be eligible for checkpointing.
var MinHighWaterMarkCheckpointAdvance = settings.RegisterDurationSetting(
// ResolvedTimestampMinUpdateInterval specifies the minimum amount of time that
// must have elapsed since the last time a changefeed's resolved timestamp was
// updated before it is eligible to updated again.
var ResolvedTimestampMinUpdateInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.min_highwater_advance",
"minimum amount of time the changefeed high water mark must advance "+
"for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier "+
"advances, as long as the rate of checkpointing keeps up with the rate of frontier changes",
"minimum amount of time that must have elapsed since the last time "+
"a changefeed's resolved timestamp was updated before it is eligible to be "+
"updated again; default of 0 means no minimum interval is enforced but "+
"updating will still be limited by the average time it takes to checkpoint progress",
0,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.resolved_timestamp.min_update_interval"),
)

// EventMemoryMultiplier is the multiplier for the amount of memory needed to process an event.
//
Expand Down
99 changes: 63 additions & 36 deletions pkg/crosscluster/physical/replication_random_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"context"
gosql "database/sql"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/backup"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants.
"github.com/cockroachdb/cockroach/pkg/crosscluster"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -37,27 +38,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return hlc.Timestamp{}, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return hlc.Timestamp{}, err
}
return replicationutils.ReplicatedTimeFromProgress(&progress), nil
}

func getTestRandomClientURI(
tenantID roachpb.TenantID, tenantName roachpb.TenantName,
) streamclient.ClusterUri {
Expand Down Expand Up @@ -120,6 +105,28 @@ func (sv *streamClientValidator) getValuesForKeyBelowTimestamp(
return sv.StreamValidator.GetValuesForKeyBelowTimestamp(key, timestamp)
}

// watchMaxCheckpointTimestamp updates an atomic pointer ever time a new
// checkpoint with a higher hlc is observed.
func watchMaxCheckpointTimestamp() (*atomic.Pointer[hlc.Timestamp], streamclient.InterceptFn) {
mu := &syncutil.Mutex{}
ts := &atomic.Pointer[hlc.Timestamp]{}
return ts, func(event crosscluster.Event, _ streamclient.SubscriptionToken) {
mu.Lock()
defer mu.Unlock()
switch event.Type() {
case crosscluster.CheckpointEvent:
maxTimestamp := ts.Load()
for _, rs := range event.GetCheckpoint().ResolvedSpans {
if maxTimestamp == nil || rs.Timestamp.After(*maxTimestamp) {
copy := rs.Timestamp
maxTimestamp = &copy
}
}
ts.Store(maxTimestamp)
}
}
}

// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is
// fed KVs from the random stream client. After receiving a certain number of
// resolved timestamp events the test completes the job to tear down the flow,
Expand All @@ -134,12 +141,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {

ctx := context.Background()

canBeCompletedCh := make(chan struct{})
const threshold = 10
mu := syncutil.Mutex{}
completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() {
canBeCompletedCh <- struct{}{}
})
maxCheckpointHlc, watchIntercepor := watchMaxCheckpointTimestamp()

// Register interceptors on the random stream client, which will be used by
// the processors.
Expand All @@ -162,7 +164,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}()

client.ClearInterceptors()
client.RegisterInterception(completeJobAfterCheckpoints)
client.RegisterInterception(watchIntercepor)
client.RegisterInterception(validateFnWithValidator(t, streamValidator))
client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable {
return replicationtestutils.SSTMaker(t, keyValues)
Expand Down Expand Up @@ -237,28 +239,53 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}
close(allowResponse)

// Wait for the job to signal that it is ready to be cutover, after it has
// received `threshold` resolved ts events.
<-canBeCompletedCh
close(canBeCompletedCh)
// Pick a cutover time based on when we observe a checkpoint that is older
// than the retained time. We wait for a checkpoint to make sure there is
// actually data written at the time we cutover to.
var cutoverTime time.Time
testutils.SucceedsSoon(t, func() error {
checkpointHlc := maxCheckpointHlc.Load()
if checkpointHlc == nil {
return errors.New("no checkpoint has been received")
}

var retainedTime time.Time
row := conn.QueryRow(
`SELECT retained_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
if err := row.Scan(&retainedTime); err != nil {
return err
}

if retainedTime.Before(checkpointHlc.GoTime()) {
cutoverTime = checkpointHlc.GoTime().Add(time.Microsecond)
return nil
}

return errors.New("waiting for a checkpoint that happens after the retained time")
})

// Ensure that the job has made some progress.
var replicatedTime hlc.Timestamp
// Wait for the replicated time to pass the cutover time. This ensures the
// test rolls back some data.
testutils.SucceedsSoon(t, func() error {
var err error
replicatedTime, err = getReplicatedTime(ingestionJobID, conn)
require.NoError(t, err)
if replicatedTime.IsEmpty() {
var replicatedTime gosql.NullTime
row := conn.QueryRow(
`SELECT replicated_time FROM [SHOW VIRTUAL CLUSTER "30" WITH REPLICATION STATUS]`)
if err := row.Scan(&replicatedTime); err != nil {
return err
}
if !replicatedTime.Valid {
return errors.New("ReplicatedTime is unset, no progress has been reported")
}
return nil
if cutoverTime.Before(replicatedTime.Time) {
return nil
}
return errors.New("replicated time has not yet passed the cutover time")
})

// Cutting over the job should shutdown the ingestion processors via a context
// cancellation, and subsequently rollback data above our frontier timestamp.
//
// Pick a cutover time just before the latest resolved timestamp.
cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
_, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime)
require.NoError(t, err)

Expand Down
43 changes: 37 additions & 6 deletions pkg/sql/rolemembershipcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,58 @@ func (m *MembershipCache) GetRolesForMember(
return nil, errors.AssertionFailedf("cannot use MembershipCache without a txn")
}

// Lookup table version.
tableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
// Lookup table versions.
roleMembersTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
if err != nil {
return nil, err
}

tableVersion := tableDesc.GetVersion()
if tableDesc.IsUncommittedVersion() {
tableVersion := roleMembersTableDesc.GetVersion()
if roleMembersTableDesc.IsUncommittedVersion() {
return resolveRolesForMember(ctx, txn, member)
}

if txn.SessionData().AllowRoleMembershipsToChangeDuringTransaction {
systemUsersTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.UsersTableID)
if err != nil {
return nil, err
}

roleOptionsTableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleOptionsTableID)
if err != nil {
return nil, err
}

systemUsersTableVersion := systemUsersTableDesc.GetVersion()
if systemUsersTableDesc.IsUncommittedVersion() {
return resolveRolesForMember(ctx, txn, member)
}

roleOptionsTableVersion := roleOptionsTableDesc.GetVersion()
if roleOptionsTableDesc.IsUncommittedVersion() {
return resolveRolesForMember(ctx, txn, member)
}

defer func() {
if retErr != nil {
return
}
txn.Descriptors().ReleaseSpecifiedLeases(ctx, []lease.IDVersion{
{
Name: tableDesc.GetName(),
ID: tableDesc.GetID(),
Name: roleMembersTableDesc.GetName(),
ID: roleMembersTableDesc.GetID(),
Version: tableVersion,
},
{
Name: systemUsersTableDesc.GetName(),
ID: systemUsersTableDesc.GetID(),
Version: systemUsersTableVersion,
},
{
Name: roleOptionsTableDesc.GetName(),
ID: roleOptionsTableDesc.GetID(),
Version: roleOptionsTableVersion,
},
})
}()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
require.NoError(t, fooTx.Commit())
require.NoError(t, <-errCh)
})

// In this test we ensure that we can perform role grant and revoke
// operations while the transaction which uses the relevant roles
// remains open. We ensure that the transaction still succeeds and
// that the operations occur in a timely manner.
t.Run("session variable prevents waiting", func(t *testing.T) {
t.Run("session variable prevents waiting during GRANT and REVOKE", func(t *testing.T) {
fooConn, err := fooDB.Conn(ctx)
require.NoError(t, err)
defer func() { _ = fooConn.Close() }()
Expand Down Expand Up @@ -116,4 +117,36 @@ func TestAllowRoleMembershipsToChangeDuringTransaction(t *testing.T) {
// Ensure the transaction we held open commits without issue.
require.NoError(t, fooTx.Commit())
})

t.Run("session variable prevents waiting during CREATE and DROP role", func(t *testing.T) {
fooConn, err := fooDB.Conn(ctx)
require.NoError(t, err)
defer func() { _ = fooConn.Close() }()
_, err = fooConn.ExecContext(ctx, "SET allow_role_memberships_to_change_during_transaction = true;")
require.NoError(t, err)
fooTx, err := fooConn.BeginTx(ctx, nil)
require.NoError(t, err)
// We need to use show roles because that access the system.users table.
_, err = fooTx.Exec("SHOW ROLES")
require.NoError(t, err)

conn, err := sqlDB.Conn(ctx)
require.NoError(t, err)
defer func() { _ = conn.Close() }()
// Set a timeout on the SQL operations to ensure that they both
// happen in a timely manner.
grantRevokeTimeout, cancel := context.WithTimeout(
ctx, testutils.DefaultSucceedsSoonDuration,
)
defer cancel()

_, err = conn.ExecContext(grantRevokeTimeout, "CREATE ROLE new_role;")
require.NoError(t, err)
_, err = conn.ExecContext(grantRevokeTimeout, "DROP ROLE new_role;")
require.NoError(t, err)

// Ensure the transaction we held open commits without issue.
require.NoError(t, fooTx.Commit())
})

}

0 comments on commit 44bf6d4

Please sign in to comment.