From 825ed8c290be470b5aa3021c80be1e45b453f162 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 20 Oct 2023 13:43:03 -0400 Subject: [PATCH] Restart leader election on lease renewal failure in gateway ...rather than exiting. Related to https://github.com/submariner-io/submariner/issues/2747 Signed-off-by: Tom Pantelis --- pkg/gateway/gateway.go | 136 ++++++++++++------ pkg/gateway/gateway_suite_test.go | 13 ++ pkg/gateway/gateway_test.go | 224 +++++++++++++++++++++++++----- pkg/pod/pod.go | 6 +- 4 files changed, 298 insertions(+), 81 deletions(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index f9979ad05..e60008f2e 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -23,6 +23,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -44,6 +45,7 @@ import ( "github.com/submariner-io/submariner/pkg/types" "github.com/submariner-io/submariner/pkg/versions" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/leaderelection" @@ -56,6 +58,8 @@ const ( defaultLeaseDuration = 10 * time.Second defaultRenewDeadline = 5 * time.Second defaultRetryPeriod = 2 * time.Second + + LeaderElectionLockName = "submariner-gateway-lock" ) type Interface interface { @@ -82,20 +86,20 @@ type Config struct { type gatewayType struct { Config - airGapped bool - cableHealthChecker healthchecker.Interface - cableEngineSyncer *syncer.GatewaySyncer - cableEngine cableengine.Engine - publicIPWatcher *endpoint.PublicIPWatcher - datastoreSyncer *datastoresyncer.DatastoreSyncer - natDiscovery natdiscovery.Interface - gatewayPod *pod.GatewayPod - hostName string - localEndpoint *types.SubmarinerEndpoint - stopCh <-chan struct{} - fatalError chan error - waitGroup sync.WaitGroup - recorder record.EventRecorder + airGapped bool + cableHealthChecker healthchecker.Interface + cableEngineSyncer *syncer.GatewaySyncer + cableEngine cableengine.Engine + publicIPWatcher *endpoint.PublicIPWatcher + datastoreSyncer *datastoresyncer.DatastoreSyncer + natDiscovery natdiscovery.Interface + gatewayPod *pod.GatewayPod + hostName string + localEndpoint *types.SubmarinerEndpoint + stopCh <-chan struct{} + fatalError chan error + leaderComponentsStarted *sync.WaitGroup + recorder record.EventRecorder } var logger = log.Logger{Logger: logf.Log.WithName("Gateway")} @@ -195,7 +199,11 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error { return errors.Wrap(err, "error creating a handler to update the gateway pod") } - g.runAsync(g.cableEngineSyncer.Run) + var waitGroup sync.WaitGroup + + g.runAsync(&waitGroup, func() { + g.cableEngineSyncer.Run(g.stopCh) + }) if !g.airGapped { g.initPublicIPWatcher() @@ -211,31 +219,35 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error { case fatalErr := <-g.fatalError: g.cableEngineSyncer.SetGatewayStatusError(fatalErr) - if err := g.gatewayPod.SetHALabels(subv1.HAStatusPassive); err != nil { + if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil { logger.Warningf("Error updating pod label: %s", err) } return fatalErr } - g.waitGroup.Wait() + waitGroup.Wait() logger.Info("Gateway engine stopped") return nil } -func (g *gatewayType) runAsync(run func(<-chan struct{})) { - g.waitGroup.Add(1) +func (g *gatewayType) runAsync(waitGroup *sync.WaitGroup, run func()) { + waitGroup.Add(1) go func() { - defer g.waitGroup.Done() - run(g.stopCh) + defer waitGroup.Done() + run() }() } func (g *gatewayType) startLeaderElection() error { - rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, "submariner-gateway-lock", + logger.Info("Starting leader election") + + g.leaderComponentsStarted = &sync.WaitGroup{} + + rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, LeaderElectionLockName, g.LeaderElectionClient.CoreV1(), g.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: g.hostName + "-submariner-gateway", EventRecorder: g.recorder, @@ -258,50 +270,78 @@ func (g *gatewayType) startLeaderElection() error { return nil } -func (g *gatewayType) onStartedLeading(_ context.Context) { +func (g *gatewayType) onStartedLeading(ctx context.Context) { + logger.Info("Leadership acquired - starting controllers") + if err := g.cableEngine.StartEngine(); err != nil { g.fatalError <- errors.Wrap(err, "error starting the cable engine") return } - go func() { - if err := g.gatewayPod.SetHALabels(subv1.HAStatusActive); err != nil { - g.fatalError <- errors.Wrap(err, "error updating pod label") + g.runAsync(g.leaderComponentsStarted, func() { + msgLogged := atomic.Bool{} + + _ = wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { + if err := g.gatewayPod.SetHALabels(ctx, subv1.HAStatusActive); err != nil { + if msgLogged.CompareAndSwap(false, true) { + logger.Warningf("Error updating pod label to active: %s", err) + } + + return false, nil + } + + return true, nil + }) + }) + + g.runAsync(g.leaderComponentsStarted, func() { + if err := g.datastoreSyncer.Start(ctx.Done()); err != nil { + g.fatalError <- errors.Wrap(err, "error running the datastore syncer") } - }() + }) - go func() { + g.runAsync(g.leaderComponentsStarted, func() { watcherConfig := g.WatcherConfig - if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, g.stopCh); err != nil { + if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, ctx.Done()); err != nil { g.fatalError <- errors.Wrap(err, "error running the tunnel controller") } - }() - - go func() { - if err := g.datastoreSyncer.Start(g.stopCh); err != nil { - g.fatalError <- errors.Wrap(err, "error running the datastore syncer") - } - }() + }) if g.cableHealthChecker != nil { - go func() { - if err := g.cableHealthChecker.Start(g.stopCh); err != nil { + g.runAsync(g.leaderComponentsStarted, func() { + if err := g.cableHealthChecker.Start(ctx.Done()); err != nil { logger.Errorf(err, "Error starting healthChecker") } - }() + }) } if g.publicIPWatcher != nil { - go func() { - g.publicIPWatcher.Run(g.stopCh) - }() + go g.publicIPWatcher.Run(ctx.Done()) } } func (g *gatewayType) onStoppedLeading() { - g.cableEngineSyncer.CleanupGatewayEntry() + logger.Info("Leadership lost") + + // Make sure all the components were at least started before we try to restart. + g.leaderComponentsStarted.Wait() - g.fatalError <- errors.New("leader election lost, shutting down") + if g.cableHealthChecker != nil { + g.cableHealthChecker.Stop() + } + + g.cableEngine.Stop() + + logger.Info("Controllers stopped") + + if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil { + logger.Warningf("Error updating pod label to passive: %s", err) + } + + err := g.startLeaderElection() + if err != nil { + g.fatalError <- errors.Wrap(err, "error restarting leader election") + } } func (g *gatewayType) initPublicIPWatcher() { @@ -356,6 +396,12 @@ func (g *gatewayType) uninstall() error { } func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.SubmarinerCluster { + // The Cluster resource requires a value for the GlobalCIDR. + globalCIDR := submSpec.GlobalCidr + if globalCIDR == nil { + globalCIDR = []string{} + } + return &types.SubmarinerCluster{ ID: submSpec.ClusterID, Spec: subv1.ClusterSpec{ @@ -363,7 +409,7 @@ func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.Subma ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr), ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr), - GlobalCIDR: submSpec.GlobalCidr, + GlobalCIDR: globalCIDR, }, } } diff --git a/pkg/gateway/gateway_suite_test.go b/pkg/gateway/gateway_suite_test.go index 56ee6e549..88fa90052 100644 --- a/pkg/gateway/gateway_suite_test.go +++ b/pkg/gateway/gateway_suite_test.go @@ -20,11 +20,16 @@ package gateway_test import ( "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/submariner-io/admiral/pkg/log/kzerolog" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/cable" + "github.com/submariner-io/submariner/pkg/cable/fake" + "github.com/submariner-io/submariner/pkg/cableengine/syncer" + "github.com/submariner-io/submariner/pkg/types" "k8s.io/client-go/kubernetes/scheme" ) @@ -32,9 +37,17 @@ func init() { kzerolog.AddFlags(nil) } +var fakeDriver *fake.Driver + var _ = BeforeSuite(func() { kzerolog.InitK8sLogging() Expect(submarinerv1.AddToScheme(scheme.Scheme)).To(Succeed()) + + cable.AddDriver(fake.DriverName, func(_ *types.SubmarinerEndpoint, _ *types.SubmarinerCluster) (cable.Driver, error) { + return fakeDriver, nil + }) + + syncer.GatewayUpdateInterval = 50 * time.Millisecond }) func TestGateway(t *testing.T) { diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index e5d2ee607..10fd91a11 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -22,17 +22,22 @@ import ( "context" "fmt" "os" + "reflect" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" "github.com/submariner-io/admiral/pkg/fake" + "github.com/submariner-io/admiral/pkg/federate" . "github.com/submariner-io/admiral/pkg/gomega" + "github.com/submariner-io/admiral/pkg/resource" "github.com/submariner-io/admiral/pkg/syncer/broker" "github.com/submariner-io/admiral/pkg/syncer/test" + testutil "github.com/submariner-io/admiral/pkg/test" "github.com/submariner-io/admiral/pkg/watcher" submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + fakecable "github.com/submariner-io/submariner/pkg/cable/fake" "github.com/submariner-io/submariner/pkg/cableengine" enginefake "github.com/submariner-io/submariner/pkg/cableengine/fake" submfake "github.com/submariner-io/submariner/pkg/client/clientset/versioned/fake" @@ -41,19 +46,35 @@ import ( "github.com/submariner-io/submariner/pkg/types" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/dynamic" dynamicfake "k8s.io/client-go/dynamic/fake" k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" ) +const publicIP = "1.2.3.4" + var _ = Describe("Run", func() { t := newTestDriver() It("should start the controllers", func() { - t.awaitEndpoint() + t.leaderElection.AwaitLeaseAcquired() + t.awaitLocalEndpoint() t.awaitHAStatus(submarinerv1.HAStatusActive) - t.awaitGateway() + + t.cableEngine.Lock() + t.cableEngine.HAStatus = submarinerv1.HAStatusActive + t.cableEngine.Unlock() + + t.awaitGateway(func(gw *submarinerv1.Gateway) bool { + return gw.Status.HAStatus == submarinerv1.HAStatusActive + }) + + endpoint := t.awaitRemoteEndpointSyncedLocal(t.createRemoteEndpointOnBroker()) + t.cableEngine.VerifyInstallCable(&endpoint.Spec) }) When("starting the Cable Engine fails", func() { @@ -66,24 +87,88 @@ var _ = Describe("Run", func() { }) }) - When("renewal of the leader lock fails", func() { - var leasesReactor *fake.FailOnActionReactor - + When("renewal of the leader lease fails", func() { BeforeEach(func() { + fakeDriver = fakecable.New() + fakeDriver.Connections = []submarinerv1.Connection{ + { + Status: submarinerv1.Connected, + Endpoint: submarinerv1.EndpointSpec{ + CableName: "submariner-cable-north-5-5-5-5", + }, + UsingIP: "5.6.7.8", + UsingNAT: true, + }, + } + + t.config.NewCableEngine = cableengine.NewEngine t.config.RenewDeadline = time.Millisecond * 200 t.config.RetryPeriod = time.Millisecond * 20 - - t.expectedRunErr = errors.New("leader election lost") - leasesReactor = fake.FailOnAction(&t.kubeClient.Fake, "leases", "update", nil, false) - leasesReactor.Fail(false) }) - It("should return an error", func() { + It("should re-acquire the leader lease after the failure is cleared", func() { + t.leaderElection.AwaitLeaseAcquired() t.awaitHAStatus(submarinerv1.HAStatusActive) + t.awaitGateway(func(gw *submarinerv1.Gateway) bool { + return gw.Status.HAStatus == submarinerv1.HAStatusActive && reflect.DeepEqual(gw.Status.Connections, fakeDriver.Connections) + }) + + endpoint := t.awaitRemoteEndpointSyncedLocal(t.createRemoteEndpointOnBroker()) + fakeDriver.AwaitConnectToEndpoint(&natdiscovery.NATEndpointInfo{ + Endpoint: *endpoint, + }) + By("Setting leases resource updates to fail") - leasesReactor.Fail(true) + t.leaderElection.FailLease(t.config.RenewDeadline) + + By("Ensuring controllers are stopped") + + t.awaitHAStatus(submarinerv1.HAStatusPassive) + + // The Gateway status should reflect that the CableEngine is stopped. + t.awaitGateway(func(gw *submarinerv1.Gateway) bool { + return gw.Status.HAStatus == submarinerv1.HAStatusPassive && len(gw.Status.Connections) == 0 + }) + + // Ensure the datastore syncer is stopped. + brokerEndpoint := t.createRemoteEndpointOnBroker() + t.ensureNoRemoteEndpointSyncedLocal(brokerEndpoint) + + // Ensure the tunnel controller is stopped. + Expect(t.endpoints.Namespace(t.config.Spec.Namespace).Delete(context.Background(), endpoint.Name, metav1.DeleteOptions{})). + To(Succeed()) + fakeDriver.AwaitNoDisconnectFromEndpoint() + + // Delete the endpoint from the broker and recreate locally to simulate a stale remote endpoint. + Expect(t.endpoints.Namespace(t.config.SyncerConfig.BrokerNamespace).Delete(context.Background(), endpoint.Name, + metav1.DeleteOptions{})).To(Succeed()) + t.createEndpoint(t.config.Spec.Namespace, endpoint) + + By("Setting leases resource updates to succeed") + + t.leaderElection.SucceedLease() + + By("Ensuring lease was renewed") + + t.leaderElection.AwaitLeaseRenewed() + + By("Ensuring controllers are restarted") + + t.awaitHAStatus(submarinerv1.HAStatusActive) + + // The Gateway status should reflect that the CableEngine is re-started. + t.awaitGateway(func(gw *submarinerv1.Gateway) bool { + return gw.Status.HAStatus == submarinerv1.HAStatusActive && reflect.DeepEqual(gw.Status.Connections, fakeDriver.Connections) + }) + + endpoint2 := t.awaitRemoteEndpointSyncedLocal(brokerEndpoint) + fakeDriver.AwaitConnectToEndpoint(&natdiscovery.NATEndpointInfo{ + Endpoint: *endpoint2, + }) + + fakeDriver.AwaitDisconnectFromEndpoint(&endpoint.Spec) }) }) @@ -110,7 +195,7 @@ var _ = Describe("Run", func() { }) It("should perform cleanup", func() { - t.awaitNoEndpoint() + t.awaitNoEndpoints() t.cableEngine.AwaitCleanup() }) @@ -129,14 +214,16 @@ var _ = Describe("Run", func() { }) type testDriver struct { - config gateway.Config - localPodName string - nodeName string - endpoints dynamic.NamespaceableResourceInterface - expectedRunErr error - cableEngine *enginefake.Engine - kubeClient *k8sfake.Clientset - dynClient *dynamicfake.FakeDynamicClient + config gateway.Config + localPodName string + nodeName string + endpoints dynamic.NamespaceableResourceInterface + expectedRunErr error + cableEngine *enginefake.Engine + kubeClient *k8sfake.Clientset + dynClient *dynamicfake.FakeDynamicClient + leaderElection *testutil.LeaderElectionSupport + remoteIPCounter int } func newTestDriver() *testDriver { @@ -144,6 +231,7 @@ func newTestDriver() *testDriver { BeforeEach(func() { t.expectedRunErr = nil + t.remoteIPCounter = 1 restMapper := test.GetRESTMapperFor(&submarinerv1.Endpoint{}, &submarinerv1.Cluster{}, &submarinerv1.Gateway{}, &corev1.Node{}) @@ -159,8 +247,9 @@ func newTestDriver() *testDriver { ServiceCidr: []string{"169.254.2.0/24"}, ClusterID: "east", Namespace: "submariner", - PublicIP: "ipv4:1.2.3.4", + PublicIP: "ipv4:" + publicIP, HealthCheckEnabled: true, + CableDriver: fakecable.DriverName, }, SyncerConfig: broker.SyncerConfig{ LocalClient: t.dynClient, @@ -184,6 +273,8 @@ func newTestDriver() *testDriver { }, } + t.leaderElection = testutil.NewLeaderElectionSupport(t.kubeClient, t.config.Spec.Namespace, gateway.LeaderElectionLockName) + t.endpoints = t.config.SyncerConfig.LocalClient.Resource(*test.GetGroupVersionResourceFor(restMapper, &submarinerv1.Endpoint{})) t.localPodName = "local-pod" @@ -251,16 +342,35 @@ func newTestDriver() *testDriver { return t } -func (t *testDriver) awaitEndpoint() { - Eventually(func() int { +func toEndpoint(from *unstructured.Unstructured) *submarinerv1.Endpoint { + endpoint := &submarinerv1.Endpoint{} + Expect(scheme.Scheme.Convert(from, endpoint, nil)).To(Succeed()) + + return endpoint +} + +func (t *testDriver) awaitLocalEndpoint() { + Eventually(func() bool { l, err := t.endpoints.Namespace(t.config.Spec.Namespace).List(context.Background(), metav1.ListOptions{}) Expect(err).To(Succeed()) - return len(l.Items) - }, 3).Should(Equal(1)) + for i := range l.Items { + endpoint := toEndpoint(&l.Items[i]) + + if endpoint.Spec.ClusterID == t.config.Spec.ClusterID { + Expect(endpoint.Spec.PublicIP).To(Equal(publicIP)) + Expect(endpoint.Spec.Backend).To(Equal(fakecable.DriverName)) + Expect(endpoint.Spec.Subnets).To(Equal(t.config.Spec.GlobalCidr)) + + return true + } + } + + return false + }, 3).Should(BeTrue()) } -func (t *testDriver) awaitNoEndpoint() { +func (t *testDriver) awaitNoEndpoints() { Eventually(func() int { l, err := t.endpoints.Namespace(t.config.Spec.Namespace).List(context.Background(), metav1.ListOptions{}) Expect(err).To(Succeed()) @@ -269,6 +379,44 @@ func (t *testDriver) awaitNoEndpoint() { }, 3).Should(BeZero()) } +func (t *testDriver) newRemoteEndpoint() *submarinerv1.Endpoint { + ep := &submarinerv1.Endpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(uuid.NewUUID()), + Labels: map[string]string{federate.ClusterIDLabelKey: "west"}, + }, + Spec: submarinerv1.EndpointSpec{ + ClusterID: "west", + CableName: fmt.Sprintf("submariner-cable-west-192-168-40-%d", t.remoteIPCounter), + Hostname: "redsox", + Subnets: []string{"169.254.3.0/24"}, + PrivateIP: "11.1.2.3", + PublicIP: "ipv4:12.1.2.3", + Backend: "libreswan", + }, + } + + t.remoteIPCounter++ + + return ep +} + +func (t *testDriver) createEndpoint(ns string, endpoint *submarinerv1.Endpoint) *submarinerv1.Endpoint { + return toEndpoint(test.CreateResource(t.endpoints.Namespace(ns), endpoint)) +} + +func (t *testDriver) createRemoteEndpointOnBroker() *submarinerv1.Endpoint { + return t.createEndpoint(t.config.SyncerConfig.BrokerNamespace, t.newRemoteEndpoint()) +} + +func (t *testDriver) awaitRemoteEndpointSyncedLocal(endpoint *submarinerv1.Endpoint) *submarinerv1.Endpoint { + return toEndpoint(test.AwaitResource(t.endpoints.Namespace(t.config.Spec.Namespace), endpoint.Name)) +} + +func (t *testDriver) ensureNoRemoteEndpointSyncedLocal(endpoint *submarinerv1.Endpoint) { + testutil.EnsureNoResource[runtime.Object](resource.ForDynamic(t.endpoints.Namespace(t.config.Spec.Namespace)), endpoint.Name) +} + func (t *testDriver) awaitHAStatus(status submarinerv1.HAStatus) { Eventually(func() string { pod, err := t.config.KubeClient.CoreV1().Pods(t.config.Spec.Namespace).Get(context.Background(), t.localPodName, metav1.GetOptions{}) @@ -278,13 +426,17 @@ func (t *testDriver) awaitHAStatus(status submarinerv1.HAStatus) { }, 3).Should(Equal(string(status))) } -func (t *testDriver) awaitGateway() { - Eventually(func() int { +func (t *testDriver) awaitGateway(verify func(*submarinerv1.Gateway) bool) { + Eventually(func() []submarinerv1.Gateway { l, err := t.config.SubmarinerClient.SubmarinerV1().Gateways(t.config.Spec.Namespace).List(context.Background(), metav1.ListOptions{}) Expect(err).To(Succeed()) - return len(l.Items) - }, 3).Should(Equal(1)) + if len(l.Items) == 1 && (verify == nil || verify(&l.Items[0])) { + return nil + } + + return l.Items + }, 3).Should(BeEmpty()) } func (t *testDriver) awaitNoGateway() { @@ -306,18 +458,24 @@ func (t *testDriver) awaitGatewayStatusError(s string) { }, 3).Should(ContainSubstring(s)) } -type fakeNATDiscovery struct{} +type fakeNATDiscovery struct { + readyChannel chan *natdiscovery.NATEndpointInfo +} func (n *fakeNATDiscovery) Run(_ <-chan struct{}) error { return nil } -func (n *fakeNATDiscovery) AddEndpoint(_ *submarinerv1.Endpoint) { +func (n *fakeNATDiscovery) AddEndpoint(ep *submarinerv1.Endpoint) { + n.readyChannel <- &natdiscovery.NATEndpointInfo{ + Endpoint: *ep, + } } func (n *fakeNATDiscovery) RemoveEndpoint(_ string) { } func (n *fakeNATDiscovery) GetReadyChannel() chan *natdiscovery.NATEndpointInfo { - return make(chan *natdiscovery.NATEndpointInfo, 100) + n.readyChannel = make(chan *natdiscovery.NATEndpointInfo, 100) + return n.readyChannel } diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 2eb50b69e..08b6d6dce 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -65,7 +65,7 @@ func NewGatewayPod(k8sClient kubernetes.Interface) (*GatewayPod, error) { return nil, errors.New("POD_NAME environment variable missing") } - if err := gp.SetHALabels(submV1.HAStatusPassive); err != nil { + if err := gp.SetHALabels(context.Background(), submV1.HAStatusPassive); err != nil { logger.Warningf("Error updating pod label: %s", err) } @@ -74,11 +74,11 @@ func NewGatewayPod(k8sClient kubernetes.Interface) (*GatewayPod, error) { const patchFormat = `{"metadata": {"labels": {"gateway.submariner.io/node": "%s", "gateway.submariner.io/status": "%s"}}}` -func (gp *GatewayPod) SetHALabels(status submV1.HAStatus) error { +func (gp *GatewayPod) SetHALabels(ctx context.Context, status submV1.HAStatus) error { podsInterface := gp.clientset.CoreV1().Pods(gp.namespace) patch := fmt.Sprintf(patchFormat, gp.node, status) - _, err := podsInterface.Patch(context.TODO(), gp.name, types.MergePatchType, []byte(patch), v1.PatchOptions{}) + _, err := podsInterface.Patch(ctx, gp.name, types.MergePatchType, []byte(patch), v1.PatchOptions{}) if err != nil { return errors.Wrapf(err, "Error patching own pod %q in namespace %q with %s", gp.name, gp.namespace, patch) }