From a9ce87e2a26c4e3c5b536e2b70a5d0df14ca5cf7 Mon Sep 17 00:00:00 2001 From: Ilias Rinis Date: Mon, 2 Dec 2024 17:01:09 +0100 Subject: [PATCH] workload: add deletion condition func to decide whether to delete a workload --- .../apiserver/controller/workload/workload.go | 121 +++++++++++++++++- .../controllerset/apiservercontrollerset.go | 40 ++++++ 2 files changed, 157 insertions(+), 4 deletions(-) diff --git a/pkg/operator/apiserver/controller/workload/workload.go b/pkg/operator/apiserver/controller/workload/workload.go index e6e0ba8e2f..560401d694 100644 --- a/pkg/operator/apiserver/controller/workload/workload.go +++ b/pkg/operator/apiserver/controller/workload/workload.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/labels" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/kubernetes" + appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -68,6 +69,11 @@ type Controller struct { queue workqueue.RateLimitingInterface versionRecorder status.VersionGetter preRunCachesSynced []cache.InformerSynced + + // deletionConditionFn checks whether the operand workload of the controller should be deleted; + // it also returns the name of said workload to be used for deletion + deletionConditionFn func() (bool, string, error) + deploymentLister appsv1listers.DeploymentLister } // NewController creates a brand new Controller instance. @@ -83,8 +89,51 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera kubeClient kubernetes.Interface, podLister corev1listers.PodLister, informers []factory.Informer, - tagetNamespaceInformers []factory.Informer, + targetNamespaceInformers []factory.Informer, + delegate Delegate, + openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface, + eventRecorder events.Recorder, + versionRecorder status.VersionGetter, +) factory.Controller { + controllerRef := &Controller{ + controllerInstanceName: factory.ControllerInstanceName(instanceName, "Workload"), + operatorNamespace: operatorNamespace, + targetNamespace: targetNamespace, + targetOperandVersion: targetOperandVersion, + operandNamePrefix: operandNamePrefix, + conditionsPrefix: conditionsPrefix, + operatorClient: operatorClient, + kubeClient: kubeClient, + podsLister: podLister, + delegate: delegate, + openshiftClusterConfigClient: openshiftClusterConfigClient, + versionRecorder: versionRecorder, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), instanceName), + } + + return newController(controllerRef, informers, targetNamespaceInformers, eventRecorder) +} + +// NewControllerWithDeletion creates a brand new Controller instance, which includes a deletion condition. +// +// the "instanceName" param will be used to set conditions in the status field. It will be suffixed with "WorkloadController", +// so it can end up in the condition in the form of "OAuthAPIWorkloadControllerDeploymentAvailable" +// +// the "operatorNamespace" is used to set "version-mapping" in the correct namespace +// +// the "targetNamespace" represent the namespace for the managed resource (DaemonSet) +// +// the "deletionConditionFn" will be used to check whether the workload specified by the +// returned name which is part of targetNamespace must be deleted +func NewControllerWithDeletion(instanceName, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string, + operatorClient v1helpers.OperatorClient, + kubeClient kubernetes.Interface, + podLister corev1listers.PodLister, + deploymentLister appsv1listers.DeploymentLister, + informers []factory.Informer, + targetNamespaceInformers []factory.Informer, delegate Delegate, + deletionConditionFn func() (bool, string, error), openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface, eventRecorder events.Recorder, versionRecorder status.VersionGetter, @@ -100,14 +149,25 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera kubeClient: kubeClient, podsLister: podLister, delegate: delegate, + deletionConditionFn: deletionConditionFn, + deploymentLister: deploymentLister, openshiftClusterConfigClient: openshiftClusterConfigClient, versionRecorder: versionRecorder, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), instanceName), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), instanceName), } + return newController(controllerRef, informers, targetNamespaceInformers, eventRecorder) +} + +func newController( + controllerRef *Controller, + informers []factory.Informer, + targetNamespaceInformers []factory.Informer, + eventRecorder events.Recorder, +) factory.Controller { c := factory.New() - for _, nsi := range tagetNamespaceInformers { - c.WithNamespaceInformer(nsi, targetNamespace) + for _, nsi := range targetNamespaceInformers { + c.WithNamespaceInformer(nsi, controllerRef.targetNamespace) } return c.WithSync(controllerRef.sync). @@ -129,6 +189,14 @@ func (c *Controller) sync(ctx context.Context, controllerContext factory.SyncCon return err } + if c.deletionConditionFn != nil { + if conditionMet, workload, err := c.deletionConditionFn(); err != nil { + return err + } else if conditionMet { + return c.deleteWorkload(ctx, workload) + } + } + if fulfilled, err := c.delegate.PreconditionFulfilled(ctx); err != nil { return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, []error{err}) } else if !fulfilled { @@ -356,6 +424,51 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o return nil } +func (c *Controller) deleteWorkload(ctx context.Context, workloadName string) (err error) { + deploymentAvailableCondition := applyoperatorv1.OperatorCondition(). + WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable)) + + workloadDegradedCondition := applyoperatorv1.OperatorCondition(). + WithType(fmt.Sprintf("%sWorkloadDegraded", c.conditionsPrefix)) + + status := applyoperatorv1.OperatorStatus() + defer func() { + status = status.WithConditions(deploymentAvailableCondition, workloadDegradedCondition) + if applyError := c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status); applyError != nil { + err = applyError + } + }() + + if _, err := c.deploymentLister.Deployments(c.targetNamespace).Get(workloadName); err != nil && !apierrors.IsNotFound(err) { + deploymentAvailableCondition = deploymentAvailableCondition. + WithStatus(operatorv1.ConditionFalse). + WithReason("DeletionError") + + workloadDegradedCondition = workloadDegradedCondition. + WithStatus(operatorv1.ConditionTrue). + WithReason("DeletionError"). + WithMessage(err.Error()) + return err + + } else if err == nil { + if err := c.kubeClient.AppsV1().Deployments(c.targetNamespace).Delete(ctx, workloadName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + deploymentAvailableCondition = deploymentAvailableCondition. + WithStatus(operatorv1.ConditionFalse). + WithReason("DeletionError") + + workloadDegradedCondition = workloadDegradedCondition. + WithStatus(operatorv1.ConditionTrue). + WithReason("DeletionError"). + WithMessage(err.Error()) + return err + } + } + + deploymentAvailableCondition = deploymentAvailableCondition.WithStatus(operatorv1.ConditionTrue) + workloadDegradedCondition = workloadDegradedCondition.WithStatus(operatorv1.ConditionFalse) + return nil +} + // isUpdatingTooLong determines if updating operands takes too long. // it returns true if the progressing condition has been set to True for at least 15 minutes func isUpdatingTooLong(operatorStatus *operatorv1.OperatorStatus, progressingConditionType string) (bool, error) { diff --git a/pkg/operator/apiserver/controllerset/apiservercontrollerset.go b/pkg/operator/apiserver/controllerset/apiservercontrollerset.go index 39438998f3..3a0da61302 100644 --- a/pkg/operator/apiserver/controllerset/apiservercontrollerset.go +++ b/pkg/operator/apiserver/controllerset/apiservercontrollerset.go @@ -295,6 +295,46 @@ func (cs *APIServerControllerSet) WithWorkloadController( return cs } +func (cs *APIServerControllerSet) WithWorkloadControllerWithDeletion( + name, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string, + kubeClient kubernetes.Interface, + delegate workload.Delegate, + deletionConditionFn func() (bool, string, error), + openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface, + versionRecorder status.VersionGetter, + kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces, + informers ...factory.Informer) *APIServerControllerSet { + + workloadController := workload.NewControllerWithDeletion( + name, + operatorNamespace, + targetNamespace, + targetOperandVersion, + operandNamePrefix, + conditionsPrefix, + cs.operatorClient, + kubeClient, + kubeInformersForNamespaces.PodLister(), + kubeInformersForNamespaces.InformersFor(targetNamespace).Apps().V1().Deployments().Lister(), + append(informers, + kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().ConfigMaps().Informer(), + kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Secrets().Informer(), + kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Pods().Informer(), + kubeInformersForNamespaces.InformersFor(targetNamespace).Apps().V1().Deployments().Informer(), + kubeInformersForNamespaces.InformersFor(metav1.NamespaceSystem).Core().V1().Nodes().Informer(), + ), + []factory.Informer{kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Namespaces().Informer()}, + + delegate, + deletionConditionFn, + openshiftClusterConfigClient, + cs.eventRecorder, + versionRecorder) + + cs.workloadController.controller = workloadController + return cs +} + func (cs *APIServerControllerSet) WithoutWorkloadController() *APIServerControllerSet { cs.workloadController.controller = nil cs.workloadController.emptyAllowed = true