Skip to content

Commit

Permalink
server: fix sync on setting overrides for secondary tenants
Browse files Browse the repository at this point in the history
The previous patch in this area was merely restarting the rangefeed
but did not actually wait for the initial update event to be received.

This patch fixes it.

Release note: None
  • Loading branch information
knz committed Sep 20, 2023
1 parent a499165 commit 10e1ea9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type SettingsWatcher struct {
// inside secondary tenants. It will be uninitialized in a system
// tenant.
storageClusterVersion clusterversion.ClusterVersion

// Used by TestingRestart.
updateWait chan struct{}
}

// testingWatcherKnobs allows the client to inject testing knobs into
Expand All @@ -85,7 +88,7 @@ func New(
stopper *stop.Stopper,
storage Storage, // optional
) *SettingsWatcher {
return &SettingsWatcher{
s := &SettingsWatcher{
clock: clock,
codec: codec,
settings: settingsToUpdate,
Expand All @@ -94,6 +97,8 @@ func New(
dec: MakeRowDecoder(codec),
storage: storage,
}
s.mu.updateWait = make(chan struct{})
return s
}

// NewWithOverrides constructs a new SettingsWatcher which allows external
Expand Down Expand Up @@ -141,6 +146,9 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
initialScan.done = true
close(initialScan.ch)
}
// Used by TestingRestart().
close(s.mu.updateWait)
s.mu.updateWait = make(chan struct{})
}

s.mu.values = make(map[settings.InternalKey]settingsValue)
Expand Down Expand Up @@ -248,9 +256,15 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

func (w *SettingsWatcher) TestingRestart() {
if w.rfc != nil {
w.rfc.TestingRestart()
// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (s *SettingsWatcher) TestingRestart() {
if s.rfc != nil {
s.mu.Lock()
waitCh := s.mu.updateWait
s.mu.Unlock()
s.rfc.TestingRestart()
<-waitCh
}
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -64,6 +65,11 @@ type Watcher struct {
// rfc provides access to the underlying rangefeedcache.Watcher for
// testing.
rfc *rangefeedcache.Watcher
mu struct {
syncutil.Mutex
// Used by TestingRestart.
updateWait chan struct{}
}
}

// New constructs a new Watcher.
Expand All @@ -78,6 +84,7 @@ func New(
dec: MakeRowDecoder(),
}
w.store.Init()
w.mu.updateWait = make(chan struct{})
return w
}

Expand Down Expand Up @@ -162,6 +169,11 @@ func (w *Watcher) startRangeFeed(
initialScan.done = true
close(initialScan.ch)
}
// Used by TestingRestart().
w.mu.Lock()
defer w.mu.Unlock()
close(w.mu.updateWait)
w.mu.updateWait = make(chan struct{})
}
}

Expand Down Expand Up @@ -220,9 +232,15 @@ func (w *Watcher) WaitForStart(ctx context.Context) error {
}
}

// TestingRestart restarts the rangefeeds and waits for the initial
// update after the rangefeed update to be processed.
func (w *Watcher) TestingRestart() {
if w.rfc != nil {
w.mu.Lock()
waitCh := w.mu.updateWait
w.mu.Unlock()
w.rfc.TestingRestart()
<-waitCh
}
}

Expand Down

0 comments on commit 10e1ea9

Please sign in to comment.