Skip to content

Commit

Permalink
remove stream resources after drop from Postgres manifest (#2563)
Browse files Browse the repository at this point in the history
* remove stream resources after drop from Postgres manifest
  • Loading branch information
FxKu authored Jun 27, 2024
1 parent 7cdc23f commit 37d6993
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
}

// streams
if len(newSpec.Spec.Streams) > 0 {
if len(newSpec.Spec.Streams) > 0 || len(oldSpec.Spec.Streams) != len(newSpec.Spec.Streams) {
if err := c.syncStreams(); err != nil {
c.logger.Errorf("could not sync streams: %v", err)
updateFailed = true
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ func (c *Cluster) syncStreams() error {
if len(slotsToSync) > 0 {
requiredPatroniConfig.Slots = slotsToSync
} else {
return nil
// try to delete existing stream resources
return c.deleteStreams()
}

c.logger.Debug("syncing logical replication slots")
Expand Down
17 changes: 17 additions & 0 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,21 @@ func TestUpdateFabricEventStream(t *testing.T) {
if match, _ := sameStreams(streams.Items[0].Spec.EventStreams, result.Spec.EventStreams); !match {
t.Errorf("Malformed FabricEventStream after disabling event recovery, expected %#v, got %#v", streams.Items[0], result)
}

mockClient := k8sutil.NewMockKubernetesClient()
cluster.KubeClient.CustomResourceDefinitionsGetter = mockClient.CustomResourceDefinitionsGetter

// remove streams from manifest
pgPatched.Spec.Streams = nil
pgUpdated, err := cluster.KubeClient.Postgresqls(namespace).Update(
context.TODO(), pgPatched, metav1.UpdateOptions{})
assert.NoError(t, err)

cluster.Postgresql.Spec = pgUpdated.Spec
cluster.syncStreams()

streamList, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions)
if len(streamList.Items) > 0 || err != nil {
t.Errorf("stream resource has not been removed or unexpected error %v", err)
}
}
26 changes: 24 additions & 2 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"github.com/zalando/postgres-operator/pkg/spec"
apiappsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apiextv1client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -59,7 +60,7 @@ type KubernetesClient struct {
appsv1.DeploymentsGetter
rbacv1.RoleBindingsGetter
policyv1.PodDisruptionBudgetsGetter
apiextv1.CustomResourceDefinitionsGetter
apiextv1client.CustomResourceDefinitionsGetter
clientbatchv1.CronJobsGetter
acidv1.OperatorConfigurationsGetter
acidv1.PostgresTeamsGetter
Expand All @@ -71,6 +72,13 @@ type KubernetesClient struct {
Zalandov1ClientSet *zalandoclient.Clientset
}

type mockCustomResourceDefinition struct {
apiextv1client.CustomResourceDefinitionInterface
}

type MockCustomResourceDefinitionsGetter struct {
}

type mockSecret struct {
corev1.SecretInterface
}
Expand Down Expand Up @@ -240,6 +248,18 @@ func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg
return updatedPg, nil
}

func (c *mockCustomResourceDefinition) Get(ctx context.Context, name string, options metav1.GetOptions) (*apiextv1.CustomResourceDefinition, error) {
return &apiextv1.CustomResourceDefinition{}, nil
}

func (c *mockCustomResourceDefinition) Create(ctx context.Context, crd *apiextv1.CustomResourceDefinition, options metav1.CreateOptions) (*apiextv1.CustomResourceDefinition, error) {
return &apiextv1.CustomResourceDefinition{}, nil
}

func (mock *MockCustomResourceDefinitionsGetter) CustomResourceDefinitions() apiextv1client.CustomResourceDefinitionInterface {
return &mockCustomResourceDefinition{}
}

func (c *mockSecret) Get(ctx context.Context, name string, options metav1.GetOptions) (*v1.Secret, error) {
oldFormatSecret := &v1.Secret{}
oldFormatSecret.Name = "testcluster"
Expand Down Expand Up @@ -444,6 +464,8 @@ func NewMockKubernetesClient() KubernetesClient {
ConfigMapsGetter: &MockConfigMapsGetter{},
DeploymentsGetter: &MockDeploymentGetter{},
ServicesGetter: &MockServiceGetter{},

CustomResourceDefinitionsGetter: &MockCustomResourceDefinitionsGetter{},
}
}

Expand Down

0 comments on commit 37d6993

Please sign in to comment.