Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110756: sql,gossip,systemconfigwatcher: miscellaneous cleanup around zone configs r=yuzefovich a=yuzefovich

See individual commits for details.

Epic: None

Release note: None

110986: kvflowcontroller: add trace statement at end of wait r=aadityasondhi a=sumeerbhola

Epic: none

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
3 people committed Sep 25, 2023
3 parents 967fe5e + 6b81d97 + 9379c63 commit b3691a2
Show file tree
Hide file tree
Showing 35 changed files with 97 additions and 210 deletions.
1 change: 0 additions & 1 deletion pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,6 @@ GO_TARGETS = [
"//pkg/server/storage_api:storage_api_test",
"//pkg/server/structlogging:structlogging",
"//pkg/server/structlogging:structlogging_test",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest:systemconfigwatchertest",
"//pkg/server/systemconfigwatcher:systemconfigwatcher",
"//pkg/server/systemconfigwatcher:systemconfigwatcher_test",
"//pkg/server/telemetry:telemetry",
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ go_test(
"//pkg/server",
"//pkg/server/authserver",
"//pkg/server/serverpb",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/lexbase",
Expand Down
6 changes: 0 additions & 6 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
Expand Down Expand Up @@ -417,11 +416,6 @@ func TestTenantInstanceIDReclaimLoop(t *testing.T) {
})
}

func TestSystemConfigWatcherCache(t *testing.T) {
defer leaktest.AfterTest(t)()
systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */)
}

