From 45ead22c272c16d9c283351b9bfa0f10d46d49d4 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 27 Oct 2023 09:31:03 -0400 Subject: [PATCH] Only update local Endpoint when setting HealthCheckIP When setting the HealthCheckIP from the Node global IP, it was doing a create-or-update operation but we really want to just update. It was observed in a leader election fail-over scenario where a local Endpoint was recreated on Node update after the Endpoint had been deleted, which caused issues (see https://issues.redhat.com/browse/ACM-8274). Signed-off-by: Tom Pantelis --- main.go | 9 ++++++ .../datastore_endpoint_sync_test.go | 19 ++++++++++++- .../datastoresyncer/datastoresyncer.go | 28 ++++++++++--------- .../datastoresyncer/node_handler.go | 2 +- 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/main.go b/main.go index 533ab6c68..5d1a3a648 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,7 @@ import ( "github.com/submariner-io/submariner/pkg/natdiscovery" "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/versions" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -112,6 +113,12 @@ func main() { leClient, err := kubernetes.NewForConfig(rest.AddUserAgent(restConfig, "leader-election")) logger.FatalOnError(err, "Error creating leader election kubernetes clientset") + dynClient, err := dynamic.NewForConfig(restConfig) + logger.FatalOnError(err, "Error creating dynamic client") + + restMapper, err := util.BuildRestMapper(restConfig) + logger.FatalOnError(err, "Error building the REST mapper") + logger.FatalOnError(subv1.AddToScheme(scheme.Scheme), "Error adding submariner types to the scheme") gw, err := gateway.New(&gateway.Config{ @@ -123,6 +130,8 @@ func main() { Spec: submSpec, SyncerConfig: broker.SyncerConfig{ LocalRestConfig: restConfig, + LocalClient: dynClient, + RestMapper: restMapper, }, WatcherConfig: watcher.Config{ RestConfig: restConfig, diff --git a/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go b/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go index 2b1e6ad84..456465810 100644 --- a/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go +++ b/pkg/controllers/datastoresyncer/datastore_endpoint_sync_test.go @@ -26,12 +26,15 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/test" + testutil "github.com/submariner-io/admiral/pkg/test" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/globalnet/constants" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -133,16 +136,30 @@ func testEndpointSyncing() { test.CreateResource(t.localNodes, node) }) - It("should update the local Endpoint's HealthCheckIP", func() { + JustBeforeEach(func() { t.localEndpoint.Spec.HealthCheckIP = node.Annotations[constants.SmGlobalIP] awaitEndpoint(t.localEndpoints, &t.localEndpoint.Spec) + }) + It("should update the local Endpoint's HealthCheckIP", func() { node.Annotations[constants.SmGlobalIP] = "200.0.0.100" t.localEndpoint.Spec.HealthCheckIP = node.Annotations[constants.SmGlobalIP] test.UpdateResource(t.localNodes, node) awaitEndpoint(t.localEndpoints, &t.localEndpoint.Spec) }) + + Context("but the local Endpoint no longer exists", func() { + It("should not recreate the local Endpoint", func() { + Expect(t.localEndpoints.Delete(context.Background(), getEndpointName(&t.localEndpoint.Spec), metav1.DeleteOptions{})). + To(Succeed()) + + node.Annotations[constants.SmGlobalIP] = "200.0.0.100" + test.UpdateResource(t.localNodes, node) + + testutil.EnsureNoResource[runtime.Object](resource.ForDynamic(t.localEndpoints), getEndpointName(&t.localEndpoint.Spec)) + }) + }) }) } diff --git a/pkg/controllers/datastoresyncer/datastoresyncer.go b/pkg/controllers/datastoresyncer/datastoresyncer.go index 42e92d4ba..757285318 100644 --- a/pkg/controllers/datastoresyncer/datastoresyncer.go +++ b/pkg/controllers/datastoresyncer/datastoresyncer.go @@ -28,6 +28,7 @@ import ( "github.com/submariner-io/admiral/pkg/resource" resourceSyncer "github.com/submariner-io/admiral/pkg/syncer" "github.com/submariner-io/admiral/pkg/syncer/broker" + "github.com/submariner-io/admiral/pkg/util" "github.com/submariner-io/admiral/pkg/watcher" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" "github.com/submariner-io/submariner/pkg/cidr" @@ -45,11 +46,11 @@ import ( ) type DatastoreSyncer struct { - localCluster types.SubmarinerCluster - localEndpoint types.SubmarinerEndpoint - localNodeName string - syncerConfig broker.SyncerConfig - localFederator federate.Federator + localCluster types.SubmarinerCluster + localEndpoint types.SubmarinerEndpoint + localNodeName string + syncerConfig broker.SyncerConfig + updateFederator federate.Federator } var logger = log.Logger{Logger: logf.Log.WithName("DSSyncer")} @@ -72,6 +73,9 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error { logger.Info("Starting the datastore syncer") + d.updateFederator = federate.NewUpdateFederator(d.syncerConfig.LocalClient, d.syncerConfig.RestMapper, d.syncerConfig.LocalNamespace, + util.CopyImmutableMetadata) + syncer, err := d.createSyncer() if err != nil { return err @@ -82,17 +86,15 @@ func (d *DatastoreSyncer) Start(stopCh <-chan struct{}) error { return errors.WithMessage(err, "error starting the syncer") } - d.localFederator = syncer.GetLocalFederator() - if err := d.ensureExclusiveEndpoint(syncer); err != nil { return errors.WithMessage(err, "could not ensure exclusive submariner Endpoint") } - if err := d.createLocalCluster(); err != nil { + if err := d.createLocalCluster(syncer.GetLocalFederator()); err != nil { return errors.WithMessage(err, "error creating the local submariner Cluster") } - if err := d.createOrUpdateLocalEndpoint(); err != nil { + if err := d.createOrUpdateLocalEndpoint(syncer.GetLocalFederator()); err != nil { return errors.WithMessage(err, "error creating the local submariner Endpoint") } @@ -312,7 +314,7 @@ func (d *DatastoreSyncer) createNodeWatcher(stopCh <-chan struct{}) error { return nil } -func (d *DatastoreSyncer) createLocalCluster() error { +func (d *DatastoreSyncer) createLocalCluster(federator federate.Federator) error { logger.Infof("Creating local submariner Cluster: %#v ", d.localCluster) cluster := &submarinerv1.Cluster{ @@ -322,10 +324,10 @@ func (d *DatastoreSyncer) createLocalCluster() error { Spec: d.localCluster.Spec, } - return d.localFederator.Distribute(cluster) //nolint:wrapcheck // Let the caller wrap it + return federator.Distribute(cluster) //nolint:wrapcheck // Let the caller wrap it } -func (d *DatastoreSyncer) createOrUpdateLocalEndpoint() error { +func (d *DatastoreSyncer) createOrUpdateLocalEndpoint(federator federate.Federator) error { logger.Infof("Creating local submariner Endpoint: %#v ", d.localEndpoint) endpointName, err := d.localEndpoint.Spec.GenerateName() @@ -340,5 +342,5 @@ func (d *DatastoreSyncer) createOrUpdateLocalEndpoint() error { Spec: d.localEndpoint.Spec, } - return d.localFederator.Distribute(endpoint) //nolint:wrapcheck // Let the caller wrap it + return federator.Distribute(endpoint) //nolint:wrapcheck // Let the caller wrap it } diff --git a/pkg/controllers/datastoresyncer/node_handler.go b/pkg/controllers/datastoresyncer/node_handler.go index f68ac4100..ba1fb79bc 100644 --- a/pkg/controllers/datastoresyncer/node_handler.go +++ b/pkg/controllers/datastoresyncer/node_handler.go @@ -75,7 +75,7 @@ func (d *DatastoreSyncer) updateLocalEndpointIfNecessary(globalIPOfNode string) prevHealthCheckIP := d.localEndpoint.Spec.HealthCheckIP d.localEndpoint.Spec.HealthCheckIP = globalIPOfNode - if err := d.createOrUpdateLocalEndpoint(); err != nil { + if err := d.createOrUpdateLocalEndpoint(d.updateFederator); err != nil { logger.Warningf("Error updating the local submariner Endpoint with HealthcheckIP: %v", err) d.localEndpoint.Spec.HealthCheckIP = prevHealthCheckIP