Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement controller paused condition for core CAPI resources #10364

12 changes: 12 additions & 0 deletions api/v1beta1/condition_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package v1beta1
const (
// ReadyCondition defines the Ready condition type that summarizes the operational state of a Cluster API object.
ReadyCondition ConditionType = "Ready"

// PausedCondition defines the Paused condition type that summarizes the operational state of a Cluster API object.
PausedCondition ConditionType = "Paused"
)

// Common ConditionReason used by Cluster API objects.
Expand All @@ -38,6 +41,15 @@ const (

// IncorrectExternalRefReason (Severity=Error) documents a CAPI object with an incorrect external object reference.
IncorrectExternalRefReason = "IncorrectExternalRef"

// ClusterPausedReason (Severity=Info) documents a CAPI object that is paused due to the cluster being paused (.Spec.Paused).
ClusterPausedReason = "ClusterPaused"

// AnnotationPausedReason (Severity=Info) documents a CAPI object that is paused due to the paused annotation being present.
AnnotationPausedReason = "PausedAnnotationSet"

// MachineDeploymentPausedReason (Severity=Info) documents a CAPI object that is paused due to the machine deployment being paused.
MachineDeploymentPausedReason = "MachineDeploymentPaused"
)

const (
Expand Down
77 changes: 53 additions & 24 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@
log = log.WithValues("Cluster", klog.KObj(cluster))
ctx = ctrl.LoggerInto(ctx, log)

if annotations.IsPaused(cluster, kcp) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}

// Initialize the patch helper.
patchHelper, err := patch.NewHelper(kcp, r.Client)
if err != nil {
Expand All @@ -181,27 +176,28 @@
// patch and return right away instead of reusing the main defer,
// because the main defer may take too much time to get cluster status
// Patch ObservedGeneration only if the reconciliation completed successfully

// TODO theobarberbany: Is this ordering correct, do we want finalizer to
// take priority over the paused condition?
patchOpts := []patch.Option{patch.WithStatusObservedGeneration{}}
if err := patchHelper.Patch(ctx, kcp, patchOpts...); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to add finalizer")
}

log.Info("Returning early to add finalizer")
return ctrl.Result{}, nil
}

// Initialize the control plane scope; this includes also checking for orphan machines and
// adopt them if necessary.
controlPlane, adoptableMachineFound, err := r.initControlPlaneScope(ctx, cluster, kcp)
if err != nil {
log.Error(err, "Failed to initControlPlaneScope")
return ctrl.Result{}, err
}
if adoptableMachineFound {
// if there are no errors but at least one CP machine has been adopted, then requeue and
// wait for the update event for the ownership to be set.
return ctrl.Result{}, nil
}
log.Info("initControlPlaneScope")

defer func() {
log.Info("start of deferred update status")
// Always attempt to update status.
if err := r.updateStatus(ctx, controlPlane); err != nil {
var connFailure *internal.RemoteClusterConnectionError
Expand All @@ -222,6 +218,7 @@
log.Error(err, "Failed to patch KubeadmControlPlane")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
log.Info("patched KubeadmControlPlane")

// Only requeue if there is no error, Requeue or RequeueAfter and the object does not have a deletion timestamp.
if reterr == nil && res.IsZero() && kcp.ObjectMeta.DeletionTimestamp.IsZero() {
Expand All @@ -243,6 +240,36 @@
}
}()

// Return early and set the paused condition to True if the object or Cluster
// is paused.
if annotations.IsPaused(cluster, kcp) {
log.Info("Reconciliation is paused for this object")

newPausedCondition := &clusterv1.Condition{
Type: clusterv1.PausedCondition,
Status: corev1.ConditionTrue,
Severity: clusterv1.ConditionSeverityInfo,
}

if cluster.Spec.Paused {
newPausedCondition.Reason = clusterv1.ClusterPausedReason
} else {
newPausedCondition.Reason = clusterv1.AnnotationPausedReason
}

conditions.Set(kcp, newPausedCondition)
return ctrl.Result{}, nil
}
log.Info("Object not paused")
conditions.MarkFalseWithNegativePolarity(kcp, clusterv1.PausedCondition)

if adoptableMachineFound {
// if there are no errors but at least one CP machine has been adopted, then requeue and
// wait for the update event for the ownership to be set.
log.Info("Returning early, adoptableMachineFound")
return ctrl.Result{}, nil
}

if !kcp.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
res, err = r.reconcileDelete(ctx, controlPlane)
Expand Down Expand Up @@ -311,7 +338,7 @@
return controlPlane, false, nil
}

