Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync all resources to cluster fields #2713

Merged
merged 27 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
52a1c9c
sync all resources to cluster fields
FxKu Aug 2, 2024
e4c3725
initialize ConfigMaps field
FxKu Aug 2, 2024
968f0e4
resolve conflict
FxKu Aug 2, 2024
cda48d2
unify config map log messages
FxKu Aug 2, 2024
bf02b64
fix config service name
FxKu Aug 2, 2024
ae1fab1
what's up with the endpoints
FxKu Aug 2, 2024
c9032ac
swap services and endpoints deletion
FxKu Aug 6, 2024
1759091
debug the leftover endpoints
FxKu Aug 6, 2024
ec243cb
comment lines
FxKu Aug 6, 2024
f2bcaee
undo changes - delete seems to work now
FxKu Aug 6, 2024
0482af4
the endpoint is near
FxKu Aug 6, 2024
74bc89b
separated sync and delete logic for Patroni resources
FxKu Aug 6, 2024
aed22e2
undo e2e test changes
FxKu Aug 6, 2024
3834091
aligh delete streams and secrets logic with other resources
FxKu Aug 7, 2024
db20458
rename gatherApplicationIds to getDistinct
FxKu Aug 7, 2024
ea3a0f8
improve slot check before syncing streams CRD
FxKu Aug 8, 2024
7d50d56
patch annotation for Patroni config service
FxKu Aug 9, 2024
5156420
Merge branch 'master' into fix-orphaned-service-delete
FxKu Aug 9, 2024
409924a
add ownerReferences and annotations diff to Patroni objects
FxKu Aug 9, 2024
461ddc1
add extra sync code for config service so it does not get too ugly
FxKu Aug 9, 2024
dd2deef
update docs
FxKu Aug 12, 2024
c2a52f6
Merge branch 'master' into fix-orphaned-service-delete
FxKu Aug 12, 2024
1d76627
some bugfixes when comparing annotations and return err on found
FxKu Aug 12, 2024
b63263c
sync Patroni resources on update event and extended unit tests
FxKu Aug 12, 2024
275acd3
add one more errror return
FxKu Aug 12, 2024
bd9c0de
add config service/endpoint owner references check to e2e test
FxKu Aug 12, 2024
c28abde
reflect another code review
FxKu Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/administrator.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,16 @@ will differ and trigger a rolling update of the pods.
## Owner References and Finalizers

The Postgres Operator can set [owner references](https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/) to most of a cluster's child resources to improve
monitoring with GitOps tools and enable cascading deletes. There are three
monitoring with GitOps tools and enable cascading deletes. There are two
exceptions:

* Persistent Volume Claims, because they are handled by the [PV Reclaim Policy]https://kubernetes.io/docs/tasks/administer-cluster/change-pv-reclaim-policy/ of the Stateful Set
* The config endpoint + headless service resource because it is managed by Patroni
* Cross-namespace secrets, because owner references are not allowed across namespaces by design

The operator would clean these resources up with its regular delete loop
unless they got synced correctly. If for some reason the initial cluster sync
fails, e.g. after a cluster creation or operator restart, a deletion of the
cluster manifest would leave orphaned resources behind which the user has to
cluster manifest might leave orphaned resources behind which the user has to
clean up manually.

Another option is to enable finalizers which first ensures the deletion of all
Expand Down
29 changes: 16 additions & 13 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ def test_config_update(self):
"max_connections": new_max_connections_value,
"wal_level": "logical"
}
},
"patroni": {
},
"patroni": {
"slots": {
"first_slot": {
"type": "physical"
Expand All @@ -414,7 +414,7 @@ def test_config_update(self):
"retry_timeout": 9,
"synchronous_mode": True,
"failsafe_mode": True,
}
}
}
}

