Skip to content

Commit

Permalink
Improve external managed etcd to allow manual orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
g-gaston committed Aug 15, 2023
1 parent dec7dfc commit b2dee50
Show file tree
Hide file tree
Showing 8 changed files with 701 additions and 96 deletions.
5 changes: 5 additions & 0 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ const (
// VariableDefinitionFromInline indicates a patch or variable was defined in the `.spec` of a ClusterClass
// rather than from an external patch extension.
VariableDefinitionFromInline = "inline"


// SkipControlPlanePauseManagedEtcdAnnotation indicates that the cluster controller should not pause or unpause
// the control plane after the managed etcd cluster becomes not-ready/ready.
SkipControlPlanePauseManagedEtcdAnnotation = "cluster.x-k8s.io/skip-pause-cp-managed-etcd"
)

// NodeUninitializedTaint can be added to Nodes at creation by the bootstrap provider, e.g. the
Expand Down
120 changes: 78 additions & 42 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/blang/semver"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -179,51 +180,12 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
}

if cluster.Spec.ManagedExternalEtcdRef != nil {
etcdRef := cluster.Spec.ManagedExternalEtcdRef
externalEtcd, err := external.Get(ctx, r.Client, etcdRef, cluster.Namespace)
managedEtcdResult, err := r.updateManagedExternalEtcdEndpoints(ctx, log, patchHelper, cluster, kcp)
if err != nil {
return ctrl.Result{}, err
}
endpoints, found, err := external.GetExternalEtcdEndpoints(externalEtcd)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to get endpoint field from %v", externalEtcd.GetName())
}
if !found {
log.Info("Etcd endpoints not available")
return ctrl.Result{Requeue: true}, nil
}
currentEtcdEndpoints := strings.Split(endpoints, ",")
sort.Strings(currentEtcdEndpoints)
currentKCPEndpoints := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints
if !reflect.DeepEqual(currentEtcdEndpoints, currentKCPEndpoints) {
/* During upgrade, KCP spec's endpoints will again be an empty list, and will get populated by the cluster controller once the
external etcd controller has set them. If the KCP controller proceeds without checking whether the etcd cluster is undergoing upgrade,
there is a chance it will get the current un-updated endpoints from the etcd cluster object, and those would end up being endpoints of the
etcd members that will get deleted during upgrade. Hence the controller checks and stalls if the etcd cluster is undergoing upgrade and proceeds
only after the etcd upgrade is completed as that guarantees that the KCP has latest set of endpoints.
*/
etcdUpgradeInProgress, err := external.IsExternalEtcdUpgrading(externalEtcd)
if err != nil {
return ctrl.Result{}, err
}
if etcdUpgradeInProgress {
log.Info("Etcd undergoing upgrade, marking etcd endpoints available condition as false, since new endpoints will be available only after etcd upgrade")
if conditions.IsTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) || conditions.IsUnknown(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) {
conditions.MarkFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable, controlplanev1.ExternalEtcdUndergoingUpgrade, clusterv1.ConditionSeverityInfo, "")
if err := patchKubeadmControlPlane(ctx, patchHelper, kcp); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = currentEtcdEndpoints
if err := patchHelper.Patch(ctx, kcp); err != nil {
log.Error(err, "Failed to patch KubeadmControlPlane to update external etcd endpoints")
return ctrl.Result{}, err
}
}
if conditions.IsFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) {
conditions.MarkTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable)
if !managedEtcdResult.IsZero() {
return managedEtcdResult, nil
}
}

Expand Down Expand Up @@ -294,6 +256,80 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.
return res, err
}

