Skip to content

Commit

Permalink
sync all resources to cluster fields (#2713)
Browse files Browse the repository at this point in the history
* sync all resources to cluster fields (CronJob, Streams, Patroni resources)
* separated sync and delete logic for Patroni resources
* align delete streams and secrets logic with other resources
* rename gatherApplicationIds to getDistinctApplicationIds
* improve slot check before syncing streams CRD
* add ownerReferences and annotations diff to Patroni objects
* add extra sync code for config service so it does not get too ugly
* some bugfixes when comparing annotations and return err on found
* sync Patroni resources on update event and extended unit tests
* add config service/endpoint owner references check to e2e tes
  • Loading branch information
FxKu authored Aug 13, 2024
1 parent 31f92a1 commit 25ccc87
Show file tree
Hide file tree
Showing 11 changed files with 666 additions and 302 deletions.
5 changes: 2 additions & 3 deletions docs/administrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,16 @@ will differ and trigger a rolling update of the pods.
## Owner References and Finalizers

The Postgres Operator can set [owner references](https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/) to most of a cluster's child resources to improve
monitoring with GitOps tools and enable cascading deletes. There are three
monitoring with GitOps tools and enable cascading deletes. There are two
exceptions:

* Persistent Volume Claims, because they are handled by the [PV Reclaim Policy]https://kubernetes.io/docs/tasks/administer-cluster/change-pv-reclaim-policy/ of the Stateful Set
* The config endpoint + headless service resource because it is managed by Patroni
* Cross-namespace secrets, because owner references are not allowed across namespaces by design

The operator would clean these resources up with its regular delete loop
unless they got synced correctly. If for some reason the initial cluster sync
fails, e.g. after a cluster creation or operator restart, a deletion of the
cluster manifest would leave orphaned resources behind which the user has to
cluster manifest might leave orphaned resources behind which the user has to
clean up manually.

Another option is to enable finalizers which first ensures the deletion of all
Expand Down
29 changes: 16 additions & 13 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ def test_config_update(self):
"max_connections": new_max_connections_value,
"wal_level": "logical"
}
},
"patroni": {
},
"patroni": {
"slots": {
"first_slot": {
"type": "physical"
Expand All @@ -414,7 +414,7 @@ def test_config_update(self):
"retry_timeout": 9,
"synchronous_mode": True,
"failsafe_mode": True,
}
}
}
}

