Skip to content

Commit

Permalink
feat: add support for more resource types
Browse files Browse the repository at this point in the history
* CronJob
* DaemonSet
* Job
* StatefulSet
  • Loading branch information
basti1302 committed May 24, 2024
1 parent 16bd693 commit 626784f
Show file tree
Hide file tree
Showing 15 changed files with 1,169 additions and 292 deletions.
281 changes: 263 additions & 18 deletions internal/controller/dash0_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package controller

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,6 +34,25 @@ type Dash0Reconciler struct {
Versions k8sresources.Versions
}

type ImmutableResourceError struct {
resourceType string
resource string
}

func (e ImmutableResourceError) Error() string {
return fmt.Sprintf(
"Dash0 cannot instrument the existing %s %s, since the this type of resource is immutable.",
e.resourceType,
e.resource,
)
}

var (
labelNotSetFilter = metav1.ListOptions{
LabelSelector: fmt.Sprintf("!%s", util.Dash0AutoInstrumentationLabel),
}
)

func (r *Dash0Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1alpha1.Dash0{}).
Expand Down Expand Up @@ -168,52 +189,276 @@ func (r *Dash0Reconciler) refreshStatus(ctx context.Context, dash0CustomResource
func (r *Dash0Reconciler) modifyExistingResources(ctx context.Context, dash0CustomResource *operatorv1alpha1.Dash0) error {
namespace := dash0CustomResource.Namespace

listOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("!%s", util.Dash0AutoInstrumentationLabel),
errCronJobs := r.findAndModifyCronJobs(ctx, namespace)
errDaemonSets := r.findAndModifyDaemonSets(ctx, namespace)
errDeployments := r.findAndModifyDeployments(ctx, namespace)
errJobs := r.findAndHandleJobs(ctx, namespace)
errStatefulSets := r.findAndModifyStatefulSets(ctx, namespace)
combinedErrors := errors.Join(
errCronJobs,
errDaemonSets,
errDeployments,
errJobs,
errStatefulSets,
)
if combinedErrors != nil {
return combinedErrors
}
return nil
}

deploymentsInNamespace, err := r.ClientSet.AppsV1().Deployments(namespace).List(ctx, listOptions)
func (r *Dash0Reconciler) findAndModifyCronJobs(ctx context.Context, namespace string) error {
matchingResourcesInNamespace, err := r.ClientSet.BatchV1().CronJobs(namespace).List(ctx, labelNotSetFilter)
if err != nil {
return fmt.Errorf("error when querying deployments: %w", err)
return fmt.Errorf("error when querying cron jobs: %w", err)
}
for _, resource := range matchingResourcesInNamespace.Items {
r.modifyCronJob(ctx, resource)
}
return nil
}

func (r *Dash0Reconciler) modifyCronJob(ctx context.Context, cronJob batchv1.CronJob) {
if cronJob.DeletionTimestamp != nil {
// do not modify resources that are being deleted
return
}
logger := log.FromContext(ctx).WithValues(
"resource type",
"CronJob",
"resource namespace",
cronJob.GetNamespace(),
"resource name",
cronJob.GetName(),
)
hasBeenModified := false
retryErr := util.Retry("modifying cron job", func() error {
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: cronJob.GetNamespace(),
Name: cronJob.GetName(),
}, &cronJob); err != nil {
return fmt.Errorf("error when fetching cron job %s/%s: %w", cronJob.GetNamespace(), cronJob.GetName(), err)
}
hasBeenModified = k8sresources.
NewResourceModifier(r.Versions, logger).
ModifyCronJob(&cronJob, cronJob.GetNamespace())
if hasBeenModified {
return r.Client.Update(ctx, &cronJob)
} else {
return nil
}
}, &logger)

r.postProcess(&cronJob, hasBeenModified, logger, retryErr)
}

