diff --git a/controlplane/kubeadm/internal/controllers/controller.go b/controlplane/kubeadm/internal/controllers/controller.go index 0ebace482bfc..9fb1e96031ff 100644 --- a/controlplane/kubeadm/internal/controllers/controller.go +++ b/controlplane/kubeadm/internal/controllers/controller.go @@ -184,15 +184,28 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl. if err != nil { return ctrl.Result{}, err } + + externalEtcdReady, err := external.IsReady(externalEtcd) + if err != nil { + return ctrl.Result{}, err + } + + if !externalEtcdReady { + log.Info("Managed external etcd is not ready yet, requeueing", "managedExternalEtcd", klog.KObj(externalEtcd)) + return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil + } + endpoints, found, err := external.GetExternalEtcdEndpoints(externalEtcd) if err != nil { return ctrl.Result{}, errors.Wrapf(err, "failed to get endpoint field from %v", externalEtcd.GetName()) } - if !found { - log.Info("Etcd endpoints not available") - return ctrl.Result{Requeue: true}, nil - } currentEtcdEndpoints := strings.Split(endpoints, ",") + + if !found || areEndpointsEmpty(currentEtcdEndpoints) { + log.Info("Managed external etcd endpoints not available, requeueing") + return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil + } + sort.Strings(currentEtcdEndpoints) currentKCPEndpoints := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints if !reflect.DeepEqual(currentEtcdEndpoints, currentKCPEndpoints) { @@ -294,6 +307,10 @@ func (r *KubeadmControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl. return res, err } +func areEndpointsEmpty(endpoints []string) bool { + return len(endpoints) == 0 || len(endpoints) == 1 && endpoints[0] == "" +} + func patchKubeadmControlPlane(ctx context.Context, patchHelper *patch.Helper, kcp *controlplanev1.KubeadmControlPlane) error { // Always update the readyCondition by summarizing the state of other conditions. conditions.SetSummary(kcp, diff --git a/controlplane/kubeadm/internal/controllers/controller_test.go b/controlplane/kubeadm/internal/controllers/controller_test.go index 05218bb48cbb..4ba54854639d 100644 --- a/controlplane/kubeadm/internal/controllers/controller_test.go +++ b/controlplane/kubeadm/internal/controllers/controller_test.go @@ -24,6 +24,7 @@ import ( "crypto/x509/pkix" "fmt" "math/big" + "strings" "sync" "testing" "time" @@ -56,6 +57,7 @@ import ( "sigs.k8s.io/cluster-api/internal/test/builder" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" + "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/certs" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" @@ -82,7 +84,8 @@ func TestClusterToKubeadmControlPlane(t *testing.T) { { NamespacedName: client.ObjectKey{ Namespace: cluster.Spec.ControlPlaneRef.Namespace, - Name: cluster.Spec.ControlPlaneRef.Name}, + Name: cluster.Spec.ControlPlaneRef.Name, + }, }, } @@ -2160,6 +2163,214 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) { }) } +func TestKubeadmControlPlaneReconciler_updateManagedExternalEtcdEndpoints(t *testing.T) { + setup := func() (*clusterv1.Cluster, *controlplanev1.KubeadmControlPlane, *unstructured.Unstructured) { + ns := "my-ns" + endpoints := []string{"1.1.1.1", "2.2.2.2", "0.0.0.0"} + managedEtcd := builder.Etcd(ns, "test-7-my-etcd").Build() + if err := unstructured.SetNestedField(managedEtcd.Object, true, "status", "ready"); err != nil { + t.Fatal(err) + } + if err := unstructured.SetNestedField(managedEtcd.Object, strings.Join(endpoints, ","), "status", "endpoints"); err != nil { + t.Fatal(err) + } + cluster, kcp, _ := createClusterWithControlPlane(ns) + cluster.Spec.ManagedExternalEtcdRef = external.GetObjectReference(managedEtcd) + kcp.Spec.KubeadmConfigSpec.ClusterConfiguration = &bootstrapv1.ClusterConfiguration{ + Etcd: bootstrapv1.Etcd{External: &bootstrapv1.ExternalEtcd{}}, + } + + return cluster, kcp, managedEtcd + } + t.Run("should update the endpoints in the kcp", func(t *testing.T) { + g := NewWithT(t) + cluster, kcp, managedEtcd := setup() + conditions.MarkFalse(kcp, controlplanev1.ExternalEtcdEndpointsAvailable, "", "", "") + + fClient := newFakeClient( + builder.GenericEtcdCRD.DeepCopy(), + managedEtcd.DeepCopy(), + cluster.DeepCopy(), + kcp.DeepCopy(), + ) + + r := &KubeadmControlPlaneReconciler{ + Client: fClient, + managementCluster: &fakeManagementCluster{ + Management: &internal.Management{Client: fClient}, + Workload: fakeWorkloadCluster{}, + }, + recorder: record.NewFakeRecorder(32), + } + + result, err := r.Reconcile( + ctx, + ctrl.Request{client.ObjectKeyFromObject(kcp)}, + ) + g.Expect(result).To(Equal(ctrl.Result{})) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func(g Gomega) { + cp := &controlplanev1.KubeadmControlPlane{} + g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed()) + g.Expect( + cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints, + ).To(Equal([]string{"0.0.0.0", "1.1.1.1", "2.2.2.2"})) + conditions.IsTrue(kcp, controlplanev1.ExternalEtcdEndpointsAvailable) + }, 5*time.Second).Should(Succeed()) + }) + + t.Run("should requeue and not update kcp when endpoints in external etcd are not set", func(t *testing.T) { + g := NewWithT(t) + cluster, kcp, managedEtcd := setup() + unstructured.RemoveNestedField(managedEtcd.Object, "status", "endpoints") + kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"} + + fClient := newFakeClient( + builder.GenericEtcdCRD.DeepCopy(), + managedEtcd.DeepCopy(), + cluster.DeepCopy(), + kcp.DeepCopy(), + ) + + r := &KubeadmControlPlaneReconciler{ + Client: fClient, + managementCluster: &fakeManagementCluster{ + Management: &internal.Management{Client: fClient}, + Workload: fakeWorkloadCluster{}, + }, + recorder: record.NewFakeRecorder(32), + } + + result, err := r.Reconcile( + ctx, + ctrl.Request{client.ObjectKeyFromObject(kcp)}, + ) + g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute})) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func(g Gomega) { + cp := &controlplanev1.KubeadmControlPlane{} + g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed()) + g.Expect( + cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints, + ).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"})) + }, 5*time.Second).Should(Succeed()) + }) + + t.Run("should requeue and not update kcp when endpoints in external etcd are empty", func(t *testing.T) { + g := NewWithT(t) + cluster, kcp, managedEtcd := setup() + g.Expect(unstructured.SetNestedField(managedEtcd.Object, "", "status", "endpoints")).To(Succeed()) + kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"} + + fClient := newFakeClient( + builder.GenericEtcdCRD.DeepCopy(), + managedEtcd.DeepCopy(), + cluster.DeepCopy(), + kcp.DeepCopy(), + ) + + r := &KubeadmControlPlaneReconciler{ + Client: fClient, + managementCluster: &fakeManagementCluster{ + Management: &internal.Management{Client: fClient}, + Workload: fakeWorkloadCluster{}, + }, + recorder: record.NewFakeRecorder(32), + } + + result, err := r.Reconcile( + ctx, + ctrl.Request{client.ObjectKeyFromObject(kcp)}, + ) + g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute})) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func(g Gomega) { + cp := &controlplanev1.KubeadmControlPlane{} + g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed()) + g.Expect( + cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints, + ).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"})) + }, 5*time.Second).Should(Succeed()) + }) + + t.Run("should requeue and not update kcp when endpoints in external etcd is not ready", func(t *testing.T) { + g := NewWithT(t) + cluster, kcp, managedEtcd := setup() + g.Expect(unstructured.SetNestedField(managedEtcd.Object, "0.0.0.0", "status", "endpoints")).To(Succeed()) + g.Expect(unstructured.SetNestedField(managedEtcd.Object, false, "status", "ready")).To(Succeed()) + kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"} + + fClient := newFakeClient( + builder.GenericEtcdCRD.DeepCopy(), + managedEtcd.DeepCopy(), + cluster.DeepCopy(), + kcp.DeepCopy(), + ) + + r := &KubeadmControlPlaneReconciler{ + Client: fClient, + managementCluster: &fakeManagementCluster{ + Management: &internal.Management{Client: fClient}, + Workload: fakeWorkloadCluster{}, + }, + recorder: record.NewFakeRecorder(32), + } + + result, err := r.Reconcile( + ctx, + ctrl.Request{client.ObjectKeyFromObject(kcp)}, + ) + g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute})) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func(g Gomega) { + cp := &controlplanev1.KubeadmControlPlane{} + g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed()) + g.Expect( + cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints, + ).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"})) + }, 5*time.Second).Should(Succeed()) + }) + + t.Run("should requeue and not update kcp when etcd is ongoing an upgrade in external etcd is going through an upgrade", func(t *testing.T) { + g := NewWithT(t) + cluster, kcp, managedEtcd := setup() + g.Expect(unstructured.SetNestedField(managedEtcd.Object, "0.0.0.0", "status", "endpoints")).To(Succeed()) + annotations.AddAnnotations(managedEtcd, map[string]string{"etcdcluster.cluster.x-k8s.io/upgrading": "true"}) + kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints = []string{"0.0.0.0", "1.1.1.1", "3.3.3.3"} + + fClient := newFakeClient( + builder.GenericEtcdCRD.DeepCopy(), + managedEtcd.DeepCopy(), + cluster.DeepCopy(), + kcp.DeepCopy(), + ) + + r := &KubeadmControlPlaneReconciler{ + Client: fClient, + managementCluster: &fakeManagementCluster{ + Management: &internal.Management{Client: fClient}, + Workload: fakeWorkloadCluster{}, + }, + recorder: record.NewFakeRecorder(32), + } + + result, err := r.Reconcile( + ctx, + ctrl.Request{client.ObjectKeyFromObject(kcp)}, + ) + g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: 1 * time.Minute})) + g.Expect(err).NotTo(HaveOccurred()) + g.Eventually(func(g Gomega) { + cp := &controlplanev1.KubeadmControlPlane{} + g.Expect(fClient.Get(ctx, client.ObjectKeyFromObject(kcp), cp)).To(Succeed()) + g.Expect( + cp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.External.Endpoints, + ).To(Equal([]string{"0.0.0.0", "1.1.1.1", "3.3.3.3"})) + conditions.IsFalse(cp, controlplanev1.ExternalEtcdEndpointsAvailable) + }, 5*time.Second).Should(Succeed()) + }) +} + // test utils. func newFakeClient(initObjs ...client.Object) client.Client { diff --git a/internal/test/builder/etcd.go b/internal/test/builder/etcd.go new file mode 100644 index 000000000000..096be828bbf8 --- /dev/null +++ b/internal/test/builder/etcd.go @@ -0,0 +1,80 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + // EtcdGroupVersion is group version used for control plane objects. + EtcdGroupVersion = schema.GroupVersion{Group: "etcd.cluster.x-k8s.io", Version: "v1beta1"} + + // GenericEtcdKind is the Kind for the GenericEtcd. + GenericEtcdKind = "GenericEtcd" + // GenericEtcdCRD is a generic control plane CRD. + GenericEtcdCRD = testEtcdCRD(EtcdGroupVersion.WithKind(GenericEtcdKind)) +) + +func testEtcdCRD(gvk schema.GroupVersionKind) *apiextensionsv1.CustomResourceDefinition { + return generateCRD(gvk, map[string]apiextensionsv1.JSONSchemaProps{ + "metadata": { + // NOTE: in CRD there is only a partial definition of metadata schema. + // Ref https://github.com/kubernetes-sigs/controller-tools/blob/59485af1c1f6a664655dad49543c474bb4a0d2a2/pkg/crd/gen.go#L185 + Type: "object", + }, + "spec": etcdSpecSchema, + "status": { + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + // mandatory fields from the Cluster API contract + "ready": {Type: "boolean"}, + "initialized": {Type: "boolean"}, + "endpoints": {Type: "string"}, + }, + }, + }) +} + +var etcdSpecSchema = apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{}, +} + +// EtcdPlaneBuilder holds the variables and objects needed to build a generic object for cluster.spec.ManagedExternalEtcdRef. +type EtcdPlaneBuilder struct { + obj *unstructured.Unstructured +} + +// Etcd returns a EtcdBuilder with the given name and Namespace. +func Etcd(namespace, name string) *EtcdPlaneBuilder { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(EtcdGroupVersion.String()) + obj.SetKind(GenericEtcdKind) + obj.SetNamespace(namespace) + obj.SetName(name) + return &EtcdPlaneBuilder{ + obj: obj, + } +} + +// Build generates an Unstructured object from the information passed to the EtcdPlaneBuilder. +func (c *EtcdPlaneBuilder) Build() *unstructured.Unstructured { + return c.obj +} diff --git a/util/annotations/helpers.go b/util/annotations/helpers.go index 0ec9ef9388ac..8118362beb2a 100644 --- a/util/annotations/helpers.go +++ b/util/annotations/helpers.go @@ -70,8 +70,7 @@ func AddAnnotations(o metav1.Object, desired map[string]string) bool { } annotations := o.GetAnnotations() if annotations == nil { - annotations = make(map[string]string) - o.SetAnnotations(annotations) + annotations = make(map[string]string) } hasChanged := false for k, v := range desired { @@ -80,6 +79,7 @@ func AddAnnotations(o metav1.Object, desired map[string]string) bool { hasChanged = true } } + o.SetAnnotations(annotations) return hasChanged }