Skip to content

Commit

Permalink
Merge pull request #2551 from sbueringer/pr-improve-goroutine-handling
Browse files Browse the repository at this point in the history
🌱 Remove reconcile when APIserver is online in VSphereCluster controller
  • Loading branch information
k8s-ci-robot authored Dec 13, 2023
2 parents bf9483f + 5b67044 commit 142431c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 114 deletions.
114 changes: 0 additions & 114 deletions controllers/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

pkgerrors "github.com/pkg/errors"
Expand All @@ -30,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand All @@ -41,7 +39,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
Expand Down Expand Up @@ -223,25 +220,6 @@ func (r *clusterReconciler) reconcileNormal(ctx context.Context, clusterCtx *cap

clusterCtx.VSphereCluster.Status.Ready = true

// Ensure the VSphereCluster is reconciled when the API server first comes online.
// A reconcile event will only be triggered if the Cluster is not marked as
// ControlPlaneInitialized.
r.reconcileVSphereClusterWhenAPIServerIsOnline(ctx, clusterCtx)
if clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.IsZero() {
log.Info("Waiting for control plane endpoint to be set")
return reconcile.Result{}, nil
}

// If the cluster is deleted, that's mean that the workload cluster is being deleted and so the CCM/CSI instances
if !clusterCtx.Cluster.DeletionTimestamp.IsZero() {
return reconcile.Result{}, nil
}

// Wait until the API server is online and accessible.
if !r.isAPIServerOnline(ctx, clusterCtx) {
return reconcile.Result{}, nil
}

return reconcile.Result{}, nil
}

Expand Down Expand Up @@ -386,98 +364,6 @@ func (r *clusterReconciler) reconcileDeploymentZones(ctx context.Context, cluste
return true, nil
}

var (
// apiServerTriggers is used to prevent multiple goroutines for a single
// Cluster that poll to see if the target API server is online.
apiServerTriggers = map[types.UID]struct{}{}
apiServerTriggersMu sync.Mutex
)

func (r *clusterReconciler) reconcileVSphereClusterWhenAPIServerIsOnline(ctx context.Context, clusterCtx *capvcontext.ClusterContext) {
log := ctrl.LoggerFrom(ctx)

if conditions.IsTrue(clusterCtx.Cluster, clusterv1.ControlPlaneInitializedCondition) {
log.V(4).Info("Skipping reconcile as API server is already online", "reason", "controlPlaneInitialized")
return
}
apiServerTriggersMu.Lock()
defer apiServerTriggersMu.Unlock()
if _, ok := apiServerTriggers[clusterCtx.Cluster.UID]; ok {
log.V(4).Info("Skipping reconcile as there is already a Go routine waiting until the API server is online", "reason", "alreadyPolling")
return
}
apiServerTriggers[clusterCtx.Cluster.UID] = struct{}{}
go func() {
// Note: we have to use a new context here so the ctx in this go routine is not canceled
// when the reconcile returns.
ctx := ctrl.LoggerInto(context.Background(), log)

// Block until the target API server is online.
log.Info("Start polling API server to wait until it comes online")
// Ignore the error as the passed function never returns one.
_ = wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isAPIServerOnline(ctx, clusterCtx), nil })
log.Info("Stop polling API server")
log.Info("Triggering GenericEvent", "reason", "apiServerOnline")
eventChannel := r.ControllerManagerContext.GetGenericEventChannelFor(clusterCtx.VSphereCluster.GetObjectKind().GroupVersionKind())
eventChannel <- event.GenericEvent{
Object: clusterCtx.VSphereCluster,
}

// Once the control plane has been marked as initialized it is safe to
// remove the key from the map that prevents multiple goroutines from
// polling the API server to see if it is online.
log.Info("Start polling for control plane initialized")
// Ignore the error as the passed function never returns one.
_ = wait.PollUntilContextCancel(ctx, time.Second*1, true, func(context.Context) (bool, error) { return r.isControlPlaneInitialized(ctx, clusterCtx), nil })
log.Info("Stop polling for control plane initialized")
apiServerTriggersMu.Lock()
delete(apiServerTriggers, clusterCtx.Cluster.UID)
apiServerTriggersMu.Unlock()
}()
}

func (r *clusterReconciler) isAPIServerOnline(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
log := ctrl.LoggerFrom(ctx)

if kubeClient, err := infrautilv1.NewKubeClient(ctx, r.Client, clusterCtx.Cluster); err == nil {
if _, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}); err == nil {
// The target cluster is online. To make sure the correct control
// plane endpoint information is logged, it is necessary to fetch
// an up-to-date Cluster resource. If this fails, then set the
// control plane endpoint information to the values from the
// VSphereCluster resource, as it must have the correct information
// if the API server is online.
cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := r.Client.Get(ctx, clusterKey, cluster); err != nil {
cluster = clusterCtx.Cluster.DeepCopy()
cluster.Spec.ControlPlaneEndpoint.Host = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Host
cluster.Spec.ControlPlaneEndpoint.Port = clusterCtx.VSphereCluster.Spec.ControlPlaneEndpoint.Port
log.Error(err, "Failed to get updated Cluster object while checking if API server is online")
}
log.Info("API server is online", "controlPlaneEndpoint", cluster.Spec.ControlPlaneEndpoint.String())
return true
}
}
return false
}

func (r *clusterReconciler) isControlPlaneInitialized(ctx context.Context, clusterCtx *capvcontext.ClusterContext) bool {
log := ctrl.LoggerFrom(ctx)

cluster := &clusterv1.Cluster{}
clusterKey := client.ObjectKey{Namespace: clusterCtx.Cluster.Namespace, Name: clusterCtx.Cluster.Name}
if err := r.Client.Get(ctx, clusterKey, cluster); err != nil {
if !apierrors.IsNotFound(err) {
log.Error(err, "Failed to get updated Cluster object while checking if control plane is initialized")
return false
}
log.Info("Exiting early because cluster no longer exists")
return true
}
return conditions.IsTrue(clusterCtx.Cluster, clusterv1.ControlPlaneInitializedCondition)
}

func (r *clusterReconciler) reconcileClusterModules(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (reconcile.Result, error) {
if feature.Gates.Enabled(feature.NodeAntiAffinity) {
return r.clusterModuleReconciler.Reconcile(ctx, clusterCtx)
Expand Down
3 changes: 3 additions & 0 deletions pkg/services/govmomi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ func waitForIPAddresses(
// network device specs. However, every time a new IP is discovered,
// a reconcile request will be triggered for the VSphereVM.
go func() {
// Note: We intentionally don't use the context from the Reconcile
// so this go routine continues independent of the current Reconcile.
ctx := context.Background()
if err := property.Wait(
ctx, propCollector, virtualMachineCtx.Obj.Reference(),
[]string{"guest.net"}, onPropertyChange); err != nil {
Expand Down

0 comments on commit 142431c

Please sign in to comment.