Skip to content

Commit

Permalink
Set up golang context when initiating controller manager
Browse files Browse the repository at this point in the history
Set up golang context when initiating controller manager and also pass it down
to VSphereCluster controller

Signed-off-by: Gong Zhang <[email protected]>
  • Loading branch information
zhanggbj committed Sep 8, 2023
1 parent 31b8129 commit 9aff8b4
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 240 deletions.
2 changes: 1 addition & 1 deletion controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func setup() {
panic(fmt.Sprintf("unable to create ClusterCacheReconciler controller: %v", err))
}

if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
if err := AddClusterControllerToManager(ctx, testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}, controllerOpts); err != nil {
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
}
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}, controllerOpts); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions controllers/vmware/test/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,15 @@ func getManager(cfg *rest.Config, networkProvider string) manager.Manager {

controllerOpts := controller.Options{MaxConcurrentReconciles: 10}

opts.AddToManager = func(controllerCtx *capvcontext.ControllerManagerContext, mgr ctrlmgr.Manager) error {
if err := controllers.AddClusterControllerToManager(controllerCtx, mgr, &vmwarev1.VSphereCluster{}, controllerOpts); err != nil {
opts.AddToManager = func(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext, mgr ctrlmgr.Manager) error {
if err := controllers.AddClusterControllerToManager(ctx, controllerCtx, mgr, &vmwarev1.VSphereCluster{}, controllerOpts); err != nil {
return err
}

return controllers.AddMachineControllerToManager(controllerCtx, mgr, &vmwarev1.VSphereMachine{}, controllerOpts)
}

mgr, err := manager.New(opts)
mgr, err := manager.New(ctx, opts)
Expect(err).NotTo(HaveOccurred())
return mgr
}
Expand Down
8 changes: 4 additions & 4 deletions controllers/vspherecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import (

// AddClusterControllerToManager adds the cluster controller to the provided
// manager.
func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object, options controller.Options) error {
func AddClusterControllerToManager(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext, mgr manager.Manager, clusterControlledType client.Object, options controller.Options) error {
supervisorBased, err := util.IsSupervisorType(clusterControlledType)
if err != nil {
return err
Expand Down Expand Up @@ -110,7 +110,7 @@ func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerC
ControllerContext: controllerContext,
clusterModuleReconciler: NewReconciler(controllerContext),
}
clusterToInfraFn := clusterToInfrastructureMapFunc(controllerCtx)
clusterToInfraFn := clusterToInfrastructureMapFunc(ctx, controllerCtx)
c, err := ctrl.NewControllerManagedBy(mgr).
// Watch the controlled, infrastructure resource.
For(clusterControlledType).
Expand Down Expand Up @@ -173,7 +173,7 @@ func AddClusterControllerToManager(controllerCtx *capvcontext.ControllerManagerC
return nil
}

func clusterToInfrastructureMapFunc(controllerCtx *capvcontext.ControllerManagerContext) handler.MapFunc {
func clusterToInfrastructureMapFunc(ctx context.Context, controllerCtx *capvcontext.ControllerManagerContext) handler.MapFunc {
gvk := infrav1.GroupVersion.WithKind(reflect.TypeOf(&infrav1.VSphereCluster{}).Elem().Name())
return clusterutilv1.ClusterToInfrastructureMapFunc(controllerCtx, gvk, controllerCtx.Client, &infrav1.VSphereCluster{})
return clusterutilv1.ClusterToInfrastructureMapFunc(ctx, gvk, controllerCtx.Client, &infrav1.VSphereCluster{})
}
75 changes: 38 additions & 37 deletions controllers/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type clusterReconciler struct {
}

// Reconcile ensures the back-end state reflects the Kubernetes resource state intent.
func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
func (r clusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
// Get the VSphereCluster resource for this request.
vsphereCluster := &infrav1.VSphereCluster{}
if err := r.Client.Get(r, req.NamespacedName, vsphereCluster); err != nil {
Expand Down Expand Up @@ -118,13 +118,13 @@ func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctr
}
}()

if err := setOwnerRefsOnVsphereMachines(clusterContext); err != nil {
if err := setOwnerRefsOnVsphereMachines(ctx, clusterContext); err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to set owner refs on VSphereMachine objects")
}

// Handle deleted clusters
if !vsphereCluster.DeletionTimestamp.IsZero() {
return r.reconcileDelete(clusterContext)
return r.reconcileDelete(ctx, clusterContext)
}

// If the VSphereCluster doesn't have our finalizer, add it.
Expand All @@ -135,13 +135,13 @@ func (r clusterReconciler) Reconcile(_ context.Context, req ctrl.Request) (_ ctr
}

// Handle non-deleted clusters
return r.reconcileNormal(clusterContext)
return r.reconcileNormal(ctx, clusterContext)
}

func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
func (r clusterReconciler) reconcileDelete(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
clusterCtx.Logger.Info("Reconciling VSphereCluster delete")

vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(clusterCtx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(ctx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err,
"unable to list VSphereMachines part of VSphereCluster %s/%s", clusterCtx.VSphereCluster.Namespace, clusterCtx.VSphereCluster.Name)
Expand All @@ -158,10 +158,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
// Remove the finalizer since VM creation wouldn't proceed
r.Logger.Info("Removing finalizer from VSphereMachine", "namespace", vsphereMachine.Namespace, "name", vsphereMachine.Name)
ctrlutil.RemoveFinalizer(vsphereMachine, infrav1.MachineFinalizer)
if err := r.Client.Update(clusterCtx, vsphereMachine); err != nil {
if err := r.Client.Update(ctx, vsphereMachine); err != nil {
return reconcile.Result{}, err
}
if err := r.Client.Delete(clusterCtx, vsphereMachine); err != nil && !apierrors.IsNotFound(err) {
if err := r.Client.Delete(ctx, vsphereMachine); err != nil && !apierrors.IsNotFound(err) {
clusterCtx.Logger.Error(err, "Failed to delete for VSphereMachine", "namespace", vsphereMachine.Namespace, "name", vsphereMachine.Name)
deletionErrors = append(deletionErrors, err)
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
Namespace: clusterCtx.VSphereCluster.Namespace,
Name: clusterCtx.VSphereCluster.Spec.IdentityRef.Name,
}
err := clusterCtx.Client.Get(clusterCtx, secretKey, secret)
err := clusterCtx.Client.Get(ctx, secretKey, secret)
if err != nil {
if apierrors.IsNotFound(err) {
ctrlutil.RemoveFinalizer(clusterCtx.VSphereCluster, infrav1.ClusterFinalizer)
Expand All @@ -207,10 +207,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
if ctrlutil.ContainsFinalizer(secret, legacyIdentityFinalizer) {
ctrlutil.RemoveFinalizer(secret, legacyIdentityFinalizer)
}
if err := clusterCtx.Client.Update(clusterCtx, secret); err != nil {
if err := clusterCtx.Client.Update(ctx, secret); err != nil {
return reconcile.Result{}, err
}
if err := clusterCtx.Client.Delete(clusterCtx, secret); err != nil {
if err := clusterCtx.Client.Delete(ctx, secret); err != nil {
return reconcile.Result{}, err
}
}
Expand All @@ -221,10 +221,10 @@ func (r clusterReconciler) reconcileDelete(clusterCtx *capvcontext.ClusterContex
return reconcile.Result{}, nil
}

func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
func (r clusterReconciler) reconcileNormal(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
clusterCtx.Logger.Info("Reconciling VSphereCluster")

ok, err := r.reconcileDeploymentZones(clusterCtx)
ok, err := r.reconcileDeploymentZones(ctx, clusterCtx)
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -233,12 +233,12 @@ func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContex
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

if err := r.reconcileIdentitySecret(clusterCtx); err != nil {
if err := r.reconcileIdentitySecret(ctx, clusterCtx); err != nil {
conditions.MarkFalse(clusterCtx.VSphereCluster, infrav1.VCenterAvailableCondition, infrav1.VCenterUnreachableReason, clusterv1.ConditionSeverityError, err.Error())
return reconcile.Result{}, err
}

vcenterSession, err := r.reconcileVCenterConnectivity(clusterCtx)
vcenterSession, err := r.reconcileVCenterConnectivity(ctx, clusterCtx)
if err != nil {
conditions.MarkFalse(clusterCtx.VSphereCluster, infrav1.VCenterAvailableCondition, infrav1.VCenterUnreachableReason, clusterv1.ConditionSeverityError, err.Error())
return reconcile.Result{}, errors.Wrapf(err,
Expand All @@ -263,7 +263,7 @@ func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContex
// Ensure the VSphereCluster is reconciled when the API server first comes online.
// A reconcile event will only be triggered if the Cluster is not marked as
// ControlPlaneInitialized.
r.reconcileVSphereClusterWhenAPIServerIsOnline(clusterCtx)
r.reconcileVSphereClusterWhenAPIServerIsOnline(ctx, clusterCtx)
if clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.IsZero() {
clusterCtx.Logger.Info("control plane endpoint is not reconciled")
return reconcile.Result{}, nil
Expand All @@ -275,22 +275,22 @@ func (r clusterReconciler) reconcileNormal(clusterCtx *capvcontext.ClusterContex
}

// Wait until the API server is online and accessible.
if !r.isAPIServerOnline(clusterCtx) {
if !r.isAPIServerOnline(ctx, clusterCtx) {
return reconcile.Result{}, nil
}

return reconcile.Result{}, nil
}

func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.ClusterContext) error {
func (r clusterReconciler) reconcileIdentitySecret(ctx context.Context, clusterCtx *capvcontext.ClusterContext) error {
vsphereCluster := clusterCtx.VSphereCluster
if identity.IsSecretIdentity(vsphereCluster) {
secret := &corev1.Secret{}
secretKey := client.ObjectKey{
Namespace: vsphereCluster.Namespace,
Name: vsphereCluster.Spec.IdentityRef.Name,
}
err := clusterCtx.Client.Get(clusterCtx, secretKey, secret)
err := clusterCtx.Client.Get(ctx, secretKey, secret)
if err != nil {
return err
}
Expand All @@ -312,7 +312,7 @@ func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.Clust
if !ctrlutil.ContainsFinalizer(secret, infrav1.SecretIdentitySetFinalizer) {
ctrlutil.AddFinalizer(secret, infrav1.SecretIdentitySetFinalizer)
}
err = r.Client.Update(clusterCtx, secret)
err = r.Client.Update(ctx, secret)
if err != nil {
return err
}
Expand All @@ -321,7 +321,7 @@ func (r clusterReconciler) reconcileIdentitySecret(clusterCtx *capvcontext.Clust
return nil
}

func (r clusterReconciler) reconcileVCenterConnectivity(clusterCtx *capvcontext.ClusterContext) (*session.Session, error) {
func (r clusterReconciler) reconcileVCenterConnectivity(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (*session.Session, error) {
params := session.NewParams().
WithServer(clusterCtx.VSphereCluster.Spec.Server).
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint).
Expand All @@ -331,13 +331,13 @@ func (r clusterReconciler) reconcileVCenterConnectivity(clusterCtx *capvcontext.
})

if clusterCtx.VSphereCluster.Spec.IdentityRef != nil {
creds, err := identity.GetCredentials(clusterCtx, r.Client, clusterCtx.VSphereCluster, r.Namespace)
creds, err := identity.GetCredentials(ctx, r.Client, clusterCtx.VSphereCluster, r.Namespace)
if err != nil {
return nil, err
}

params = params.WithUserInfo(creds.Username, creds.Password)
return session.GetOrCreate(clusterCtx, params)
return session.GetOrCreate(ctx, params)
}

params = params.WithUserInfo(clusterCtx.Username, clusterCtx.Password)
Expand All @@ -353,7 +353,7 @@ func (r clusterReconciler) reconcileVCenterVersion(clusterCtx *capvcontext.Clust
return nil
}

func (r clusterReconciler) reconcileDeploymentZones(clusterCtx *capvcontext.ClusterContext) (bool, error) {
func (r clusterReconciler) reconcileDeploymentZones(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (bool, error) {
// If there is no failure domain selector, we should simply skip it
if clusterCtx.VSphereCluster.Spec.FailureDomainSelector == nil {
return true, nil
Expand All @@ -367,7 +367,7 @@ func (r clusterReconciler) reconcileDeploymentZones(clusterCtx *capvcontext.Clus
}

var deploymentZoneList infrav1.VSphereDeploymentZoneList
err = r.Client.List(clusterCtx, &deploymentZoneList, &opts)
err = r.Client.List(ctx, &deploymentZoneList, &opts)
if err != nil {
return false, errors.Wrap(err, "unable to list deployment zones")
}
Expand Down Expand Up @@ -422,7 +422,7 @@ var (
apiServerTriggersMu sync.Mutex
)

func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(clusterCtx *capvcontext.ClusterContext) {
func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(ctx context.Context, clusterCtx *capvcontext.ClusterContext) {
if conditions.IsTrue(clusterCtx.Cluster, clusterv1.ControlPlaneInitializedCondition) {
clusterCtx.Logger.Info("skipping reconcile when API server is online",
"reason", "controlPlaneInitialized")
Expand All @@ -439,7 +439,7 @@ func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(clusterC
go func() {
// Block until the target API server is online.
clusterCtx.Logger.Info("start polling API server for online check")
wait.PollUntilContextCancel(context.Background(), time.Second*1, true, func(context.Context) (bool, error) { return r.isAPIServerOnline(clusterCtx), nil }) //nolint:errcheck
wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isAPIServerOnline(ctx, clusterCtx), nil }) //nolint:errcheck
clusterCtx.Logger.Info("stop polling API server for online check")
clusterCtx.Logger.Info("triggering GenericEvent", "reason", "api-server-online")
eventChannel := clusterCtx.GetGenericEventChannelFor(clusterCtx.VSphereCluster.GetObjectKind().GroupVersionKind())
Expand All @@ -451,17 +451,17 @@ func (r clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(clusterC
// remove the key from the map that prevents multiple goroutines from
// polling the API server to see if it is online.
clusterCtx.Logger.Info("start polling for control plane initialized")
wait.PollUntilContextCancel(context.Background(), time.Second*1, true, func(context.Context) (bool, error) { return r.isControlPlaneInitialized(clusterCtx), nil }) //nolint:errcheck
wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isControlPlaneInitialized(ctx, clusterCtx), nil }) //nolint:errcheck
clusterCtx.Logger.Info("stop polling for control plane initialized")
apiServerTriggersMu.Lock()
delete(apiServerTriggers, clusterCtx.Cluster.UID)
apiServerTriggersMu.Unlock()
}()
}

func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterContext) bool {
if kubeClient, err := infrautilv1.NewKubeClient(clusterCtx, clusterCtx.Client, clusterCtx.Cluster); err == nil {
if _, err := kubeClient.CoreV1().Nodes().List(clusterCtx, metav1.ListOptions{}); err == nil {
func (r clusterReconciler) isAPIServerOnline(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
if kubeClient, err := infrautilv1.NewKubeClient(ctx, clusterCtx.Client, clusterCtx.Cluster); err == nil {
if _, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}); err == nil {
// The target cluster is online. To make sure the correct control
// plane endpoint information is logged, it is necessary to fetch
// an up-to-date Cluster resource. If this fails, then set the
Expand All @@ -470,7 +470,7 @@ func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterCont
// if the API server is online.
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := clusterCtx.Client.Get(clusterCtx, clusterKey, cluster); err != nil {
if err := clusterCtx.Client.Get(ctx, clusterKey, cluster); err != nil {
cluster = clusterCtx.Cluster.DeepCopy()
cluster.Spec.ControlPlaneEndpoint.Host = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Host
cluster.Spec.ControlPlaneEndpoint.Port = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Port
Expand All @@ -485,10 +485,10 @@ func (r clusterReconciler) isAPIServerOnline(clusterCtx *capvcontext.ClusterCont
return false
}

func (r clusterReconciler) isControlPlaneInitialized(clusterCtx *capvcontext.ClusterContext) bool {
func (r clusterReconciler) isControlPlaneInitialized(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := clusterCtx.Client.Get(clusterCtx, clusterKey, cluster); err != nil {
if err := clusterCtx.Client.Get(ctx, clusterKey, cluster); err != nil {
if !apierrors.IsNotFound(err) {
clusterCtx.Logger.Error(err, "failed to get updated cluster object while checking if control plane is initialized")
return false
Expand All @@ -499,8 +499,8 @@ func (r clusterReconciler) isControlPlaneInitialized(clusterCtx *capvcontext.Clu
return conditions.IsTrue(clusterCtx.Cluster, clusterv1.ControlPlaneInitializedCondition)
}

func setOwnerRefsOnVsphereMachines(clusterCtx *capvcontext.ClusterContext) error {
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(clusterCtx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
func setOwnerRefsOnVsphereMachines(ctx context.Context, clusterCtx *capvcontext.ClusterContext) error {
vsphereMachines, err := infrautilv1.GetVSphereMachinesInCluster(ctx, clusterCtx.Client, clusterCtx.Cluster.Namespace, clusterCtx.Cluster.Name)
if err != nil {
return errors.Wrapf(err,
"unable to list VSphereMachines part of VSphereCluster %s/%s", clusterCtx.VSphereCluster.Namespace, clusterCtx.VSphereCluster.Name)
Expand All @@ -523,13 +523,14 @@ func setOwnerRefsOnVsphereMachines(clusterCtx *capvcontext.ClusterContext) error
UID: clusterCtx.VSphereCluster.UID,
}))

if err := patchHelper.Patch(clusterCtx, vsphereMachine); err != nil {
if err := patchHelper.Patch(ctx, vsphereMachine); err != nil {
patchErrors = append(patchErrors, err)
}
}
return kerrors.NewAggregate(patchErrors)
}

// TODO: zhanggbj: Add golang context after refactoring ClusterModule controller and reconciler.
func (r clusterReconciler) reconcileClusterModules(clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
if feature.Gates.Enabled(feature.NodeAntiAffinity) {
return r.clusterModuleReconciler.Reconcile(clusterCtx)
Expand Down
Loading

0 comments on commit 9aff8b4

Please sign in to comment.