Expand Down Expand Up @@ -517,7 +517,7 @@ def compare_config():
pg_add_new_slots_patch = {
"spec": {
"patroni": {
"slots": {
"slots": {
"test_slot": {
"type": "logical",
"database": "foo",
Expand Down Expand Up @@ -1667,19 +1667,18 @@ def test_owner_references(self):
k8s.api.custom_objects_api.delete_namespaced_custom_object(
"acid.zalan.do", "v1", self.test_namespace, "postgresqls", cluster_name)

# statefulset, pod disruption budget and secrets should be deleted via owner reference
# child resources with owner references should be deleted via owner references
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted")
self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Services not deleted")
self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 0, "Secrets were not deleted")

time.sleep(5) # wait for the operator to also delete the leftovers
time.sleep(5) # wait for the operator to also delete the PVCs

# pvcs and Patroni config service/endpoint should not be affected by owner reference
# but deleted by the operator almost immediately
# pvcs do not have an owner reference but will deleted by the operator almost immediately
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 0, "PVCs not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Patroni config service not deleted")
self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Patroni config endpoint not deleted")

# disable owner references in config
disable_owner_refs = {
Expand Down Expand Up @@ -2143,13 +2142,13 @@ def test_stream_resources(self):
# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
},
"streams": [
{
"applicationId": "test-app",
Expand Down Expand Up @@ -2481,11 +2480,15 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n
self.assertTrue(self.has_postgresql_owner_reference(svc.metadata.owner_references, inverse), "primary service owner reference check failed")
replica_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-repl", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(replica_svc.metadata.owner_references, inverse), "replica service owner reference check failed")
config_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-config", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(config_svc.metadata.owner_references, inverse), "config service owner reference check failed")

ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name, cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(ep.metadata.owner_references, inverse), "primary endpoint owner reference check failed")
replica_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-repl", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica owner reference check failed")
self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica endpoint owner reference check failed")
config_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-config", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")

pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed")
Expand Down
137 changes: 30 additions & 107 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/sirupsen/logrus"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"

"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
Expand All @@ -30,7 +30,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,9 +61,13 @@ type Config struct {
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
//PVCs are treated separately
}
Expand Down Expand Up @@ -132,9 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
kubeResources: kubeResources{
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand Down Expand Up @@ -357,6 +363,11 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync resources created by Patroni
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
}

// create database objects unless we are running without pods or disabled
// that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
Expand All @@ -382,10 +393,6 @@ func (c *Cluster) Create() (err error) {
c.logger.Info("a k8s cron job for logical backup has been successfully created")
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

// Create connection pooler deployment and services if necessary. Since we
// need to perform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available.
Expand All @@ -410,6 +417,10 @@ func (c *Cluster) Create() (err error) {
}
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

return nil
}

Expand Down Expand Up @@ -856,7 +867,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
Expand Down Expand Up @@ -977,6 +988,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}

// Patroni service and endpoints / config maps
if err := c.syncPatroniResources(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
func() {
// check if users need to be synced during update
Expand Down Expand Up @@ -1191,7 +1208,6 @@ func (c *Cluster) Delete() error {
}

for _, role := range []PostgresRole{Master, Replica} {

if !c.patroniKubernetesUseConfigMaps() {
if err := c.deleteEndpoint(role); err != nil {
anyErrors = true
Expand All @@ -1207,10 +1223,10 @@ func (c *Cluster) Delete() error {
}
}

if err := c.deletePatroniClusterObjects(); err != nil {
if err := c.deletePatroniResources(); err != nil {
anyErrors = true
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove leftover patroni objects; %v", err)
c.logger.Warningf("could not delete all Patroni resources: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
}

// Delete connection pooler objects anyway, even if it's not mentioned in the
Expand Down Expand Up @@ -1742,96 +1758,3 @@ func (c *Cluster) Lock() {
func (c *Cluster) Unlock() {
c.mu.Unlock()
}

type simpleActionWithResult func()

type clusterObjectGet func(name string) (spec.NamespacedName, error)

type clusterObjectDelete func(name string) error

func (c *Cluster) deletePatroniClusterObjects() error {
// TODO: figure out how to remove leftover patroni objects in other cases
var actionsList []simpleActionWithResult

if !c.patroniUsesKubernetes() {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
}

actionsList = append(actionsList, c.deletePatroniClusterServices)
if c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterConfigMaps)
} else {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}

c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
deleter()
}
return nil
}

func deleteClusterObject(
get clusterObjectGet,
del clusterObjectDelete,
objType string,
clusterName string,
logger *logrus.Entry) {
for _, suffix := range patroniObjectSuffixes {
name := fmt.Sprintf("%s-%s", clusterName, suffix)

namespacedName, err := get(name)
if err == nil {
logger.Debugf("deleting %s %q",
objType, namespacedName)

if err = del(name); err != nil {
logger.Warningf("could not delete %s %q: %v",
objType, namespacedName, err)
}

} else if !k8sutil.ResourceNotFound(err) {
logger.Warningf("could not fetch %s %q: %v",
objType, namespacedName, err)
}
}
}

func (c *Cluster) deletePatroniClusterServices() {
get := func(name string) (spec.NamespacedName, error) {
svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(svc.ObjectMeta), err
}

deleteServiceFn := func(name string) error {
return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteServiceFn, "service", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterEndpoints() {
get := func(name string) (spec.NamespacedName, error) {
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(ep.ObjectMeta), err
}

deleteEndpointFn := func(name string) error {
return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteEndpointFn, "endpoint", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterConfigMaps() {
get := func(name string) (spec.NamespacedName, error) {
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(cm.ObjectMeta), err
}

deleteConfigMapFn := func(name string) error {
return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteConfigMapFn, "configmap", c.Name, c.logger)
}
2 changes: 1 addition & 1 deletion pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
if err != nil {
c.logger.Debugf("could not get connection pooler secret %s: %v", secretName, err)
} else {
if err = c.deleteSecret(secret.UID, *secret); err != nil {
if err = c.deleteSecret(secret.UID); err != nil {
return fmt.Errorf("could not delete pooler secret: %v", err)
}
}
Expand Down
16 changes: 5 additions & 11 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ func (c *Cluster) statefulSetName() string {
return c.Name
}

func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
name = fmt.Sprintf("%s-%s", name, "repl")
}

return name
}

func (c *Cluster) serviceName(role PostgresRole) string {
name := c.Name
if role == Replica {
switch role {
case Replica:
name = fmt.Sprintf("%s-%s", name, "repl")
case Patroni:
name = fmt.Sprintf("%s-%s", name, "config")
}

return name
Expand Down Expand Up @@ -2072,7 +2066,7 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.endpointName(role),
Name: c.serviceName(role),
Namespace: c.Namespace,
Annotations: c.annotationsSet(nil),
Labels: c.roleLabelsSet(true, role),
Expand Down
Loading

0 comments on commit 25ccc87

Please sign in to comment.