func (r *KubeadmControlPlaneReconciler) updateManagedExternalEtcdEndpoints(
ctx context.Context, log logr.Logger, patchHelper *patch.Helper, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane,
) (ctrl.Result, error) {
if kcp.Spec.KubeadmConfigSpec.ClusterConfiguration == nil || kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External == nil {
return ctrl.Result{}, errors.New("invalid kcp, external etcd not configured for cluster with managed external etcd")
}

etcdRef := cluster.Spec.ManagedExternalEtcdRef
externalEtcd, err := external.Get(ctx, r.Client, etcdRef, cluster.Namespace)
if err != nil {
return ctrl.Result{}, err
}

externalEtcdReady, err := external.IsReady(externalEtcd)
if err != nil {
return ctrl.Result{}, err
}

if !externalEtcdReady {
log.Info("Managed external etcd is not ready yet, requeueing")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

endpoints, found, err := external.GetExternalEtcdEndpoints(externalEtcd)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to get endpoint field from %v", externalEtcd.GetName())
}
currentEtcdEndpoints := strings.Split(endpoints, ",")

if !found || areEndpointsEmpty(currentEtcdEndpoints) {
log.Info("Managed external etcd endpoints not available, requeueing")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

sort.Strings(currentEtcdEndpoints)
currentKCPEndpoints := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints
if !reflect.DeepEqual(currentEtcdEndpoints, currentKCPEndpoints) {
/* During upgrade, KCP spec's endpoints will again be an empty list, and will get populated by the cluster controller once the
external etcd controller has set them. If the KCP controller proceeds without checking whether the etcd cluster is undergoing upgrade,
there is a chance it will get the current un-updated endpoints from the etcd cluster object, and those would end up being endpoints of the
etcd members that will get deleted during upgrade. Hence the controller checks and stalls if the etcd cluster is undergoing upgrade and proceeds
only after the etcd upgrade is completed as that guarantees that the KCP has latest set of endpoints.
*/
etcdUpgradeInProgress, err := external.IsExternalEtcdUpgrading(externalEtcd)
if err != nil {
return ctrl.Result{}, err
}
if etcdUpgradeInProgress {
log.Info("Etcd undergoing upgrade, marking etcd endpoints available condition as false, since new endpoints will be available only after etcd upgrade")
if conditions.IsTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) || conditions.IsUnknown(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) {
conditions.MarkFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable, controlplanev1.ExternalEtcdUndergoingUpgrade, clusterv1.ConditionSeverityInfo, "")
if err := patchKubeadmControlPlane(ctx, patchHelper, kcp); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = currentEtcdEndpoints
if err := patchHelper.Patch(ctx, kcp); err != nil {
log.Error(err, "Failed to patch KubeadmControlPlane to update external etcd endpoints")
return ctrl.Result{}, err
}
}
if conditions.IsFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) {
conditions.MarkTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable)
}

return ctrl.Result{}, nil
}

func areEndpointsEmpty(endpoints []string) bool {
return len(endpoints) == 0 || len(endpoints) == 1 && endpoints[0] == ""
}

func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error {
// Always update the readyCondition by summarizing the state of other conditions.
conditions.SetSummary(kcp,
Expand Down
209 changes: 208 additions & 1 deletion controlplane/kubeadm/internal/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"crypto/x509/pkix"
"fmt"
"math/big"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -56,6 +57,7 @@ import (
"sigs.k8s.io/cluster-api/internal/test/builder"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/certs"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
Expand All @@ -82,7 +84,8 @@ func TestClusterToKubeadmControlPlane(t *testing.T) {
{
NamespacedName: client.ObjectKey{
Namespace: cluster.Spec.ControlPlaneRef.Namespace,
Name: cluster.Spec.ControlPlaneRef.Name},
Name: cluster.Spec.ControlPlaneRef.Name,
},
},
}

Expand Down Expand Up @@ -2160,6 +2163,210 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
})
}

func TestKubeadmControlPlaneReconciler_updateManagedExternalEtcdEndpoints(t *testing.T) {
setup := func() (*clusterv1.Cluster, *controlplanev1.KubeadmControlPlane, *unstructured.Unstructured) {
ns := "my-ns"
endpoints := []string{"1.1.1.1", "2.2.2.2", "0.0.0.0"}
managedEtcd := builder.Etcd(ns, "test-7-my-etcd").Build()
unstructured.SetNestedField(managedEtcd.Object, true, "status", "ready")

Check failure on line 2171 in controlplane/kubeadm/internal/controllers/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
unstructured.SetNestedField(managedEtcd.Object, strings.Join(endpoints, ","), "status", "endpoints")

Check failure on line 2172 in controlplane/kubeadm/internal/controllers/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
cluster, kcp, _ := createClusterWithControlPlane(ns)
cluster.Spec.ManagedExternalEtcdRef = external.GetObjectReference(managedEtcd)
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration = &bootstrapv1.ClusterConfiguration{
Etcd: bootstrapv1.Etcd{External: &bootstrapv1.ExternalEtcd{}},
}

return cluster, kcp, managedEtcd
}
t.Run("should update the endpoints in the kcp", func(t *testing.T) {
g := NewWithT(t)
cluster, kcp, managedEtcd := setup()
conditions.MarkFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable, "", "", "")

fClient := newFakeClient(
builder.GenericEtcdCRD.DeepCopy(),
managedEtcd.DeepCopy(),
cluster.DeepCopy(),
kcp.DeepCopy(),
)

r := &KubeadmControlPlaneReconciler{
Client: fClient,
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: fClient},
Workload: fakeWorkloadCluster{},
},
recorder: record.NewFakeRecorder(32),
}

result, err := r.Reconcile(
ctx,
ctrl.Request{client.ObjectKeyFromObject(kcp)},
)
g.Expect(result).To(Equal(ctrl.Result{}))
g.Expect(err).NotTo(HaveOccurred())
g.Eventually(func(g Gomega) {
cp := &controlplanev1.KubeadmControlPlane{}
g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed())
g.Expect(
cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints,
).To(Equal([]string{"0.0.0.0", "1.1.1.1", "2.2.2.2"}))
conditions.IsTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable)
}, 5*time.Second).Should(Succeed())
})