// TestStartTenantWithStaleInstance covers the following scenario:
// - a sql server starts up and is assigned port 'a'
// - the sql server shuts down and releases port 'a'
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/gossipsim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/cmd/gossipsim",
visibility = ["//visibility:private"],
deps = [
"//pkg/config/zonepb",
"//pkg/gossip",
"//pkg/gossip/simulation",
"//pkg/roachpb",
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/gossipsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/gossip/simulation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -302,7 +301,7 @@ func main() {
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

n := simulation.NewNetwork(stopper, nodeCount, true, zonepb.DefaultZoneConfigRef())
n := simulation.NewNetwork(stopper, nodeCount, true)
n.SimulateNetwork(
func(cycle int, network *simulation.Network) bool {
// Output dot graph.
Expand Down
1 change: 0 additions & 1 deletion pkg/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_library(
deps = [
"//pkg/base",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
7 changes: 3 additions & 4 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -61,7 +60,7 @@ func startGossipAtAddr(

server, err := rpc.NewServer(ctx, rpcContext)
require.NoError(t, err)
g := NewTest(nodeID, stopper, registry, zonepb.DefaultZoneConfigRef())
g := NewTest(nodeID, stopper, registry)
RegisterGossipServer(server, g)
ln, err := netutil.ListenAndServeGRPC(stopper, server, addr)
require.NoError(t, err)
Expand Down Expand Up @@ -121,7 +120,7 @@ func startFakeServerGossips(

lserver, err := rpc.NewServer(ctx, lRPCContext)
require.NoError(t, err)
local := NewTest(localNodeID, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
local := NewTest(localNodeID, stopper, metric.NewRegistry())
RegisterGossipServer(lserver, local)
lln, err := netutil.ListenAndServeGRPC(stopper, lserver, util.IsolatedTestAddr)
require.NoError(t, err)
Expand Down Expand Up @@ -479,7 +478,7 @@ func TestClientRegisterWithInitNodeID(t *testing.T) {
server, err := rpc.NewServer(ctx, rpcContext)
require.NoError(t, err)
// node ID must be non-zero
gnode := NewTest(nodeID, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
gnode := NewTest(nodeID, stopper, metric.NewRegistry())
RegisterGossipServer(server, gnode)
g = append(g, gnode)

Expand Down
5 changes: 2 additions & 3 deletions pkg/gossip/convergence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip/simulation"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -58,7 +57,7 @@ func TestConvergence(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

network := simulation.NewNetwork(stopper, testConvergenceSize, true, zonepb.DefaultZoneConfigRef())
network := simulation.NewNetwork(stopper, testConvergenceSize, true)

const maxCycles = 100
if connectedCycle := network.RunUntilFullyConnected(); connectedCycle > maxCycles {
Expand Down Expand Up @@ -89,7 +88,7 @@ func TestNetworkReachesEquilibrium(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

network := simulation.NewNetwork(stopper, testReachesEquilibriumSize, true, zonepb.DefaultZoneConfigRef())
network := simulation.NewNetwork(stopper, testReachesEquilibriumSize, true)

var connsRefused int64
var cyclesWithoutChange int
Expand Down
54 changes: 3 additions & 51 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -242,15 +240,6 @@ type Gossip struct {
bootstrapInterval time.Duration
cullInterval time.Duration

// The system config is treated unlike other info objects.
// It is used so often that we keep an unmarshaled version of it
// here and its own set of callbacks.
// We do not use the infostore to avoid unmarshalling under the
// main gossip lock.
systemConfig *config.SystemConfig
systemConfigMu syncutil.RWMutex
systemConfigChannels []chan<- struct{}

// addresses is a list of bootstrap host addresses for
// connecting to the gossip network.
addressIdx int
Expand All @@ -266,8 +255,6 @@ type Gossip struct {
bootstrapAddrs map[util.UnresolvedAddr]roachpb.NodeID

locality roachpb.Locality

defaultZoneConfig *zonepb.ZoneConfig
}

// New creates an instance of a gossip node.
Expand All @@ -281,7 +268,6 @@ func New(
stopper *stop.Stopper,
registry *metric.Registry,
locality roachpb.Locality,
defaultZoneConfig *zonepb.ZoneConfig,
) *Gossip {
ambient.SetEventLog("gossip", "gossip")
g := &Gossip{
Expand All @@ -298,7 +284,6 @@ func New(
addressExists: map[util.UnresolvedAddr]bool{},
bootstrapAddrs: map[util.UnresolvedAddr]roachpb.NodeID{},
locality: locality,
defaultZoneConfig: defaultZoneConfig,
}

stopper.AddCloser(stop.CloserFn(g.server.AmbientContext.FinishEventLog))
Expand All @@ -307,8 +292,6 @@ func New(

g.mu.Lock()
defer g.mu.Unlock()
// Add ourselves as a SystemConfig watcher.
g.mu.is.registerCallback(KeyDeprecatedSystemConfig, g.updateSystemConfig)
// Add ourselves as a node descriptor watcher.
g.mu.is.registerCallback(MakePrefixPattern(KeyNodeDescPrefix), g.updateNodeAddress)
g.mu.is.registerCallback(MakePrefixPattern(KeyStoreDescPrefix), g.updateStoreMap)
Expand All @@ -318,13 +301,8 @@ func New(

// NewTest is a simplified wrapper around New that creates the
// ClusterIDContainer and NodeIDContainer internally. Used for testing.
func NewTest(
nodeID roachpb.NodeID,
stopper *stop.Stopper,
registry *metric.Registry,
defaultZoneConfig *zonepb.ZoneConfig,
) *Gossip {
return NewTestWithLocality(nodeID, stopper, registry, roachpb.Locality{}, defaultZoneConfig)
func NewTest(nodeID roachpb.NodeID, stopper *stop.Stopper, registry *metric.Registry) *Gossip {
return NewTestWithLocality(nodeID, stopper, registry, roachpb.Locality{})
}

// NewTestWithLocality calls NewTest with an explicit locality value.
Expand All @@ -333,13 +311,12 @@ func NewTestWithLocality(
stopper *stop.Stopper,
registry *metric.Registry,
locality roachpb.Locality,
defaultZoneConfig *zonepb.ZoneConfig,
) *Gossip {
c := &base.ClusterIDContainer{}
n := &base.NodeIDContainer{}
var ac log.AmbientContext
ac.AddLogTag("n", n)
gossip := New(ac, c, n, stopper, registry, locality, defaultZoneConfig)
gossip := New(ac, c, n, stopper, registry, locality)
if nodeID != 0 {
n.Set(context.TODO(), nodeID)
}
Expand Down Expand Up @@ -1086,31 +1063,6 @@ func (g *Gossip) RegisterCallback(pattern string, method Callback, opts ...Callb
}
}

// updateSystemConfig is the raw gossip info callback. Unmarshal the
// system config, and if successful, send on each system config
// channel.
func (g *Gossip) updateSystemConfig(key string, content roachpb.Value) {
ctx := g.AnnotateCtx(context.TODO())
if key != KeyDeprecatedSystemConfig {
log.Fatalf(ctx, "wrong key received on SystemConfig callback: %s", key)
}
cfg := config.NewSystemConfig(g.defaultZoneConfig)
if err := content.GetProto(&cfg.SystemConfigEntries); err != nil {
log.Errorf(ctx, "could not unmarshal system config on callback: %s", err)
return
}

g.systemConfigMu.Lock()
defer g.systemConfigMu.Unlock()
g.systemConfig = cfg
for _, c := range g.systemConfigChannels {
select {
case c <- struct{}{}:
default:
}
}
}

// Incoming returns a slice of incoming gossip client connection
// node IDs.
func (g *Gossip) Incoming() []roachpb.NodeID {
Expand Down
11 changes: 5 additions & 6 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -44,7 +43,7 @@ func TestGossipInfoStore(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
g := NewTest(1, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
g := NewTest(1, stopper, metric.NewRegistry())
slice := []byte("b")
if err := g.AddInfo("s", slice, time.Hour); err != nil {
t.Fatal(err)
Expand All @@ -64,7 +63,7 @@ func TestGossipMoveNode(t *testing.T) {
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
g := NewTest(1, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
g := NewTest(1, stopper, metric.NewRegistry())
var nodes []*roachpb.NodeDescriptor
for i := 1; i <= 3; i++ {
node := &roachpb.NodeDescriptor{
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestGossipGetNextBootstrapAddress(t *testing.T) {
util.MakeUnresolvedAddr("tcp", "localhost:9004"),
}

g := NewTest(0, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
g := NewTest(0, stopper, metric.NewRegistry())
g.setAddresses(addresses)

// Using specified addresses, fetch bootstrap addresses 3 times
Expand Down Expand Up @@ -173,7 +172,7 @@ func TestGossipLocalityResolver(t *testing.T) {
var node2LocalityList []roachpb.LocalityAddress
node2LocalityList = append(node2LocalityList, nodeLocalityAddress2)

g := NewTestWithLocality(1, stopper, metric.NewRegistry(), gossipLocalityAdvertiseList, zonepb.DefaultZoneConfigRef())
g := NewTestWithLocality(1, stopper, metric.NewRegistry(), gossipLocalityAdvertiseList)
node1 := &roachpb.NodeDescriptor{
NodeID: 1,
Address: node1PublicAddressRPC,
Expand Down Expand Up @@ -711,7 +710,7 @@ func TestGossipJoinTwoClusters(t *testing.T) {
require.NoError(t, err)

// node ID must be non-zero
gnode := NewTest(roachpb.NodeID(i+1), stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
gnode := NewTest(roachpb.NodeID(i+1), stopper, metric.NewRegistry())
RegisterGossipServer(server, gnode)
g = append(g, gnode)
gnode.SetStallInterval(interval)
Expand Down
1 change: 0 additions & 1 deletion pkg/gossip/simulation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/gossip/simulation",
visibility = ["//visibility:public"],
deps = [
"//pkg/config/zonepb",
"//pkg/gossip",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
11 changes: 4 additions & 7 deletions pkg/gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -58,9 +57,7 @@ type Network struct {
}

// NewNetwork creates nodeCount gossip nodes.
func NewNetwork(
stopper *stop.Stopper, nodeCount int, createAddresses bool, defaultZoneConfig *zonepb.ZoneConfig,
) *Network {
func NewNetwork(stopper *stop.Stopper, nodeCount int, createAddresses bool) *Network {
ctx := context.TODO()
log.Infof(ctx, "simulating gossip network with %d nodes", nodeCount)

Expand Down Expand Up @@ -89,7 +86,7 @@ func NewNetwork(
n.RPCContext.StorageClusterID.Set(context.TODO(), uuid.MakeV4())

for i := 0; i < nodeCount; i++ {
node, err := n.CreateNode(defaultZoneConfig)
node, err := n.CreateNode()
if err != nil {
log.Fatalf(context.TODO(), "%v", err)
}
Expand All @@ -103,7 +100,7 @@ func NewNetwork(
}

// CreateNode creates a simulation node and starts an RPC server for it.
func (n *Network) CreateNode(defaultZoneConfig *zonepb.ZoneConfig) (*Node, error) {
func (n *Network) CreateNode() (*Node, error) {
ctx := context.TODO()
server, err := rpc.NewServer(ctx, n.RPCContext)
if err != nil {
Expand All @@ -114,7 +111,7 @@ func (n *Network) CreateNode(defaultZoneConfig *zonepb.ZoneConfig) (*Node, error
return nil, err
}
node := &Node{Server: server, Listener: ln, Registry: metric.NewRegistry()}
node.Gossip = gossip.NewTest(0, n.Stopper, node.Registry, defaultZoneConfig)
node.Gossip = gossip.NewTest(0, n.Stopper, node.Registry)
gossip.RegisterGossipServer(server, node.Gossip)
n.Stopper.AddCloser(stop.CloserFn(server.Stop))
_ = n.Stopper.RunAsyncTask(ctx, "node-wait-quiesce", func(context.Context) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/gossip/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/gossip/simulation"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -96,8 +95,7 @@ func TestGossipStorage(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

defaultZoneConfig := zonepb.DefaultZoneConfigRef()
network := simulation.NewNetwork(stopper, 3, true, defaultZoneConfig)
network := simulation.NewNetwork(stopper, 3, true)

// Set storage for each of the nodes.
addresses := make(unresolvedAddrSlice, len(network.Nodes))
Expand Down Expand Up @@ -154,7 +152,7 @@ func TestGossipStorage(t *testing.T) {

// Create an unaffiliated gossip node with only itself as an address,
// leaving it no way to reach the gossip network.
node, err := network.CreateNode(defaultZoneConfig)
node, err := network.CreateNode()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -211,7 +209,7 @@ func TestGossipStorageCleanup(t *testing.T) {
defer stopper.Stop(context.Background())

const numNodes = 3
network := simulation.NewNetwork(stopper, numNodes, false, zonepb.DefaultZoneConfigRef())
network := simulation.NewNetwork(stopper, numNodes, false)

const notReachableAddr = "localhost:0"
const invalidAddr = "10.0.0.1000:3333333"
Expand Down
Loading

0 comments on commit b3691a2

Please sign in to comment.