Skip to content

Commit

Permalink
feat(backendconnection): watch and reconcile OTel collector resources
Browse files Browse the repository at this point in the history
Watch the K8s resources for the OpenTelemetry collector daemonset and
deployment which are managed by the operator, reconcile them back into
their desired state if they have been changed externally.
  • Loading branch information
basti1302 committed Sep 10, 2024
1 parent e8f5398 commit c2c59ba
Show file tree
Hide file tree
Showing 19 changed files with 816 additions and 280 deletions.
18 changes: 14 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,17 +430,27 @@ func startDash0Controllers(
Clientset: clientset,
OTelColResourceManager: oTelColResourceManager,
}
backendConnectionReconciler := &backendconnection.BackendConnectionReconciler{
Client: k8sClient,
BackendConnectionManager: backendConnectionManager,
Images: images,
OperatorNamespace: envVars.operatorNamespace,
OTelCollectorNamePrefix: envVars.oTelCollectorNamePrefix,
}
if err = backendConnectionReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the backend connection reconciler: %w", err)
}

operatorConfigurationReconciler := &controller.OperatorConfigurationReconciler{
Client: mgr.GetClient(),
Client: k8sClient,
Clientset: clientset,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("dash0-operator-configuration-controller"),
DeploymentSelfReference: deploymentSelfReference,
Images: images,
DevelopmentMode: developmentMode,
}
if err := operatorConfigurationReconciler.SetupWithManager(mgr); err != nil {
if err = operatorConfigurationReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the operator configuration reconciler: %w", err)
}
operatorConfigurationReconciler.InitializeSelfMonitoringMetrics(
Expand All @@ -457,7 +467,7 @@ func startDash0Controllers(
Images: images,
OperatorNamespace: envVars.operatorNamespace,
}
if err := monitoringReconciler.SetupWithManager(mgr); err != nil {
if err = monitoringReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to set up the monitoring reconciler: %w", err)
}
monitoringReconciler.InitializeSelfMonitoringMetrics(
Expand All @@ -467,7 +477,7 @@ func startDash0Controllers(
)

if os.Getenv("ENABLE_WEBHOOK") != "false" {
if err := (&webhook.Handler{
if err = (&webhook.Handler{
Client: k8sClient,
Recorder: mgr.GetEventRecorderFor("dash0-webhook"),
Images: images,
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ go 1.22.4
toolchain go1.22.6

require (
github.com/cisco-open/k8s-objectmatcher v1.10.0
github.com/dash0hq/dash0-operator/images/pkg/common v0.0.0-00010101000000-000000000000
github.com/go-logr/logr v1.4.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.20.2
github.com/onsi/gomega v1.34.2
go.opentelemetry.io/collector/pdata v1.14.1
go.opentelemetry.io/otel/metric v1.29.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.31.0
k8s.io/apimachinery v0.31.0
Expand All @@ -21,11 +22,13 @@ require (
)

require (
emperror.dev/errors v0.8.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
Expand All @@ -39,6 +42,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
Expand All @@ -59,7 +63,6 @@ require (
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
Expand All @@ -79,6 +82,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apiextensions-apiserver v0.31.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
emperror.dev/errors v0.8.1 h1:UavXZ5cSX/4u9iyvH6aDcuGkVjeexUGJ7Ij7G4VfQT0=
emperror.dev/errors v0.8.1/go.mod h1:YcRvLPh626Ubn2xqtoprejnA5nFha+TJ+2vew48kWuE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cisco-open/k8s-objectmatcher v1.10.0 h1:1TdhMPqVaU+NqECqytAkRF1SFU0QIMqrqbNTnTl933A=
github.com/cisco-open/k8s-objectmatcher v1.10.0/go.mod h1:O/TFG3vW12jbKNQejpc8SGgSfujlaWYIOCZHcXeK514=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU=
github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v5.9.0+incompatible h1:fBXyNpNMuTTDdquAq/uisOr2lShz4oaXpDTX2bLe7ls=
github.com/evanphx/json-patch v5.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg=
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
Expand Down Expand Up @@ -121,8 +125,10 @@ go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt3
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
Expand Down
160 changes: 160 additions & 0 deletions internal/backendconnection/backend_connection_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// SPDX-FileCopyrightText: Copyright 2024 Dash0 Inc.
// SPDX-License-Identifier: Apache-2.0

package backendconnection

import (
"context"
"slices"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/dash0monitoring/v1alpha1"
"github.com/dash0hq/dash0-operator/internal/backendconnection/otelcolresources"
"github.com/dash0hq/dash0-operator/internal/dash0/selfmonitoring"
"github.com/dash0hq/dash0-operator/internal/dash0/util"
)

type BackendConnectionReconciler struct {
client.Client
BackendConnectionManager *BackendConnectionManager
Images util.Images
OperatorNamespace string
OTelCollectorNamePrefix string
}

func (r *BackendConnectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Named("dash0backendconnectioncontroller").
Watches(
&corev1.ConfigMap{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.DaemonSetCollectorConfigConfigMapName(r.OTelCollectorNamePrefix),
otelcolresources.DeploymentCollectorConfigConfigMapName(r.OTelCollectorNamePrefix),
// Note: We are deliberately not watching the filelog receiver offsets ConfigMap, since it is updated
// frequently by the filelog offset synch container and does not require reconciliation.
})).
Watches(
&rbacv1.ClusterRole{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.DaemonSetClusterRoleName(r.OTelCollectorNamePrefix),
otelcolresources.DeploymentClusterRoleName(r.OTelCollectorNamePrefix),
})).
Watches(
&rbacv1.ClusterRoleBinding{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.DeploymentClusterRoleBindingName(r.OTelCollectorNamePrefix),
otelcolresources.DeploymentClusterRoleName(r.OTelCollectorNamePrefix),
})).
Watches(
&corev1.Service{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.ServiceName(r.OTelCollectorNamePrefix),
})).
Watches(
&appsv1.DaemonSet{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.DaemonSetName(r.OTelCollectorNamePrefix),
})).
Watches(
&appsv1.Deployment{},
&handler.EnqueueRequestForObject{},
r.withNamePredicate([]string{
otelcolresources.DeploymentName(r.OTelCollectorNamePrefix),
})).
Complete(r)
}

func (r *BackendConnectionReconciler) withNamePredicate(resourceNames []string) builder.Predicates {
return builder.WithPredicates(r.createFilterPredicate(resourceNames))
}

func (r *BackendConnectionReconciler) createFilterPredicate(resourceNames []string) predicate.Funcs {
resourceNamespace := r.OperatorNamespace
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return resourceMatches(e.Object, resourceNamespace, resourceNames)
},
UpdateFunc: func(e event.UpdateEvent) bool {
return resourceMatches(e.ObjectOld, resourceNamespace, resourceNames) ||
resourceMatches(e.ObjectNew, resourceNamespace, resourceNames)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return resourceMatches(e.Object, resourceNamespace, resourceNames)
},
GenericFunc: func(e event.GenericEvent) bool {
return resourceMatches(e.Object, resourceNamespace, resourceNames)
},
}
}

func resourceMatches(object client.Object, resourceNamespace string, resourceNames []string) bool {
if object.GetNamespace() != resourceNamespace {
return false
}
return slices.Contains(resourceNames, object.GetName())
}

func (r *BackendConnectionReconciler) Reconcile(
ctx context.Context,
request reconcile.Request,
) (reconcile.Result, error) {
logger := log.FromContext(ctx)
logger.Info("reconciling backend connection resources", "request", request)

allDash0MonitoringResouresInCluster := &dash0v1alpha1.Dash0MonitoringList{}
if err := r.List(
ctx,
allDash0MonitoringResouresInCluster,
&client.ListOptions{},
); err != nil {
logger.Error(err, "Failed to list all Dash0 monitoring resources when reconciling backend connection resources.")
return reconcile.Result{}, err
}

if len(allDash0MonitoringResouresInCluster.Items) == 0 {
logger.Info("No Dash0 monitoring resources in cluster, aborting the backend connection resources reconciliation.")
return reconcile.Result{}, nil
}

// TODO this needs to be fixed when we start to support sending telemetry to different backends per namespace.
// Ultimately we need to derive one consistent configuration including multiple pipelines and routing across all
// monitored namespaces.
arbitraryMonitoringResource := allDash0MonitoringResouresInCluster.Items[0]

err := r.BackendConnectionManager.EnsureOpenTelemetryCollectorIsDeployedInOperatorNamespace(
ctx,
r.Images,
r.OperatorNamespace,
&arbitraryMonitoringResource,
selfmonitoring.ReadSelfMonitoringConfigurationFromOperatorConfigurationResource(ctx, r.Client, &logger),
TriggeredByWatchEvent,
)
if err != nil {
logger.Error(err, "Failed to create/update backend connection resources.")
return reconcile.Result{}, err
}

logger.Info(
"successfully reconciled backend connection resources",
"request",
request,
)

return reconcile.Result{}, nil
}
42 changes: 42 additions & 0 deletions internal/backendconnection/backendconnection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package backendconnection
import (
"context"
"fmt"
"sync/atomic"

"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -21,16 +22,51 @@ type BackendConnectionManager struct {
client.Client
Clientset *kubernetes.Clientset
*otelcolresources.OTelColResourceManager
updateInProgress atomic.Bool
resourcesHaveBeenDeletedByOperator atomic.Bool
}

type BackendConnectionReconcileTrigger string

const (
TriggeredByWatchEvent BackendConnectionReconcileTrigger = "watch"
TriggeredByDash0Resource BackendConnectionReconcileTrigger = "resource"
)

func (m *BackendConnectionManager) EnsureOpenTelemetryCollectorIsDeployedInOperatorNamespace(
ctx context.Context,
images util.Images,
operatorNamespace string,
monitoringResource *dash0v1alpha1.Dash0Monitoring,
selfMonitoringConfiguration selfmonitoring.SelfMonitoringConfiguration,
trigger BackendConnectionReconcileTrigger,
) error {
logger := log.FromContext(ctx)
if m.resourcesHaveBeenDeletedByOperator.Load() {
if trigger == TriggeredByWatchEvent {
if m.DevelopmentMode {
logger.Info("OpenTelemetry collector resources have already been deleted, ignoring reconciliation request.")
}
return nil
} else if trigger == TriggeredByDash0Resource {
if m.DevelopmentMode {
logger.Info("resetting resourcesHaveBeenDeletedByOperator")
}
m.resourcesHaveBeenDeletedByOperator.Store(false)
}
}
if m.updateInProgress.Load() {
if m.DevelopmentMode {
logger.Info("creation/update of the OpenTelemetry collector resources is already in progress, skipping " +
"additional reconciliation request.")
}
return nil
}

m.updateInProgress.Store(true)
defer func() {
m.updateInProgress.Store(false)
}()

resourcesHaveBeenCreated, resourcesHaveBeenUpdated, err :=
m.OTelColResourceManager.CreateOrUpdateOpenTelemetryCollectorResources(
Expand Down Expand Up @@ -66,6 +102,12 @@ func (m *BackendConnectionManager) RemoveOpenTelemetryCollectorIfNoMonitoringRes
dash0MonitoringResourceToBeDeleted *dash0v1alpha1.Dash0Monitoring,
selfMonitoringConfiguration selfmonitoring.SelfMonitoringConfiguration,
) error {
m.resourcesHaveBeenDeletedByOperator.Store(true)
m.updateInProgress.Store(true)
defer func() {
m.updateInProgress.Store(false)
}()

logger := log.FromContext(ctx)
list := &dash0v1alpha1.Dash0MonitoringList{}
err := m.Client.List(
Expand Down
Loading

0 comments on commit c2c59ba

Please sign in to comment.