Skip to content

Commit

Permalink
watch health probes in dnsrecord controller
Browse files Browse the repository at this point in the history
Signed-off-by: Maskym Vavilov <[email protected]>
  • Loading branch information
maksymvavilov committed Nov 15, 2024
1 parent 1f435c8 commit 8430c7c
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 38 deletions.
108 changes: 83 additions & 25 deletions internal/controller/dnsrecord_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reason := "DNSProviderError"
message := fmt.Sprintf("The dns provider could not be loaded: %v", err)
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse, reason, message)
return r.updateStatus(ctx, previous, dnsRecord, false, []string{}, err)
return r.updateStatus(ctx, previous, dnsRecord, nil, false, []string{}, err)
}

if probesEnabled {
Expand Down Expand Up @@ -174,7 +175,7 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
logger.Error(err, "Failed to validate record")
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse,
"ValidationError", fmt.Sprintf("validation of DNSRecord failed: %v", err))
return r.updateStatus(ctx, previous, dnsRecord, false, []string{}, err)
return r.updateStatus(ctx, previous, dnsRecord, nil, false, []string{}, err)
}

//Ensure an Owner ID has been assigned to the record (OwnerID set in the status)
Expand All @@ -197,14 +198,14 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse,
"DNSProviderError", fmt.Sprintf("The dns provider could not be loaded: %v", err))
return r.updateStatus(ctx, previous, dnsRecord, false, []string{}, err)
return r.updateStatus(ctx, previous, dnsRecord, nil, false, []string{}, err)
}

z, err := p.DNSZoneForHost(ctx, dnsRecord.Spec.RootHost)
if err != nil {
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse,
"DNSProviderError", fmt.Sprintf("Unable to find suitable zone in provider: %v", provider.SanitizeError(err)))
return r.updateStatus(ctx, previous, dnsRecord, false, []string{}, err)
return r.updateStatus(ctx, previous, dnsRecord, nil, false, []string{}, err)
}

//Add zone id/domainName to status
Expand All @@ -220,25 +221,34 @@ func (r *DNSRecordReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse,
"DNSProviderError", fmt.Sprintf("The dns provider could not be loaded: %v", err))
return r.updateStatus(ctx, previous, dnsRecord, false, []string{}, err)
return r.updateStatus(ctx, previous, dnsRecord, nil, false, []string{}, err)
}

probes := &v1alpha1.DNSHealthCheckProbeList{}
if probesEnabled {
if err = r.ReconcileHealthChecks(ctx, dnsRecord, allowInsecureCert); err != nil {
return ctrl.Result{}, err
}
// get all probes owned by this record
if err := r.List(ctx, probes, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
ProbeOwnerLabel: BuildOwnerLabelValue(dnsRecord),
}),
Namespace: dnsRecord.Namespace,
}); err != nil {
return ctrl.Result{}, err
}
}
// just check probes here and don't publish
// Publish the record
hadChanges, notHealthyProbes, err := r.publishRecord(ctx, dnsRecord, dnsProvider)
hadChanges, notHealthyProbes, err := r.publishRecord(ctx, dnsRecord, probes, dnsProvider)
if err != nil {
logger.Error(err, "Failed to publish record")
setDNSRecordCondition(dnsRecord, string(v1alpha1.ConditionTypeReady), metav1.ConditionFalse,
"ProviderError", fmt.Sprintf("The DNS provider failed to ensure the record: %v", provider.SanitizeError(err)))
return r.updateStatus(ctx, previous, dnsRecord, hadChanges, notHealthyProbes, err)
return r.updateStatus(ctx, previous, dnsRecord, probes, hadChanges, notHealthyProbes, err)
}

return r.updateStatus(ctx, previous, dnsRecord, hadChanges, notHealthyProbes, nil)
return r.updateStatus(ctx, previous, dnsRecord, probes, hadChanges, notHealthyProbes, nil)
}

