Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync all resources to cluster fields #2713

Merged
merged 27 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
52a1c9c
sync all resources to cluster fields
FxKu Aug 2, 2024
e4c3725
initialize ConfigMaps field
FxKu Aug 2, 2024
968f0e4
resolve conflict
FxKu Aug 2, 2024
cda48d2
unify config map log messages
FxKu Aug 2, 2024
bf02b64
fix config service name
FxKu Aug 2, 2024
ae1fab1
what's up with the endpoints
FxKu Aug 2, 2024
c9032ac
swap services and endpoints deletion
FxKu Aug 6, 2024
1759091
debug the leftover endpoints
FxKu Aug 6, 2024
ec243cb
comment lines
FxKu Aug 6, 2024
f2bcaee
undo changes - delete seems to work now
FxKu Aug 6, 2024
0482af4
the endpoint is near
FxKu Aug 6, 2024
74bc89b
separated sync and delete logic for Patroni resources
FxKu Aug 6, 2024
aed22e2
undo e2e test changes
FxKu Aug 6, 2024
3834091
aligh delete streams and secrets logic with other resources
FxKu Aug 7, 2024
db20458
rename gatherApplicationIds to getDistinct
FxKu Aug 7, 2024
ea3a0f8
improve slot check before syncing streams CRD
FxKu Aug 8, 2024
7d50d56
patch annotation for Patroni config service
FxKu Aug 9, 2024
5156420
Merge branch 'master' into fix-orphaned-service-delete
FxKu Aug 9, 2024
409924a
add ownerReferences and annotations diff to Patroni objects
FxKu Aug 9, 2024
461ddc1
add extra sync code for config service so it does not get too ugly
FxKu Aug 9, 2024
dd2deef
update docs
FxKu Aug 12, 2024
c2a52f6
Merge branch 'master' into fix-orphaned-service-delete
FxKu Aug 12, 2024
1d76627
some bugfixes when comparing annotations and return err on found
FxKu Aug 12, 2024
b63263c
sync Patroni resources on update event and extended unit tests
FxKu Aug 12, 2024
275acd3
add one more errror return
FxKu Aug 12, 2024
bd9c0de
add config service/endpoint owner references check to e2e test
FxKu Aug 12, 2024
c28abde
reflect another code review
FxKu Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ def test_config_update(self):
"max_connections": new_max_connections_value,
"wal_level": "logical"
}
},
"patroni": {
},
"patroni": {
"slots": {
"first_slot": {
"type": "physical"
Expand All @@ -412,7 +412,7 @@ def test_config_update(self):
"retry_timeout": 9,
"synchronous_mode": True,
"failsafe_mode": True,
}
}
}
}

