diff --git a/.chloggen/single-callback-k8sresolver.yaml b/.chloggen/single-callback-k8sresolver.yaml new file mode 100644 index 000000000000..b0eb561b1d46 --- /dev/null +++ b/.chloggen/single-callback-k8sresolver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The k8sresolver in loadbalancingexporter was triggering exporter churn in the way the change event was handled. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35658] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go index 0eac62ea40d2..186111eba4d5 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_handler.go +++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go @@ -25,7 +25,7 @@ type handler struct { } func (h handler) OnAdd(obj any, _ bool) { - var endpoints []string + var endpoints map[string]bool switch object := obj.(type) { case *corev1.Endpoints: @@ -36,7 +36,7 @@ func (h handler) OnAdd(obj any, _ bool) { return } changed := false - for _, ep := range endpoints { + for ep := range endpoints { if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded { changed = true } @@ -49,28 +49,36 @@ func (h handler) OnAdd(obj any, _ bool) { func (h handler) OnUpdate(oldObj, newObj any) { switch oldEps := oldObj.(type) { case *corev1.Endpoints: - epRemove := convertToEndpoints(oldEps) - for _, ep := range epRemove { - h.endpoints.Delete(ep) - } - if len(epRemove) > 0 { - _, _ = h.callback(context.Background()) - } - newEps, ok := newObj.(*corev1.Endpoints) if !ok { h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", newObj)) h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet)) return } + + oldEndpoints := convertToEndpoints(oldEps) + newEndpoints := convertToEndpoints(newEps) changed := false - for _, ep := range convertToEndpoints(newEps) { + + // Iterate through old endpoints and remove those that are not in the new list. + for ep := range oldEndpoints { + if _, ok := newEndpoints[ep]; !ok { + h.endpoints.Delete(ep) + changed = true + } + } + + // Iterate through new endpoints and add those that are not in the endpoints map already. + for ep := range newEndpoints { if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded { changed = true } } + if changed { _, _ = h.callback(context.Background()) + } else { + h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps)) } default: // unsupported h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj)) @@ -80,7 +88,7 @@ func (h handler) OnUpdate(oldObj, newObj any) { } func (h handler) OnDelete(obj any) { - var endpoints []string + var endpoints map[string]bool switch object := obj.(type) { case *cache.DeletedFinalStateUnknown: h.OnDelete(object.Obj) @@ -95,19 +103,19 @@ func (h handler) OnDelete(obj any) { return } if len(endpoints) != 0 { - for _, endpoint := range endpoints { + for endpoint := range endpoints { h.endpoints.Delete(endpoint) } _, _ = h.callback(context.Background()) } } -func convertToEndpoints(eps ...*corev1.Endpoints) []string { - var ipAddress []string +func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool { + ipAddress := map[string]bool{} for _, ep := range eps { for _, subsets := range ep.Subsets { for _, addr := range subsets.Addresses { - ipAddress = append(ipAddress, addr.IP) + ipAddress[addr.IP] = true } } } diff --git a/exporter/loadbalancingexporter/resolver_k8s_test.go b/exporter/loadbalancingexporter/resolver_k8s_test.go index 71cec20f9bfd..5a4e77dd593b 100644 --- a/exporter/loadbalancingexporter/resolver_k8s_test.go +++ b/exporter/loadbalancingexporter/resolver_k8s_test.go @@ -77,6 +77,7 @@ func TestK8sResolve(t *testing.T) { name string args args simulateFn func(*suiteContext, args) error + onChangeFn func([]string) verifyFn func(*suiteContext, args) error }{ { @@ -116,6 +117,41 @@ func TestK8sResolve(t *testing.T) { return nil }, }, + { + name: "simulate re-list that does not change endpoints", + args: args{ + logger: zap.NewNop(), + service: "lb", + namespace: "default", + ports: []int32{8080, 9090}, + }, + simulateFn: func(suiteCtx *suiteContext, args args) error { + exist := suiteCtx.endpoint.DeepCopy() + patch := client.MergeFrom(exist) + data, err := patch.Data(exist) + if err != nil { + return err + } + _, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace). + Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{}) + return err + }, + onChangeFn: func([]string) { + assert.Fail(t, "should not call onChange") + }, + verifyFn: func(ctx *suiteContext, _ args) error { + if _, err := ctx.resolver.resolve(context.Background()); err != nil { + return err + } + + assert.Equal(t, []string{ + "192.168.10.100:8080", + "192.168.10.100:9090", + }, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal") + + return nil + }, + }, { name: "simulate change the backend ip address", args: args{ @@ -177,6 +213,10 @@ func TestK8sResolve(t *testing.T) { suiteCtx, teardownSuite := setupSuite(t, tt.args) defer teardownSuite(t) + if tt.onChangeFn != nil { + suiteCtx.resolver.onChange(tt.onChangeFn) + } + err := tt.simulateFn(suiteCtx, tt.args) assert.NoError(t, err)