Skip to content

Commit

Permalink
Add method to health checker to stop the pingers
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Oct 25, 2023
1 parent 7b0561b commit 1536fac
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
50 changes: 36 additions & 14 deletions pkg/cableengine/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
type Interface interface {
Start(stopCh <-chan struct{}) error
GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo
Stop()
}

type Config struct {
Expand All @@ -59,25 +60,27 @@ type Config struct {
}

type controller struct {
sync.RWMutex
endpointWatcher watcher.Interface
pingers sync.Map
pingers map[string]PingerInterface
config *Config
}

var logger = log.Logger{Logger: logf.Log.WithName("HealthChecker")}

func New(config *Config) (Interface, error) {
controller := &controller{
config: config,
config: config,
pingers: map[string]PingerInterface{},
}

config.WatcherConfig.ResourceConfigs = []watcher.ResourceConfig{
{
Name: "HealthChecker Endpoint Controller",
ResourceType: &submarinerv1.Endpoint{},
Handler: watcher.EventHandlerFuncs{
OnCreateFunc: controller.endpointCreatedorUpdated,
OnUpdateFunc: controller.endpointCreatedorUpdated,
OnCreateFunc: controller.endpointCreatedOrUpdated,
OnUpdateFunc: controller.endpointCreatedOrUpdated,
OnDeleteFunc: controller.endpointDeleted,
},
SourceNamespace: config.EndpointNamespace,
Expand All @@ -95,8 +98,11 @@ func New(config *Config) (Interface, error) {
}

func (h *controller) GetLatencyInfo(endpoint *submarinerv1.EndpointSpec) *LatencyInfo {
if obj, found := h.pingers.Load(endpoint.CableName); found {
return obj.(PingerInterface).GetLatencyInfo()
h.RLock()
defer h.RUnlock()

if pinger, found := h.pingers[endpoint.CableName]; found {
return pinger.GetLatencyInfo()
}

return nil
Expand All @@ -113,7 +119,18 @@ func (h *controller) Start(stopCh <-chan struct{}) error {
return nil
}

func (h *controller) endpointCreatedorUpdated(obj runtime.Object, _ int) bool {
func (h *controller) Stop() {
h.Lock()
defer h.Unlock()

for _, p := range h.pingers {
p.Stop()
}

h.pingers = map[string]PingerInterface{}
}

func (h *controller) endpointCreatedOrUpdated(obj runtime.Object, _ int) bool {
logger.V(log.TRACE).Infof("Endpoint created: %#v", obj)

endpointCreated := obj.(*submarinerv1.Endpoint)
Expand All @@ -127,15 +144,17 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object, _ int) bool {
return false
}

if obj, found := h.pingers.Load(endpointCreated.Spec.CableName); found {
pinger := obj.(PingerInterface)
h.Lock()
defer h.Unlock()

if pinger, found := h.pingers[endpointCreated.Spec.CableName]; found {
if pinger.GetIP() == endpointCreated.Spec.HealthCheckIP {
return false
}

logger.V(log.DEBUG).Infof("HealthChecker is already running for %q - stopping", endpointCreated.Name)
pinger.Stop()
h.pingers.Delete(endpointCreated.Spec.CableName)
delete(h.pingers, endpointCreated.Spec.CableName)
}

pingerConfig := PingerConfig{
Expand All @@ -153,7 +172,7 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object, _ int) bool {
}

pinger := newPingerFunc(pingerConfig)
h.pingers.Store(endpointCreated.Spec.CableName, pinger)
h.pingers[endpointCreated.Spec.CableName] = pinger
pinger.Start()

logger.Infof("CableEngine HealthChecker started pinger for CableName: %q with HealthCheckIP %q",
Expand All @@ -164,10 +183,13 @@ func (h *controller) endpointCreatedorUpdated(obj runtime.Object, _ int) bool {

func (h *controller) endpointDeleted(obj runtime.Object, _ int) bool {
endpointDeleted := obj.(*submarinerv1.Endpoint)
if obj, found := h.pingers.Load(endpointDeleted.Spec.CableName); found {
pinger := obj.(PingerInterface)

h.Lock()
defer h.Unlock()

if pinger, found := h.pingers[endpointDeleted.Spec.CableName]; found {
pinger.Stop()
h.pingers.Delete(endpointDeleted.Spec.CableName)
delete(h.pingers, endpointDeleted.Spec.CableName)
}

return false
Expand Down
7 changes: 6 additions & 1 deletion pkg/cableengine/healthchecker/healthchecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ var _ = Describe("Controller", func() {
pingerMap[healthCheckIP2].SetLatencyInfo(latencyInfo2)
Eventually(func() *healthchecker.LatencyInfo { return healthChecker.GetLatencyInfo(&endpoint2.Spec) }).
Should(Equal(latencyInfo2))

healthChecker.Stop()

Expect(healthChecker.GetLatencyInfo(&endpoint1.Spec)).To(BeNil())
Expect(healthChecker.GetLatencyInfo(&endpoint2.Spec)).To(BeNil())
})
})

Expand Down Expand Up @@ -225,7 +230,7 @@ var _ = Describe("Controller", func() {
})
})

When("a remote Endpoint is has no HealthCheckIP", func() {
When("a remote Endpoint has no HealthCheckIP", func() {
It("should not start a Pinger", func() {
createEndpoint(remoteClusterID1, "")
pingerMap[healthCheckIP1].AwaitNoStart()
Expand Down

0 comments on commit 1536fac

Please sign in to comment.