Expand Down Expand Up @@ -515,7 +515,7 @@ def compare_config():
pg_add_new_slots_patch = {
"spec": {
"patroni": {
"slots": {
"slots": {
"test_slot": {
"type": "logical",
"database": "foo",
Expand Down Expand Up @@ -2020,13 +2020,13 @@ def test_stream_resources(self):
# update the manifest with the streams section
patch_streaming_config = {
"spec": {
"patroni": {
"patroni": {
"slots": {
"manual_slot": {
"type": "physical"
}
}
},
},
"streams": [
{
"applicationId": "test-app",
Expand Down
131 changes: 24 additions & 107 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
// Postgres CustomResourceDefinition object i.e. Spilo

import (
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/sirupsen/logrus"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1"

"github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme"
"github.com/zalando/postgres-operator/pkg/spec"
Expand All @@ -30,7 +30,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,9 +61,13 @@ type Config struct {
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
LogicalBackupJob *batchv1.CronJob
Streams map[string]*zalandov1.FabricEventStream
//Pods are treated separately
//PVCs are treated separately
}
Expand Down Expand Up @@ -132,9 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
kubeResources: kubeResources{
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand Down Expand Up @@ -357,6 +363,11 @@ func (c *Cluster) Create() (err error) {
c.logger.Infof("pods are ready")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync resources created by Patroni
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
}

// create database objects unless we are running without pods or disabled
// that feature explicitly
if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) {
Expand All @@ -382,10 +393,6 @@ func (c *Cluster) Create() (err error) {
c.logger.Info("a k8s cron job for logical backup has been successfully created")
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

// Create connection pooler deployment and services if necessary. Since we
// need to perform some operations with the database itself (e.g. install
// lookup function), do it as the last step, when everything is available.
Expand All @@ -410,6 +417,10 @@ func (c *Cluster) Create() (err error) {
}
}

if err := c.listResources(); err != nil {
c.logger.Errorf("could not list resources: %v", err)
}

return nil
}

Expand Down Expand Up @@ -847,7 +858,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
Expand Down Expand Up @@ -1183,7 +1194,6 @@ func (c *Cluster) Delete() error {
}

for _, role := range []PostgresRole{Master, Replica} {

if !c.patroniKubernetesUseConfigMaps() {
if err := c.deleteEndpoint(role); err != nil {
anyErrors = true
Expand All @@ -1199,10 +1209,10 @@ func (c *Cluster) Delete() error {
}
}

if err := c.deletePatroniClusterObjects(); err != nil {
if err := c.deletePatroniResources(); err != nil {
FxKu marked this conversation as resolved.
Show resolved Hide resolved
anyErrors = true
c.logger.Warningf("could not remove leftover patroni objects; %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not remove leftover patroni objects; %v", err)
c.logger.Warningf("could not delete all Patroni resources: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
}

// Delete connection pooler objects anyway, even if it's not mentioned in the
Expand Down Expand Up @@ -1734,96 +1744,3 @@ func (c *Cluster) Lock() {
func (c *Cluster) Unlock() {
c.mu.Unlock()
}

type simpleActionWithResult func()

type clusterObjectGet func(name string) (spec.NamespacedName, error)

type clusterObjectDelete func(name string) error

func (c *Cluster) deletePatroniClusterObjects() error {
// TODO: figure out how to remove leftover patroni objects in other cases
var actionsList []simpleActionWithResult

if !c.patroniUsesKubernetes() {
c.logger.Infof("not cleaning up Etcd Patroni objects on cluster delete")
}

actionsList = append(actionsList, c.deletePatroniClusterServices)
if c.patroniKubernetesUseConfigMaps() {
actionsList = append(actionsList, c.deletePatroniClusterConfigMaps)
} else {
actionsList = append(actionsList, c.deletePatroniClusterEndpoints)
}

c.logger.Debugf("removing leftover Patroni objects (endpoints / services and configmaps)")
for _, deleter := range actionsList {
deleter()
}
return nil
}

func deleteClusterObject(
get clusterObjectGet,
del clusterObjectDelete,
objType string,
clusterName string,
logger *logrus.Entry) {
for _, suffix := range patroniObjectSuffixes {
name := fmt.Sprintf("%s-%s", clusterName, suffix)

namespacedName, err := get(name)
if err == nil {
logger.Debugf("deleting %s %q",
objType, namespacedName)

if err = del(name); err != nil {
logger.Warningf("could not delete %s %q: %v",
objType, namespacedName, err)
}

} else if !k8sutil.ResourceNotFound(err) {
logger.Warningf("could not fetch %s %q: %v",
objType, namespacedName, err)
}
}
}

func (c *Cluster) deletePatroniClusterServices() {
get := func(name string) (spec.NamespacedName, error) {
svc, err := c.KubeClient.Services(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(svc.ObjectMeta), err
}

deleteServiceFn := func(name string) error {
return c.KubeClient.Services(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteServiceFn, "service", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterEndpoints() {
get := func(name string) (spec.NamespacedName, error) {
ep, err := c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(ep.ObjectMeta), err
}

deleteEndpointFn := func(name string) error {
return c.KubeClient.Endpoints(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteEndpointFn, "endpoint", c.Name, c.logger)
}

func (c *Cluster) deletePatroniClusterConfigMaps() {
get := func(name string) (spec.NamespacedName, error) {
cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
return util.NameFromMeta(cm.ObjectMeta), err
}

deleteConfigMapFn := func(name string) error {
return c.KubeClient.ConfigMaps(c.Namespace).Delete(context.TODO(), name, c.deleteOptions)
}

deleteClusterObject(get, deleteConfigMapFn, "configmap", c.Name, c.logger)
}
2 changes: 1 addition & 1 deletion pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (c *Cluster) deleteConnectionPoolerSecret() (err error) {
if err != nil {
c.logger.Debugf("could not get connection pooler secret %s: %v", secretName, err)
} else {
if err = c.deleteSecret(secret.UID, *secret); err != nil {
if err = c.deleteSecret(secret.UID); err != nil {
return fmt.Errorf("could not delete pooler secret: %v", err)
}
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ func (c *Cluster) statefulSetName() string {
return c.Name
}

func (c *Cluster) endpointName(role PostgresRole) string {
name := c.Name
if role == Replica {
name = fmt.Sprintf("%s-%s", name, "repl")
}

return name
}

func (c *Cluster) serviceName(role PostgresRole) string {
name := c.Name
if role == Replica {
switch role {
case Replica:
name = fmt.Sprintf("%s-%s", name, "repl")
case Patroni:
name = fmt.Sprintf("%s-%s", name, "config")
}

return name
Expand Down Expand Up @@ -1968,6 +1962,9 @@ func (c *Cluster) shouldCreateLoadBalancerForService(role PostgresRole, spec *ac

return c.OpConfig.EnableMasterLoadBalancer

case Patroni:
return false

default:
panic(fmt.Sprintf("Unknown role %v", role))
}
Expand Down Expand Up @@ -2061,7 +2058,7 @@ func (c *Cluster) getCustomServiceAnnotations(role PostgresRole, spec *acidv1.Po
func (c *Cluster) generateEndpoint(role PostgresRole, subsets []v1.EndpointSubset) *v1.Endpoints {
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: c.endpointName(role),
Name: c.serviceName(role),
Namespace: c.Namespace,
Annotations: c.annotationsSet(nil),
Labels: c.roleLabelsSet(true, role),
Expand Down
Loading
Loading