Skip to content

Commit

Permalink
Support for Nodes with Empty Zone
Browse files Browse the repository at this point in the history
Nodes can have empty zone if it not has been assigned yet.

The zone was retrieved from providerID using the following Regular Expression Matching and taking the second match: ```var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$`)```

In the case of empty zone it was generating an Error.
In order to allow an empty zone to match the Regular Expression, it has been updated to:

```var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]*)/([^/]+)$`)```

After that, I updated all the instances where the code was checking the error, to check also for the empty zone.

I updated the Instance Group Manager to not remove the nodes which do not have a zone assigned.
  • Loading branch information
08volt committed Sep 4, 2024
1 parent f19f995 commit 0b2fcc6
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 20 deletions.
17 changes: 16 additions & 1 deletion pkg/instancegroups/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,17 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
// https://github.com/kubernetes/cloud-provider-gcp/blob/fca628cb3bf9267def0abb509eaae87d2d4040f3/providers/gce/gce_loadbalancer_internal.go#L606C1-L675C1
// the m.maxIGSize should be set to 1000 as is in the cloud-provider-gcp.
zonedNodes := m.splitNodesByZone(nodes, iglogger)
iglogger.Info(fmt.Sprintf("Syncing nodes: %d nodes over %d zones", len(nodes), len(zonedNodes)))

emptyZoneNodesNames := sets.NewString(zonedNodes[zonegetter.EmptyZone]...)
if len(emptyZoneNodesNames) > 0 {
iglogger.Info(fmt.Sprintf("%d nodes have empty zone: %v. They will not be removed from instance group as long as zone is missing", len(emptyZoneNodesNames), emptyZoneNodesNames))
}

for zone, kubeNodesFromZone := range zonedNodes {
if zone == zonegetter.EmptyZone {
continue // skip ensuring instance group for empty zone
}
igName := m.namer.InstanceGroup()
if len(kubeNodesFromZone) > m.maxIGSize {
sortedKubeNodesFromZone := sets.NewString(kubeNodesFromZone...).List()
Expand All @@ -335,7 +345,12 @@ func (m *manager) Sync(nodes []string, logger klog.Logger) (err error) {
gceNodes.Insert(instance)
}

removeNodes := gceNodes.Difference(kubeNodes).List()
removalCandidates := gceNodes.Difference(kubeNodes)
iglogger.V(2).Info("Nodes that are removal candidates", "removalCandidates", events.TruncatedStringList(removalCandidates.List()))

removeNodes := removalCandidates.Difference(emptyZoneNodesNames).List() // Do not remove nodes which zone label still need to be assigned
iglogger.V(2).Info("Removing nodes (after ignoring nodes without zone assigned)", "removeNodes", events.TruncatedStringList(removeNodes)) // Do not remove nodes which zone label still need to be assigned

addNodes := kubeNodes.Difference(gceNodes).List()

iglogger.V(2).Info("Removing nodes", "removeNodes", events.TruncatedStringList(removeNodes))
Expand Down
103 changes: 103 additions & 0 deletions pkg/instancegroups/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/zonegetter"
)
Expand Down Expand Up @@ -59,6 +60,108 @@ func newNodePool(f Provider, maxIGSize int) Manager {
return pool
}

func TestNodePoolSyncWithEmptyZone(t *testing.T) {
maxIGSize := 1000

names1001 := make([]string, maxIGSize+1)
for i := 1; i <= maxIGSize+1; i++ {
names1001[i-1] = fmt.Sprintf("n%d", i)
}

testCases := []struct {
gceNodes []string
kubeNodes []string
kubeZones []string
shouldSkipSync bool
}{
{
gceNodes: []string{"n1", "n2"},
kubeNodes: []string{"n1", "n2"},
kubeZones: []string{defaultTestZone, ""},
},
}

for _, testCase := range testCases {
// create fake gce node pool with existing gceNodes
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
zonesToIGs := map[string]IGsToInstances{
defaultTestZone: {
ig: sets.NewString(testCase.gceNodes...),
},
}
fakeGCEInstanceGroups := NewFakeInstanceGroups(zonesToIGs, maxIGSize)

pool := newNodePool(fakeGCEInstanceGroups, maxIGSize)
for i, kubeNode := range testCase.kubeNodes {
manager := pool.(*manager)
zonegetter.AddFakeNodes(manager.ZoneGetter, testCase.kubeZones[i], kubeNode)
}

igName := defaultNamer.InstanceGroup()
ports := []int64{80}
_, err := pool.EnsureInstanceGroupsAndPorts(igName, ports, klog.TODO())
if err != nil {
t.Fatalf("pool.EnsureInstanceGroupsAndPorts(%s, %v) returned error %v, want nil", igName, ports, err)
}

// run sync with expected kubeNodes
apiCallsCountBeforeSync := len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes, klog.TODO())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes, err)
}

// run assertions
apiCallsCountAfterSync := len(fakeGCEInstanceGroups.calls)
if testCase.shouldSkipSync && apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}

// should not remove nodes
for _, value := range fakeGCEInstanceGroups.calls {
if value == utils.RemoveInstances {
t.Errorf("Should not remove instances. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
break
}
}

instancesList, err := fakeGCEInstanceGroups.ListInstancesInInstanceGroup(ig.Name, defaultTestZone, allInstances)
if err != nil {
t.Fatalf("fakeGCEInstanceGroups.ListInstancesInInstanceGroup(%s, %s, %s) returned error %v, want nil", ig.Name, defaultTestZone, allInstances, err)
}
instances, err := test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("test.InstancesListToNameSet(%v) returned error %v, want nil", ig, err)
}