// setLogger Updates the given Logger with record/zone metadata from the given DNSRecord.
Expand All @@ -252,7 +262,7 @@ func (r *DNSRecordReconciler) setLogger(ctx context.Context, logger logr.Logger,
return log.IntoContext(ctx, logger), logger
}

func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, current *v1alpha1.DNSRecord, hadChanges bool, notHealthyProbes []string, specErr error) (reconcile.Result, error) {
func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, current *v1alpha1.DNSRecord, probes *v1alpha1.DNSHealthCheckProbeList, hadChanges bool, notHealthyProbes []string, specErr error) (reconcile.Result, error) {
var requeueTime time.Duration
logger := log.FromContext(ctx)

Expand All @@ -269,7 +279,7 @@ func (r *DNSRecordReconciler) updateStatus(ctx context.Context, previous, curren
}

// short loop. We don't publish anything so not changing status
if prematurely, requeueIn := recordReceivedPrematurely(current); prematurely {
if prematurely, requeueIn := recordReceivedPrematurely(current, probes); prematurely {
return reconcile.Result{RequeueAfter: requeueIn}, nil
}

Expand Down Expand Up @@ -358,14 +368,34 @@ func (r *DNSRecordReconciler) SetupWithManager(mgr ctrl.Manager, maxRequeue, val
}
return toReconcile
})).
Watches(&v1alpha1.DNSHealthCheckProbe{}, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
logger := log.FromContext(ctx)
probe, ok := o.(*v1alpha1.DNSHealthCheckProbe)
if !ok {
logger.V(1).Info("unexpected object type", "error", fmt.Sprintf("%T is not a *v1alpha1.DNSHealthCheckProbe", o))
}

records := &v1alpha1.DNSRecordList{}
if err := mgr.GetClient().List(ctx, records, &client.ListOptions{Namespace: o.GetNamespace()}); err != nil {
logger.Error(err, "failed to list dnsrecords ", "namespace", o.GetNamespace())
}

owner := GetOwnerFromLabel(probe)
for _, record := range records.Items {
if record.Name == owner || record.GetUIDHash() == owner {
return []reconcile.Request{{NamespacedName: client.ObjectKeyFromObject(&record)}}
}
}
return nil
})).
Complete(r)
}

