Skip to content

Commit

Permalink
empty-zones
Browse files Browse the repository at this point in the history
  • Loading branch information
08volt committed Sep 4, 2024
1 parent f19f995 commit 726a46f
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 16 deletions.
12 changes: 11 additions & 1 deletion pkg/instancegroups/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,17 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
// https://github.com/kubernetes/cloud-provider-gcp/blob/fca628cb3bf9267def0abb509eaae87d2d4040f3/providers/gce/gce_loadbalancer_internal.go#L606C1-L675C1
// the m.maxIGSize should be set to 1000 as is in the cloud-provider-gcp.
zonedNodes := m.splitNodesByZone(nodes, iglogger)
iglogger.Info(fmt.Sprintf("Syncing nodes: %d nodes over %d zones", len(nodes), len(zonedNodes)))

emptyZoneNodesNames := sets.NewString(zonedNodes[""]...)
if len(emptyZoneNodesNames) > 0 {
iglogger.Info(fmt.Sprintf("%d nodes have empty zone: %v", len(emptyZoneNodesNames), emptyZoneNodesNames))
}

for zone, kubeNodesFromZone := range zonedNodes {
if zone == "" {
continue // skip ensuring instance group for empty zone
}
igName := m.namer.InstanceGroup()
if len(kubeNodesFromZone) > m.maxIGSize {
sortedKubeNodesFromZone := sets.NewString(kubeNodesFromZone...).List()
Expand All @@ -335,7 +345,7 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
gceNodes.Insert(instance)
}

removeNodes := gceNodes.Difference(kubeNodes).List()
removeNodes := gceNodes.Difference(kubeNodes).Difference(emptyZoneNodesNames).List() // Do not remove nodes which zone label still need to be assigned
addNodes := kubeNodes.Difference(gceNodes).List()

iglogger.V(2).Info("Removing nodes", "removeNodes", events.TruncatedStringList(removeNodes))
Expand Down
103 changes: 103 additions & 0 deletions pkg/instancegroups/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
)
Expand Down Expand Up @@ -59,6 +60,108 @@ func newNodePool(f Provider, maxIGSize int) Manager {
return pool
}

func TestNodePoolSyncWithEmptyZone(t *testing.T) {
maxIGSize := 1000

names1001 := make([]string, maxIGSize+1)
for i := 1; i <= maxIGSize+1; i++ {
names1001[i-1] = fmt.Sprintf("n%d", i)
}

testCases := []struct {
gceNodes []string
kubeNodes []string
kubeZones []string
shouldSkipSync bool
}{
{
gceNodes: []string{"n1", "n2"},
kubeNodes: []string{"n1", "n2"},
kubeZones: []string{defaultTestZone, ""},
},
}

for _, testCase := range testCases {
// create fake gce node pool with existing gceNodes
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
zonesToIGs := map[string]IGsToInstances{
defaultTestZone: {
ig: sets.NewString(testCase.gceNodes...),
},
}
fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize)

pool := newNodePool(fakeGCEInstanceGroups, maxIGSize)
for i, kubeNode := range testCase.kubeNodes {
manager := pool.(*manager)
zonegetter.AddFakeNodes(manager.ZoneGetter, testCase.kubeZones[i], kubeNode)
}

igName := defaultNamer.InstanceGroup()
ports := []int64{80}
_, err := pool.EnsureInstanceGroupsAndPorts(igName, ports, klog.TODO())
if err != nil {
t.Fatalf("pool.EnsureInstanceGroupsAndPorts(%s, %v) returned error %v, want nil", igName, ports, err)
}

// run sync with expected kubeNodes
apiCallsCountBeforeSync := len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes, klog.TODO())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes, err)
}

// run assertions
apiCallsCountAfterSync := len(fakeGCEInstanceGroups.calls)
if testCase.shouldSkipSync && apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}

