From f2ca60129b4c7232247a1826c6b7fba5f02f1f01 Mon Sep 17 00:00:00 2001 From: Sebastian Woehrl Date: Mon, 10 Jun 2024 08:46:24 +0200 Subject: [PATCH] Disable http client connection reuse to prevent memory leak Signed-off-by: Sebastian Woehrl --- .../api/v1/zz_generated.deepcopy.go | 22 ++++++++++++- .../opensearch-gateway/services/os_client.go | 4 +++ opensearch-operator/pkg/reconcilers/scaler.go | 33 +++++++------------ 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/opensearch-operator/api/v1/zz_generated.deepcopy.go b/opensearch-operator/api/v1/zz_generated.deepcopy.go index da195040..055616a0 100644 --- a/opensearch-operator/api/v1/zz_generated.deepcopy.go +++ b/opensearch-operator/api/v1/zz_generated.deepcopy.go @@ -402,7 +402,7 @@ func (in *Condition) DeepCopyInto(out *Condition) { if in.Cron != nil { in, out := &in.Cron, &out.Cron *out = new(Cron) - **out = **in + (*in).DeepCopyInto(*out) } if in.MinDocCount != nil { in, out := &in.MinDocCount, &out.MinDocCount @@ -454,6 +454,11 @@ func (in *ConfMgmt) DeepCopy() *ConfMgmt { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Cron) DeepCopyInto(out *Cron) { *out = *in + if in.CronDetails != nil { + in, out := &in.CronDetails, &out.CronDetails + *out = new(CronDetails) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Cron. @@ -466,6 +471,21 @@ func (in *Cron) DeepCopy() *Cron { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CronDetails) DeepCopyInto(out *CronDetails) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CronDetails. +func (in *CronDetails) DeepCopy() *CronDetails { + if in == nil { + return nil + } + out := new(CronDetails) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DashboardsConfig) DeepCopyInto(out *DashboardsConfig) { *out = *in diff --git a/opensearch-operator/opensearch-gateway/services/os_client.go b/opensearch-operator/opensearch-gateway/services/os_client.go index b12729f9..081c6fa2 100644 --- a/opensearch-operator/opensearch-gateway/services/os_client.go +++ b/opensearch-operator/opensearch-gateway/services/os_client.go @@ -73,6 +73,10 @@ func NewOsClusterClient(clusterUrl string, username string, password string, opt } return &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + // These options are needed as otherwise connections would be kept and leak memory + // Connection reuse is not really possible due to each reconcile run being independent + DisableKeepAlives: true, + MaxIdleConns: 1, } }(), Addresses: []string{clusterUrl}, diff --git a/opensearch-operator/pkg/reconcilers/scaler.go b/opensearch-operator/pkg/reconcilers/scaler.go index 297e53d0..3223a80a 100644 --- a/opensearch-operator/pkg/reconcilers/scaler.go +++ b/opensearch-operator/pkg/reconcilers/scaler.go @@ -10,6 +10,7 @@ import ( "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/builders" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/helpers" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util" "github.com/cisco-open/operator-tools/pkg/reconciler" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/tools/record" @@ -25,6 +26,7 @@ type ScalerReconciler struct { recorder record.EventRecorder reconcilerContext *ReconcilerContext instance *opsterv1.OpenSearchCluster + ReconcilerOptions } func NewScalerReconciler( @@ -33,14 +35,17 @@ func NewScalerReconciler( recorder record.EventRecorder, reconcilerContext *ReconcilerContext, instance *opsterv1.OpenSearchCluster, - opts ...reconciler.ResourceReconcilerOption, + opts ...ReconcilerOption, ) *ScalerReconciler { + options := ReconcilerOptions{} + options.apply(opts...) return &ScalerReconciler{ - client: k8s.NewK8sClient(client, ctx, append(opts, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler")))...), + client: k8s.NewK8sClient(client, ctx, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler"))), ctx: ctx, recorder: recorder, reconcilerContext: reconcilerContext, instance: instance, + ReconcilerOptions: options, } } @@ -187,11 +192,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu if !smartDecrease { return false, err } - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return true, err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "WARN", "failed to remove node exclude", "Group-%s . failed to remove node exclude %s", nodePoolGroupName, lastReplicaNodeName) @@ -209,13 +210,9 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error { lg := log.FromContext(r.ctx) - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) annotations := map[string]string{"cluster-name": r.instance.GetName()} - if err != nil { - return err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client for scaling") @@ -272,12 +269,8 @@ func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, cur lg := log.FromContext(r.ctx) annotations := map[string]string{"cluster-name": r.instance.GetName()} lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1) - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return err - } - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { return err } @@ -328,12 +321,8 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu } // Gracefully remove nodes - username, password, err := helpers.UsernameAndPassword(r.client, r.instance) - if err != nil { - return nil, err - } annotations := map[string]string{"cluster-name": r.instance.GetName()} - clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password) + clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport) if err != nil { lg.Error(err, "failed to create os client") r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client")