t.Run("should requeue and not update kcp when endpoints in external etcd are not set", func(t *testing.T) {
g := NewWithT(t)
cluster, kcp, managedEtcd := setup()
unstructured.RemoveNestedField(managedEtcd.Object, "status", "endpoints")
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}

fClient := newFakeClient(
builder.GenericEtcdCRD.DeepCopy(),
managedEtcd.DeepCopy(),
cluster.DeepCopy(),
kcp.DeepCopy(),
)

r := &KubeadmControlPlaneReconciler{
Client: fClient,
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: fClient},
Workload: fakeWorkloadCluster{},
},
recorder: record.NewFakeRecorder(32),
}

result, err := r.Reconcile(
ctx,
ctrl.Request{client.ObjectKeyFromObject(kcp)},
)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute}))
g.Expect(err).NotTo(HaveOccurred())
g.Eventually(func(g Gomega) {
cp := &controlplanev1.KubeadmControlPlane{}
g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed())
g.Expect(
cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints,
).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}))
}, 5*time.Second).Should(Succeed())
})

t.Run("should requeue and not update kcp when endpoints in external etcd are empty", func(t *testing.T) {
g := NewWithT(t)
cluster, kcp, managedEtcd := setup()
unstructured.SetNestedField(managedEtcd.Object, "", "status", "endpoints")

Check failure on line 2258 in controlplane/kubeadm/internal/controllers/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}

fClient := newFakeClient(
builder.GenericEtcdCRD.DeepCopy(),
managedEtcd.DeepCopy(),
cluster.DeepCopy(),
kcp.DeepCopy(),
)

r := &KubeadmControlPlaneReconciler{
Client: fClient,
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: fClient},
Workload: fakeWorkloadCluster{},
},
recorder: record.NewFakeRecorder(32),
}

result, err := r.Reconcile(
ctx,
ctrl.Request{client.ObjectKeyFromObject(kcp)},
)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute}))
g.Expect(err).NotTo(HaveOccurred())
g.Eventually(func(g Gomega) {
cp := &controlplanev1.KubeadmControlPlane{}
g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed())
g.Expect(
cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints,
).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}))
}, 5*time.Second).Should(Succeed())
})

t.Run("should requeue and not update kcp when endpoints in external etcd is not ready", func(t *testing.T) {
g := NewWithT(t)
cluster, kcp, managedEtcd := setup()
unstructured.SetNestedField(managedEtcd.Object, "0.0.0.0", "status", "endpoints")

Check failure on line 2295 in controlplane/kubeadm/internal/controllers/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
unstructured.SetNestedField(managedEtcd.Object, false, "status", "ready")

Check failure on line 2296 in controlplane/kubeadm/internal/controllers/controller_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `unstructured.SetNestedField` is not checked (errcheck)
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}

fClient := newFakeClient(
builder.GenericEtcdCRD.DeepCopy(),
managedEtcd.DeepCopy(),
cluster.DeepCopy(),
kcp.DeepCopy(),
)

r := &KubeadmControlPlaneReconciler{
Client: fClient,
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: fClient},
Workload: fakeWorkloadCluster{},
},
recorder: record.NewFakeRecorder(32),
}

result, err := r.Reconcile(
ctx,
ctrl.Request{client.ObjectKeyFromObject(kcp)},
)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute}))
g.Expect(err).NotTo(HaveOccurred())
g.Eventually(func(g Gomega) {
cp := &controlplanev1.KubeadmControlPlane{}
g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed())
g.Expect(
cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints,
).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}))
}, 5*time.Second).Should(Succeed())
})

t.Run("should requeue and not update kcp when etcd is ongoing an upgrade in external etcd is going through an upgrade", func(t *testing.T) {
g := NewWithT(t)
cluster, kcp, managedEtcd := setup()
unstructured.SetNestedField(managedEtcd.Object, "0.0.0.0", "status", "endpoints")
annotations.AddAnnotations(managedEtcd, map[string]string{"etcdcluster.cluster.x-k8s.io/upgrading": "true"})
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}

fClient := newFakeClient(
builder.GenericEtcdCRD.DeepCopy(),
managedEtcd.DeepCopy(),
cluster.DeepCopy(),
kcp.DeepCopy(),
)

r := &KubeadmControlPlaneReconciler{
Client: fClient,
managementCluster: &fakeManagementCluster{
Management: &internal.Management{Client: fClient},
Workload: fakeWorkloadCluster{},
},
recorder: record.NewFakeRecorder(32),
}

result, err := r.Reconcile(
ctx,
ctrl.Request{client.ObjectKeyFromObject(kcp)},
)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute}))
g.Expect(err).NotTo(HaveOccurred())
g.Eventually(func(g Gomega) {
cp := &controlplanev1.KubeadmControlPlane{}
g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed())
g.Expect(
cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints,
).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"}))
conditions.IsFalse(cp, controlplanev1.ExternalEtcdEndpointsAvailable)
}, 5*time.Second).Should(Succeed())
})
}

// test utils.

func newFakeClient(initObjs ...client.Object) client.Client {
Expand Down
Loading

0 comments on commit b2dee50

Please sign in to comment.