Skip to content

Commit

Permalink
Merge pull request #288 from Kuadrant/update-prob-logic
Browse files Browse the repository at this point in the history
update probe logic not to share memory
  • Loading branch information
maleck13 authored Oct 31, 2024
2 parents 92b6587 + 4af3023 commit 7fea5b8
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 100 deletions.
12 changes: 6 additions & 6 deletions internal/controller/dnshealthcheckprobe_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,17 @@ func (r *DNSProbeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(dnsProbe, DNSHealthCheckFinalizer) {
controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer)
if err := r.Client.Update(ctx, dnsProbe); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
if controllerutil.AddFinalizer(dnsProbe, DNSHealthCheckFinalizer) {
if err := r.Client.Update(ctx, dnsProbe); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/dnsrecord_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.updateStatus(ctx, previous, dnsRecord, false, err)
}

if probesEnabled {
if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil {
return ctrl.Result{}, err
}
}
// just check probes here and don't publish
// Publish the record
hadChanges, err := r.publishRecord(ctx, dnsRecord, dnsProvider)
if err != nil {
Expand All @@ -232,12 +238,6 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.updateStatus(ctx, previous, dnsRecord, hadChanges, err)
}

if probesEnabled {
if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil {
return ctrl.Result{}, err
}
}

return r.updateStatus(ctx, previous, dnsRecord, hadChanges, nil)
}

Expand Down
151 changes: 81 additions & 70 deletions internal/probes/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,74 +39,44 @@ func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
}

type Probe struct {
probeConfig *v1alpha1.DNSHealthCheckProbe
Transport RoundTripperFunc
probeHeaders v1alpha1.AdditionalHeaders
}

func NewProbe(probeConfig *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) *Probe {
func NewProbe(headers v1alpha1.AdditionalHeaders) *Probe {
return &Probe{
probeConfig: probeConfig,
probeHeaders: headers,
}
}

// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker.
// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic)
func (w *Probe) Start(clientctx context.Context, k8sClient client.Client) context.CancelFunc {
ctx, cancel := context.WithCancel(clientctx)
logger := log.FromContext(ctx)
logger.Info("health: starting new worker for", "probe ", keyForProbe(w.probeConfig))
metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Inc()
go func() {
for range w.ExecuteProbe(ctx, w.probeConfig) {
// each time this is done it will send a signal
logger.V(1).Info("health: probe finished ", "updating status for probe", keyForProbe(w.probeConfig), "status", w.probeConfig.Status)
err := k8sClient.Status().Update(clientctx, w.probeConfig)
if err != nil {
logger.Error(err, "health: probe finished. error updating probe status", "probe", keyForProbe(w.probeConfig))
}
}
logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(w.probeConfig))
metrics.ProbeCounter.WithLabelValues(w.probeConfig.Name, w.probeConfig.Namespace, w.probeConfig.Spec.Hostname).Dec()
}()
return cancel
}

