Skip to content

Commit

Permalink
Tablet throttler: remove cached metric associated with removed tablet (
Browse files Browse the repository at this point in the history
…vitessio#16555)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Aug 11, 2024
1 parent f68e62d commit 9018fef
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 8 deletions.
20 changes: 18 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
primaryStimulatorRateLimiter.Stop()
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
clear(throttler.inventory.TabletMetrics)
}()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

Expand Down Expand Up @@ -842,7 +843,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
}
case probes := <-throttler.clusterProbesChan:
// incoming structural update, sparse, as result of refreshInventory()
throttler.updateClusterProbes(ctx, probes)
throttler.updateClusterProbes(probes)
case <-metricsAggregateTicker.C:
if throttler.IsOpen() {
throttler.aggregateMetrics()
Expand Down Expand Up @@ -1116,10 +1117,25 @@ func (throttler *Throttler) refreshInventory(ctx context.Context) error {
}

// synchronous update of inventory
func (throttler *Throttler) updateClusterProbes(ctx context.Context, clusterProbes *base.ClusterProbes) error {
func (throttler *Throttler) updateClusterProbes(clusterProbes *base.ClusterProbes) error {
throttler.inventory.ClustersProbes = clusterProbes.TabletProbes
throttler.inventory.IgnoreHostsCount = clusterProbes.IgnoreHostsCount
throttler.inventory.IgnoreHostsThreshold = clusterProbes.IgnoreHostsThreshold

for alias := range throttler.inventory.TabletMetrics {
if alias == "" {
// *this* tablet uses the empty alias to identify itself.
continue
}
if _, found := clusterProbes.TabletProbes[alias]; !found {
// There seems to be a metric stored for some alias, say zone1-0000000102,
// but there is no alias for this probe in the new clusterProbes. This
// suggests that the corresponding tablet has been removed, or its type was changed
// (e.g. from REPLICA to RDONLY). We should therefore remove this cached metric.
delete(throttler.inventory.TabletMetrics, alias)
}
}

return nil
}

Expand Down
81 changes: 75 additions & 6 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
Value: 5.1,
},
}
nonPrimaryTabletType atomic.Int32
)

const (
Expand Down Expand Up @@ -151,7 +152,11 @@ type FakeTopoServer struct {
func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tabletType := topodatapb.TabletType_PRIMARY
if alias.Uid != 100 {
tabletType = topodatapb.TabletType_REPLICA
val := topodatapb.TabletType(nonPrimaryTabletType.Load())
if val == topodatapb.TabletType_UNKNOWN {
val = topodatapb.TabletType_REPLICA
}
tabletType = val
}
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Expand Down Expand Up @@ -1156,9 +1161,9 @@ func TestRefreshInventory(t *testing.T) {
// validateProbesCount expects number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, probes base.Probes) {
if throttler.isLeader.Load() {
assert.Equal(t, 3, len(probes))
assert.Len(t, probes, 3)
} else {
assert.Equal(t, 1, len(probes))
assert.Len(t, probes, 1)
}
}
t.Run("waiting for probes", func(t *testing.T) {
Expand All @@ -1171,7 +1176,7 @@ func TestRefreshInventory(t *testing.T) {
// not run, and therefore there is none but us to both populate `clusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)
throttler.updateClusterProbes(ctx, probes)
throttler.updateClusterProbes(probes)
validateProbesCount(t, probes.TabletProbes)
// Achieved our goal
return
Expand Down Expand Up @@ -1488,6 +1493,70 @@ func TestProbesWhileOperating(t *testing.T) {
})
})
})

t.Run("metrics", func(t *testing.T) {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
assert.Len(t, results, 3) // 1 self tablet + 2 shard tablets
assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias
assert.Contains(t, results, "fakezone1-0000000101", "TabletMetrics: %+v", results)
assert.Contains(t, results, "fakezone2-0000000102", "TabletMetrics: %+v", results)
})

t.Run("no REPLICA probes", func(t *testing.T) {
nonPrimaryTabletType.Store(int32(topodatapb.TabletType_RDONLY))
defer nonPrimaryTabletType.Store(int32(topodatapb.TabletType_REPLICA))

t.Run("waiting for inventory metrics", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
if len(results) == 1 {
// That's what we were waiting for. Good.
assert.Contains(t, results, "", "TabletMetrics: %+v", results) // primary self identifies with empty alias
return
}

select {
case <-ticker.C:
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics")
}
}
})
})
t.Run("again with probes", func(t *testing.T) {
t.Run("waiting for inventory metrics", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
var results base.TabletResultMap
<-runSerialFunction(t, ctx, throttler, func(ctx context.Context) {
results = maps.Clone(throttler.inventory.TabletMetrics)
})
if len(results) == 3 {
// That's what we were waiting for. Good.
return
}

select {
case <-ticker.C:
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for inventory metrics")
}
}
})
})
})
}

Expand Down Expand Up @@ -1603,7 +1672,7 @@ func TestProbesPostDisable(t *testing.T) {
})

t.Run("metrics", func(t *testing.T) {
assert.Equal(t, 3, len(throttler.inventory.TabletMetrics)) // 1 self tablet + 2 shard tablets
assert.Empty(t, throttler.inventory.TabletMetrics) // map has been cleared
})

t.Run("aggregated", func(t *testing.T) {
Expand Down Expand Up @@ -2103,7 +2172,7 @@ func TestReplica(t *testing.T) {
defer throttler.appCheckedMetrics.Delete(testAppName.String())
checkResult := throttler.Check(ctx, testAppName.String(), nil, flags)
require.NotNil(t, checkResult)
assert.Equal(t, 3, len(checkResult.Metrics))
assert.Len(t, checkResult.Metrics, 3)
})
t.Run("client, OK", func(t *testing.T) {
client := NewBackgroundClient(throttler, throttlerapp.TestingName, base.UndefinedScope)
Expand Down

0 comments on commit 9018fef

Please sign in to comment.