Skip to content

Commit

Permalink
sync Patroni resources on update event and extended unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FxKu committed Aug 12, 2024
1 parent 1d76627 commit b63263c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 16 deletions.
6 changes: 6 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}

// Patroni service and endpoints / config maps
if err := c.syncPatroniResources(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
func() {
// check if users need to be synced during update
Expand Down
126 changes: 110 additions & 16 deletions pkg/cluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
"github.com/zalando/postgres-operator/mocks"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake"
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/config"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
k8sFake "k8s.io/client-go/kubernetes/fake"
)

Expand Down Expand Up @@ -49,6 +51,7 @@ func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset
PersistentVolumeClaimsGetter: clientSet.CoreV1(),
PersistentVolumesGetter: clientSet.CoreV1(),
EndpointsGetter: clientSet.CoreV1(),
ConfigMapsGetter: clientSet.CoreV1(),
PodsGetter: clientSet.CoreV1(),
DeploymentsGetter: clientSet.AppsV1(),
CronJobsGetter: clientSet.BatchV1(),
Expand All @@ -66,11 +69,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[
clusterOptions := clusterLabelsOptions(cluster)
// helper functions
containsAnnotations := func(expected map[string]string, actual map[string]string, objName string, objType string) error {
if expected == nil {
if len(actual) != 0 {
return fmt.Errorf("%s %v expected not to have any annotations, got: %#v", objType, objName, actual)
}
} else if !(reflect.DeepEqual(expected, actual)) {
if !util.MapContains(actual, expected) {
return fmt.Errorf("%s %v expected annotations: %#v, got: %#v", objType, objName, expected, actual)
}
return nil
Expand Down Expand Up @@ -183,7 +182,7 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[
return err
}
for _, cronJob := range cronJobList.Items {
if err := containsAnnotations(updateAnnotations(annotations), cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil {
if err := containsAnnotations(annotations, cronJob.Annotations, cronJob.ObjectMeta.Name, "Logical backup cron job"); err != nil {
return err
}
if err := containsAnnotations(updateAnnotations(annotations), cronJob.Spec.JobTemplate.Spec.Template.Annotations, cronJob.Name, "Logical backup cron job pod template"); err != nil {
Expand Down Expand Up @@ -219,8 +218,21 @@ func checkResourcesInheritedAnnotations(cluster *Cluster, resultAnnotations map[
return nil
}

checkConfigMaps := func(annotations map[string]string) error {
cmList, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, cm := range cmList.Items {
if err := containsAnnotations(annotations, cm.Annotations, cm.ObjectMeta.Name, "ConfigMap"); err != nil {
return err
}
}
return nil
}

checkFuncs := []func(map[string]string) error{
checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints,
checkSts, checkPods, checkSvc, checkPdb, checkPooler, checkCronJob, checkPvc, checkSecrets, checkEndpoints, checkConfigMaps,
}
for _, f := range checkFuncs {
if err := f(resultAnnotations); err != nil {
Expand Down Expand Up @@ -281,6 +293,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
KubernetesUseConfigMaps: true,
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
Expand Down Expand Up @@ -343,19 +356,65 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster,
}
}

// resources which Patroni creates
if err = createPatroniResources(cluster); err != nil {
return nil, err
}

return cluster, nil
}

func createPatroniResources(cluster *Cluster) error {
patroniService := cluster.generateService(Replica, &pg.Spec)
patroniService.ObjectMeta.Name = cluster.serviceName(Patroni)
_, err := cluster.KubeClient.Services(namespace).Create(context.TODO(), patroniService, metav1.CreateOptions{})
if err != nil {
return err
}

for _, suffix := range patroniObjectSuffixes {
metadata := metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", clusterName, suffix),
Namespace: namespace,
Annotations: map[string]string{
"initialize": "123456789",
},
Labels: cluster.labelsSet(false),
}

if cluster.OpConfig.KubernetesUseConfigMaps {
configMap := v1.ConfigMap{
ObjectMeta: metadata,
}
_, err := cluster.KubeClient.ConfigMaps(namespace).Create(context.TODO(), &configMap, metav1.CreateOptions{})
if err != nil {
return err
}
} else {
endpoints := v1.Endpoints{
ObjectMeta: metadata,
}
_, err := cluster.KubeClient.Endpoints(namespace).Create(context.TODO(), &endpoints, metav1.CreateOptions{})
if err != nil {
return err
}
}
}

return nil
}

func annotateResources(cluster *Cluster) error {
clusterOptions := clusterLabelsOptions(cluster)
patchData, err := metaAnnotationsPatch(externalAnnotations)

stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, sts := range stsList.Items {
sts.Annotations = externalAnnotations
if _, err = cluster.KubeClient.StatefulSets(namespace).Update(context.TODO(), &sts, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.StatefulSets(namespace).Patch(context.TODO(), sts.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -366,7 +425,7 @@ func annotateResources(cluster *Cluster) error {
}
for _, pod := range podList.Items {
pod.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Pods(namespace).Update(context.TODO(), &pod, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.Pods(namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -377,7 +436,7 @@ func annotateResources(cluster *Cluster) error {
}
for _, svc := range svcList.Items {
svc.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Services(namespace).Update(context.TODO(), &svc, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.Services(namespace).Patch(context.TODO(), svc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -388,7 +447,19 @@ func annotateResources(cluster *Cluster) error {
}
for _, pdb := range pdbList.Items {
pdb.Annotations = externalAnnotations
_, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Update(context.TODO(), &pdb, metav1.UpdateOptions{})
_, err = cluster.KubeClient.PodDisruptionBudgets(namespace).Patch(context.TODO(), pdb.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return err
}
}

cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, cronJob := range cronJobList.Items {
cronJob.Annotations = externalAnnotations
_, err = cluster.KubeClient.CronJobs(namespace).Patch(context.TODO(), cronJob.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return err
}
Expand All @@ -400,7 +471,7 @@ func annotateResources(cluster *Cluster) error {
}
for _, pvc := range pvcList.Items {
pvc.Annotations = externalAnnotations
if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Update(context.TODO(), &pvc, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.PersistentVolumeClaims(namespace).Patch(context.TODO(), pvc.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -411,7 +482,7 @@ func annotateResources(cluster *Cluster) error {
return err
}
deploy.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Deployments(namespace).Update(context.TODO(), deploy, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.Deployments(namespace).Patch(context.TODO(), deploy.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -422,7 +493,7 @@ func annotateResources(cluster *Cluster) error {
}
for _, secret := range secrets.Items {
secret.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Secrets(namespace).Update(context.TODO(), &secret, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.Secrets(namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}
Expand All @@ -433,10 +504,22 @@ func annotateResources(cluster *Cluster) error {
}
for _, ep := range endpoints.Items {
ep.Annotations = externalAnnotations
if _, err = cluster.KubeClient.Endpoints(namespace).Update(context.TODO(), &ep, metav1.UpdateOptions{}); err != nil {
if _, err = cluster.KubeClient.Endpoints(namespace).Patch(context.TODO(), ep.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}

configMaps, err := cluster.KubeClient.ConfigMaps(namespace).List(context.TODO(), clusterOptions)
if err != nil {
return err
}
for _, cm := range configMaps.Items {
cm.Annotations = externalAnnotations
if _, err = cluster.KubeClient.ConfigMaps(namespace).Patch(context.TODO(), cm.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{}); err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -503,7 +586,18 @@ func TestInheritedAnnotations(t *testing.T) {
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)

// 3. Existing annotations (should not be removed)
// 3. Change from ConfigMaps to Endpoints
err = cluster.deletePatroniResources()
assert.NoError(t, err)
cluster.OpConfig.KubernetesUseConfigMaps = false
err = createPatroniResources(cluster)
assert.NoError(t, err)
err = cluster.Sync(newSpec.DeepCopy())
assert.NoError(t, err)
err = checkResourcesInheritedAnnotations(cluster, result)
assert.NoError(t, err)

// 4. Existing annotations (should not be removed)
err = annotateResources(cluster)
assert.NoError(t, err)
maps.Copy(result, externalAnnotations)
Expand Down

0 comments on commit b63263c

Please sign in to comment.