func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan struct{} {
sig := make(chan struct{})

// ExecuteProbe executes the health check request on a background routine at the correct interval. It returns a channel on which it will send each probe result
func (w *Probe) ExecuteProbe(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) <-chan ProbeResult {
sig := make(chan ProbeResult)
localProbe := probe.DeepCopy()
go func() {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(localProbe))
for {
if sig == nil || probe == nil {
if sig == nil || localProbe == nil {
logger.Error(fmt.Errorf("channel or probe nil "), "exiting probe")
return
}
timer := time.NewTimer(executeAt(probe))
timer := time.NewTimer(executeAt(localProbe))
select {
case <-ctx.Done():
logger.V(1).Info("health: context shutdown. Stopping", "probe", keyForProbe(probe))
logger.V(2).Info("health probe worker: context shutdown. Stopping")
if sig != nil {
timer.Stop()
close(sig)
sig = nil
logger.V(1).Info("health: context shutdown. time stopped and channel closed", "probe", keyForProbe(probe))
logger.V(2).Info("health probe worker: context shutdown. time stopped and channel closed")
}
return
case <-timer.C:
result := w.execute(ctx, probe)
probe.Status.ObservedGeneration = probe.Generation
if !result.Healthy {
probe.Status.ConsecutiveFailures++
} else {
probe.Status.ConsecutiveFailures = 0
}
probe.Status.Healthy = &result.Healthy
probe.Status.LastCheckedAt = result.CheckedAt
probe.Status.Reason = result.Reason
logger.V(1).Info("health: execution complete ", "probe", keyForProbe(probe), "result", result)
sig <- struct{}{}
logger.V(2).Info("health probe worker: executing")
result := w.execute(ctx, localProbe)
// as this routine is just executing the local config it only cares about when it should execute again
localProbe.Status.LastCheckedAt = result.CheckedAt
sig <- result
}
}
}()
Expand All @@ -125,42 +95,39 @@ func executeAt(probe *v1alpha1.DNSHealthCheckProbe) time.Duration {
}

func (w *Probe) execute(ctx context.Context, probe *v1alpha1.DNSHealthCheckProbe) ProbeResult {
logger := log.FromContext(ctx)
logger.V(1).Info("kinperforming health check")
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probe))
logger.V(2).Info("performing health check")
var ips []net.IP

//if the address is a CNAME, check all IP Addresses that it resolves to
logger.V(1).Info("looking up address ", "address", probe.Spec.Address)
logger.V(2).Info("looking up address ", "address", probe.Spec.Address)
ip := net.ParseIP(probe.Spec.Address)

if ip == nil {
IPAddr, err := net.LookupIP(probe.Spec.Address)
if err != nil {
logger.V(1).Error(err, "error looking up address", "address", probe.Spec.Address)
logger.Error(err, "error looking up address", "address", probe.Spec.Address)
return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: err.Error()}
}
ips = append(ips, IPAddr...)
} else {
ips = append(ips, ip)
}

var result ProbeResult
for _, ip = range ips {
result := w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders)
// if any IP in a CNAME is healthy, it is a healthy CNAME
result = w.performRequest(ctx, string(probe.Spec.Protocol), probe.Spec.Hostname, probe.Spec.Path, ip.String(), probe.Spec.Port, probe.Spec.AllowInsecureCertificate, w.probeHeaders)
// return as any healthy IP is a good result (multiple can only really happen with a CNAME)
if result.Healthy {
return result
}
}

// all IPs returned an unhealthy result
return ProbeResult{
CheckedAt: metav1.Now(),
Healthy: false,
}
//TODO deal with multiple results for a CNAME better and don't just rely on last result
// all IPs returned an unhealthy. Result will be the last ProbeResult so return this to have some status set
return result
}

func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip string, port int, allowInsecure bool, headers v1alpha1.AdditionalHeaders) ProbeResult {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", "preforming request")
probeClient := metrics.NewInstrumentedClient("probe", &http.Client{
Transport: TransportWithDNSResponse(map[string]string{host: ip}, allowInsecure),
})
Expand All @@ -182,7 +149,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
httpReq.Header.Add(h.Name, h.Value)
}

logger.V(1).Info("health: probe executing against ", "url", httpReq.URL)
logger.V(2).Info("health: probe executing against ", "url", httpReq.URL)

// Send the request
res, err := probeClient.Do(httpReq)
Expand All @@ -191,7 +158,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
} else if err != nil {
return ProbeResult{CheckedAt: metav1.Now(), Healthy: false, Reason: fmt.Sprintf("error: %s, response: %+v", err.Error(), res)}
}
logger.V(1).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode)
logger.V(2).Info("health: probe execution complete against ", "url", httpReq.URL, "status code", res.StatusCode)
if !slice.Contains[int](ExpectedResponses, func(i int) bool { return i == res.StatusCode }) {
return ProbeResult{
CheckedAt: metav1.Now(),
Expand All @@ -204,6 +171,7 @@ func (w *Probe) performRequest(ctx context.Context, protocol, host, path, ip str
return ProbeResult{
CheckedAt: metav1.Now(),
Healthy: true,
Status: res.StatusCode,
}
}

Expand Down Expand Up @@ -245,36 +213,79 @@ func NewProbeManager() *ProbeManager {

// StopProbeWorker stops the worker and removes it from the WorkerManager
func (m *ProbeManager) StopProbeWorker(ctx context.Context, probeCR *v1alpha1.DNSHealthCheckProbe) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR))
if stop, ok := m.probes[keyForProbe(probeCR)]; ok {
logger.V(1).Info("health: Stopping existing worker", "probe", keyForProbe(probeCR))
logger.V(2).Info("Stopping existing worker", "probe", keyForProbe(probeCR))
stop()
delete(m.probes, keyForProbe(probeCR))
}
}