func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane, options ...patch.Option) error {

Check failure on line 341 in controlplane/kubeadm/internal/controllers/controller.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'options' seems to be unused, consider removing or renaming it as _ (revive)
// Always update the readyCondition by summarizing the state of other conditions.
conditions.SetSummary(kcp,
conditions.WithConditions(
Expand All @@ -325,19 +352,21 @@
)

// Patch the object, ignoring conflicts on the conditions owned by this controller.
// Also, if requested, we are adding additional options like e.g. Patch ObservedGeneration when issuing the
// patch at the end of the reconcile loop.
options = append(options, patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{
controlplanev1.MachinesCreatedCondition,
clusterv1.ReadyCondition,
controlplanev1.MachinesSpecUpToDateCondition,
controlplanev1.ResizedCondition,
controlplanev1.MachinesReadyCondition,
controlplanev1.AvailableCondition,
controlplanev1.CertificatesAvailableCondition,
}})

return patchHelper.Patch(ctx, kcp, options...)
return patchHelper.Patch(
ctx,
kcp,
patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{
controlplanev1.MachinesCreatedCondition,
clusterv1.ReadyCondition,
clusterv1.PausedCondition,
controlplanev1.MachinesSpecUpToDateCondition,
controlplanev1.ResizedCondition,
controlplanev1.MachinesReadyCondition,
controlplanev1.AvailableCondition,
controlplanev1.CertificatesAvailableCondition,
}},
patch.WithStatusObservedGeneration{},
)
}

// reconcile handles KubeadmControlPlane reconciliation.
Expand Down
100 changes: 100 additions & 0 deletions controlplane/kubeadm/internal/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ import (
"sigs.k8s.io/cluster-api/util/secret"
)

const (
timeout = time.Second * 30
)

func TestClusterToKubeadmControlPlane(t *testing.T) {
g := NewWithT(t)
fakeClient := newFakeClient()
Expand Down Expand Up @@ -391,6 +395,15 @@ func TestReconcilePaused(t *testing.T) {
Client: fakeClient,
SecretCachingClient: fakeClient,
recorder: record.NewFakeRecorder(32),
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: fakeWorkloadCluster{
Workload: &internal.Workload{
Client: env,
},
Status: internal.ClusterStatus{},
},
},
}

_, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(kcp)})
Expand Down Expand Up @@ -2280,6 +2293,93 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
})
}

func TestReconcilePausedCondition(t *testing.T) {
g := NewWithT(t)

ns, err := env.CreateNamespace(ctx, "test-reconcile-pause-condition")
g.Expect(err).ToNot(HaveOccurred())

cluster, kcp, _ := createClusterWithControlPlane(ns.Name)
g.Expect(env.Create(ctx, cluster)).To(Succeed())
g.Expect(env.Create(ctx, kcp)).To(Succeed())
defer func(do ...client.Object) {
g.Expect(env.Cleanup(ctx, do...)).To(Succeed())
}(cluster, kcp, ns)

// Set cluster.status.InfrastructureReady so we actually enter in the reconcile loop
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf("{\"status\":{\"infrastructureReady\":%t}}", true)))
g.Expect(env.Status().Patch(ctx, cluster, patch)).To(Succeed())

genericInfrastructureMachineTemplate := &unstructured.Unstructured{
Object: map[string]interface{}{
"kind": "GenericInfrastructureMachineTemplate",
"apiVersion": "infrastructure.cluster.x-k8s.io/v1beta1",
"metadata": map[string]interface{}{
"name": "infra-foo",
"namespace": cluster.Namespace,
},
"spec": map[string]interface{}{
"template": map[string]interface{}{
"spec": map[string]interface{}{
"hello": "world",
},
},
},
},
}
g.Expect(env.Create(ctx, genericInfrastructureMachineTemplate)).To(Succeed())