for _, deployment := range deploymentsInNamespace.Items {
r.modifySingleResource(ctx, deployment)
func (r *Dash0Reconciler) findAndModifyDaemonSets(ctx context.Context, namespace string) error {
matchingResourcesInNamespace, err := r.ClientSet.AppsV1().DaemonSets(namespace).List(ctx, labelNotSetFilter)
if err != nil {
return fmt.Errorf("error when querying daemon sets: %w", err)
}
for _, resource := range matchingResourcesInNamespace.Items {
r.modifyDaemonSet(ctx, resource)
}
return nil
}

func (r *Dash0Reconciler) modifySingleResource(ctx context.Context, deployment appsv1.Deployment) {
logger := log.FromContext(ctx).WithValues("resource type", "deployment", "resource namespace", deployment.GetNamespace(), "resource name", deployment.GetName())
func (r *Dash0Reconciler) modifyDaemonSet(ctx context.Context, daemonSet appsv1.DaemonSet) {
if daemonSet.DeletionTimestamp != nil {
// do not modify resources that are being deleted
return
}
logger := log.FromContext(ctx).WithValues(
"resource type",
"DaemonSet",
"resource namespace",
daemonSet.GetNamespace(),
"resource name",
daemonSet.GetName(),
)
hasBeenModified := false
retryErr := util.Retry("Modifying deployment", func() error {
retryErr := util.Retry("modifying daemon set", func() error {
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: daemonSet.GetNamespace(),
Name: daemonSet.GetName(),
}, &daemonSet); err != nil {
return fmt.Errorf("error when fetching daemon set %s/%s: %w", daemonSet.GetNamespace(), daemonSet.GetName(), err)
}
hasBeenModified = k8sresources.
NewResourceModifier(r.Versions, logger).
ModifyDaemonSet(&daemonSet, daemonSet.GetNamespace())
if hasBeenModified {
return r.Client.Update(ctx, &daemonSet)
} else {
return nil
}
}, &logger)

r.postProcess(&daemonSet, hasBeenModified, logger, retryErr)
}

func (r *Dash0Reconciler) findAndModifyDeployments(ctx context.Context, namespace string) error {
matchingResourcesInNamespace, err := r.ClientSet.AppsV1().Deployments(namespace).List(ctx, labelNotSetFilter)
if err != nil {
return fmt.Errorf("error when querying deployments: %w", err)
}
for _, resource := range matchingResourcesInNamespace.Items {
r.modifyDeployment(ctx, resource)
}
return nil
}

func (r *Dash0Reconciler) modifyDeployment(ctx context.Context, deployment appsv1.Deployment) {
if deployment.DeletionTimestamp != nil {
// do not modify resources that are being deleted
return
}
logger := log.FromContext(ctx).WithValues(
"resource type",
"Deployment",
"resource namespace",
deployment.GetNamespace(),
"resource name",
deployment.GetName(),
)
hasBeenModified := false
retryErr := util.Retry("modifying deployment", func() error {
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: deployment.GetNamespace(),
Name: deployment.GetName(),
}, &deployment); err != nil {
return fmt.Errorf("error when fetching deployment %s/%s: %w", deployment.GetNamespace(), deployment.GetName(), err)
}
hasBeenModified = k8sresources.ModifyDeployment(
&deployment,
deployment.GetNamespace(),
r.Versions,
hasBeenModified = k8sresources.
NewResourceModifier(r.Versions, logger).
ModifyDeployment(&deployment, deployment.GetNamespace())
if hasBeenModified {
return r.Client.Update(ctx, &deployment)
} else {
return nil
}
}, &logger)

r.postProcess(&deployment, hasBeenModified, logger, retryErr)
}

func (r *Dash0Reconciler) findAndHandleJobs(ctx context.Context, namespace string) error {
matchingResourcesInNamespace, err := r.ClientSet.BatchV1().Jobs(namespace).List(ctx, labelNotSetFilter)
if err != nil {
return fmt.Errorf("error when querying cron jobs: %w", err)
}

for _, job := range matchingResourcesInNamespace.Items {
r.handleJob(ctx, job)
}
return nil
}

func (r *Dash0Reconciler) handleJob(ctx context.Context, job batchv1.Job) {
if job.DeletionTimestamp != nil {
// do not modify resources that are being deleted
return
}
logger := log.FromContext(ctx).WithValues(
"resource type",
"Job",
"resource namespace",
job.GetNamespace(),
"resource name",
job.GetName(),
)
retryErr := util.Retry("labelling immutable job", func() error {
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: job.GetNamespace(),
Name: job.GetName(),
}, &job); err != nil {
return fmt.Errorf("error when fetching job %s/%s: %w", job.GetNamespace(), job.GetName(), err)
}
k8sresources.
NewResourceModifier(r.Versions, logger).
AddLabelsToImmutableJob(&job)
return r.Client.Update(ctx, &job)
}, &logger)