// deleteRecord deletes record(s) in the DNSPRovider(i.e. route53) zone (dnsRecord.Status.ZoneID).
func (r *DNSRecordReconciler) deleteRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, dnsProvider provider.Provider) (bool, error) {
logger := log.FromContext(ctx)

hadChanges, _, err := r.applyChanges(ctx, dnsRecord, dnsProvider, true)
hadChanges, _, err := r.applyChanges(ctx, dnsRecord, nil, dnsProvider, true)
if err != nil {
if strings.Contains(err.Error(), "was not found") || strings.Contains(err.Error(), "notFound") {
logger.Info("Record not found in zone, continuing")
Expand All @@ -383,15 +413,15 @@ func (r *DNSRecordReconciler) deleteRecord(ctx context.Context, dnsRecord *v1alp

// publishRecord publishes record(s) to the DNSPRovider(i.e. route53) zone (dnsRecord.Status.ZoneID).
// returns if it had changes, if record is healthy and an error. If had no changes - the healthy bool can be ignored
func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, dnsProvider provider.Provider) (bool, []string, error) {
func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, probes *v1alpha1.DNSHealthCheckProbeList, dnsProvider provider.Provider) (bool, []string, error) {
logger := log.FromContext(ctx)

if prematurely, _ := recordReceivedPrematurely(dnsRecord); prematurely {
if prematurely, _ := recordReceivedPrematurely(dnsRecord, probes); prematurely {
logger.V(1).Info("Skipping DNSRecord - is still valid")
return false, []string{}, nil
}

hadChanges, notHealthyProbes, err := r.applyChanges(ctx, dnsRecord, dnsProvider, false)
hadChanges, notHealthyProbes, err := r.applyChanges(ctx, dnsRecord, probes, dnsProvider, false)
if err != nil {
return hadChanges, notHealthyProbes, err
}
Expand All @@ -400,17 +430,46 @@ func (r *DNSRecordReconciler) publishRecord(ctx context.Context, dnsRecord *v1al
return hadChanges, notHealthyProbes, nil
}

// recordReceivedPrematurely returns true if current reconciliation loop started before
// last loop plus validFor duration.
// recordReceivedPrematurely returns true if the current reconciliation loop started before
// the last loop plus validFor duration.
// It also returns a duration for which the record should have been requeued. Meaning that if the record was valid
// for 30 minutes and was received in 29 minutes the function will return (true, 30 min).
func recordReceivedPrematurely(record *v1alpha1.DNSRecord) (bool, time.Duration) {
// for 30 minutes and was received in 29 minutes, the function will return (true, 30 min).
// It will make an exception and will let through premature records if healthcheck probes change their health status
func recordReceivedPrematurely(record *v1alpha1.DNSRecord, probes *v1alpha1.DNSHealthCheckProbeList) (bool, time.Duration) {
var prematurely bool

requeueIn := validFor
if record.Status.ValidFor != "" {
requeueIn, _ = time.ParseDuration(record.Status.ValidFor)
}
expiryTime := metav1.NewTime(record.Status.QueuedAt.Add(requeueIn))
return !generationChanged(record) && reconcileStart.Before(&expiryTime), requeueIn
prematurely = !generationChanged(record) && reconcileStart.Before(&expiryTime)

// check for the exception if we are received prematurely.
// this cuts off all the cases when we are creating. If this evaluates to true, we must have created probes
// and must have healthy condition
if prematurely && probesEnabled && record.Spec.HealthCheck != nil {
healthyCond := meta.FindStatusCondition(record.Status.Conditions, string(v1alpha1.ConditionTypeHealthy))
// this is caused only by an error during reconciliation
if healthyCond == nil {
return false, requeueIn
}
// healthy is true only if we have probes and they are all healthy
isHealthy := healthyCond.Status == metav1.ConditionTrue

// if at least one not healthy - this will lock in false
allProbesHealthy := true
for _, probe := range probes.Items {
if probe.Status.Healthy != nil {
allProbesHealthy = allProbesHealthy && *probe.Status.Healthy
}
}

// prematurely is true here. return false in case we need full reconcile
return isHealthy == allProbesHealthy, requeueIn
}

return prematurely, requeueIn
}

func generationChanged(record *v1alpha1.DNSRecord) bool {
Expand Down Expand Up @@ -446,7 +505,7 @@ func setStatusConditions(record *v1alpha1.DNSRecord, hadChanges bool, notHealthy
setDNSRecordCondition(record, string(v1alpha1.ConditionTypeReady), metav1.ConditionTrue, string(v1alpha1.ConditionReasonProviderSuccess), "Provider ensured the dns record")

// probes are disabled or not defined, or this is a wildcard record
if record.Spec.HealthCheck == nil || strings.HasPrefix(record.Spec.RootHost, v1alpha1.WildcardPrefix) {
if record.Spec.HealthCheck == nil || strings.HasPrefix(record.Spec.RootHost, v1alpha1.WildcardPrefix) || !probesEnabled {
meta.RemoveStatusCondition(&record.Status.Conditions, string(v1alpha1.ConditionTypeHealthy))
return
}
Expand All @@ -471,10 +530,9 @@ func setStatusConditions(record *v1alpha1.DNSRecord, hadChanges bool, notHealthy
// at least one of the probes is healthy
setDNSRecordCondition(record, string(v1alpha1.ConditionTypeHealthy), metav1.ConditionFalse, string(v1alpha1.ConditionReasonPartiallyHealthy), fmt.Sprintf("Not healthy addresses: %s", notHealthyProbes))
}
// in any case - hostname will resolve, so the record is ready (don't change ready condition)
return
}
// none of the probes is healthy (change ready condition to false)
// none of the probes is healthy
setDNSRecordCondition(record, string(v1alpha1.ConditionTypeHealthy), metav1.ConditionFalse, string(v1alpha1.ConditionReasonUnhealthy), fmt.Sprintf("Not healthy addresses: %s", notHealthyProbes))

}
Expand Down Expand Up @@ -516,7 +574,7 @@ func (r *DNSRecordReconciler) getDNSProvider(ctx context.Context, dnsRecord *v1a

// applyChanges creates the Plan and applies it to the registry. Returns true only if the Plan had no errors and there were changes to apply.
// The error is nil only if the changes were successfully applied or there were no changes to be made.
func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, dnsProvider provider.Provider, isDelete bool) (bool, []string, error) {
func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alpha1.DNSRecord, probes *v1alpha1.DNSHealthCheckProbeList, dnsProvider provider.Provider, isDelete bool) (bool, []string, error) {
logger := log.FromContext(ctx)
rootDomainName := dnsRecord.Spec.RootHost
zoneDomainFilter := externaldnsendpoint.NewDomainFilter([]string{dnsRecord.Status.ZoneDomainName})
Expand Down Expand Up @@ -554,7 +612,7 @@ func (r *DNSRecordReconciler) applyChanges(ctx context.Context, dnsRecord *v1alp
}

// healthySpecEndpoints = Records that this DNSRecord expects to exist, that do not have matching unhealthy probes
healthySpecEndpoints, notHealthyProbes, err := r.removeUnhealthyEndpoints(ctx, specEndpoints, dnsRecord)
healthySpecEndpoints, notHealthyProbes, err := removeUnhealthyEndpoints(specEndpoints, dnsRecord, probes)
if err != nil {
return false, []string{}, fmt.Errorf("removing unhealthy specEndpoints: %w", err)
}
Expand Down
21 changes: 8 additions & 13 deletions internal/controller/dnsrecord_healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ func (r *DNSRecordReconciler) ensureProbe(ctx context.Context, generated *v1alph
// - Returns empty array if nothing is published (prevent from publishing unhealthy EPs)
//
// it returns the list of healthy endpoints, an array of unhealthy addresses and an error
func (r *DNSRecordReconciler) removeUnhealthyEndpoints(ctx context.Context, specEndpoints []*endpoint.Endpoint, dnsRecord *v1alpha1.DNSRecord) ([]*endpoint.Endpoint, []string, error) {
probes := &v1alpha1.DNSHealthCheckProbeList{}
func removeUnhealthyEndpoints(specEndpoints []*endpoint.Endpoint, dnsRecord *v1alpha1.DNSRecord, probes *v1alpha1.DNSHealthCheckProbeList) ([]*endpoint.Endpoint, []string, error) {

// we are deleting or don't have health checks - don't bother
if (dnsRecord.DeletionTimestamp != nil && !dnsRecord.DeletionTimestamp.IsZero()) || dnsRecord.Spec.HealthCheck == nil {
if (dnsRecord.DeletionTimestamp != nil && !dnsRecord.DeletionTimestamp.IsZero()) || dnsRecord.Spec.HealthCheck == nil || !probesEnabled {
return specEndpoints, []string{}, nil
}

Expand All @@ -125,16 +124,6 @@ func (r *DNSRecordReconciler) removeUnhealthyEndpoints(ctx context.Context, spec
return specEndpoints, []string{}, nil
}

// get all probes owned by this record
err := r.List(ctx, probes, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
ProbeOwnerLabel: BuildOwnerLabelValue(dnsRecord),
}),
Namespace: dnsRecord.Namespace,
})
if err != nil {
return nil, []string{}, err
}
unhealthyAddresses := make([]string, 0, len(probes.Items))

// use adjusted endpoints instead of spec ones
Expand Down Expand Up @@ -212,3 +201,9 @@ func BuildOwnerLabelValue(record *v1alpha1.DNSRecord) string {
}
return value
}

// GetOwnerFromLabel returns a name or UID of probe owner
// A reverse to BuildOwnerLabelValue
func GetOwnerFromLabel(probe *v1alpha1.DNSHealthCheckProbe) string {
return probe.GetLabels()[ProbeOwnerLabel]
}
81 changes: 81 additions & 0 deletions internal/controller/dnsrecord_healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,87 @@ var _ = Describe("DNSRecordReconciler_HealthChecks", func() {
}, TestTimeoutMedium, time.Second).Should(Succeed())
})

It("Should reconcile prematurely on healthcheck change", func() {
// Create a default record
Expect(k8sClient.Create(ctx, dnsRecord)).To(Succeed())

By("Making probes healthy")
Eventually(func(g Gomega) {
probes := &v1alpha1.DNSHealthCheckProbeList{}

g.Expect(k8sClient.List(ctx, probes, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
ProbeOwnerLabel: BuildOwnerLabelValue(dnsRecord),
}),
Namespace: dnsRecord.Namespace,
})).To(Succeed())
g.Expect(len(probes.Items)).To(Equal(2))

// make probes healthy
for _, probe := range probes.Items {
probe.Status.Healthy = ptr.To(true)
probe.Status.LastCheckedAt = metav1.Now()
probe.Status.ConsecutiveFailures = 0
g.Expect(k8sClient.Status().Update(ctx, &probe)).To(Succeed())
}
}, TestTimeoutMedium, time.Second).Should(Succeed())

// Override valid for to 1 hour to make sure we are always premature
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(dnsRecord), dnsRecord)).To(Succeed())
// update only after we are ready
g.Expect(dnsRecord.Status.Conditions).To(
ContainElement(MatchFields(IgnoreExtras, Fields{
"Type": Equal(string(v1alpha1.ConditionTypeReady)),
"Status": Equal(metav1.ConditionTrue),
})),
)
dnsRecord.Status.ValidFor = "1h"
g.Expect(k8sClient.Status().Update(ctx, dnsRecord)).To(Succeed())
}, TestTimeoutMedium, time.Second).Should(Succeed())

// validate that it is still 1 hour - we hit the premature loop
// we purposefully are not updating anything on premature loops
// so can't be sure if we have had a short loop or not yet
By("Verify premature loop did not occur")
Eventually(func(g Gomega) {
newRecord := &v1alpha1.DNSRecord{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(dnsRecord), newRecord)).To(Succeed())
g.Expect(newRecord.Status.ValidFor).To(Equal("1h"))
}, TestTimeoutMedium, time.Second).Should(Succeed())

