diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 29671788ed2..1a03452b1a9 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -35,6 +35,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "hash/crc32" "net/http" @@ -97,6 +98,9 @@ var ( // HealthCheckHealthyTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Healthy Tablets` title to // create the HTML code required to render the list of healthy tablets from the HealthCheck. HealthCheckHealthyTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Healthy Tablets") + + // errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined. + errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -289,6 +293,24 @@ type HealthCheckImpl struct { loadTabletsTrigger chan struct{} } +// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. +func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + return nil, errKeyspacesToWatchAndTabletFilters + } + + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err) + } + filters = append(filters, fbs) + } else if len(KeyspacesToWatch) > 0 { + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + return filters, nil +} + // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. @@ -310,10 +332,14 @@ type HealthCheckImpl struct { // // The localCell for this healthcheck // -// callback. +// cellsToWatch. // -// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { +// Is a list of cells to watch for tablets. +// +// filters. +// +// Is one or more filters to apply when determining what tablets we want to stream healthchecks from. +func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) hc := &HealthCheckImpl{ @@ -329,7 +355,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur loadTabletsTrigger: make(chan struct{}), } var topoWatchers []*TopologyWatcher - var filter TabletFilter cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { cells = append(cells, localCell) @@ -340,20 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - if len(tabletFilters) > 0 { - if len(KeyspacesToWatch) > 0 { - log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") - } - - fbs, err := NewFilterByShard(tabletFilters) - if err != nil { - log.Exitf("Cannot parse tablet_filters parameter: %v", err) - } - filter = fbs - } else if len(KeyspacesToWatch) > 0 { - filter = NewFilterByKeyspace(KeyspacesToWatch) - } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 8fd63b009cb..962200a6a3b 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -63,6 +63,62 @@ func init() { refreshInterval = time.Minute } +func TestNewVTGateHealthCheckFilters(t *testing.T) { + defer func() { + KeyspacesToWatch = nil + tabletFilters = nil + }() + + testCases := []struct { + name string + keyspacesToWatch []string + tabletFilters []string + expectedError string + expectedFilterTypes []any + }{ + { + name: "noFilters", + }, + { + name: "tabletFilters", + tabletFilters: []string{"ks1|-80"}, + expectedFilterTypes: []any{&FilterByShard{}}, + }, + { + name: "keyspacesToWatch", + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}}, + }, + { + name: "failKeyspacesToWatchAndFilters", + tabletFilters: []string{"ks1|-80"}, + keyspacesToWatch: []string{"ks1"}, + expectedError: errKeyspacesToWatchAndTabletFilters.Error(), + }, + { + name: "failInvalidTabletFilters", + tabletFilters: []string{"shouldfail|"}, + expectedError: "failed to parse tablet_filters value \"shouldfail|\": error parsing shard name : Code: INVALID_ARGUMENT\nempty name\n", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + KeyspacesToWatch = testCase.keyspacesToWatch + tabletFilters = testCase.tabletFilters + + filters, err := NewVTGateHealthCheckFilters() + if testCase.expectedError != "" { + assert.EqualError(t, err, testCase.expectedError) + } + assert.Len(t, filters, len(testCase.expectedFilterTypes)) + for i, filter := range filters { + assert.IsType(t, testCase.expectedFilterTypes[i], filter) + } + }) + } +} + func TestHealthCheck(t *testing.T) { ctx := utils.LeakCheckContext(t) // reset error counters @@ -1121,7 +1177,7 @@ func TestPrimaryInOtherCell(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as primary in different cell @@ -1181,7 +1237,7 @@ func TestReplicaInOtherCell(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as replica @@ -1286,7 +1342,7 @@ func TestCellAliases(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() cellsAlias := &topodatapb.CellsAlias{ @@ -1437,7 +1493,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic } func createTestHc(ctx context.Context, ts *topo.Server) *HealthCheckImpl { - return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "") + return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "", nil) } type fakeConn struct { diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index af60479a42b..b7124ec3c13 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -41,7 +41,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(ctx, factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) kss := &keyspaceState{ @@ -80,7 +80,7 @@ func TestKeyspaceEventTypes(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(ctx, factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 0b69ecb6a63..338bbce5a24 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -274,6 +274,19 @@ type TabletFilter interface { IsIncluded(tablet *topodata.Tablet) bool } +// TabletFilters contains filters for tablets. +type TabletFilters []TabletFilter + +// IsIncluded returns true if a tablet passes all filters. +func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool { + for _, filter := range tf { + if !filter.IsIncluded(tablet) { + return false + } + } + return true +} + // FilterByShard is a filter that filters tablets by // keyspace/shard. type FilterByShard struct { diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 1dfdf946e5a..01543d04a41 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -122,10 +122,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { defer ts.Close() fhc := NewFakeHealthCheck(nil) defer fhc.Close() + filter := NewFilterByKeyspace([]string{"keyspace"}) logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -172,10 +173,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias) tw.loadTablets() + // Confirm second tablet triggers ListTablets + AddTablet calls. counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 2762153755) - // Check the new tablet is returned by GetAllTablets(). + // Add a third tablet in a filtered keyspace to the topology. + tablet3 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 3, + }, + Hostname: "host3", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "excluded", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias) + tw.loadTablets() + + // Confirm filtered tablet did not trigger an AddTablet call. + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0}) + checkChecksum(t, tw, 3177315266) + + // Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet. allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet2) assert.Len(t, allTablets, 2) @@ -207,14 +229,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { assert.Contains(t, allTablets, key) assert.True(t, proto.Equal(tablet, allTablets[key])) assert.NotContains(t, allTablets, origKey) - checkChecksum(t, tw, 2762153755) + checkChecksum(t, tw, 3177315266) } else { counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0}) assert.Len(t, allTablets, 2) assert.Contains(t, allTablets, origKey) assert.True(t, proto.Equal(origTablet, allTablets[origKey])) assert.NotContains(t, allTablets, key) - checkChecksum(t, tw, 2762153755) + checkChecksum(t, tw, 3177315266) } // Both tablets restart on different hosts. @@ -269,7 +291,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) - checkChecksum(t, tw, 789108290) + checkChecksum(t, tw, 852159264) allTablets = fhc.GetAllTablets() assert.Len(t, allTablets, 1) @@ -280,8 +302,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { assert.Contains(t, allTablets, key) assert.True(t, proto.Equal(tablet2, allTablets[key])) - // Remove the other and check that it is detected as being gone. + // Remove the other tablets and check that it is detected as being gone. + // Deleting the filtered tablet should not trigger a RemoveTablet call. require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias)) _, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 15228475bfb..4276763961b 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -239,7 +239,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo log.Fatal(err) } - healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "") + healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "", nil) c := &client{ primary: primary, healthCheck: healthCheck, diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index d1846168a43..cafe8b706b2 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -84,7 +84,11 @@ type TabletGateway struct { } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { - return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch) + filters, err := discovery.NewVTGateHealthCheckFilters() + if err != nil { + log.Exit(err) + } + return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters) } // NewTabletGateway creates and returns a new TabletGateway diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 4a682ffd298..fc149fc8b87 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -41,7 +41,7 @@ import ( // These vars store the functions used to create the topo server, healthcheck, // and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck +type healthCheckFactoryFunc func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( @@ -50,8 +50,13 @@ var ( ) func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + // discovery.NewFilterByShard expects a single-shard filter to be in "keyspace|shard" format. + filter, err := discovery.NewFilterByShard([]string{keyspace + "|" + shard}) + if err != nil { + return nil, err + } + return discovery.NewHealthCheck(ctx, discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","), filter), nil } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) @@ -158,9 +163,11 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler ThrottlerInterface + + ctx context.Context + cancel context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -284,7 +291,10 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } + ctx, cancel := context.WithCancel(context.Background()) state := &txThrottlerStateImpl{ + ctx: ctx, + cancel: cancel, config: config, healthCheckCells: config.TxThrottlerHealthCheckCells, tabletTypes: tabletTypes, @@ -295,38 +305,41 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi // get cells from topo if none defined in tabletenv config if len(state.healthCheckCells) == 0 { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cellsCancel() + state.healthCheckCells = fetchKnownCells(cellsCtx, txThrottler.topoServer, target) state.cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + if err := state.initHealthCheckStream(txThrottler.topoServer, target); err != nil { + return nil, err + } + go state.healthChecksProcessor(txThrottler.topoServer, target) state.waitForTermination.Add(1) go state.updateMaxLag() return state, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) (err error) { + ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) + if err != nil { + return err + } ts.healthCheckChan = ts.healthCheck.Subscribe() - + return nil } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.cancel() ts.healthCheck.Close() } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) +func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, target *querypb.Target) error { + fetchCtx, cancel := context.WithTimeout(ts.ctx, topo.RemoteOperationTimeout) defer cancel() knownCells := fetchKnownCells(fetchCtx, topoServer, target) @@ -334,11 +347,12 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + return ts.initHealthCheckStream(topoServer, target) } + return nil } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -347,10 +361,12 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS } for { select { - case <-ctx.Done(): + case <-ts.ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + if err := ts.updateHealthCheckCells(topoServer, target); err != nil { + log.Errorf("txThrottler: failed to update cell list: %+v", err) + } case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index fe352cf96f4..babe71a6135 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -71,8 +71,8 @@ func TestEnabledThrottler(t *testing.T) { hcCall1.Do(func() {}) hcCall2 := mockHealthCheck.EXPECT().Close() hcCall2.After(hcCall1) - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return mockHealthCheck + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + return mockHealthCheck, nil } mockThrottler := NewMockThrottlerInterface(mockCtrl)