Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable http client connection reuse to prevent memory leak #842

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion opensearch-operator/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewOsClusterClient(clusterUrl string, username string, password string, opt
}
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// These options are needed as otherwise connections would be kept and leak memory
// Connection reuse is not really possible due to each reconcile run being independent
DisableKeepAlives: true,
MaxIdleConns: 1,
}
}(),
Addresses: []string{clusterUrl},
Expand Down
33 changes: 11 additions & 22 deletions opensearch-operator/pkg/reconcilers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/builders"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/helpers"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util"
"github.com/cisco-open/operator-tools/pkg/reconciler"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -25,6 +26,7 @@ type ScalerReconciler struct {
recorder record.EventRecorder
reconcilerContext *ReconcilerContext
instance *opsterv1.OpenSearchCluster
ReconcilerOptions
}

func NewScalerReconciler(
Expand All @@ -33,14 +35,17 @@ func NewScalerReconciler(
recorder record.EventRecorder,
reconcilerContext *ReconcilerContext,
instance *opsterv1.OpenSearchCluster,
opts ...reconciler.ResourceReconcilerOption,
opts ...ReconcilerOption,
) *ScalerReconciler {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know what is change doing by adding ReconcilerOption ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That change is needed to, for one, make this reconciler behave more like the others, and also to get the osClientTransport field from the ReconcilerOptions which is a needed argument for the CreateClientForCluster function. In the end this also helps with testing as the RoundTripper can be mocked.

options := ReconcilerOptions{}
options.apply(opts...)
return &ScalerReconciler{
client: k8s.NewK8sClient(client, ctx, append(opts, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler")))...),
client: k8s.NewK8sClient(client, ctx, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler"))),
ctx: ctx,
recorder: recorder,
reconcilerContext: reconcilerContext,
instance: instance,
ReconcilerOptions: options,
}
}

Expand Down Expand Up @@ -187,11 +192,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu
if !smartDecrease {
return false, err
}
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return true, err
}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what is the advantage of switching this to CreateClientForCluster method?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following code also changes to CreateClientForCluster, what is this change doing ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically unifies how an opensearch client can be created so there is no duplicated code and only one way to create a client.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with this only one OpenSearch client is created (until the operator is restarted) and used for all the operator options invoked like create/update on a cluster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new client is created for each reconciler and reconcile loop, but there is only one codepath through which a client can be created. This gives us better control when we want/need to change handling of clients.
Strictly speaking this change is not needed for the bugfix, but as I was checking that code part anyway for the leak I used the chance to unify it.

if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "WARN", "failed to remove node exclude", "Group-%s . failed to remove node exclude %s", nodePoolGroupName, lastReplicaNodeName)
Expand All @@ -209,13 +210,9 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu

func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error {
lg := log.FromContext(r.ctx)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client for scaling")
Expand Down Expand Up @@ -272,12 +269,8 @@ func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, cur
lg := log.FromContext(r.ctx)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
return err
}
Expand Down Expand Up @@ -328,12 +321,8 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu
}

// Gracefully remove nodes
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return nil, err
}
annotations := map[string]string{"cluster-name": r.instance.GetName()}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client")
Expand Down