Skip to content

Commit

Permalink
Restart leader election on lease renewal failure in gateway
Browse files Browse the repository at this point in the history
...rather than exiting.

Related to submariner-io#2747

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Oct 25, 2023
1 parent bf2dd9e commit 6304991
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 81 deletions.
136 changes: 91 additions & 45 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/submariner-io/submariner/pkg/types"
"github.com/submariner-io/submariner/pkg/versions"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection"
Expand All @@ -56,6 +58,8 @@ const (
defaultLeaseDuration = 10 * time.Second
defaultRenewDeadline = 5 * time.Second
defaultRetryPeriod = 2 * time.Second

LeaderElectionLockName = "submariner-gateway-lock"
)

type Interface interface {
Expand All @@ -82,20 +86,20 @@ type Config struct {

type gatewayType struct {
Config
airGapped bool
cableHealthChecker healthchecker.Interface
cableEngineSyncer *syncer.GatewaySyncer
cableEngine cableengine.Engine
publicIPWatcher *endpoint.PublicIPWatcher
datastoreSyncer *datastoresyncer.DatastoreSyncer
natDiscovery natdiscovery.Interface
gatewayPod *pod.GatewayPod
hostName string
localEndpoint *types.SubmarinerEndpoint
stopCh <-chan struct{}
fatalError chan error
waitGroup sync.WaitGroup
recorder record.EventRecorder
airGapped bool
cableHealthChecker healthchecker.Interface
cableEngineSyncer *syncer.GatewaySyncer
cableEngine cableengine.Engine
publicIPWatcher *endpoint.PublicIPWatcher
datastoreSyncer *datastoresyncer.DatastoreSyncer
natDiscovery natdiscovery.Interface
gatewayPod *pod.GatewayPod
hostName string
localEndpoint *types.SubmarinerEndpoint
stopCh <-chan struct{}
fatalError chan error
leaderComponentsStarted *sync.WaitGroup
recorder record.EventRecorder
}

var logger = log.Logger{Logger: logf.Log.WithName("Gateway")}
Expand Down Expand Up @@ -195,7 +199,11 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error {
return errors.Wrap(err, "error creating a handler to update the gateway pod")
}

g.runAsync(g.cableEngineSyncer.Run)
var waitGroup sync.WaitGroup

g.runAsync(&waitGroup, func() {
g.cableEngineSyncer.Run(g.stopCh)
})

if !g.airGapped {
g.initPublicIPWatcher()
Expand All @@ -211,31 +219,35 @@ func (g *gatewayType) Run(stopCh <-chan struct{}) error {
case fatalErr := <-g.fatalError:
g.cableEngineSyncer.SetGatewayStatusError(fatalErr)

if err := g.gatewayPod.SetHALabels(subv1.HAStatusPassive); err != nil {
if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil {
logger.Warningf("Error updating pod label: %s", err)
}

return fatalErr
}

g.waitGroup.Wait()
waitGroup.Wait()

logger.Info("Gateway engine stopped")

return nil
}

func (g *gatewayType) runAsync(run func(<-chan struct{})) {
g.waitGroup.Add(1)
func (g *gatewayType) runAsync(waitGroup *sync.WaitGroup, run func()) {
waitGroup.Add(1)

go func() {
defer g.waitGroup.Done()
run(g.stopCh)
defer waitGroup.Done()
run()
}()
}

func (g *gatewayType) startLeaderElection() error {
rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, "submariner-gateway-lock",
logger.Info("Starting leader election")

g.leaderComponentsStarted = &sync.WaitGroup{}

rl, err := resourcelock.New(resourcelock.LeasesResourceLock, g.Spec.Namespace, LeaderElectionLockName,
g.LeaderElectionClient.CoreV1(), g.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{
Identity: g.hostName + "-submariner-gateway",
EventRecorder: g.recorder,
Expand All @@ -258,50 +270,78 @@ func (g *gatewayType) startLeaderElection() error {
return nil
}

func (g *gatewayType) onStartedLeading(_ context.Context) {
func (g *gatewayType) onStartedLeading(ctx context.Context) {
logger.Info("Leadership acquired - starting controllers")

if err := g.cableEngine.StartEngine(); err != nil {
g.fatalError <- errors.Wrap(err, "error starting the cable engine")
return
}

go func() {
if err := g.gatewayPod.SetHALabels(subv1.HAStatusActive); err != nil {
g.fatalError <- errors.Wrap(err, "error updating pod label")
g.runAsync(g.leaderComponentsStarted, func() {
msgLogged := atomic.Bool{}

_ = wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
if err := g.gatewayPod.SetHALabels(ctx, subv1.HAStatusActive); err != nil {
if msgLogged.CompareAndSwap(false, true) {
logger.Warningf("Error updating pod label to active: %s", err)
}

return false, nil
}

return true, nil
})
})

g.runAsync(g.leaderComponentsStarted, func() {
if err := g.datastoreSyncer.Start(ctx.Done()); err != nil {
g.fatalError <- errors.Wrap(err, "error running the datastore syncer")
}
}()
})

go func() {
g.runAsync(g.leaderComponentsStarted, func() {
watcherConfig := g.WatcherConfig
if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, g.stopCh); err != nil {
if err := tunnel.StartController(g.cableEngine, g.Spec.Namespace, &watcherConfig, ctx.Done()); err != nil {
g.fatalError <- errors.Wrap(err, "error running the tunnel controller")
}
}()

go func() {
if err := g.datastoreSyncer.Start(g.stopCh); err != nil {
g.fatalError <- errors.Wrap(err, "error running the datastore syncer")
}
}()
})

if g.cableHealthChecker != nil {
go func() {
if err := g.cableHealthChecker.Start(g.stopCh); err != nil {
g.runAsync(g.leaderComponentsStarted, func() {
if err := g.cableHealthChecker.Start(ctx.Done()); err != nil {
logger.Errorf(err, "Error starting healthChecker")
}
}()
})
}

if g.publicIPWatcher != nil {
go func() {
g.publicIPWatcher.Run(g.stopCh)
}()
go g.publicIPWatcher.Run(ctx.Done())
}
}

func (g *gatewayType) onStoppedLeading() {
g.cableEngineSyncer.CleanupGatewayEntry()
logger.Info("Leadership lost")

// Make sure all the components were at least started before we try to restart.
g.leaderComponentsStarted.Wait()

g.fatalError <- errors.New("leader election lost, shutting down")
if g.cableHealthChecker != nil {
g.cableHealthChecker.Stop()
}

g.cableEngine.Stop()

logger.Info("Controllers stopped")

if err := g.gatewayPod.SetHALabels(context.Background(), subv1.HAStatusPassive); err != nil {
logger.Warningf("Error updating pod label to passive: %s", err)
}

err := g.startLeaderElection()
if err != nil {
g.fatalError <- errors.Wrap(err, "error restarting leader election")
}
}

func (g *gatewayType) initPublicIPWatcher() {
Expand Down Expand Up @@ -356,14 +396,20 @@ func (g *gatewayType) uninstall() error {
}

func submarinerClusterFrom(submSpec *types.SubmarinerSpecification) *types.SubmarinerCluster {
// The Cluster resource requires a value for the GlobalCIDR.
globalCIDR := submSpec.GlobalCidr
if globalCIDR == nil {
globalCIDR = []string{}
}

return &types.SubmarinerCluster{
ID: submSpec.ClusterID,
Spec: subv1.ClusterSpec{
ClusterID: submSpec.ClusterID,
ColorCodes: []string{"blue"}, // This is a fake value, used only for upgrade purposes
ServiceCIDR: cidr.ExtractIPv4Subnets(submSpec.ServiceCidr),
ClusterCIDR: cidr.ExtractIPv4Subnets(submSpec.ClusterCidr),
GlobalCIDR: submSpec.GlobalCidr,
GlobalCIDR: globalCIDR,
},
}
}
13 changes: 13 additions & 0 deletions pkg/gateway/gateway_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@ package gateway_test

import (
"testing"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/log/kzerolog"
submarinerv1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
"github.com/submariner-io/submariner/pkg/cable/fake"
"github.com/submariner-io/submariner/pkg/cableengine/syncer"
"github.com/submariner-io/submariner/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
)

func init() {
kzerolog.AddFlags(nil)
}

var fakeDriver *fake.Driver

var _ = BeforeSuite(func() {
kzerolog.InitK8sLogging()
Expect(submarinerv1.AddToScheme(scheme.Scheme)).To(Succeed())

cable.AddDriver(fake.DriverName, func(_ *types.SubmarinerEndpoint, _ *types.SubmarinerCluster) (cable.Driver, error) {
return fakeDriver, nil
})

syncer.GatewayUpdateInterval = 50 * time.Millisecond
})

func TestGateway(t *testing.T) {
Expand Down
Loading

0 comments on commit 6304991

Please sign in to comment.