Skip to content

Commit

Permalink
changefeedccl: correct doc and rename changefeed.min_highwater_advance
Browse files Browse the repository at this point in the history
Release note (ops change): The `changefeed.min_highwater_advance`
cluster setting has been renamed to
`changefeed.resolved_timestamp.min_update_interval`
to more accurately reflect its function. Its description has also
been updated. The old name remains usable for backwards-compatibility.
  • Loading branch information
andyyang890 committed Jan 10, 2025
1 parent 84f7903 commit c7a4fe3
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
changefeed.frontier_highwater_lag_checkpoint_threshold duration 10m0s controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled application
changefeed.memory.per_changefeed_limit byte size 512 MiB controls amount of data that can be buffered per changefeed application
changefeed.min_highwater_advance duration 0s minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes application
changefeed.resolved_timestamp.min_update_interval (alias: changefeed.min_highwater_advance) duration 0s minimum amount of time that must have elapsed since the last time a changefeed's resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress application
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration application
changefeed.protect_timestamp_interval duration 10m0s controls how often the changefeed forwards its protected timestamp to the resolved timestamp application
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-frontier-highwater-lag-checkpoint-threshold" class="anchored"><code>changefeed.frontier_highwater_lag_checkpoint_threshold</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls the maximum the high-water mark is allowed to lag behind the leading spans of the frontier before per-span checkpointing is enabled; if 0, checkpointing due to high-water lag is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-memory-per-changefeed-limit" class="anchored"><code>changefeed.memory.per_changefeed_limit</code></div></td><td>byte size</td><td><code>512 MiB</code></td><td>controls amount of data that can be buffered per changefeed</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.min_highwater_advance</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time the changefeed high water mark must advance for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier advances, as long as the rate of checkpointing keeps up with the rate of frontier changes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-min-highwater-advance" class="anchored"><code>changefeed.resolved_timestamp.min_update_interval<br />(alias: changefeed.min_highwater_advance)</code></div></td><td>duration</td><td><code>0s</code></td><td>minimum amount of time that must have elapsed since the last time a changefeed&#39;s resolved timestamp was updated before it is eligible to be updated again; default of 0 means no minimum interval is enforced but updating will still be limited by the average time it takes to checkpoint progress</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-interval" class="anchored"><code>changefeed.protect_timestamp_interval</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls how often the changefeed forwards its protected timestamp to the resolved timestamp</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,19 +1105,21 @@ func (j *jobState) canCheckpointSpans() bool {
}

// canCheckpointHighWatermark returns true if we should update job high water mark (i.e. progress).
// Normally, whenever frontier changes, we update high water mark.
// Normally, whenever the frontier changes, we update the high water mark.
// However, if the rate of frontier changes is too high, we want to slow down
// the frequency of job progress updates. We do this by skipping some updates
// if the time to update the job progress is greater than the delta between
// previous and the current progress update time.
// the frequency of job progress updates. We do this by enforcing a minimum
// interval between high water updates, which is the greater of the average time
// it takes to update the job progress and changefeed.resolved_timestamp.min_update_interval.
func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
if !(frontierChanged || j.progressUpdatesSkipped) {
return false
}

minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
if j.checkpointDuration > 0 &&
j.ts.Now().Before(j.lastProgressUpdate.Add(j.checkpointDuration+minAdvance)) {
minInterval := max(
j.checkpointDuration,
changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV),
)
if j.ts.Now().Before(j.lastProgressUpdate.Add(minInterval)) {
// Updates are too rapid; skip some.
j.progressUpdatesSkipped = true
return false
Expand All @@ -1129,16 +1131,15 @@ func (j *jobState) canCheckpointHighWatermark(frontierChanged bool) bool {
// checkpointCompleted must be called when job checkpoint completes.
// checkpointDuration indicates how long the checkpoint took.
func (j *jobState) checkpointCompleted(ctx context.Context, checkpointDuration time.Duration) {
minAdvance := changefeedbase.MinHighWaterMarkCheckpointAdvance.Get(&j.settings.SV)
if j.progressUpdatesSkipped {
// Log message if we skipped updates for some time.
warnThreshold := 2 * minAdvance
warnThreshold := 2 * changefeedbase.ResolvedTimestampMinUpdateInterval.Get(&j.settings.SV)
if warnThreshold < 60*time.Second {
warnThreshold = 60 * time.Second
}
behind := j.ts.Now().Sub(j.lastProgressUpdate)
if behind > warnThreshold {
log.Warningf(ctx, "high water mark update delayed by %s; mean checkpoint duration %s",
log.Warningf(ctx, "high water mark update was delayed by %s; mean checkpoint duration %s",
behind, j.checkpointDuration)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7537,7 +7537,7 @@ func TestCheckpointFrequency(t *testing.T) {
// If we also specify minimum amount of time between updates, we would skip updates
// until enough time has elapsed.
minAdvance := 10 * time.Minute
changefeedbase.MinHighWaterMarkCheckpointAdvance.Override(ctx, &js.settings.SV, minAdvance)
changefeedbase.ResolvedTimestampMinUpdateInterval.Override(ctx, &js.settings.SV, minAdvance)

require.False(t, js.canCheckpointHighWatermark(frontierAdvanced))
ts.Advance(minAdvance)
Expand Down
18 changes: 11 additions & 7 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,21 @@ func validateSinkThrottleConfig(values *settings.Values, configStr string) error
return json.Unmarshal([]byte(configStr), config)
}

// MinHighWaterMarkCheckpointAdvance specifies the minimum amount of time the
// changefeed high water mark must advance for it to be eligible for checkpointing.
var MinHighWaterMarkCheckpointAdvance = settings.RegisterDurationSetting(
// ResolvedTimestampMinUpdateInterval specifies the minimum amount of time that
// must have elapsed since the last time a changefeed's resolved timestamp was
// updated before it is eligible to updated again.
var ResolvedTimestampMinUpdateInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.min_highwater_advance",
"minimum amount of time the changefeed high water mark must advance "+
"for it to be eligible for checkpointing; Default of 0 will checkpoint every time frontier "+
"advances, as long as the rate of checkpointing keeps up with the rate of frontier changes",
"minimum amount of time that must have elapsed since the last time "+
"a changefeed's resolved timestamp was updated before it is eligible to be "+
"updated again; default of 0 means no minimum interval is enforced but "+
"updating will still be limited by the average time it takes to checkpoint progress",
0,
settings.NonNegativeDuration,
settings.WithPublic)
settings.WithPublic,
settings.WithName("changefeed.resolved_timestamp.min_update_interval"),
)

// EventMemoryMultiplier is the multiplier for the amount of memory needed to process an event.
//
Expand Down

0 comments on commit c7a4fe3

Please sign in to comment.