From b63263cd3092ecac6ab18f60ceac1cad639ba1e2 Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Mon, 12 Aug 2024 18:00:58 +0200 Subject: [PATCH] sync Patroni resources on update event and extended unit tests --- pkg/cluster/cluster.go | 6 ++ pkg/cluster/util_test.go | 126 ++++++++++++++++++++++++++++++++++----- 2 files changed, 116 insertions(+), 16 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ffc0ef987..f0f432753 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -988,6 +988,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } + // Patroni service and endpoints / config maps + if err := c.syncPatroniResources(); err != nil { + c.logger.Errorf("could not sync services: %v", err) + updateFailed = true + } + // Users func() { // check if users need to be synced during update diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 58380b49a..d303e8bcc 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -16,12 +16,14 @@ import ( "github.com/zalando/postgres-operator/mocks" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/patroni" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" k8sFake "k8s.io/client-go/kubernetes/fake" ) @@ -49,6 +51,7 @@ func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset PersistentVolumeClaimsGetter: clientSet.CoreV1(), PersistentVolumesGetter: clientSet.CoreV1(), EndpointsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), PodsGetter: clientSet.CoreV1(), DeploymentsGetter: clientSet.AppsV1(), CronJobsGetter: clientSet.BatchV1(), @@ -66,11 +69,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ clusterOptions := clusterLabelsOptions(cluster) // helper functions containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error { - if expected == nil { - if len(actual) != 0 { - return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual) - } - } else if !(reflect.DeepEqual(expected, actual)) { + if !util.MapContains(actual, expected) { return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual) } return nil @@ -183,7 +182,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ return err } for _, cronJob := range cronJobList.Items { - if err := containsAnnotations(updateAnnotations(annotations), cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil { + if err := containsAnnotations(annotations, cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil { return err } if err := containsAnnotations(updateAnnotations(annotations), cronJob.Spec.JobTemplate.Spec.Template.Annotations, cronJob.Name, "Logical backup cron job pod template"); err != nil { @@ -219,8 +218,21 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[ return nil } + checkConfigMaps := func(annotations map[string]string) error { + cmList, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cm := range cmList.Items { + if err := containsAnnotations(annotations, cm.Annotations, cm.ObjectMeta.Name, "ConfigMap"); err != nil { + return err + } + } + return nil + } + checkFuncs := []func(map[string]string) error{ - checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints, + checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints, checkConfigMaps, } for _, f := range checkFuncs { if err := f(resultAnnotations); err != nil { @@ -281,6 +293,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, OpConfig: config.Config{ PatroniAPICheckInterval: time.Duration(1), PatroniAPICheckTimeout: time.Duration(5), + KubernetesUseConfigMaps: true, ConnectionPooler: config.ConnectionPooler{ ConnectionPoolerDefaultCPURequest: "100m", ConnectionPoolerDefaultCPULimit: "100m", @@ -343,11 +356,57 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, } } + // resources which Patroni creates + if err = createPatroniResources(cluster); err != nil { + return nil, err + } + return cluster, nil } +func createPatroniResources(cluster *Cluster) error { + patroniService := cluster.generateService(Replica, &pg.Spec) + patroniService.ObjectMeta.Name = cluster.serviceName(Patroni) + _, err := cluster.KubeClient.Services(namespace).Create(context.TODO(), patroniService, metav1.CreateOptions{}) + if err != nil { + return err + } + + for _, suffix := range patroniObjectSuffixes { + metadata := metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", clusterName, suffix), + Namespace: namespace, + Annotations: map[string]string{ + "initialize": "123456789", + }, + Labels: cluster.labelsSet(false), + } + + if cluster.OpConfig.KubernetesUseConfigMaps { + configMap := v1.ConfigMap{ + ObjectMeta: metadata, + } + _, err := cluster.KubeClient.ConfigMaps(namespace).Create(context.TODO(), &configMap, metav1.CreateOptions{}) + if err != nil { + return err + } + } else { + endpoints := v1.Endpoints{ + ObjectMeta: metadata, + } + _, err := cluster.KubeClient.Endpoints(namespace).Create(context.TODO(), &endpoints, metav1.CreateOptions{}) + if err != nil { + return err + } + } + } + + return nil +} + func annotateResources(cluster *Cluster) error { clusterOptions := clusterLabelsOptions(cluster) + patchData, err := metaAnnotationsPatch(externalAnnotations) stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions) if err != nil { @@ -355,7 +414,7 @@ func annotateResources(cluster *Cluster) error { } for _, sts := range stsList.Items { sts.Annotations = externalAnnotations - if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.StatefulSets(namespace).Patch(context.TODO(), sts.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -366,7 +425,7 @@ func annotateResources(cluster *Cluster) error { } for _, pod := range podList.Items { pod.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Pods(namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -377,7 +436,7 @@ func annotateResources(cluster *Cluster) error { } for _, svc := range svcList.Items { svc.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Services(namespace).Patch(context.TODO(), svc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -388,7 +447,19 @@ func annotateResources(cluster *Cluster) error { } for _, pdb := range pdbList.Items { pdb.Annotations = externalAnnotations - _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{}) + _, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Patch(context.TODO(), pdb.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return err + } + } + + cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cronJob := range cronJobList.Items { + cronJob.Annotations = externalAnnotations + _, err = cluster.KubeClient.CronJobs(namespace).Patch(context.TODO(), cronJob.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) if err != nil { return err } @@ -400,7 +471,7 @@ func annotateResources(cluster *Cluster) error { } for _, pvc := range pvcList.Items { pvc.Annotations = externalAnnotations - if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -411,7 +482,7 @@ func annotateResources(cluster *Cluster) error { return err } deploy.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Deployments(namespace).Patch(context.TODO(), deploy.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -422,7 +493,7 @@ func annotateResources(cluster *Cluster) error { } for _, secret := range secrets.Items { secret.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Secrets(namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } @@ -433,10 +504,22 @@ func annotateResources(cluster *Cluster) error { } for _, ep := range endpoints.Items { ep.Annotations = externalAnnotations - if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil { + if _, err = cluster.KubeClient.Endpoints(namespace).Patch(context.TODO(), ep.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { return err } } + + configMaps, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions) + if err != nil { + return err + } + for _, cm := range configMaps.Items { + cm.Annotations = externalAnnotations + if _, err = cluster.KubeClient.ConfigMaps(namespace).Patch(context.TODO(), cm.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil { + return err + } + } + return nil } @@ -503,7 +586,18 @@ func TestInheritedAnnotations(t *testing.T) { err = checkResourcesInheritedAnnotations(cluster, result) assert.NoError(t, err) - // 3. Existing annotations (should not be removed) + // 3. Change from ConfigMaps to Endpoints + err = cluster.deletePatroniResources() + assert.NoError(t, err) + cluster.OpConfig.KubernetesUseConfigMaps = false + err = createPatroniResources(cluster) + assert.NoError(t, err) + err = cluster.Sync(newSpec.DeepCopy()) + assert.NoError(t, err) + err = checkResourcesInheritedAnnotations(cluster, result) + assert.NoError(t, err) + + // 4. Existing annotations (should not be removed) err = annotateResources(cluster) assert.NoError(t, err) maps.Copy(result, externalAnnotations)