diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index e28b72418ae1..c5c51702617b 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -60,6 +60,9 @@ type SettingsWatcher struct { // inside secondary tenants. It will be uninitialized in a system // tenant. storageClusterVersion clusterversion.ClusterVersion + + // Used by TestingRestart. + updateWait chan struct{} } // testingWatcherKnobs allows the client to inject testing knobs into @@ -85,7 +88,7 @@ func New( stopper *stop.Stopper, storage Storage, // optional ) *SettingsWatcher { - return &SettingsWatcher{ + s := &SettingsWatcher{ clock: clock, codec: codec, settings: settingsToUpdate, @@ -94,6 +97,8 @@ func New( dec: MakeRowDecoder(codec), storage: storage, } + s.mu.updateWait = make(chan struct{}) + return s } // NewWithOverrides constructs a new SettingsWatcher which allows external @@ -141,6 +146,9 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + close(s.mu.updateWait) + s.mu.updateWait = make(chan struct{}) } s.mu.values = make(map[settings.InternalKey]settingsValue) @@ -248,9 +256,15 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } -func (w *SettingsWatcher) TestingRestart() { - if w.rfc != nil { - w.rfc.TestingRestart() +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. +func (s *SettingsWatcher) TestingRestart() { + if s.rfc != nil { + s.mu.Lock() + waitCh := s.mu.updateWait + s.mu.Unlock() + s.rfc.TestingRestart() + <-waitCh } } diff --git a/pkg/server/tenantsettingswatcher/watcher.go b/pkg/server/tenantsettingswatcher/watcher.go index 9560f4c5b58e..83aadc5effa6 100644 --- a/pkg/server/tenantsettingswatcher/watcher.go +++ b/pkg/server/tenantsettingswatcher/watcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/startup" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -64,6 +65,11 @@ type Watcher struct { // rfc provides access to the underlying rangefeedcache.Watcher for // testing. rfc *rangefeedcache.Watcher + mu struct { + syncutil.Mutex + // Used by TestingRestart. + updateWait chan struct{} + } } // New constructs a new Watcher. @@ -78,6 +84,7 @@ func New( dec: MakeRowDecoder(), } w.store.Init() + w.mu.updateWait = make(chan struct{}) return w } @@ -162,6 +169,11 @@ func (w *Watcher) startRangeFeed( initialScan.done = true close(initialScan.ch) } + // Used by TestingRestart(). + w.mu.Lock() + defer w.mu.Unlock() + close(w.mu.updateWait) + w.mu.updateWait = make(chan struct{}) } } @@ -220,9 +232,15 @@ func (w *Watcher) WaitForStart(ctx context.Context) error { } } +// TestingRestart restarts the rangefeeds and waits for the initial +// update after the rangefeed update to be processed. func (w *Watcher) TestingRestart() { if w.rfc != nil { + w.mu.Lock() + waitCh := w.mu.updateWait + w.mu.Unlock() w.rfc.TestingRestart() + <-waitCh } }