if retryErr != nil {
r.postProcess(&job, false, logger, retryErr)
} else {
r.postProcess(
&job,
false,
logger,
ImmutableResourceError{
resourceType: "job",
resource: fmt.Sprintf("%s/%s", job.GetNamespace(), job.GetName()),
},
)
}
}

func (r *Dash0Reconciler) findAndModifyStatefulSets(ctx context.Context, namespace string) error {
matchingResourcesInNamespace, err := r.ClientSet.AppsV1().StatefulSets(namespace).List(ctx, labelNotSetFilter)
if err != nil {
return fmt.Errorf("error when querying stateful sets: %w", err)
}
for _, resource := range matchingResourcesInNamespace.Items {
r.modifyStatefulSet(ctx, resource)
}
return nil
}

func (r *Dash0Reconciler) modifyStatefulSet(ctx context.Context, statefulSet appsv1.StatefulSet) {
if statefulSet.DeletionTimestamp != nil {
// do not modify resources that are being deleted
return
}
logger := log.FromContext(ctx).WithValues(
"resource type",
"StatefulSet",
"resource namespace",
statefulSet.GetNamespace(),
"resource name",
statefulSet.GetName(),
)
hasBeenModified := false
retryErr := util.Retry("modifying stateful set", func() error {
if err := r.Client.Get(ctx, client.ObjectKey{
Namespace: statefulSet.GetNamespace(),
Name: statefulSet.GetName(),
}, &statefulSet); err != nil {
return fmt.Errorf("error when fetching stateful set %s/%s: %w", statefulSet.GetNamespace(), statefulSet.GetName(), err)
}
hasBeenModified = k8sresources.
NewResourceModifier(r.Versions, logger).
ModifyStatefulSet(&statefulSet, statefulSet.GetNamespace())
if hasBeenModified {
return r.Client.Update(ctx, &deployment)
return r.Client.Update(ctx, &statefulSet)
} else {
return nil
}
}, &logger)

r.postProcess(&statefulSet, hasBeenModified, logger, retryErr)
}

func (r *Dash0Reconciler) postProcess(
resource runtime.Object,
hasBeenModified bool,
logger logr.Logger,
retryErr error,
) {
if retryErr != nil {
logger.Error(retryErr, "Dash0 instrumentation by controller has not been successful.")
util.QueueFailedInstrumentationEvent(r.Recorder, &deployment, "controller", retryErr)
e := &ImmutableResourceError{}
if errors.As(retryErr, e) {
logger.Info(e.Error())
} else {
logger.Error(retryErr, "Dash0 instrumentation by controller has not been successful.")
}
util.QueueFailedInstrumentationEvent(r.Recorder, resource, "controller", retryErr)
} else if !hasBeenModified {
logger.Info("Dash0 instrumentation already present, no modification by controller is necessary.")
util.QueueAlreadyInstrumentedEvent(r.Recorder, &deployment, "controller")
util.QueueAlreadyInstrumentedEvent(r.Recorder, resource, "controller")
} else {
logger.Info("The controller has added Dash0 instrumentation to the resource.")
util.QueueSuccessfulInstrumentationEvent(r.Recorder, &deployment, "controller")
util.QueueSuccessfulInstrumentationEvent(r.Recorder, resource, "controller")
}
}
Loading

0 comments on commit 626784f

Please sign in to comment.