r := &KubeadmControlPlaneReconciler{
Client: env,
SecretCachingClient: secretCachingClient,
recorder: record.NewFakeRecorder(32),
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: env},
Workload: fakeWorkloadCluster{
Workload: &internal.Workload{
Client: env,
},
Status: internal.ClusterStatus{},
},
},
}

// We start unpaused
g.Eventually(func() bool {
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(kcp)})
g.Expect(err).ToNot(HaveOccurred())

key := util.ObjectKey(kcp)
if err := env.Get(ctx, key, kcp); err != nil {
return false
}

// Checks the condition is set
if !conditions.Has(kcp, clusterv1.PausedCondition) {
return false
}
// The condition is set to false
return conditions.IsFalse(kcp, clusterv1.PausedCondition)
}, timeout).Should(BeTrue())

// Pause the cluster
cluster.Spec.Paused = true
g.Expect(env.Update(ctx, cluster)).To(Succeed())

g.Eventually(func() bool {
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: util.ObjectKey(kcp)})
g.Expect(err).ToNot(HaveOccurred())

key := util.ObjectKey(kcp)
if err := env.Get(ctx, key, kcp); err != nil {
return false
}

// The condition is set to true
return conditions.IsTrue(kcp, clusterv1.PausedCondition)
}, timeout).Should(BeTrue())
}

// test utils.

func newFakeClient(initObjs ...client.Object) client.Client {
Expand Down
34 changes: 26 additions & 8 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt
c, err := ctrl.NewControllerManagedBy(mgr).
For(&clusterv1.Machine{}).
WithOptions(options).
WithEventFilter(predicates.ResourceNotPausedAndHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
WithEventFilter(predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue)).
Watches(
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(clusterToMachines),
builder.WithPredicates(
// TODO: should this wait for Cluster.Status.InfrastructureReady similar to Infra Machine resources?
predicates.All(ctrl.LoggerFrom(ctx),
predicates.Any(ctrl.LoggerFrom(ctx),
predicates.ClusterUnpaused(ctrl.LoggerFrom(ctx)),
predicates.ClusterCreateUpdateEvent(ctrl.LoggerFrom(ctx)),
predicates.ClusterControlPlaneInitialized(ctrl.LoggerFrom(ctx)),
),
predicates.ResourceHasFilterLabel(ctrl.LoggerFrom(ctx), r.WatchFilterValue),
Expand Down Expand Up @@ -181,12 +181,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
m.Spec.ClusterName, m.Name, m.Namespace)
}

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, m) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}

// Initialize the patch helper
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
Expand All @@ -207,6 +201,29 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}
}()

// Return early and set the paused condition to True if the object or Cluster
// is paused.
if annotations.IsPaused(cluster, m) {
log.Info("Reconciliation is paused for this object")

newPausedCondition := &clusterv1.Condition{
Type: clusterv1.PausedCondition,
Status: corev1.ConditionTrue,
Severity: clusterv1.ConditionSeverityInfo,
}

if cluster.Spec.Paused {
newPausedCondition.Reason = clusterv1.ClusterPausedReason
} else {
newPausedCondition.Reason = clusterv1.AnnotationPausedReason
}

conditions.Set(m, newPausedCondition)
return ctrl.Result{}, nil
}

conditions.MarkFalseWithNegativePolarity(m, clusterv1.PausedCondition)

// Reconcile labels.
if m.Labels == nil {
m.Labels = make(map[string]string)
Expand Down Expand Up @@ -273,6 +290,7 @@ func patchMachine(ctx context.Context, patchHelper *patch.Helper, machine *clust
clusterv1.ReadyCondition,
clusterv1.BootstrapReadyCondition,
clusterv1.InfrastructureReadyCondition,
clusterv1.PausedCondition,
clusterv1.DrainingSucceededCondition,
clusterv1.MachineHealthCheckSucceededCondition,
clusterv1.MachineOwnerRemediatedCondition,
Expand Down
Loading
Loading