From 6f56419d0a80045211bbcea03c51f35734d75b20 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 16 Jan 2025 12:10:33 +0100 Subject: [PATCH 1/5] refactor: improve route reconciler object handling Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/collector_types.go | 5 +- api/telemetry/v1alpha1/common.go | 7 - api/telemetry/v1alpha1/output_types.go | 30 ++ api/telemetry/v1alpha1/subscription_types.go | 28 +- api/telemetry/v1alpha1/tenant_types.go | 5 +- .../telemetry.kube-logging.dev_outputs.yaml | 14 +- .../telemetry.kube-logging.dev_tenants.yaml | 4 +- .../telemetry.kube-logging.dev_outputs.yaml | 14 +- .../telemetry.kube-logging.dev_tenants.yaml | 4 +- go.mod | 2 +- .../telemetry/collector_controller.go | 15 +- .../telemetry/manager/bridge_manager.go | 103 +++++ .../controller/telemetry/manager/manager.go | 46 ++ .../manager/tenant_resource_manager.go | 241 ++++++++++ .../telemetry/resources/state/state.go | 22 + .../controller/telemetry/resources/types.go | 42 ++ .../controller/telemetry/route_controller.go | 427 +++++------------- internal/controller/telemetry/utils/utils.go | 52 +++ 18 files changed, 733 insertions(+), 328 deletions(-) create mode 100644 internal/controller/telemetry/manager/bridge_manager.go create mode 100644 internal/controller/telemetry/manager/manager.go create mode 100644 internal/controller/telemetry/manager/tenant_resource_manager.go create mode 100644 internal/controller/telemetry/resources/state/state.go create mode 100644 internal/controller/telemetry/resources/types.go diff --git a/api/telemetry/v1alpha1/collector_types.go b/api/telemetry/v1alpha1/collector_types.go index 76ac78d1..e04e6e2c 100644 --- a/api/telemetry/v1alpha1/collector_types.go +++ b/api/telemetry/v1alpha1/collector_types.go @@ -17,6 +17,7 @@ package v1alpha1 import ( "time" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -93,8 +94,8 @@ func (c CollectorSpec) GetMemoryLimit() *resource.Quantity { // CollectorStatus defines the observed state of Collector type CollectorStatus struct { - Tenants []string `json:"tenants,omitempty"` - State State `json:"state,omitempty"` + Tenants []string `json:"tenants,omitempty"` + State state.State `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/telemetry/v1alpha1/common.go b/api/telemetry/v1alpha1/common.go index 2f902624..24b27c65 100644 --- a/api/telemetry/v1alpha1/common.go +++ b/api/telemetry/v1alpha1/common.go @@ -14,13 +14,6 @@ package v1alpha1 -type State string - -const ( - StateReady State = "ready" - StateFailed State = "failed" -) - type NamespacedName struct { Namespace string `json:"namespace"` Name string `json:"name"` diff --git a/api/telemetry/v1alpha1/output_types.go b/api/telemetry/v1alpha1/output_types.go index ccbf0520..e209054e 100644 --- a/api/telemetry/v1alpha1/output_types.go +++ b/api/telemetry/v1alpha1/output_types.go @@ -15,6 +15,8 @@ package v1alpha1 import ( + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -155,10 +157,30 @@ type Fluentforward struct { // OutputStatus defines the observed state of Output type OutputStatus struct { + Tenant string `json:"tenant,omitempty"` + State state.State `json:"state,omitempty"` +} + +func (o *Output) GetTenant() string { + return o.Status.Tenant +} + +func (o *Output) SetTenant(tenant string) { + o.Status.Tenant = tenant +} + +func (o *Output) GetState() state.State { + return o.Status.State +} + +func (o *Output) SetState(state state.State) { + o.Status.State = state } // +kubebuilder:object:root=true // +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Tenant",type=string,JSONPath=`.status.tenant` +// +kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.state` // +kubebuilder:resource:categories=telemetry-all // Output is the Schema for the outputs API @@ -179,6 +201,14 @@ type OutputList struct { Items []Output `json:"items"` } +func (l *OutputList) GetItems() []resources.ResourceOwnedByTenant { + items := make([]resources.ResourceOwnedByTenant, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items +} + func init() { SchemeBuilder.Register(&Output{}, &OutputList{}) } diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index 7a2d1144..ea017a50 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -15,6 +15,8 @@ package v1alpha1 import ( + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -35,7 +37,23 @@ type SubscriptionSpec struct { type SubscriptionStatus struct { Tenant string `json:"tenant,omitempty"` Outputs []NamespacedName `json:"outputs,omitempty"` - State State `json:"state,omitempty"` + State state.State `json:"state,omitempty"` +} + +func (s *Subscription) GetTenant() string { + return s.Status.Tenant +} + +func (s *Subscription) SetTenant(tenant string) { + s.Status.Tenant = tenant +} + +func (s *Subscription) GetState() state.State { + return s.Status.State +} + +func (s *Subscription) SetState(state state.State) { + s.Status.State = state } // +kubebuilder:object:root=true @@ -63,6 +81,14 @@ type SubscriptionList struct { Items []Subscription `json:"items"` } +func (l *SubscriptionList) GetItems() []resources.ResourceOwnedByTenant { + items := make([]resources.ResourceOwnedByTenant, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items +} + func init() { SchemeBuilder.Register(&Subscription{}, &SubscriptionList{}) } diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index fe37e22e..7dac319b 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -15,6 +15,7 @@ package v1alpha1 import ( + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -86,7 +87,7 @@ type PersistenceConfig struct { // TenantSpec defines the desired state of Tenant type TenantSpec struct { - // Determines the namespaces from which subscriptions are collected by this tenant. + // Determines the namespaces from which subscriptions and outputs are collected by this tenant. SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"` // Determines the namespaces from which logs are collected by this tenant. @@ -106,7 +107,7 @@ type TenantStatus struct { Subscriptions []NamespacedName `json:"subscriptions,omitempty"` LogSourceNamespaces []string `json:"logSourceNamespaces,omitempty"` ConnectedBridges []string `json:"connectedBridges,omitempty"` - State State `json:"state,omitempty"` + State state.State `json:"state,omitempty"` } //+kubebuilder:object:root=true diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml index bf3c85e6..451b698d 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml @@ -16,7 +16,14 @@ spec: singular: output scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - jsonPath: .status.tenant + name: Tenant + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 schema: openAPIV3Schema: description: Output is the Schema for the outputs API @@ -698,6 +705,11 @@ spec: type: object status: description: OutputStatus defines the observed state of Output + properties: + state: + type: string + tenant: + type: string type: object type: object served: true diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index e41be948..6c7fbc9c 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -159,8 +159,8 @@ spec: Cannot be used together with LogSourceNamespaceSelectors. type: boolean subscriptionNamespaceSelectors: - description: Determines the namespaces from which subscriptions are - collected by this tenant. + description: Determines the namespaces from which subscriptions and + outputs are collected by this tenant. items: description: |- A label selector is a label query over a set of resources. The result of matchLabels and diff --git a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml index bf3c85e6..451b698d 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml @@ -16,7 +16,14 @@ spec: singular: output scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - jsonPath: .status.tenant + name: Tenant + type: string + - jsonPath: .status.state + name: State + type: string + name: v1alpha1 schema: openAPIV3Schema: description: Output is the Schema for the outputs API @@ -698,6 +705,11 @@ spec: type: object status: description: OutputStatus defines the observed state of Output + properties: + state: + type: string + tenant: + type: string type: object type: object served: true diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index e41be948..6c7fbc9c 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -159,8 +159,8 @@ spec: Cannot be used together with LogSourceNamespaceSelectors. type: boolean subscriptionNamespaceSelectors: - description: Determines the namespaces from which subscriptions are - collected by this tenant. + description: Determines the namespaces from which subscriptions and + outputs are collected by this tenant. items: description: |- A label selector is a label query over a set of resources. The result of matchLabels and diff --git a/go.mod b/go.mod index 0a402f57..f57fc590 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.3 require ( emperror.dev/errors v0.8.1 github.com/cisco-open/operator-tools v0.37.0 + github.com/go-logr/logr v1.4.2 github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-multierror v1.1.1 github.com/mitchellh/mapstructure v1.5.0 @@ -50,7 +51,6 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index 36585a6c..c2dd2985 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -43,6 +43,7 @@ import ( "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/otel_conf_gen/validator" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components/extension/storage" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" ) @@ -83,7 +84,7 @@ func (r *CollectorReconciler) buildConfigInputForCollector(ctx context.Context, } for _, tenant := range tenants { - if tenant.Status.State == v1alpha1.StateFailed { + if tenant.Status.State == state.StateFailed { logger.Info(fmt.Sprintf("tenant %q is in failed state, retrying later", tenant.Name)) return otelcolconfgen.OtelColConfigInput{}, ErrTenantFailed } @@ -208,7 +209,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } logger.Error(errors.WithStack(err), "invalid otel config input") - collector.Status.State = v1alpha1.StateFailed + collector.Status.State = state.StateFailed if updateErr := r.updateStatus(ctx, collector); updateErr != nil { logger.Error(errors.WithStack(updateErr), "failed updating collector status") return ctrl.Result{}, errors.Append(err, updateErr) @@ -221,7 +222,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if err := validator.ValidateAssembledConfig(otelConfig); err != nil { logger.Error(errors.WithStack(err), "invalid otel config") - collector.Status.State = v1alpha1.StateFailed + collector.Status.State = state.StateFailed if updateErr := r.updateStatus(ctx, collector); updateErr != nil { logger.Error(errors.WithStack(updateErr), "failed updating collector status") return ctrl.Result{}, errors.Append(err, updateErr) @@ -235,18 +236,18 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, fmt.Errorf("%+v", err) } - otelCollector, state := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name) + otelCollector, collectorState := r.otelCollector(collector, otelConfig, additionalArgs, otelConfigInput.Tenants, saName.Name) if err := ctrl.SetControllerReference(collector, otelCollector, r.Scheme); err != nil { return ctrl.Result{}, err } resourceReconciler := reconciler.NewReconcilerWith(r.Client, reconciler.WithLog(logger)) - _, err = resourceReconciler.ReconcileResource(otelCollector, state) + _, err = resourceReconciler.ReconcileResource(otelCollector, collectorState) if err != nil { logger.Error(errors.WithStack(err), "failed reconciling collector") - collector.Status.State = v1alpha1.StateFailed + collector.Status.State = state.StateFailed if updateErr := r.updateStatus(ctx, collector); updateErr != nil { logger.Error(errors.WithStack(updateErr), "failed updating collector status") return ctrl.Result{}, errors.Append(err, updateErr) @@ -261,7 +262,7 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } collector.Status.Tenants = normalizeStringSlice(tenantNames) - collector.Status.State = v1alpha1.StateReady + collector.Status.State = state.StateReady if !reflect.DeepEqual(originalCollectorStatus, collector.Status) { logger.Info("collector status changed") diff --git a/internal/controller/telemetry/manager/bridge_manager.go b/internal/controller/telemetry/manager/bridge_manager.go new file mode 100644 index 00000000..b4d074b1 --- /dev/null +++ b/internal/controller/telemetry/manager/bridge_manager.go @@ -0,0 +1,103 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + + "emperror.dev/errors" + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" + "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// BridgeManager manages bridge resources +type BridgeManager struct { + BaseManager +} + +func (b *BridgeManager) GetBridgesForTenant(ctx context.Context, tenantName string) (bridgesOwned []v1alpha1.Bridge, err error) { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(resources.BridgeSourceTenantReferenceField, tenantName), + } + sourceBridge, err := b.getBridges(ctx, listOpts) + if err != nil { + return nil, err + } + + listOpts = &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(resources.BridgeTargetTenantReferenceField, tenantName), + } + targetBridge, err := b.getBridges(ctx, listOpts) + if err != nil { + return nil, err + } + + bridges := append(sourceBridge, targetBridge...) + for _, bridge := range bridges { + if bridge.Spec.SourceTenant == tenantName || bridge.Spec.TargetTenant == tenantName { + bridgesOwned = append(bridgesOwned, bridge) + } + } + + return +} + +func (b *BridgeManager) getBridges(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Bridge, error) { + var bridges v1alpha1.BridgeList + if err := b.List(ctx, &bridges, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + return bridges.Items, nil +} + +func (b *BridgeManager) getTenants(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Tenant, error) { + var tenants v1alpha1.TenantList + if err := b.List(ctx, &tenants, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + return tenants.Items, nil +} + +func (b *BridgeManager) CheckBridgeConnection(ctx context.Context, tenantName string, bridge *v1alpha1.Bridge) error { + for _, tenantReference := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} { + if tenantReference != tenantName { + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(resources.TenantNameField, tenantReference), + } + tenant, err := b.getTenants(ctx, listOpts) + if err != nil { + return err + } + if len(tenant) == 0 { + return errors.Errorf("bridge %s has a dangling tenant reference %s", bridge.Name, tenantReference) + } + } + } + + return nil +} + +func GetBridgeNamesFromBridges(bridges []v1alpha1.Bridge) []string { + bridgeNames := make([]string, len(bridges)) + for i, bridge := range bridges { + bridgeNames[i] = bridge.Name + } + + return bridgeNames +} diff --git a/internal/controller/telemetry/manager/manager.go b/internal/controller/telemetry/manager/manager.go new file mode 100644 index 00000000..26dd8b8f --- /dev/null +++ b/internal/controller/telemetry/manager/manager.go @@ -0,0 +1,46 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// BaseManager provides common functionality for all managers +type BaseManager struct { + client.Client + logr.Logger +} + +// NewBaseManager creates a new base manager +func NewBaseManager(client client.Client, logger logr.Logger) BaseManager { + return BaseManager{ + client, + logger, + } +} + +func (b *BaseManager) GetBridgeManager() *BridgeManager { + return &BridgeManager{ + BaseManager: *b, + } +} + +func (b *BaseManager) GetTenantResourceManager() *TenantResourceManager { + return &TenantResourceManager{ + BaseManager: *b, + } +} diff --git a/internal/controller/telemetry/manager/tenant_resource_manager.go b/internal/controller/telemetry/manager/tenant_resource_manager.go new file mode 100644 index 00000000..a62580fb --- /dev/null +++ b/internal/controller/telemetry/manager/tenant_resource_manager.go @@ -0,0 +1,241 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "context" + "fmt" + "reflect" + "slices" + "strings" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" + apiv1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// TenantResourceManager is a manager for resources owned by tenants: Subscriptions and Outputs +type TenantResourceManager struct { + BaseManager +} + +// GetResourceOwnedByTenant returns a list of resources owned by the tenant and a list of resources that need to be updated +func (t *TenantResourceManager) GetResourceOwnedByTenant(ctx context.Context, resource resources.ResourceOwnedByTenant, tenant *v1alpha1.Tenant) (ownedList []resources.ResourceOwnedByTenant, updateList []resources.ResourceOwnedByTenant, err error) { + if resource == nil { + return nil, nil, fmt.Errorf("resource cannot be nil") + } + + namespaces, err := t.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) + if err != nil { + return nil, nil, fmt.Errorf("failed to get namespaces: %w", err) + } + + // Create the appropriate list type based on the resource type + var resourceList resources.ResourceList + switch resource.(type) { + case *v1alpha1.Subscription: + resourceList = &v1alpha1.SubscriptionList{} + case *v1alpha1.Output: + resourceList = &v1alpha1.OutputList{} + default: + return nil, nil, fmt.Errorf("unsupported resource type: %T", resource) + } + + // Collect resources from all namespaces + var allResources []resources.ResourceOwnedByTenant + for _, ns := range namespaces { + listOpts := &client.ListOptions{ + Namespace: ns.Name, + } + + if err := t.List(ctx, resourceList, listOpts); err != nil { + if !apierrors.IsNotFound(err) { + return nil, nil, fmt.Errorf("failed to list resources in namespace %s: %w", ns.Name, err) + } + continue + } + + allResources = append(allResources, resourceList.GetItems()...) + } + + // Categorize resources + for _, res := range allResources { + currentTenant := res.GetTenant() + if currentTenant != "" && currentTenant != tenant.Name { + t.Logger.Error( + fmt.Errorf("resource is owned by another tenant"), + "skipping reconciliation", + "current_tenant", currentTenant, + "desired_tenant", tenant.Name, + "action_required", "remove resource from previous tenant before adopting to new tenant", + ) + continue + } + + if currentTenant == "" { + updateList = append(updateList, res) + } else { + ownedList = append(ownedList, res) + } + } + + return ownedList, updateList, nil +} + +// getNamespacesForSelectorSlice returns a list of namespaces that match the given label selectors +func (r *TenantResourceManager) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { + var namespaces []apiv1.Namespace + for _, ls := range labelSelectors { + selector, err := metav1.LabelSelectorAsSelector(&ls) + if err != nil { + return nil, err + } + + var namespacesForSelector apiv1.NamespaceList + listOpts := &client.ListOptions{ + LabelSelector: selector, + } + if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + namespaces = append(namespaces, namespacesForSelector.Items...) + } + + namespaces = normalizeNamespaceSlice(namespaces) + + return namespaces, nil +} + +// updateResourcesForTenant fails internally and logs failures individually +// this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single resource +func (t *TenantResourceManager) UpdateResourcesForTenant(ctx context.Context, tenantName string, resources []resources.ResourceOwnedByTenant) (updatedResources []resources.ResourceOwnedByTenant) { + for _, res := range resources { + res.SetTenant(tenantName) + t.Logger.Info(fmt.Sprintf("updating resource (%s/%s) -> tenant (%s) reference", res.GetNamespace(), res.GetName(), tenantName)) + + if updateErr := t.Status().Update(ctx, res); updateErr != nil { + res.SetState(state.StateFailed) + t.Logger.Error(updateErr, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", res.GetNamespace(), res.GetName(), tenantName)) + } else { + updatedResources = append(updatedResources, res) + } + + res.SetState(state.StateReady) + } + + return +} + +// GetResourcesReferencingTenantButNotSelected returns a list of resources that are +// referencing the tenant in their status but are not selected by the tenant directly +func (t *TenantResourceManager) GetResourcesReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, resource resources.ResourceOwnedByTenant, selectedResources []resources.ResourceOwnedByTenant) ([]resources.ResourceOwnedByTenant, error) { + // Create the appropriate list type based on the resource type + var resourceList resources.ResourceList + switch resource.(type) { + case *v1alpha1.Subscription: + resourceList = &v1alpha1.SubscriptionList{} + case *v1alpha1.Output: + resourceList = &v1alpha1.OutputList{} + default: + return nil, fmt.Errorf("unsupported resource type: %T", resource) + } + + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(resources.StatusTenantReferenceField, tenant.Name), + } + if err := t.List(ctx, resourceList, listOpts); client.IgnoreNotFound(err) != nil { + t.Logger.Error(err, "failed to list resources that need to be detached from tenant") + return nil, err + } + + var resourcesToDisown []resources.ResourceOwnedByTenant + for _, resourceReferencing := range resourceList.GetItems() { + idx := slices.IndexFunc(selectedResources, func(selected resources.ResourceOwnedByTenant) bool { + return reflect.DeepEqual(resourceReferencing.GetName(), selected.GetName()) && reflect.DeepEqual(resourceReferencing.GetNamespace(), selected.GetNamespace()) + }) + if idx == -1 { + resourcesToDisown = append(resourcesToDisown, resourceReferencing) + } + } + + return resourcesToDisown, nil +} + +// disownResources fails internally by logging errors individually +// this is by design so that we don't fail the whole reconciliation when a single resource cannot be disowned +func (t *TenantResourceManager) DisownResources(ctx context.Context, resourceToDisown []resources.ResourceOwnedByTenant) { + for _, res := range resourceToDisown { + tenantName := res.GetTenant() + res.SetTenant("") + if updateErr := t.Status().Update(ctx, res); updateErr != nil { + res.SetState(state.StateFailed) + t.Logger.Error(updateErr, fmt.Sprintf("failed to detach subscription %s/%s from tenant: %s", res.GetNamespace(), res.GetName(), tenantName)) + } else { + t.Logger.Info(fmt.Sprintf("disowning resource (%s/%s)", res.GetNamespace(), res.GetName())) + } + } +} + +// GetLogsourceNamespaceNamesForTenant returns the namespaces that match the log source namespace selectors of the tenant +func (t *TenantResourceManager) GetLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { + namespaces, err := t.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) + if err != nil { + return nil, err + } + + namespaceNames := make([]string, len(namespaces)) + for i, namespace := range namespaces { + namespaceNames[i] = namespace.Name + } + + return namespaceNames, nil +} + +func GetResourceNamesFromResource(resources []resources.ResourceOwnedByTenant) []v1alpha1.NamespacedName { + resourceNames := make([]v1alpha1.NamespacedName, len(resources)) + for i, resource := range resources { + resourceNames[i] = v1alpha1.NamespacedName{ + Name: resource.GetName(), + Namespace: resource.GetNamespace(), + } + } + + return resourceNames +} + +// normalizeNamespaceSlice removes duplicates from the input list and sorts it +func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { + allKeys := make(map[string]bool) + uniqueList := []apiv1.Namespace{} + for _, item := range inputList { + if _, value := allKeys[item.Name]; !value { + allKeys[item.Name] = true + uniqueList = append(uniqueList, item) + } + } + + cmp := func(a, b apiv1.Namespace) int { + return strings.Compare(a.Name, b.Name) + } + slices.SortFunc(uniqueList, cmp) + + return uniqueList +} diff --git a/internal/controller/telemetry/resources/state/state.go b/internal/controller/telemetry/resources/state/state.go new file mode 100644 index 00000000..fc693297 --- /dev/null +++ b/internal/controller/telemetry/resources/state/state.go @@ -0,0 +1,22 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package state + +type State string + +const ( + StateReady State = "ready" + StateFailed State = "failed" +) diff --git a/internal/controller/telemetry/resources/types.go b/internal/controller/telemetry/resources/types.go new file mode 100644 index 00000000..ba51e25b --- /dev/null +++ b/internal/controller/telemetry/resources/types.go @@ -0,0 +1,42 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resources + +import ( + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + StatusTenantReferenceField = ".status.tenant" + BridgeSourceTenantReferenceField = ".spec.sourceTenant" + BridgeTargetTenantReferenceField = ".spec.targetTenant" + TenantNameField = ".metadata.name" +) + +// ResourceOwnedByTenant is an interface that must be implemented by resources that can be owned by a tenant +type ResourceOwnedByTenant interface { + client.Object + GetTenant() string + SetTenant(tenant string) + GetState() state.State + SetState(state state.State) +} + +// ResourceList is an interface for Kubernetes list types +type ResourceList interface { + client.ObjectList + GetItems() []ResourceOwnedByTenant +} diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index 18bbe179..299815c0 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -20,12 +20,9 @@ import ( "reflect" "slices" "sort" - "strings" "emperror.dev/errors" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -35,14 +32,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/manager" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" -) - -const ( - subscriptionTenantReferenceField = ".status.tenant" - bridgeSourceTenantReferenceField = ".spec.sourceTenant" - bridgeTargetTenantReferenceField = ".spec.targetTenant" - tenantNameField = ".metadata.name" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" + "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" ) // RouteReconciler is responsible for reconciling Tenant resources @@ -63,128 +57,54 @@ type RouteReconciler struct { // +kubebuilder:rbac:groups=opentelemetry.io,resources=opentelemetrycollectors,verbs=get;list;watch;create;update;patch;delete func (r *RouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + baseManager := manager.NewBaseManager(r.Client, log.FromContext(ctx)) tenant := &v1alpha1.Tenant{} - logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name)) + baseManager.Logger.Info(fmt.Sprintf("getting tenant: %q", req.NamespacedName.Name)) if err := r.Get(ctx, req.NamespacedName, tenant); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } originalTenantStatus := tenant.Status - logger.Info(fmt.Sprintf("reconciling tenant: %q", tenant.Name)) - - subscriptionsForTenant, updateList, err := r.getSubscriptionsForTenant(ctx, tenant) - if err != nil { - logger.Error(errors.WithStack(err), "failed to get subscriptions for tenant", "tenant", tenant.Name) - - tenant.Status.State = v1alpha1.StateFailed - if updateErr := r.updateStatus(ctx, tenant); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) - return ctrl.Result{}, errors.Append(err, updateErr) - } - - return ctrl.Result{}, err - } - - // add all newly updated subscriptions here - subscriptionsForTenant = append(subscriptionsForTenant, r.updateSubscriptionsForTenant(ctx, tenant.Name, updateList)...) - subscriptionsToDisown := r.getSubscriptionsReferencingTenantButNotSelected(ctx, tenant, subscriptionsForTenant) - r.disownSubscriptions(ctx, subscriptionsToDisown) - - subscriptionNames := getSubscriptionNamesFromSubscription(subscriptionsForTenant) - components.SortNamespacedNames(subscriptionNames) - tenant.Status.Subscriptions = subscriptionNames - - for _, subscription := range subscriptionsForTenant { - originalSubscriptionStatus := subscription.Status.DeepCopy() - validOutputs := []v1alpha1.NamespacedName{} - for _, outputRef := range subscription.Spec.Outputs { - checkedOutput := &v1alpha1.Output{} - if err := r.Client.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil { - logger.Error(err, "referred output invalid", "output", outputRef.String()) - } else { - validOutputs = append(validOutputs, outputRef) - } - - } - if len(validOutputs) == 0 { - subscription.Status.State = v1alpha1.StateFailed - logger.Error(errors.WithStack(errors.New("no valid outputs for subscription")), "no valid outputs for subscription", "subscription", subscription.NamespacedName().String()) - } else { - subscription.Status.State = v1alpha1.StateReady - } - subscription.Status.Outputs = validOutputs + baseManager.Logger.Info(fmt.Sprintf("reconciling tenant: %q", tenant.Name)) - if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { - if updateErr := r.updateStatus(ctx, &subscription); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating subscription status", "subscription", subscription.NamespacedName().String()) - return ctrl.Result{}, errors.Append(err, updateErr) - } - } - } - - bridgesForTenant, err := r.getBridgesForTenant(ctx, tenant.Name) - if err != nil { - tenant.Status.State = v1alpha1.StateFailed - logger.Error(errors.WithStack(err), "failed to get bridges for tenant", "tenant", tenant.Name) + if err := handleOwnedResources(ctx, baseManager.GetTenantResourceManager(), tenant); err != nil { + tenant.Status.State = state.StateFailed + baseManager.Logger.Error(errors.WithStack(err), "failed to handle resources owned by tenant", "tenant", tenant.Name) if updateErr := r.updateStatus(ctx, tenant); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + baseManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) return ctrl.Result{}, errors.Append(err, updateErr) } - - return ctrl.Result{}, err - } - bridgesForTenantNames := getBridgeNamesFromBridges(bridgesForTenant) - sort.Strings(bridgesForTenantNames) - tenant.Status.ConnectedBridges = bridgesForTenantNames - - for _, bridge := range bridgesForTenant { - if err := r.checkBridgeConnection(ctx, tenant.Name, &bridge); err != nil { - tenant.Status.State = v1alpha1.StateFailed - logger.Error(errors.WithStack(err), "failed to check bridge connection", "bridge", bridge.Name) - if updateErr := r.updateStatus(ctx, tenant); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) - return ctrl.Result{}, errors.Append(err, updateErr) - } - - return ctrl.Result{}, err - } } - logsourceNamespacesForTenant, err := r.getLogsourceNamespaceNamesForTenant(ctx, tenant) - if err != nil { - tenant.Status.State = v1alpha1.StateFailed - logger.Error(errors.WithStack(err), "failed to get logsource namespaces for tenant", "tenant", tenant.Name) + if err := handleBridgeResources(ctx, baseManager.GetBridgeManager(), tenant); err != nil { + tenant.Status.State = state.StateFailed + baseManager.Logger.Error(errors.WithStack(err), "failed to handle bridge resources", "tenant", tenant.Name) if updateErr := r.updateStatus(ctx, tenant); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + baseManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) return ctrl.Result{}, errors.Append(err, updateErr) } - - return ctrl.Result{}, err } - slices.Sort(logsourceNamespacesForTenant) - tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant - tenant.Status.State = v1alpha1.StateReady + tenant.Status.State = state.StateReady if !reflect.DeepEqual(originalTenantStatus, tenant.Status) { - logger.Info("tenant status changed") + baseManager.Logger.Info("tenant status changed") if updateErr := r.updateStatus(ctx, tenant); updateErr != nil { - logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) - return ctrl.Result{}, errors.Append(err, updateErr) + baseManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return ctrl.Result{}, updateErr } return ctrl.Result{}, nil } - logger.Info("tenant reconciliation complete", "tenant", tenant.Name) + baseManager.Logger.Info("tenant reconciliation complete", "tenant", tenant.Name) return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, subscriptionTenantReferenceField, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Subscription{}, resources.StatusTenantReferenceField, func(rawObj client.Object) []string { subscription := rawObj.(*v1alpha1.Subscription) if subscription.Status.Tenant == "" { return nil @@ -194,7 +114,17 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { }); err != nil { return err } - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, bridgeSourceTenantReferenceField, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Output{}, resources.StatusTenantReferenceField, func(rawObj client.Object) []string { + output := rawObj.(*v1alpha1.Output) + if output.Status.Tenant == "" { + return nil + } + + return []string{output.Status.Tenant} + }); err != nil { + return err + } + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, resources.BridgeSourceTenantReferenceField, func(rawObj client.Object) []string { bridge := rawObj.(*v1alpha1.Bridge) if bridge.Spec.SourceTenant == "" { return nil @@ -204,7 +134,7 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { }); err != nil { return err } - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, bridgeTargetTenantReferenceField, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Bridge{}, resources.BridgeTargetTenantReferenceField, func(rawObj client.Object) []string { bridge := rawObj.(*v1alpha1.Bridge) if bridge.Spec.TargetTenant == "" { return nil @@ -214,7 +144,7 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { }); err != nil { return err } - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Tenant{}, tenantNameField, func(rawObj client.Object) []string { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1alpha1.Tenant{}, resources.TenantNameField, func(rawObj client.Object) []string { tenant := rawObj.(*v1alpha1.Tenant) return []string{tenant.Name} }); err != nil { @@ -296,200 +226,100 @@ func (r *RouteReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *RouteReconciler) getSubscriptionsForTenant(ctx context.Context, tenant *v1alpha1.Tenant) (ownedList []v1alpha1.Subscription, updateList []v1alpha1.Subscription, err error) { - logger := log.FromContext(ctx) +func (r *RouteReconciler) updateStatus(ctx context.Context, obj client.Object) error { + return r.Status().Update(ctx, obj) +} - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tenant.Spec.SubscriptionNamespaceSelectors) +func handleOwnedResources(ctx context.Context, tenantResManager *manager.TenantResourceManager, tenant *v1alpha1.Tenant) error { + logsourceNamespacesForTenant, err := tenantResManager.GetLogsourceNamespaceNamesForTenant(ctx, tenant) if err != nil { - return nil, nil, err - } - - var selectedSubscriptions []v1alpha1.Subscription - for _, ns := range namespaces { - var subscriptionsForNS v1alpha1.SubscriptionList - listOpts := &client.ListOptions{ - Namespace: ns.Name, - } - if err := r.List(ctx, &subscriptionsForNS, listOpts); client.IgnoreNotFound(err) != nil { - return nil, nil, err - } - - selectedSubscriptions = append(selectedSubscriptions, subscriptionsForNS.Items...) - } - - for _, subscription := range selectedSubscriptions { - if subscription.Status.Tenant != "" && subscription.Status.Tenant != tenant.Name { - logger.Error(errors.Errorf("subscription (%s) is owned by another tenant (%s), skipping reconciliation for this tenant (%s)", subscription.Name, subscription.Status.Tenant, tenant.Name), - "make sure to remove subscription from the previous tenant before adopting to new tenant") - continue + tenant.Status.State = state.StateFailed + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get logsource namespaces for tenant", "tenant", tenant.Name) + if updateErr := tenantResManager.Status().Update(ctx, tenant); updateErr != nil { + tenantResManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return errors.Append(err, updateErr) } - if subscription.Status.Tenant == "" { - updateList = append(updateList, subscription) - } else { - ownedList = append(ownedList, subscription) - } + return err } + slices.Sort(logsourceNamespacesForTenant) + tenant.Status.LogSourceNamespaces = logsourceNamespacesForTenant - return -} - -func (r *RouteReconciler) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { - var namespaces []apiv1.Namespace - for _, ls := range labelSelectors { - selector, err := metav1.LabelSelectorAsSelector(&ls) - if err != nil { - return nil, err - } + subscriptionsForTenant, subscriptionUpdateList, err := tenantResManager.GetResourceOwnedByTenant(ctx, &v1alpha1.Subscription{}, tenant) + if err != nil { + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get subscriptions for tenant", "tenant", tenant.Name) - var namespacesForSelector apiv1.NamespaceList - listOpts := &client.ListOptions{ - LabelSelector: selector, - } - if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err + tenant.Status.State = state.StateFailed + if updateErr := tenantResManager.Status().Update(ctx, tenant); updateErr != nil { + tenantResManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return errors.Append(err, updateErr) } - namespaces = append(namespaces, namespacesForSelector.Items...) + return err } - namespaces = normalizeNamespaceSlice(namespaces) - - return namespaces, nil -} - -// disownSubscriptions fails internally by logging errors individually -// this is by design so that we don't fail the whole reconciliation when a single subscription update fails -func (r *RouteReconciler) disownSubscriptions(ctx context.Context, subscriptionsToDisown []v1alpha1.Subscription) { - logger := log.FromContext(ctx) - - for _, subscription := range subscriptionsToDisown { - subscription.Status.Tenant = "" - if updateErr := r.Status().Update(ctx, &subscription); updateErr != nil { - subscription.Status.State = v1alpha1.StateFailed - logger.Error(updateErr, fmt.Sprintf("failed to detach subscription %s/%s from collector", subscription.Namespace, subscription.Name)) - } else { - logger.Info("disowning subscription", "subscription", fmt.Sprintf("%s/%s", subscription.Namespace, subscription.Name)) - } + // add all newly updated subscriptions here + subscriptionsForTenant = append(subscriptionsForTenant, tenantResManager.UpdateResourcesForTenant(ctx, tenant.Name, subscriptionUpdateList)...) + subscriptionsToDisown, err := tenantResManager.GetResourcesReferencingTenantButNotSelected(ctx, tenant, &v1alpha1.Subscription{}, subscriptionsForTenant) + if err != nil { + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get subscriptions to disown", "tenant", tenant.Name) } -} + tenantResManager.DisownResources(ctx, subscriptionsToDisown) -// updateSubscriptionsForTenant fails internally and logs failures individually -// this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single subscription -func (r *RouteReconciler) updateSubscriptionsForTenant(ctx context.Context, tenantName string, subscriptions []v1alpha1.Subscription) (updatedSubscriptions []v1alpha1.Subscription) { - logger := log.FromContext(ctx, "tenant", tenantName) + subscriptionNames := manager.GetResourceNamesFromResource(subscriptionsForTenant) + components.SortNamespacedNames(subscriptionNames) + tenant.Status.Subscriptions = subscriptionNames - for _, subscription := range subscriptions { - subscription.Status.Tenant = tenantName - logger.Info("updating subscription status for tenant ownership") + // Check outputs for tenant + outputsForTenant, outputUpdateList, err := tenantResManager.GetResourceOwnedByTenant(ctx, &v1alpha1.Output{}, tenant) + if err != nil { + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get outputs for tenant", "tenant", tenant.Name) - if updateErr := r.Status().Update(ctx, &subscription); updateErr != nil { - subscription.Status.State = v1alpha1.StateFailed - logger.Error(updateErr, fmt.Sprintf("failed to set subscription (%s/%s) -> tenant (%s) reference", subscription.Namespace, subscription.Name, tenantName)) - } else { - updatedSubscriptions = append(updatedSubscriptions, subscription) + tenant.Status.State = state.StateFailed + if updateErr := tenantResManager.Status().Update(ctx, tenant); updateErr != nil { + tenantResManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return errors.Append(err, updateErr) } - } - - return -} - -func (r *RouteReconciler) getSubscriptionsReferencingTenantButNotSelected(ctx context.Context, tenant *v1alpha1.Tenant, selectedSubscriptions []v1alpha1.Subscription) []v1alpha1.Subscription { - logger := log.FromContext(ctx) - - var subscriptionsReferencing v1alpha1.SubscriptionList - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(subscriptionTenantReferenceField, tenant.Name), - } - if err := r.Client.List(ctx, &subscriptionsReferencing, listOpts); client.IgnoreNotFound(err) != nil { - logger.Error(err, "failed to list subscriptions that need to be detached from tenant") - return nil - } - var subscriptionsToDisown []v1alpha1.Subscription - for _, subscriptionReferencing := range subscriptionsReferencing.Items { - idx := slices.IndexFunc(selectedSubscriptions, func(selected v1alpha1.Subscription) bool { - return reflect.DeepEqual(subscriptionReferencing.NamespacedName(), selected.NamespacedName()) - }) - if idx == -1 { - subscriptionsToDisown = append(subscriptionsToDisown, subscriptionReferencing) - } + return err } - return subscriptionsToDisown -} - -func (r *RouteReconciler) getLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { - namespaces, err := r.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) + // add all newly updated outputs here + outputsForTenant = append(outputsForTenant, tenantResManager.UpdateResourcesForTenant(ctx, tenant.Name, outputUpdateList)...) + outputsToDisown, err := tenantResManager.GetResourcesReferencingTenantButNotSelected(ctx, tenant, &v1alpha1.Output{}, outputsForTenant) if err != nil { - return nil, err - } - - namespaceNames := make([]string, len(namespaces)) - for i, namespace := range namespaces { - namespaceNames[i] = namespace.Name + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get outputs to disown", "tenant", tenant.Name) } + tenantResManager.DisownResources(ctx, outputsToDisown) - return namespaceNames, nil -} - -func (r *RouteReconciler) getBridges(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Bridge, error) { - var bridges v1alpha1.BridgeList - if err := r.Client.List(ctx, &bridges, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - return bridges.Items, nil -} - -func (r *RouteReconciler) getBridgesForTenant(ctx context.Context, tenantName string) (bridgesOwned []v1alpha1.Bridge, err error) { - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(bridgeSourceTenantReferenceField, tenantName), - } - sourceBridge, err := r.getBridges(ctx, listOpts) - if err != nil { - return nil, err - } + // Check outputs for subscriptions + realSubscriptionsForTenant, err := utils.GetConcreteTypeFromList[*v1alpha1.Subscription](utils.ToObject(subscriptionsForTenant)) + for _, subscription := range realSubscriptionsForTenant { + originalSubscriptionStatus := subscription.Status.DeepCopy() + validOutputs := []v1alpha1.NamespacedName{} + for _, outputRef := range subscription.Spec.Outputs { + checkedOutput := &v1alpha1.Output{} + if err := tenantResManager.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil { + tenantResManager.Logger.Error(err, "referred output invalid", "output", outputRef.String()) + } else { + validOutputs = append(validOutputs, outputRef) + } - listOpts = &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(bridgeTargetTenantReferenceField, tenantName), - } - targetBridge, err := r.getBridges(ctx, listOpts) - if err != nil { - return nil, err - } + // FIXME: put a check here - bridges := append(sourceBridge, targetBridge...) - for _, bridge := range bridges { - if bridge.Spec.SourceTenant == tenantName || bridge.Spec.TargetTenant == tenantName { - bridgesOwned = append(bridgesOwned, bridge) } - } - - return -} - -func (r *RouteReconciler) getTenants(ctx context.Context, listOpts *client.ListOptions) ([]v1alpha1.Tenant, error) { - var tenants v1alpha1.TenantList - if err := r.Client.List(ctx, &tenants, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - return tenants.Items, nil -} + if len(validOutputs) == 0 { + subscription.Status.State = state.StateFailed + tenantResManager.Logger.Error(errors.WithStack(errors.New("no valid outputs for subscription")), "no valid outputs for subscription", "subscription", subscription.NamespacedName().String()) + } else { + subscription.Status.State = state.StateReady + } + subscription.Status.Outputs = validOutputs -func (r *RouteReconciler) checkBridgeConnection(ctx context.Context, tenantName string, bridge *v1alpha1.Bridge) error { - for _, tenantReference := range []string{bridge.Spec.SourceTenant, bridge.Spec.TargetTenant} { - if tenantReference != tenantName { - listOpts := &client.ListOptions{ - FieldSelector: fields.OneTermEqualSelector(tenantNameField, tenantReference), - } - tenant, err := r.getTenants(ctx, listOpts) - if err != nil { - return err - } - if len(tenant) == 0 { - return errors.Errorf("bridge %s has a dangling tenant reference %s", bridge.Name, tenantReference) + if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { + if updateErr := tenantResManager.Status().Update(ctx, subscription); updateErr != nil { + tenantResManager.Logger.Error(errors.WithStack(updateErr), "failed updating subscription status", "subscription", subscription.NamespacedName().String()) + return errors.Append(err, updateErr) } } } @@ -497,42 +327,35 @@ func (r *RouteReconciler) checkBridgeConnection(ctx context.Context, tenantName return nil } -func (r *RouteReconciler) updateStatus(ctx context.Context, obj client.Object) error { - return r.Client.Status().Update(ctx, obj) -} - -func normalizeNamespaceSlice(inputList []apiv1.Namespace) []apiv1.Namespace { - allKeys := make(map[string]bool) - uniqueList := []apiv1.Namespace{} - for _, item := range inputList { - if _, value := allKeys[item.Name]; !value { - allKeys[item.Name] = true - uniqueList = append(uniqueList, item) +func handleBridgeResources(ctx context.Context, bridgeManager *manager.BridgeManager, tenant *v1alpha1.Tenant) error { + bridgesForTenant, err := bridgeManager.GetBridgesForTenant(ctx, tenant.Name) + if err != nil { + tenant.Status.State = state.StateFailed + bridgeManager.Logger.Error(errors.WithStack(err), "failed to get bridges for tenant", "tenant", tenant.Name) + if updateErr := bridgeManager.Status().Update(ctx, tenant); updateErr != nil { + bridgeManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return errors.Append(err, updateErr) } - } - cmp := func(a, b apiv1.Namespace) int { - return strings.Compare(a.Name, b.Name) + return err } - slices.SortFunc(uniqueList, cmp) - return uniqueList -} - -func getSubscriptionNamesFromSubscription(subscriptions []v1alpha1.Subscription) []v1alpha1.NamespacedName { - subscriptionNames := make([]v1alpha1.NamespacedName, len(subscriptions)) - for i, subscription := range subscriptions { - subscriptionNames[i] = subscription.NamespacedName() - } + bridgesForTenantNames := manager.GetBridgeNamesFromBridges(bridgesForTenant) + sort.Strings(bridgesForTenantNames) + tenant.Status.ConnectedBridges = bridgesForTenantNames - return subscriptionNames -} + for _, bridge := range bridgesForTenant { + if err := bridgeManager.CheckBridgeConnection(ctx, tenant.Name, &bridge); err != nil { + tenant.Status.State = state.StateFailed + bridgeManager.Logger.Error(errors.WithStack(err), "failed to check bridge connection", "bridge", bridge.Name) + if updateErr := bridgeManager.Status().Update(ctx, tenant); updateErr != nil { + bridgeManager.Logger.Error(errors.WithStack(updateErr), "failed updating tenant status", "tenant", tenant.Name) + return errors.Append(err, updateErr) + } -func getBridgeNamesFromBridges(bridges []v1alpha1.Bridge) []string { - bridgeNames := make([]string, len(bridges)) - for i, bridge := range bridges { - bridgeNames[i] = bridge.Name + return err + } } - return bridgeNames + return nil } diff --git a/internal/controller/telemetry/utils/utils.go b/internal/controller/telemetry/utils/utils.go index 634af24e..ca6d2603 100644 --- a/internal/controller/telemetry/utils/utils.go +++ b/internal/controller/telemetry/utils/utils.go @@ -14,6 +14,12 @@ package utils +import ( + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + func ToPtr[T any](v T) *T { return &v } @@ -21,3 +27,49 @@ func ToPtr[T any](v T) *T { func ToValue[T any](v *T) T { return *v } + +// GetConcreteTypeFromList converts a slice of client.Object to a slice of concrete types. +// Returns an error if any conversion fails. +// Example usage: +// +// subscriptions, err := GetConcreteTypeFromList[*v1alpha1.Subscription](objectList) +// outputs, err := GetConcreteTypeFromList[*v1alpha1.Output](objectList) +func GetConcreteTypeFromList[T client.Object](objects []client.Object) ([]T, error) { + result := make([]T, 0, len(objects)) + for i, obj := range objects { + if concrete, ok := GetConcreteType[T](obj); !ok { + return nil, fmt.Errorf("failed to convert object at index %d to type %T", i, *new(T)) + } else { + result = append(result, concrete) + } + } + return result, nil +} + +// GetConcreteType returns the concrete type T from a client.Object if possible. +// Returns the concrete type and true if successful, zero value and false if not. +// Example usage: +// +// if sub, ok := GetConcreteType[*v1alpha1.Subscription](obj); ok { +// // Use sub.Status, sub.Spec etc +// } +// if out, ok := GetConcreteType[*v1alpha1.Output](obj); ok { +// // Use out.Status, out.Spec etc +// } +func GetConcreteType[T client.Object](obj client.Object) (T, bool) { + var zero T + concrete, ok := obj.(T) + if !ok { + return zero, false + } + return concrete, true +} + +// ToObject converts a slice of concrete types to a slice of client.Object. +func ToObject[T client.Object](items []T) []client.Object { + objects := make([]client.Object, len(items)) + for i, item := range items { + objects[i] = item + } + return objects +} From 47a34e984d984b9045dbc36b4f26dfe82b20cfb8 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 16 Jan 2025 15:58:13 +0100 Subject: [PATCH 2/5] feat: restrict output visibility to subs namespace Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/subscription_types.go | 1 + ...emetry.kube-logging.dev_subscriptions.yaml | 5 +- ...emetry.kube-logging.dev_subscriptions.yaml | 5 +- .../manager/tenant_resource_manager.go | 106 ++++++++++++------ .../controller/telemetry/route_controller.go | 27 +---- 5 files changed, 85 insertions(+), 59 deletions(-) diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index ea017a50..809e5472 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -30,6 +30,7 @@ type SubscriptionSpec struct { // +kubebuilder:validation:Required // The outputs to which the logs will be routed if the condition evaluates to true. + // Outputs must be in the same namespace as the subscription to be valid. Outputs []NamespacedName `json:"outputs"` } diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml index be8f04e7..0eae775c 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml @@ -56,8 +56,9 @@ spec: telemetry to the outputs. type: string outputs: - description: The outputs to which the logs will be routed if the condition - evaluates to true. + description: |- + The outputs to which the logs will be routed if the condition evaluates to true. + Outputs must be in the same namespace as the subscription to be valid. items: properties: name: diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index be8f04e7..0eae775c 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -56,8 +56,9 @@ spec: telemetry to the outputs. type: string outputs: - description: The outputs to which the logs will be routed if the condition - evaluates to true. + description: |- + The outputs to which the logs will be routed if the condition evaluates to true. + Outputs must be in the same namespace as the subscription to be valid. items: properties: name: diff --git a/internal/controller/telemetry/manager/tenant_resource_manager.go b/internal/controller/telemetry/manager/tenant_resource_manager.go index a62580fb..a25b05e3 100644 --- a/internal/controller/telemetry/manager/tenant_resource_manager.go +++ b/internal/controller/telemetry/manager/tenant_resource_manager.go @@ -21,6 +21,7 @@ import ( "slices" "strings" + "emperror.dev/errors" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/resources/state" @@ -28,6 +29,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -36,6 +38,21 @@ type TenantResourceManager struct { BaseManager } +// GetLogsourceNamespaceNamesForTenant returns the namespaces that match the log source namespace selectors of the tenant +func (t *TenantResourceManager) GetLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { + namespaces, err := t.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) + if err != nil { + return nil, err + } + + namespaceNames := make([]string, len(namespaces)) + for i, namespace := range namespaces { + namespaceNames[i] = namespace.Name + } + + return namespaceNames, nil +} + // GetResourceOwnedByTenant returns a list of resources owned by the tenant and a list of resources that need to be updated func (t *TenantResourceManager) GetResourceOwnedByTenant(ctx context.Context, resource resources.ResourceOwnedByTenant, tenant *v1alpha1.Tenant) (ownedList []resources.ResourceOwnedByTenant, updateList []resources.ResourceOwnedByTenant, err error) { if resource == nil { @@ -99,31 +116,6 @@ func (t *TenantResourceManager) GetResourceOwnedByTenant(ctx context.Context, re return ownedList, updateList, nil } -// getNamespacesForSelectorSlice returns a list of namespaces that match the given label selectors -func (r *TenantResourceManager) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { - var namespaces []apiv1.Namespace - for _, ls := range labelSelectors { - selector, err := metav1.LabelSelectorAsSelector(&ls) - if err != nil { - return nil, err - } - - var namespacesForSelector apiv1.NamespaceList - listOpts := &client.ListOptions{ - LabelSelector: selector, - } - if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { - return nil, err - } - - namespaces = append(namespaces, namespacesForSelector.Items...) - } - - namespaces = normalizeNamespaceSlice(namespaces) - - return namespaces, nil -} - // updateResourcesForTenant fails internally and logs failures individually // this is by design in order to avoid blocking the whole reconciliation in case we cannot update a single resource func (t *TenantResourceManager) UpdateResourcesForTenant(ctx context.Context, tenantName string, resources []resources.ResourceOwnedByTenant) (updatedResources []resources.ResourceOwnedByTenant) { @@ -194,19 +186,65 @@ func (t *TenantResourceManager) DisownResources(ctx context.Context, resourceToD } } -// GetLogsourceNamespaceNamesForTenant returns the namespaces that match the log source namespace selectors of the tenant -func (t *TenantResourceManager) GetLogsourceNamespaceNamesForTenant(ctx context.Context, tentant *v1alpha1.Tenant) ([]string, error) { - namespaces, err := t.getNamespacesForSelectorSlice(ctx, tentant.Spec.LogSourceNamespaceSelectors) - if err != nil { - return nil, err +// ValidateSubscriptionOutputs validates the output references of a subscription +func (t *TenantResourceManager) ValidateSubscriptionOutputs(ctx context.Context, subscription *v1alpha1.Subscription) []v1alpha1.NamespacedName { + validOutputs := []v1alpha1.NamespacedName{} + invalidOutputs := []v1alpha1.NamespacedName{} + + for _, outputRef := range subscription.Spec.Outputs { + checkedOutput := &v1alpha1.Output{} + if err := t.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil { + t.Logger.Error(err, "referred output invalid", "output", outputRef.String()) + + invalidOutputs = append(invalidOutputs, outputRef) + continue + } + + // Ensure the output belongs to the same tenant + if checkedOutput.Status.Tenant != subscription.Status.Tenant { + t.Logger.Error(errors.New("output and subscription tenants mismatch"), + "output", checkedOutput.NamespacedName().String(), + "output's tenant", checkedOutput.Status.Tenant, + "subscription", subscription.NamespacedName().String(), + "subscription's tenant", subscription.Status.Tenant) + + invalidOutputs = append(invalidOutputs, outputRef) + continue + } + + validOutputs = append(validOutputs, outputRef) } - namespaceNames := make([]string, len(namespaces)) - for i, namespace := range namespaces { - namespaceNames[i] = namespace.Name + if len(invalidOutputs) > 0 { + t.Logger.Error(errors.New("some outputs are invalid"), "invalidOutputs", invalidOutputs, "subscription", subscription.NamespacedName().String()) } - return namespaceNames, nil + return validOutputs +} + +// getNamespacesForSelectorSlice returns a list of namespaces that match the given label selectors +func (r *TenantResourceManager) getNamespacesForSelectorSlice(ctx context.Context, labelSelectors []metav1.LabelSelector) ([]apiv1.Namespace, error) { + var namespaces []apiv1.Namespace + for _, ls := range labelSelectors { + selector, err := metav1.LabelSelectorAsSelector(&ls) + if err != nil { + return nil, err + } + + var namespacesForSelector apiv1.NamespaceList + listOpts := &client.ListOptions{ + LabelSelector: selector, + } + if err := r.List(ctx, &namespacesForSelector, listOpts); client.IgnoreNotFound(err) != nil { + return nil, err + } + + namespaces = append(namespaces, namespacesForSelector.Items...) + } + + namespaces = normalizeNamespaceSlice(namespaces) + + return namespaces, nil } func GetResourceNamesFromResource(resources []resources.ResourceOwnedByTenant) []v1alpha1.NamespacedName { diff --git a/internal/controller/telemetry/route_controller.go b/internal/controller/telemetry/route_controller.go index 299815c0..a14ddc84 100644 --- a/internal/controller/telemetry/route_controller.go +++ b/internal/controller/telemetry/route_controller.go @@ -294,32 +294,17 @@ func handleOwnedResources(ctx context.Context, tenantResManager *manager.TenantR // Check outputs for subscriptions realSubscriptionsForTenant, err := utils.GetConcreteTypeFromList[*v1alpha1.Subscription](utils.ToObject(subscriptionsForTenant)) + if err != nil { + tenantResManager.Logger.Error(errors.WithStack(err), "failed to get concrete type from list", "tenant", tenant.Name) + } + for _, subscription := range realSubscriptionsForTenant { originalSubscriptionStatus := subscription.Status.DeepCopy() - validOutputs := []v1alpha1.NamespacedName{} - for _, outputRef := range subscription.Spec.Outputs { - checkedOutput := &v1alpha1.Output{} - if err := tenantResManager.Get(ctx, types.NamespacedName(outputRef), checkedOutput); err != nil { - tenantResManager.Logger.Error(err, "referred output invalid", "output", outputRef.String()) - } else { - validOutputs = append(validOutputs, outputRef) - } - - // FIXME: put a check here - - } - if len(validOutputs) == 0 { - subscription.Status.State = state.StateFailed - tenantResManager.Logger.Error(errors.WithStack(errors.New("no valid outputs for subscription")), "no valid outputs for subscription", "subscription", subscription.NamespacedName().String()) - } else { - subscription.Status.State = state.StateReady - } - subscription.Status.Outputs = validOutputs - + subscription.Status.Outputs = tenantResManager.ValidateSubscriptionOutputs(ctx, subscription) if !reflect.DeepEqual(originalSubscriptionStatus, subscription.Status) { if updateErr := tenantResManager.Status().Update(ctx, subscription); updateErr != nil { tenantResManager.Logger.Error(errors.WithStack(updateErr), "failed updating subscription status", "subscription", subscription.NamespacedName().String()) - return errors.Append(err, updateErr) + return updateErr } } } From 6141b884f4716c83deda0a9c6fb25e8624aaaaad Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 16 Jan 2025 15:58:27 +0100 Subject: [PATCH 3/5] chore: adjust tests Signed-off-by: Bence Csati --- e2e/testdata/filestorage/one_tenant_one_subscription.yaml | 4 ++-- .../one_tenant_two_subscriptions.yaml | 8 ++++---- .../tenants_with_bridges/tenants_with_bridges.yaml | 8 ++++---- .../controller/telemetry/controller_integration_test.go | 8 ++++---- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/e2e/testdata/filestorage/one_tenant_one_subscription.yaml b/e2e/testdata/filestorage/one_tenant_one_subscription.yaml index 7b4cfe0a..12ef5462 100644 --- a/e2e/testdata/filestorage/one_tenant_one_subscription.yaml +++ b/e2e/testdata/filestorage/one_tenant_one_subscription.yaml @@ -45,13 +45,13 @@ spec: condition: "true" outputs: - name: otlp-test-output - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 diff --git a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml index de3546a5..47012bf2 100644 --- a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml +++ b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml @@ -43,7 +43,7 @@ spec: condition: "true" outputs: - name: otlp-test-output - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Subscription @@ -54,13 +54,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 @@ -71,7 +71,7 @@ apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml index ee74c920..b1a0125b 100644 --- a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -82,13 +82,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-database - namespace: collector + namespace: database --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-database - namespace: collector + namespace: database spec: batch: send_batch_size: 8192 @@ -128,13 +128,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-web - namespace: collector + namespace: web --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-web - namespace: collector + namespace: web spec: batch: send_batch_size: 8192 diff --git a/internal/controller/telemetry/controller_integration_test.go b/internal/controller/telemetry/controller_integration_test.go index 18dc03e9..c52755d8 100644 --- a/internal/controller/telemetry/controller_integration_test.go +++ b/internal/controller/telemetry/controller_integration_test.go @@ -81,7 +81,7 @@ var _ = Describe("Telemetry controller integration test", func() { Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-1", - Namespace: "collector", + Namespace: "tenant-1-ctrl", }, }, }, @@ -96,7 +96,7 @@ var _ = Describe("Telemetry controller integration test", func() { Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", - Namespace: "collector", + Namespace: "tenant-2-all", }, }, }, @@ -173,7 +173,7 @@ var _ = Describe("Telemetry controller integration test", func() { { ObjectMeta: metav1.ObjectMeta{ Name: "otlp-test-output-1", - Namespace: "collector", + Namespace: "tenant-1-ctrl", }, Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ @@ -189,7 +189,7 @@ var _ = Describe("Telemetry controller integration test", func() { { ObjectMeta: metav1.ObjectMeta{ Name: "otlp-test-output-2", - Namespace: "collector", + Namespace: "tenant-2-all", }, Spec: v1alpha1.OutputSpec{ OTLPGRPC: &v1alpha1.OTLPGRPC{ From 34b805057246a976adf920d2670b20cdd892ba0f Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 16 Jan 2025 16:10:53 +0100 Subject: [PATCH 4/5] chore: adjust docs Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/output_types.go | 2 +- .../telemetry.kube-logging.dev_outputs.yaml | 1 - .../telemetry.kube-logging.dev_outputs.yaml | 1 - docs/demos/loki/manifests.yaml | 25 ++++++++++++------- docs/demos/openobserve/demo.yaml | 15 ++--------- .../fluent-forward/telemetry-controller.yaml | 3 ++- .../simple-demo-with-secretref/pipeline.yaml | 8 +++--- .../one_tenant_two_subscriptions.yaml | 8 +++--- .../two_tenants_one_subscription_each.yaml | 8 +++--- 9 files changed, 33 insertions(+), 38 deletions(-) diff --git a/api/telemetry/v1alpha1/output_types.go b/api/telemetry/v1alpha1/output_types.go index e209054e..e6445864 100644 --- a/api/telemetry/v1alpha1/output_types.go +++ b/api/telemetry/v1alpha1/output_types.go @@ -107,7 +107,7 @@ type Endpoint struct { // Controls whether to validate the tcp address. // Turning this ON may result in the collector failing to start if it came up faster then the endpoint. // default is false. - ValidateTCPResolution bool `json:"validate_tcp_resolution"` + ValidateTCPResolution bool `json:"validate_tcp_resolution,omitempty"` } type KubernetesMetadata struct { diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml index 451b698d..7abea515 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_outputs.yaml @@ -171,7 +171,6 @@ spec: type: boolean required: - tcp_addr - - validate_tcp_resolution type: object kubernetes_metadata: properties: diff --git a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml index 451b698d..7abea515 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_outputs.yaml @@ -171,7 +171,6 @@ spec: type: boolean required: - tcp_addr - - validate_tcp_resolution type: object kubernetes_metadata: properties: diff --git a/docs/demos/loki/manifests.yaml b/docs/demos/loki/manifests.yaml index 0d66edff..d9196121 100644 --- a/docs/demos/loki/manifests.yaml +++ b/docs/demos/loki/manifests.yaml @@ -31,8 +31,19 @@ spec: apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: - name: loki - namespace: collector + name: loki-1 + namespace: tenant-demo-1 +spec: + otlphttp: + endpoint: http://loki.loki.svc.cluster.local:3100/otlp/ + tls: + insecure: true +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: loki-2 + namespace: tenant-demo-2 spec: otlphttp: endpoint: http://loki.loki.svc.cluster.local:3100/otlp/ @@ -62,9 +73,7 @@ spec: condition: "true" outputs: - name: loki - namespace: collector - - name: openobserve-1 - namespace: collector + namespace: tenant-demo-1 --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Tenant @@ -88,7 +97,5 @@ metadata: spec: condition: "true" outputs: - - name: loki - namespace: collector - - name: openobserve-2 - namespace: collector + - name: loki-2 + namespace: tenant-demo-2 diff --git a/docs/demos/openobserve/demo.yaml b/docs/demos/openobserve/demo.yaml index 4c283562..481ccc04 100644 --- a/docs/demos/openobserve/demo.yaml +++ b/docs/demos/openobserve/demo.yaml @@ -43,24 +43,13 @@ spec: condition: "true" outputs: - name: otlp-openobserve - namespace: collector ---- -apiVersion: telemetry.kube-logging.dev/v1alpha1 -kind: Subscription -metadata: - name: subscription-sample-2 - namespace: example-tenant-ns -spec: - condition: "true" - outputs: - - name: otlp-openobserve - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-openobserve - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 diff --git a/docs/examples/fluent-forward/telemetry-controller.yaml b/docs/examples/fluent-forward/telemetry-controller.yaml index 06a7f2b1..a68a469c 100644 --- a/docs/examples/fluent-forward/telemetry-controller.yaml +++ b/docs/examples/fluent-forward/telemetry-controller.yaml @@ -41,7 +41,8 @@ metadata: namespace: default spec: fluentforward: - endpoint: all-to-file-fluentd.default:24240 + endpoint: + tcp_addr: all-to-file-fluentd.default:24240 tag: otelcol kubernetes_metadata: key: kubernetes diff --git a/docs/examples/simple-demo-with-secretref/pipeline.yaml b/docs/examples/simple-demo-with-secretref/pipeline.yaml index 9dc457d4..ace6eb8b 100644 --- a/docs/examples/simple-demo-with-secretref/pipeline.yaml +++ b/docs/examples/simple-demo-with-secretref/pipeline.yaml @@ -44,13 +44,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns --- apiVersion: v1 kind: Secret metadata: name: otlp-test-output-1-basicauth-token-secret - namespace: collector + namespace: example-tenant-ns type: Opaque data: username: dXNlcg== # user @@ -60,13 +60,13 @@ apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns spec: authentication: basicauth: secretRef: name: otlp-test-output-1-basicauth-token-secret - namespace: collector + namespace: example-tenant-ns otlp: endpoint: receiver-otelcol-collector.receiver.svc.cluster.local:4317 tls: diff --git a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml index c248017a..275540a3 100644 --- a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml +++ b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml @@ -43,7 +43,7 @@ spec: condition: "true" outputs: - name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Subscription @@ -54,13 +54,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 @@ -75,7 +75,7 @@ apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns spec: otlp: endpoint: openobserve-otlp-grpc.openobserve.svc.cluster.local:5081 diff --git a/docs/examples/two_tenants_one_subscription_each.yaml b/docs/examples/two_tenants_one_subscription_each.yaml index a0ec03c7..b7f9e6d5 100644 --- a/docs/examples/two_tenants_one_subscription_each.yaml +++ b/docs/examples/two_tenants_one_subscription_each.yaml @@ -74,7 +74,7 @@ spec: condition: "true" outputs: - name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns-1 --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Subscription @@ -85,13 +85,13 @@ spec: condition: "true" outputs: - name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns-2 --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-1 - namespace: collector + namespace: example-tenant-ns-1 spec: otlp: endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317 @@ -102,7 +102,7 @@ apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Output metadata: name: otlp-test-output-2 - namespace: collector + namespace: example-tenant-ns-2 spec: otlp: endpoint: receiver-collector.example-tenant-ns.svc.cluster.local:4317 From 01f01d9f5d66327023ef1f933b29158f47b11054 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Thu, 16 Jan 2025 16:10:53 +0100 Subject: [PATCH 5/5] chore: adjust docs Signed-off-by: Bence Csati --- .../otel_col_conf_test_fixtures/complex.yaml | 15 +++++++-------- .../exporter/fluent_forward_exporter_test.go | 6 ++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index 20f396a6..8e2c4897 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -59,7 +59,6 @@ exporters: fluentforwardexporter/collector_fluentforward-test-output: endpoint: tcp_addr: fluentd.example-tenant-b-ns.svc.cluster.local:24224 - validate_tcp_resolution: false retry_on_failure: enabled: true max_elapsed_time: 0 @@ -94,14 +93,14 @@ exporters: tls: insecure: true otlp/collector_otlp-test-output-3: - endpoint: "receiver-collector.example-tenant-b-ns.svc.cluster.local:4317" + endpoint: receiver-collector.example-tenant-b-ns.svc.cluster.local:4317 retry_on_failure: enabled: true - max_elapsed_time: 0.0 + max_elapsed_time: 0 sending_queue: enabled: true - queue_size: 100.0 - storage: "file_storage/example-tenant-b" + queue_size: 100 + storage: file_storage/example-tenant-b tls: insecure: true otlphttp/collector_loki-test-output: @@ -149,9 +148,9 @@ processors: value: otlp/collector_otlp-test-output-2 attributes/exporter_name_otlp-test-output-3: actions: - - action: "insert" - key: "exporter" - value: "otlp/collector_otlp-test-output-3" + - action: insert + key: exporter + value: otlp/collector_otlp-test-output-3 attributes/metricattributes: actions: - action: insert diff --git a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go index d1a385f6..13a7045e 100644 --- a/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go +++ b/internal/controller/telemetry/pipeline/components/exporter/fluent_forward_exporter_test.go @@ -111,8 +111,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) { expectedResult: map[string]any{ "fluentforwardexporter/default_output1": map[string]any{ "endpoint": map[string]any{ - "tcp_addr": "http://example.com", - "validate_tcp_resolution": false, + "tcp_addr": "http://example.com", }, "sending_queue": map[string]any{ "enabled": true, @@ -211,8 +210,7 @@ func TestGenerateFluentforwardExporters(t *testing.T) { expectedResult: map[string]any{ "fluentforwardexporter/default_output2": map[string]any{ "endpoint": map[string]any{ - "tcp_addr": "http://example.com", - "validate_tcp_resolution": false, + "tcp_addr": "http://example.com", }, "connection_timeout": "30s", "shared_key": "shared-key",