Skip to content

Commit

Permalink
Restart leader election on lease renewal failure in gateway
Browse files Browse the repository at this point in the history
...rather than exiting.

Related to #2747

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Oct 30, 2023
1 parent c02d4e5 commit 825ed8c
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 81 deletions.
136 changes: 91 additions & 45 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/submariner-io/submariner/pkg/types"
"github.com/submariner-io/submariner/pkg/versions"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
Expand All @@ -56,6 +58,8 @@ const (
defaultLeaseDuration = 10 * time.Second
defaultRenewDeadline = 5 * time.Second
defaultRetryPeriod = 2 * time.Second

LeaderElectionLockName = "submariner-gateway-lock"
)

type Interface interface {
Expand All @@ -82,20 +86,20 @@ type Config struct {

type gatewayType struct {
Config
airGapped bool
cableHealthChecker healthchecker.Interface
cableEngineSyncer *syncer.GatewaySyncer
cableEngine cableengine.Engine
publicIPWatcher *endpoint.PublicIPWatcher
datastoreSyncer *datastoresyncer.DatastoreSyncer
natDiscovery natdiscovery.Interface
gatewayPod *pod.GatewayPod
hostName string
localEndpoint *types.SubmarinerEndpoint
stopCh <-chan struct{}
fatalError chan error
waitGroup sync.WaitGroup
recorder record.EventRecorder
airGapped bool
cableHealthChecker healthchecker.Interface
cableEngineSyncer *syncer.GatewaySyncer
cableEngine cableengine.Engine
publicIPWatcher *endpoint.PublicIPWatcher
datastoreSyncer *datastoresyncer.DatastoreSyncer
natDiscovery natdiscovery.Interface
gatewayPod *pod.GatewayPod
hostName string
localEndpoint *types.SubmarinerEndpoint
stopCh <-chan struct{}
fatalError chan error
leaderComponentsStarted *sync.WaitGroup
recorder record.EventRecorder
}

var logger = log.Logger{Logger: logf.Log.WithName("Gateway")}
Expand Down Expand Up @@ -195,7 +199,11 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error {
return errors.Wrap(err, "error creating a handler to update the gateway pod")
}

g.runAsync(g.cableEngineSyncer.Run)
var waitGroup sync.WaitGroup

g.runAsync(&waitGroup, func() {
g.cableEngineSyncer.Run(g.stopCh)
})

if !g.airGapped {
g.initPublicIPWatcher()
Expand All @@ -211,31 +219,35 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error {
case fatalErr := <-g.fatalError:
g.cableEngineSyncer.SetGatewayStatusError(fatalErr)

if err := g.gatewayPod.SetHALabels(subv1.HAStatusPassive); err != nil {
if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil {
logger.Warningf("Error updating pod label: %s", err)
}

return fatalErr
}

g.waitGroup.Wait()
waitGroup.Wait()

logger.Info("Gateway engine stopped")

return nil
}

func (g *gatewayType) runAsync(run func(<-chan struct{})) {
g.waitGroup.Add(1)
func (g *gatewayType) runAsync(waitGroup *sync.WaitGroup, run func()) {
waitGroup.Add(1)

go func() {
defer g.waitGroup.Done()
run(g.stopCh)
defer waitGroup.Done()
run()
}()
}

func (g *gatewayType) startLeaderElection() error {
rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, "submariner-gateway-lock",
logger.Info("Starting leader election")

g.leaderComponentsStarted = &sync.WaitGroup{}

rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, LeaderElectionLockName,
g.LeaderElectionClient.CoreV1(), g.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{
Identity: g.hostName + "-submariner-gateway",
EventRecorder: g.recorder,
Expand All @@ -258,50 +270,78 @@ func (g *gatewayType) startLeaderElection() error {
return nil
}

func (g *gatewayType) onStartedLeading(_ context.Context) {
func (g *gatewayType) onStartedLeading(ctx context.Context) {
logger.Info("Leadership acquired - starting controllers")

if err := g.cableEngine.StartEngine(); err != nil {
g.fatalError <- errors.Wrap(err, "error starting the cable engine")
return
}

go func() {
if err := g.gatewayPod.SetHALabels(subv1.HAStatusActive); err != nil {
g.fatalError <- errors.Wrap(err, "error updating pod label")
g.runAsync(g.leaderComponentsStarted, func() {
msgLogged := atomic.Bool{}

_ = wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
if err := g.gatewayPod.SetHALabels(ctx, subv1.HAStatusActive); err != nil {
if msgLogged.CompareAndSwap(false, true) {
logger.Warningf("Error updating pod label to active: %s", err)
}

return false, nil
}

return true, nil
})
})

g.runAsync(g.leaderComponentsStarted, func() {
if err := g.datastoreSyncer.Start(ctx.Done()); err != nil {
g.fatalError <- errors.Wrap(err, "error running the datastore syncer")
}
}()
})

go func() {
g.runAsync(g.leaderComponentsStarted, func() {
watcherConfig := g.WatcherConfig
if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, g.stopCh); err != nil {
if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, ctx.Done()); err != nil {
g.fatalError <- errors.Wrap(err, "error running the tunnel controller")
}
}()

go func() {
if err := g.datastoreSyncer.Start(g.stopCh); err != nil {
g.fatalError <- errors.Wrap(err, "error running the datastore syncer")
}
}()
})

if g.cableHealthChecker != nil {
go func() {
if err := g.cableHealthChecker.Start(g.stopCh); err != nil {
g.runAsync(g.leaderComponentsStarted, func() {
if err := g.cableHealthChecker.Start(ctx.Done()); err != nil {
logger.Errorf(err, "Error starting healthChecker")
}
}()
})
}

if g.publicIPWatcher != nil {
go func() {
g.publicIPWatcher.Run(g.stopCh)
}()
go g.publicIPWatcher.Run(ctx.Done())
}
}

func (g *gatewayType) onStoppedLeading() {
g.cableEngineSyncer.CleanupGatewayEntry()
logger.Info("Leadership lost")

// Make sure all the components were at least started before we try to restart.
g.leaderComponentsStarted.Wait()

g.fatalError <- errors.New("leader election lost, shutting down")
if g.cableHealthChecker != nil {
g.cableHealthChecker.Stop()
}

g.cableEngine.Stop()

logger.Info("Controllers stopped")

if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil {
logger.Warningf("Error updating pod label to passive: %s", err)
}

err := g.startLeaderElection()
if err != nil {
g.fatalError <- errors.Wrap(err, "error restarting leader election")
}
}

func (g *gatewayType) initPublicIPWatcher() {
Expand Down Expand Up @@ -356,14 +396,20 @@ func (g *gatewayType) uninstall() error {
}

func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.SubmarinerCluster {
// The Cluster resource requires a value for the GlobalCIDR.
globalCIDR := submSpec.GlobalCidr
if globalCIDR == nil {
globalCIDR = []string{}
}

return &types.SubmarinerCluster{
ID: submSpec.ClusterID,
Spec: subv1.ClusterSpec{
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr),
ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr),
GlobalCIDR: submSpec.GlobalCidr,
GlobalCIDR: globalCIDR,
},
}
}
13 changes: 13 additions & 0 deletions pkg/gateway/gateway_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@ package gateway_test

import (
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/log/kzerolog"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
"github.com/submariner-io/submariner/pkg/cable/fake"
"github.com/submariner-io/submariner/pkg/cableengine/syncer"
"github.com/submariner-io/submariner/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
)

func init() {
kzerolog.AddFlags(nil)
}

var fakeDriver *fake.Driver

var _ = BeforeSuite(func() {
kzerolog.InitK8sLogging()
Expect(submarinerv1.AddToScheme(scheme.Scheme)).To(Succeed())

cable.AddDriver(fake.DriverName, func(_ *types.SubmarinerEndpoint, _ *types.SubmarinerCluster) (cable.Driver, error) {
return fakeDriver, nil
})

syncer.GatewayUpdateInterval = 50 * time.Millisecond
})

func TestGateway(t *testing.T) {
Expand Down
Loading

0 comments on commit 825ed8c

Please sign in to comment.