// Start a worker in a separate gouroutine. Returns a cancel func to kill the worker.
// If a worker is nil, no routines will start and cancel could be ignored (still returning it to prevent panic)
func (w *Probe) Start(clientctx context.Context, k8sClient client.Client, probe *v1alpha1.DNSHealthCheckProbe) context.CancelFunc {

ctx, cancel := context.WithCancel(clientctx)
logger := log.FromContext(ctx).WithValues("probe", keyForProbe(probe))
logger.V(1).Info("health: starting new worker for probe")

go func() {
metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Inc()
//each time the probe executes it will send a result on the channel returned by ExecuteProbe until the probe is cancelled. The probe can be cancelled by a new spec being created for the healthcheck or on shutdown
for probeResult := range w.ExecuteProbe(ctx, probe) {
freshProbe := &v1alpha1.DNSHealthCheckProbe{}
if err := k8sClient.Get(clientctx, client.ObjectKeyFromObject(probe), freshProbe); err != nil {
// if we hit an error here we cancel and return as it is an unusual state
logger.Error(err, "health: probe finished. error getting upto date probe. Cancelling")
cancel()
return
}
freshProbe.Status.ObservedGeneration = freshProbe.Generation
if !probeResult.Healthy {
freshProbe.Status.ConsecutiveFailures++
} else {
freshProbe.Status.ConsecutiveFailures = 0
}
freshProbe.Status.Healthy = &probeResult.Healthy
freshProbe.Status.LastCheckedAt = probeResult.CheckedAt
freshProbe.Status.Reason = probeResult.Reason
freshProbe.Status.Status = probeResult.Status
logger.V(1).Info("health: execution complete ", "result", probeResult)

logger.V(2).Info("health: probe finished updating status for probe", "status", freshProbe)
err := k8sClient.Status().Update(clientctx, freshProbe)
if err != nil {
logger.Error(err, "health: probe finished. error updating probe status")
}
}

logger.V(1).Info("health: stopped executing probe", "probe", keyForProbe(probe))
metrics.ProbeCounter.WithLabelValues(probe.Name, probe.Namespace, probe.Spec.Hostname).Dec()
}()
return cancel
}

// EnsureProbeWorker ensures a new worker per generation of the probe.
// New generation of probe - new worker.
// If the generation has not changed, it will re-create a worker. If context is done (we are deleting) that worker will die immediately.
func (m *ProbeManager) EnsureProbeWorker(ctx context.Context, k8sClient client.Client, probeCR *v1alpha1.DNSHealthCheckProbe, headers v1alpha1.AdditionalHeaders) {
logger := log.FromContext(ctx)
logger := log.FromContext(ctx).WithValues("health probe worker:", keyForProbe(probeCR))
logger.Info("ensure probe")

// if worker exists
if stop, ok := m.probes[keyForProbe(probeCR)]; ok {
// gen has not changed (spec has not changed) - nothing to do,
// or first reconcile of the probe but worker already in place
if probeCR.Status.ObservedGeneration == probeCR.Generation || probeCR.Status.ObservedGeneration == 0 {
logger.V(1).Info("health: probe worker exists for generation. Continuing", "probe", keyForProbe(probeCR))
logger.V(2).Info("already exists for generation and status. Continuing", "probe", keyForProbe(probeCR))
return
}
logger.V(1).Info("health: worker already exists. New generation of the probe: stopping existing worker", "probe", keyForProbe(probeCR))
logger.V(2).Info("old worker exists. New generation of the probe found: stopping existing worker", "probe", keyForProbe(probeCR))
stop()
}
// Either worker does not exist, or gen changed and old worker got killed. Creating a new one.
logger.V(1).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR))
probe := NewProbe(probeCR, headers)
m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient)
logger.V(2).Info("health: starting fresh worker for", "generation", probeCR.Generation, "probe", keyForProbe(probeCR))
probe := NewProbe(headers)
m.probes[keyForProbe(probeCR)] = probe.Start(ctx, k8sClient, probeCR)
}