expectedInstancesSize := len(testCase.kubeNodes)
if len(testCase.kubeNodes) > maxIGSize {
// If kubeNodes bigger than maximum instance group size, resulted instances
// should be truncated to flags.F.MaxIgSize
expectedInstancesSize = maxIGSize
}
if instances.Len() != expectedInstancesSize {
t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize)
}

kubeNodeSet := sets.NewString(testCase.kubeNodes...)
if !kubeNodeSet.IsSuperset(instances) {
t.Errorf("kubeNodes = %v is not superset of instances = %v", testCase.kubeNodes, instances)
}

// call sync one more time and check that it will be no-op and will not cause any api calls
apiCallsCountBeforeSync = len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes, klog.TODO())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes, err)
}
apiCallsCountAfterSync = len(fakeGCEInstanceGroups.calls)
if apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync if called second time with the same kubeNodes. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}
}
}

func TestNodePoolSync(t *testing.T) {
maxIGSize := 1000

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == zonegetter.EmptyZone {
l.logger.Error(err, "Unable to find zone for node, skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down Expand Up @@ -178,7 +178,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
continue
}
zone, err := l.zoneGetter.ZoneForNode(node.Name, l.logger)
if err != nil {
if err != nil || zone == zonegetter.EmptyZone {
l.logger.Error(err, "Unable to find zone for node skipping", "nodeName", node.Name)
metrics.PublishNegControllerErrorCountMetrics(err, true)
continue
Expand Down
14 changes: 8 additions & 6 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,14 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
continue
}
zone, getZoneErr := zoneGetter.ZoneForNode(nodeName, logger)
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
if getZoneErr != nil || zone == zonegetter.EmptyZone {
if getZoneErr != nil {
metrics.PublishNegControllerErrorCountMetrics(getZoneErr, true)
if enableMultiSubnetCluster && errors.Is(getZoneErr, zonegetter.ErrNodeNotInDefaultSubnet) {
epLogger.Error(getZoneErr, "Detected endpoint not from default subnet. Skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeInNonDefaultSubnet]++
continue
}
}
epLogger.Error(getZoneErr, "Endpoint's corresponding node does not have valid zone information, skipping", "nodeName", nodeName)
localEPCount[negtypes.NodeNotFound]++
Expand Down
8 changes: 3 additions & 5 deletions pkg/utils/zonegetter/zone_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
AllNodesFilter = Filter("AllNodesFilter")
CandidateNodesFilter = Filter("CandidateNodesFilter")
CandidateAndUnreadyNodesFilter = Filter("CandidateAndUnreadyNodesFilter")
EmptyZone = ""
)

var ErrProviderIDNotFound = errors.New("providerID not found")
Expand All @@ -58,7 +59,7 @@ var ErrNodePodCIDRNotSet = errors.New("Node does not have PodCIDR set")

// providerIDRE is the regex to process providerID.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]+)/([^/]+)$`)
var providerIDRE = regexp.MustCompile(`^` + "gce" + `://([^/]+)/([^/]*)/([^/]+)$`)

// ZoneGetter manages lookups for GCE instances to zones.
type ZoneGetter struct {
Expand Down Expand Up @@ -161,7 +162,7 @@ func (z *ZoneGetter) ListZones(filter Filter, logger klog.Logger) ([]string, err
zones := sets.String{}
for _, n := range nodes {
zone, err := getZone(n)
if err != nil {
if err != nil || zone == EmptyZone {
filterLogger.Error(err, "Failed to get zone from providerID", "nodeName", n.Name)
continue
}
Expand Down Expand Up @@ -325,9 +326,6 @@ func getZone(node *api_v1.Node) (string, error) {
if len(matches) != 4 {
return "", fmt.Errorf("%w: providerID %q of node %s is not valid", ErrSplitProviderID, node.Spec.ProviderID, node.Name)
}
if matches[2] == "" {
return "", fmt.Errorf("%w: node %s has an empty zone", ErrSplitProviderID, node.Name)
}
return matches[2], nil
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/utils/zonegetter/zone_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestListZones(t *testing.T) {
t.Errorf("For test case %q with onlyIncludeDefaultSubnetNodes = %v, got %d zones, want %d zones", tc.desc, enableMultiSubnetCluster, len(zones), tc.expectLen)
}
for _, zone := range zones {
if zone == "" {
if zone == EmptyZone {
t.Errorf("For test case %q with onlyIncludeDefaultSubnetNodes = %v, got an empty zone,", tc.desc, enableMultiSubnetCluster)
}
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestListZonesMultipleSubnets(t *testing.T) {
t.Errorf("For test case %q with multi subnet cluster enabled, got %d zones, want %d zones", tc.desc, len(zones), tc.expectLen)
}
for _, zone := range zones {
if zone == "" {
if zone == EmptyZone {
t.Errorf("For test case %q with multi subnet cluster enabled, got an empty zone,", tc.desc)
}
}
Expand Down Expand Up @@ -239,8 +239,8 @@ func TestZoneForNode(t *testing.T) {
{
desc: "Node with empty zone in providerID",
nodeName: "instance-empty-zone-providerID",
expectZone: "",
expectErr: ErrSplitProviderID,
expectZone: EmptyZone,
expectErr: nil,
},
}
for _, tc := range testCases {
Expand Down Expand Up @@ -355,8 +355,8 @@ func TestGetZone(t *testing.T) {
ProviderID: "gce://foo-project//bar-node",
},
},
expectZone: "",
expectErr: ErrSplitProviderID,
expectZone: EmptyZone,
expectErr: nil,
},
} {
zone, err := getZone(&tc.node)
Expand Down

0 comments on commit 0b2fcc6

Please sign in to comment.