By("Making probes unhealthy")
Eventually(func(g Gomega) {
probes := &v1alpha1.DNSHealthCheckProbeList{}

g.Expect(k8sClient.List(ctx, probes, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
ProbeOwnerLabel: BuildOwnerLabelValue(dnsRecord),
}),
Namespace: dnsRecord.Namespace,
})).To(Succeed())
g.Expect(len(probes.Items)).To(Equal(2))

// make probes healthy
for _, probe := range probes.Items {
probe.Status.Healthy = ptr.To(false)
probe.Status.LastCheckedAt = metav1.Now()
probe.Status.ConsecutiveFailures = 5
g.Expect(k8sClient.Status().Update(ctx, &probe)).To(Succeed())
}
}, TestTimeoutMedium, time.Second).Should(Succeed())

// check that record has valid for not an hour - we did the full reconcile and updated valid for
By("Validate premature loop occurred")
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(dnsRecord), dnsRecord)).To(Succeed())
// on the full loop we will always update ValidFor unless it is max duration.
// since we had 1 hour the logic should set it to the max.
// meaning that changing probes triggered dnsrecornd reconciler
// and we had a fool loop before we were supposed to
g.Expect(dnsRecord.Status.ValidFor).To(Equal(RequeueDuration.String()))
}, TestTimeoutMedium, time.Second).Should(Succeed())
})
It("Should not publish unhealthy enpoints", func() {
//Create default test dnsrecord
Expect(k8sClient.Create(ctx, dnsRecord)).To(Succeed())
Expand Down

0 comments on commit 8430c7c

Please sign in to comment.