func keyForProbe(probe *v1alpha1.DNSHealthCheckProbe) string {
Expand Down
41 changes: 23 additions & 18 deletions internal/probes/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
Transport func() probes.RoundTripperFunc
ProbeConfig func() *v1alpha1.DNSHealthCheckProbe
ProbeHeaders v1alpha1.AdditionalHeaders
Validate func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int)
Validate func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int)
Ctx context.Context
ExpectedProbeCalls int
}{
Expand All @@ -64,15 +64,17 @@ func TestWorker_ProbeSuccess(t *testing.T) {
ProbeConfig: func() *v1alpha1.DNSHealthCheckProbe {
return testProbe.DeepCopy()
},
Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) {
if probe.Status.ConsecutiveFailures != 0 {
t.Fatalf("expected no failures but got %v", probe.Status.ConsecutiveFailures)
Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) {
if len(results) != expectedCalls {
t.Fatalf("expected %v results got %v", expectedCalls, len(results))
}
if probe.Status.Healthy == nil || *probe.Status.Healthy != true {
t.Fatalf("expected the probe to be healthy")
lastResult := results[expectedCalls-1]
// get the last probe result
if lastResult.Healthy == false {
t.Fatalf("expected the result of the probe to be healthy but it was not")
}
if tt.calls != expectedCalls {
t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls)
if lastResult.Status != 200 {
t.Fatalf("expected the result status to be 200 but got %v", lastResult.Status)
}
},
},
Expand All @@ -89,15 +91,16 @@ func TestWorker_ProbeSuccess(t *testing.T) {
return testProbe.DeepCopy()
},
ExpectedProbeCalls: 3,
Validate: func(t *testing.T, probe *v1alpha1.DNSHealthCheckProbe, tt *testTransport, expectedCalls int) {
if probe.Status.ConsecutiveFailures != expectedCalls {
t.Fatalf("expected %v failures but got %v", expectedCalls, probe.Status.ConsecutiveFailures)
Validate: func(t *testing.T, results []probes.ProbeResult, tt *testTransport, expectedCalls int) {
if len(results) != expectedCalls {
t.Fatalf("expected %v results got %v", expectedCalls, len(results))
}
if probe.Status.Healthy == nil || *probe.Status.Healthy != false {
t.Fatalf("expected the probe to be unhealthy")
lastResult := results[expectedCalls-1]
if lastResult.Healthy != false {
t.Fatalf("expected the result of the probe to be un-healthy but it was not")
}
if tt.calls != expectedCalls {
t.Fatalf("expected the number of health probe http calls to be %v but got %v", expectedCalls, tt.calls)
if lastResult.Status != 503 {
t.Fatalf("expected result status to be 503 but got %v", lastResult.Status)
}
},
},
Expand All @@ -106,7 +109,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
for _, test := range testCases {
t.Run(test.Name, func(t *testing.T) {
probeConfig := test.ProbeConfig()
w := probes.NewProbe(probeConfig, test.ProbeHeaders)
w := probes.NewProbe(test.ProbeHeaders)
tTransport := &testTransport{}
tTransport.f = test.Transport()
w.Transport = tTransport.countedHTTP
Expand All @@ -116,11 +119,13 @@ func TestWorker_ProbeSuccess(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(test.ExpectedProbeCalls)
// read from our channel
executedProbes := []probes.ProbeResult{}
go func() {
calls := 0
for range exChan {
for result := range exChan {
calls++
t.Logf("channel read %v", calls)
executedProbes = append(executedProbes, result)
wait.Done()
}
}()
Expand All @@ -135,7 +140,7 @@ func TestWorker_ProbeSuccess(t *testing.T) {
// cancel our context and wait for the probe to exit
cancel()
wait.Wait()
test.Validate(t, probeConfig, tTransport, test.ExpectedProbeCalls)
test.Validate(t, executedProbes, tTransport, test.ExpectedProbeCalls)
})
}
}

0 comments on commit 7fea5b8

Please sign in to comment.