// should not remove nodes
for _, value := range fakeGCEInstanceGroups.calls {
if value == utils.RemoveInstances {
t.Errorf("Should not remove instances. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
break
}
}

instancesList, err := fakeGCEInstanceGroups.ListInstancesInInstanceGroup(ig.Name, defaultTestZone, allInstances)
if err != nil {
t.Fatalf("fakeGCEInstanceGroups.ListInstancesInInstanceGroup(%s, %s, %s) returned error %v, want nil", ig.Name, defaultTestZone, allInstances, err)
}
instances, err := test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("test.InstancesListToNameSet(%v) returned error %v, want nil", ig, err)
}

expectedInstancesSize := len(testCase.kubeNodes)
if len(testCase.kubeNodes) > maxIGSize {
// If kubeNodes bigger than maximum instance group size, resulted instances
// should be truncated to flags.F.MaxIgSize
expectedInstancesSize = maxIGSize
}
if instances.Len() != expectedInstancesSize {
t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize)
}

kubeNodeSet := sets.NewString(testCase.kubeNodes...)
if !kubeNodeSet.IsSuperset(instances) {
t.Errorf("kubeNodes = %v is not superset of instances = %v", testCase.kubeNodes, instances)
}

// call sync one more time and check that it will be no-op and will not cause any api calls
apiCallsCountBeforeSync = len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes, klog.TODO())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes, err)
}
apiCallsCountAfterSync = len(fakeGCEInstanceGroups.calls)
if apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync if called second time with the same kubeNodes. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}
}
}

func TestNodePoolSync(t *testing.T) {
maxIGSize := 1000

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == "" {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down Expand Up @@ -178,7 +178,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == "" {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down
16 changes: 9 additions & 7 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func getEndpointZone(endpointAddress negtypes.AddressData, zoneGetter *zonegette
return zone, count, fmt.Errorf("%w: %w", negtypes.ErrEPNodePodCIDRNotSet, err)
}
// providerID missing in node or zone information missing in providerID.
if errors.Is(err, zonegetter.ErrProviderIDNotFound) || errors.Is(err, zonegetter.ErrSplitProviderID) {
if errors.Is(err, zonegetter.ErrProviderIDNotFound) || errors.Is(err, zonegetter.ErrSplitProviderID) || zone == "" {
count[negtypes.ZoneMissing]++
return zone, count, fmt.Errorf("%w: zone is missing for node %v", negtypes.ErrEPZoneMissing, *endpointAddress.NodeName)
}
Expand Down Expand Up @@ -466,12 +466,14 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
continue
}
zone, getZoneErr := zoneGetter.ZoneForNode(nodeName, logger)
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
if getZoneErr != nil || zone == "" {
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
}
}
epLogger.Error(getZoneErr, "Endpoint's corresponding node does not have valid zone information, skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeNotFound]++
Expand Down
7 changes: 2 additions & 5 deletions pkg/utils/zonegetter/zone_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var ErrNodePodCIDRNotSet = errors.New("Node does not have PodCIDR set")

// providerIDRE is the regex to process providerID.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$`)
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]*)/([^/]+)$`)

// ZoneGetter manages lookups for GCE instances to zones.
type ZoneGetter struct {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err
zones := sets.String{}
for _, n := range nodes {
zone, err := getZone(n)
if err != nil {
if err != nil || zone == "" {
filterLogger.Error(err, "Failed to get zone from providerID", "nodeName", n.Name)
continue
}
Expand Down Expand Up @@ -325,9 +325,6 @@ func getZone(node *api_v1.Node) (string, error) {
if len(matches) != 4 {
return "", fmt.Errorf("%w: providerID %q of node %s is not valid", ErrSplitProviderID, node.Spec.ProviderID, node.Name)
}
if matches[2] == "" {
return "", fmt.Errorf("%w: node %s has an empty zone", ErrSplitProviderID, node.Name)
}
return matches[2], nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/zonegetter/zone_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestZoneForNode(t *testing.T) {
desc: "Node with empty zone in providerID",
nodeName: "instance-empty-zone-providerID",
expectZone: "",
expectErr: ErrSplitProviderID,
expectErr: nil,
},
}
for _, tc := range testCases {
Expand Down

0 comments on commit 726a46f

Please sign in to comment.