Expand Down Expand Up @@ -517,7 +517,7 @@ def compare_config():
pg_add_new_slots_patch = {
"spec": {
"patroni": {
"slots": {
"slots": {
"test_slot": {
"type": "logical",
"database": "foo",
Expand Down Expand Up @@ -1667,19 +1667,18 @@ def test_owner_references(self):
k8s.api.custom_objects_api.delete_namespaced_custom_object(
"acid.zalan.do", "v1", self.test_namespace, "postgresqls", cluster_name)

# statefulset, pod disruption budget and secrets should be deleted via owner reference
# child resources with owner references should be deleted via owner references
self.eventuallyEqual(lambda: k8s.count_pods_with_label(cluster_label), 0, "Pods not deleted")
self.eventuallyEqual(lambda: k8s.count_statefulsets_with_label(cluster_label), 0, "Statefulset not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Services not deleted")
self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Endpoints not deleted")
self.eventuallyEqual(lambda: k8s.count_pdbs_with_label(cluster_label), 0, "Pod disruption budget not deleted")
self.eventuallyEqual(lambda: k8s.count_secrets_with_label(cluster_label), 0, "Secrets were not deleted")

time.sleep(5) # wait for the operator to also delete the leftovers
time.sleep(5) # wait for the operator to also delete the PVCs

# pvcs and Patroni config service/endpoint should not be affected by owner reference
# but deleted by the operator almost immediately
# pvcs do not have an owner reference but will deleted by the operator almost immediately
self.eventuallyEqual(lambda: k8s.count_pvcs_with_label(cluster_label), 0, "PVCs not deleted")
self.eventuallyEqual(lambda: k8s.count_services_with_label(cluster_label), 0, "Patroni config service not deleted")
self.eventuallyEqual(lambda: k8s.count_endpoints_with_label(cluster_label), 0, "Patroni config endpoint not deleted")

# disable owner references in config
disable_owner_refs = {
Expand Down Expand Up @@ -2143,13 +2142,13 @@ def test_stream_resources(self):
# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
},
"streams": [
{
"applicationId": "test-app",
Expand Down Expand Up @@ -2481,11 +2480,15 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n
self.assertTrue(self.has_postgresql_owner_reference(svc.metadata.owner_references, inverse), "primary service owner reference check failed")
replica_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-repl", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(replica_svc.metadata.owner_references, inverse), "replica service owner reference check failed")
config_svc = k8s.api.core_v1.read_namespaced_service(cluster_name + "-config", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(config_svc.metadata.owner_references, inverse), "config service owner reference check failed")

ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name, cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(ep.metadata.owner_references, inverse), "primary endpoint owner reference check failed")
replica_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-repl", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica owner reference check failed")
self.assertTrue(self.has_postgresql_owner_reference(replica_ep.metadata.owner_references, inverse), "replica endpoint owner reference check failed")
config_ep = k8s.api.core_v1.read_namespaced_endpoints(cluster_name + "-config", cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed")

pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace)
self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed")
Expand Down
137 changes: 30 additions & 107 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/sirupsen/logrus"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"

"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
Expand All @@ -30,7 +30,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,9 +61,13 @@ type Config struct {
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
//PVCs are treated separately
}
Expand Down Expand Up @@ -132,9 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
kubeResources: kubeResources{
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand Down Expand Up @@ -357,6 +363,11 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync resources created by Patroni
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
}

// create database objects unless we are running without pods or disabled
// that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
Expand All @@ -382,10 +393,6 @@ func (c *Cluster) Create() (err error) {
c.logger.Info("a k8s cron job for logical backup has been successfully created")
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

// Create connection pooler deployment and services if necessary. Since we
// need to perform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available.
Expand All @@ -410,6 +417,10 @@ func (c *Cluster) Create() (err error) {
}
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

return nil
}

Expand Down Expand Up @@ -856,7 +867,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if !reflect.DeepEqual(new.Spec, cur.Spec) {
return false, "new PDB's spec does not match the current one"
Expand Down Expand Up @@ -977,6 +988,12 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
updateFailed = true
}

// Patroni service and endpoints / config maps
if err := c.syncPatroniResources(); err != nil {
c.logger.Errorf("could not sync services: %v", err)
updateFailed = true
}

// Users
func() {
// check if users need to be synced during update
Expand Down Expand Up @@ -1191,7 +1208,6 @@ func (c *Cluster) Delete() error {
}

for _, role := range []PostgresRole{Master, Replica} {

if !c.patroniKubernetesUseConfigMaps() {
if err := c.deleteEndpoint(role); err != nil {
anyErrors = true
Expand All @@ -1207,10 +1223,10 @@ func (c *Cluster) Delete() error {
}
}

if err := c.deletePatroniClusterObjects(); err != nil {
if err := c.deletePatroniResources(); err != nil {
FxKu marked this conversation as resolved.
Show resolved Hide resolved
anyErrors = true
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove leftover patroni objects; %v", err)
c.logger.Warningf("could not delete all Patroni resources: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
}

// Delete connection pooler objects anyway, even if it's not mentioned in the
Expand Down Expand Up @@ -1742,96 +1758,3 @@ func (c *Cluster) Lock() {
func (c *Cluster) Unlock() {
c.mu.Unlock()
}

type simpleActionWithResult func()

type clusterObjectGet func(name string) (spec.NamespacedName, error)

type clusterObjectDelete func(name string) error

func (c *Cluster) deletePatroniClusterObjects() error {
// TODO: figure out how to remove leftover patroni objects in other cases
var actionsList []simpleActionWithResult

if !c.patroniUsesKubernetes() {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
}

actionsList = append(actionsList, c.deletePatroniClusterServices)
if c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterConfigMaps)
} else {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}

c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
deleter()
}
return nil
}

func deleteClusterObject(
get clusterObjectGet,
del clusterObjectDelete,
objType string,
clusterName string,
logger *logrus.Entry) {
for _, suffix := range patroniObjectSuffixes {
name := fmt.Sprintf("%s-%s", clusterName, suffix)

namespacedName, err := get(name)
if err == nil {
logger.Debugf("deleting %s %q",
objType, namespacedName)

if err = del(name); err != nil {
logger.Warningf("could not delete %s %q: %v",
objType, namespacedName, err)
}

} else if !k8sutil.ResourceNotFound(err) {
logger.Warningf("could not fetch %s %q: %v",
objType, namespacedName, err)
}
}
}

func (c *Cluster) deletePatroniClusterServices() {
get := func(name string) (spec.NamespacedName, error) {
svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(svc.ObjectMeta), err
}

deleteServiceFn := func(name string) error {
return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteServiceFn, "service", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterEndpoints() {
get := func(name string) (spec.NamespacedName, error) {
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(ep.ObjectMeta), err
}

deleteEndpointFn := func(name string) error {
return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteEndpointFn, "endpoint", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterConfigMaps() {
get := func(name string) (spec.NamespacedName, error) {
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(cm.ObjectMeta), err
}

deleteConfigMapFn := func(name string) error {
return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteConfigMapFn, "configmap", c.Name, c.logger)
}
2 changes: 1 addition & 1 deletion pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
if err != nil {
c.logger.Debugf("could not get connection pooler secret %s: %v", secretName, err)
} else {
if err = c.deleteSecret(secret.UID, *secret); err != nil {
if err = c.deleteSecret(secret.UID); err != nil {
return fmt.Errorf("could not delete pooler secret: %v", err)
}
}
Expand Down
16 changes: 5 additions & 11 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ func (c *Cluster) statefulSetName() string {
return c.Name
}

func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
name = fmt.Sprintf("%s-%s", name, "repl")
}

return name
}

func (c *Cluster) serviceName(role PostgresRole) string {
name := c.Name
if role == Replica {
switch role {
case Replica:
name = fmt.Sprintf("%s-%s", name, "repl")
case Patroni:
name = fmt.Sprintf("%s-%s", name, "config")
}

return name
Expand Down Expand Up @@ -2072,7 +2066,7 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.endpointName(role),
Name: c.serviceName(role),
Namespace: c.Namespace,
Annotations: c.annotationsSet(nil),
Labels: c.roleLabelsSet(true, role),
Expand Down
Loading
Loading