diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index 1df12d0fb0eb..dca90b6bc3d9 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -9,10 +9,8 @@ import ( "go.opentelemetry.io/collector/component" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" ) @@ -39,32 +37,41 @@ func selectors() (labels.Selector, fields.Selector) { // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type func newFakeClient( _ component.TelemetrySettings, - _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, - _ kube.APIClientsetProvider, - _ kube.InformerProvider, - _ kube.InformerProviderNamespace, - _ kube.InformerProviderReplicaSet, - _ kube.InformerProviderNode, + _ *kube.InformerProviders, _ bool, _ time.Duration, ) (kube.Client, error) { - cs := fake.NewSimpleClientset() - ls, fs := selectors() closeCh := make(chan struct{}) + informer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + nsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + nodeInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + rsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } return &fakeClient{ Pods: map[kube.PodIdentifier]*kube.Pod{}, Rules: rules, Filters: filters, Associations: associations, - Informer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh), - NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh), - NodeInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh), - ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh), + Informer: informer, + NamespaceInformer: nsInformer, + NodeInformer: nodeInformer, + ReplicaSetInformer: rsInformer, StopCh: make(chan struct{}), }, nil } diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index d6af3a0c81f3..54bcbcc9ec35 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -23,10 +23,8 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" ) @@ -43,7 +41,6 @@ type WatchClient struct { m sync.RWMutex deleteMut sync.Mutex logger *zap.Logger - kc kubernetes.Interface informer cache.SharedInformer podHandlerRegistration cache.ResourceEventHandlerRegistration namespaceInformer cache.SharedInformer @@ -93,16 +90,11 @@ var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. func New( set component.TelemetrySettings, - apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, - newClientSet APIClientsetProvider, - newInformer InformerProvider, - newNamespaceInformer InformerProviderNamespace, - newReplicaSetInformer InformerProviderReplicaSet, - newNodeInformer InformerProviderNode, + informerProviders *InformerProviders, waitForMetadata bool, waitForMetadataTimeout time.Duration, ) (Client, error) { @@ -129,15 +121,6 @@ func New( c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} - if newClientSet == nil { - newClientSet = k8sconfig.MakeClient - } - - kc, err := newClientSet(apiCfg) - if err != nil { - return nil, err - } - c.kc = kc labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters) if err != nil { @@ -148,24 +131,6 @@ func New( zap.String("labelSelector", labelSelector.String()), zap.String("fieldSelector", fieldSelector.String()), ) - if newInformer == nil { - newInformer = newSharedInformer - } - - if newNamespaceInformer == nil { - switch { - case c.extractNamespaceLabelsAnnotations(): - // if rules to extract metadata from namespace is configured use namespace shared informer containing - // all namespaces including kube-system which contains cluster uid information (kube-system-uid) - newNamespaceInformer = newNamespaceSharedInformer - case rules.ClusterUID: - // use kube-system shared informer to only watch kube-system namespace - // reducing overhead of watching all the namespaces - newNamespaceInformer = newKubeSystemSharedInformer - default: - newNamespaceInformer = NewNoOpInformer - } - } podTransformFunc := func(object any) (any, error) { originalPod, success := object.(*api_v1.Pod) @@ -175,14 +140,31 @@ func New( return removeUnnecessaryPodData(originalPod, c.Rules), nil } - c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector, podTransformFunc, c.stopCh) + c.informer, err = informerProviders.PodInformerProvider( + c.Filters.Namespace, + labelSelector, + fieldSelector, + podTransformFunc, + c.stopCh, + ) + if err != nil { + return nil, err + } - c.namespaceInformer = newNamespaceInformer(c.kc, c.stopCh) + // if rules to extract metadata from namespace is configured use namespace shared informer containing + // all namespaces including kube-system which contains cluster uid information (kube-system-uid) + if rules.extractNamespaceLabelsAnnotations() || rules.ClusterUID { + fs := fields.Everything() + if !rules.extractNamespaceLabelsAnnotations() { + fs = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace) + } + c.namespaceInformer, err = informerProviders.NamespaceInformerProvider(fs, c.stopCh) + if err != nil { + return nil, err + } + } if rules.DeploymentName || rules.DeploymentUID { - if newReplicaSetInformer == nil { - newReplicaSetInformer = newReplicaSetSharedInformer - } transformFunc := func(object any) (any, error) { originalReplicaset, success := object.(*apps_v1.ReplicaSet) if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing @@ -191,19 +173,21 @@ func New( return removeUnnecessaryReplicaSetData(originalReplicaset), nil } - c.replicasetInformer = newReplicaSetInformer( - c.kc, + c.replicasetInformer, err = informerProviders.ReplicaSetInformerProvider( c.Filters.Namespace, transformFunc, c.stopCh, ) + if err != nil { + return nil, err + } } if c.extractNodeLabelsAnnotations() || c.extractNodeUID() { - if newNodeInformer == nil { - newNodeInformer = newNodeSharedInformer + c.nodeInformer, err = informerProviders.NodeInformerProvider(c.Filters.Node, 5*time.Minute, c.stopCh) + if err != nil { + return nil, err } - c.nodeInformer = newNodeInformer(c.kc, c.Filters.Node, 5*time.Minute, c.stopCh) } return c, err @@ -225,15 +209,17 @@ func (c *WatchClient) Start() error { } synced = append(synced, c.podHandlerRegistration.HasSynced) - c.namespaceHandlerRegistration, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handleNamespaceAdd, - UpdateFunc: c.handleNamespaceUpdate, - DeleteFunc: c.handleNamespaceDelete, - }) - if err != nil { - return err + if c.namespaceInformer != nil { + c.namespaceHandlerRegistration, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleNamespaceAdd, + UpdateFunc: c.handleNamespaceUpdate, + DeleteFunc: c.handleNamespaceDelete, + }) + if err != nil { + return err + } + synced = append(synced, c.namespaceHandlerRegistration.HasSynced) } - synced = append(synced, c.namespaceHandlerRegistration.HasSynced) if c.replicasetInformer != nil { c.replicasetHandlerRegistration, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -1022,22 +1008,6 @@ func (c *WatchClient) addOrUpdateNamespace(namespace *api_v1.Namespace) { c.m.Unlock() } -func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { - for _, r := range c.Rules.Labels { - if r.From == MetadataFromNamespace { - return true - } - } - - for _, r := range c.Rules.Annotations { - if r.From == MetadataFromNamespace { - return true - } - } - - return false -} - func (c *WatchClient) extractNodeLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNode { diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index cc3cbe609bab..36fa36d467bb 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -4,7 +4,6 @@ package kube import ( - "fmt" "regexp" "testing" "time" @@ -21,17 +20,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) -func newFakeAPIClientset(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { - return fake.NewSimpleClientset(), nil -} - func newPodIdentifier(from string, name string, value string) PodIdentifier { if from == "connection" { name = "" @@ -144,58 +135,14 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) { assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", got.NodeUID) } -func TestDefaultClientset(t *testing.T) { - c, err := New( - componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, - ExtractionRules{}, - Filters{}, - []Association{}, - Excludes{}, - nil, - nil, - nil, - nil, - nil, - false, - 10*time.Second, - ) - assert.Error(t, err) - assert.Equal(t, "invalid authType for kubernetes: ", err.Error()) - assert.Nil(t, c) - - c, err = New( - componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, - ExtractionRules{}, - Filters{}, - []Association{}, - Excludes{}, - newFakeAPIClientset, - nil, - nil, - nil, - nil, - false, - 10*time.Second, - ) - assert.NoError(t, err) - assert.NotNil(t, c) -} - func TestBadFilters(t *testing.T) { c, err := New( componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, Excludes{}, - newFakeAPIClientset, - NewFakeInformer, - NewFakeNamespaceInformer, - NewFakeReplicaSetInformer, - NewFakeNodeInformer, + NewFakeInformerProviders(), false, 10*time.Second, ) @@ -223,40 +170,6 @@ func TestClientStartStop(t *testing.T) { }, time.Second, time.Millisecond) } -func TestConstructorErrors(t *testing.T) { - er := ExtractionRules{} - ff := Filters{} - t.Run("client-provider-call", func(t *testing.T) { - var gotAPIConfig k8sconfig.APIConfig - apiCfg := k8sconfig.APIConfig{ - AuthType: "test-auth-type", - } - clientProvider := func(c k8sconfig.APIConfig) (kubernetes.Interface, error) { - gotAPIConfig = c - return nil, fmt.Errorf("error creating k8s client") - } - c, err := New( - componenttest.NewNopTelemetrySettings(), - apiCfg, - er, - ff, - []Association{}, - Excludes{}, - clientProvider, - NewFakeInformer, - NewFakeNamespaceInformer, - nil, - nil, - false, - 10*time.Second, - ) - assert.Nil(t, c) - assert.Error(t, err) - assert.Equal(t, "error creating k8s client", err.Error()) - assert.Equal(t, apiCfg, gotAPIConfig) - }) -} - func TestPodAdd(t *testing.T) { c, _ := newTestClient(t) podAddAndUpdateTest(t, c, c.handlePodAdd) @@ -1925,7 +1838,6 @@ func TestErrorSelectorsFromFilters(t *testing.T) { } func TestExtractNamespaceLabelsAnnotations(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) testCases := []struct { name string shouldExtractNamespace bool @@ -1982,8 +1894,7 @@ func TestExtractNamespaceLabelsAnnotations(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - c.Rules = tc.rules - assert.Equal(t, tc.shouldExtractNamespace, c.extractNamespaceLabelsAnnotations()) + assert.Equal(t, tc.shouldExtractNamespace, tc.rules.extractNamespaceLabelsAnnotations()) }) } } @@ -2017,16 +1928,11 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o } c, err := New( set, - k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, - newFakeAPIClientset, - NewFakeInformer, - NewFakeNamespaceInformer, - NewFakeReplicaSetInformer, - NewFakeNodeInformer, + NewFakeInformerProviders(), false, 10*time.Second, ) @@ -2070,14 +1976,17 @@ func TestWaitForMetadata(t *testing.T) { }, { name: "wait but never synced", informerProvider: func( - client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector, transformFunc cache.TransformFunc, stopCh chan struct{}, - ) cache.SharedInformer { - return &neverSyncedFakeClient{NewFakeInformer(client, namespace, labelSelector, fieldSelector, transformFunc, stopCh)} + ) (cache.SharedInformer, error) { + informer, err := NewFakeInformer(namespace, labelSelector, fieldSelector, transformFunc, stopCh) + if err != nil { + return nil, err + } + return &neverSyncedFakeClient{informer}, nil }, err: true, }} @@ -2085,18 +1994,15 @@ func TestWaitForMetadata(t *testing.T) { for _, tc := range testCases { t.Run( tc.name, func(t *testing.T) { + informerProviders := NewFakeInformerProviders() + informerProviders.PodInformerProvider = tc.informerProvider c, err := New( componenttest.NewNopTelemetrySettings(), - k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, - newFakeAPIClientset, - tc.informerProvider, - nil, - nil, - nil, + informerProviders, true, 1*time.Second, ) diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index af813e518ef1..9b304def941f 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -21,14 +21,22 @@ type FakeInformer struct { fieldSelector fields.Selector } +func NewFakeInformerProviders() *InformerProviders { + return &InformerProviders{ + PodInformerProvider: NewFakeInformer, + NamespaceInformerProvider: NewFakeNamespaceInformer, + ReplicaSetInformerProvider: NewFakeReplicaSetInformer, + NodeInformerProvider: NewFakeNodeInformer, + } +} + func NewFakeInformer( - _ kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector, _ cache.TransformFunc, closeCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := &FakeInformer{ FakeController: &FakeController{}, namespace: namespace, @@ -36,7 +44,7 @@ func NewFakeInformer( fieldSelector: fieldSelector, } go informer.Run(closeCh) - return informer + return informer, nil } func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { @@ -72,12 +80,15 @@ type FakeNamespaceInformer struct { } func NewFakeNamespaceInformer( - _ kubernetes.Interface, - _ chan struct{}, -) cache.SharedInformer { - return &FakeInformer{ + fs fields.Selector, + closeCh chan struct{}, +) (cache.SharedInformer, error) { + informer := &FakeInformer{ FakeController: &FakeController{}, + fieldSelector: fs, } + go informer.Run(closeCh) + return informer, nil } func (f *FakeNamespaceInformer) AddEventHandler(_ cache.ResourceEventHandler) {} @@ -98,16 +109,15 @@ type FakeReplicaSetInformer struct { } func NewFakeReplicaSetInformer( - _ kubernetes.Interface, _ string, _ cache.TransformFunc, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := &FakeInformer{ FakeController: &FakeController{}, } go informer.Run(stopCh) - return informer + return informer, nil } func (f *FakeReplicaSetInformer) AddEventHandler(_ cache.ResourceEventHandler) {} @@ -128,16 +138,15 @@ func (f *FakeReplicaSetInformer) GetController() cache.Controller { } func NewFakeNodeInformer( - _ kubernetes.Interface, _ string, _ time.Duration, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := &FakeInformer{ FakeController: &FakeController{}, } go informer.Run(stopCh) - return informer + return informer, nil } type FakeController struct { diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index 5e7e228893a7..1c0afca51de1 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -22,6 +22,8 @@ import ( const kubeSystemNamespace = "kube-system" +// InformerProviders holds factory functions for informers needed by the kube client. The intent is to facilitate +// informer sharing by letting the processor factory pass in custom implementations of these factories. type InformerProviders struct { PodInformerProvider InformerProvider NamespaceInformerProvider InformerProviderNamespace @@ -29,41 +31,82 @@ type InformerProviders struct { NodeInformerProvider InformerProviderNode } +// NewDefaultInformerProviders returns informer providers using the given client. These are not shared, and exist +// to provide a simple default implementation. +func NewDefaultInformerProviders(client kubernetes.Interface) *InformerProviders { + podInformerProvider := func( + namespace string, + labelSelector labels.Selector, + fieldSelector fields.Selector, + transformFunc cache.TransformFunc, + stopCh chan struct{}, + ) (cache.SharedInformer, error) { + return newSharedInformer(client, namespace, labelSelector, fieldSelector, transformFunc, stopCh) + } + + namespaceInformerProvider := func( + fs fields.Selector, + stopCh chan struct{}, + ) (cache.SharedInformer, error) { + return newNamespaceSharedInformer(client, fs, stopCh) + } + + replicaSetInformerProvider := func( + namespace string, + transformFunc cache.TransformFunc, + stopCh chan struct{}, + ) (cache.SharedInformer, error) { + return newReplicaSetSharedInformer(client, namespace, transformFunc, stopCh) + } + + nodeInformerProvider := func( + nodeName string, + watchSyncPeriod time.Duration, + stopCh chan struct{}, + ) (cache.SharedInformer, error) { + return newNodeSharedInformer(client, nodeName, watchSyncPeriod, stopCh) + } + + return &InformerProviders{ + PodInformerProvider: podInformerProvider, + NamespaceInformerProvider: namespaceInformerProvider, + ReplicaSetInformerProvider: replicaSetInformerProvider, + NodeInformerProvider: nodeInformerProvider, + } +} + // InformerProvider defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProvider func( - client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector, transformFunc cache.TransformFunc, stopCh chan struct{}, -) cache.SharedInformer +) (cache.SharedInformer, error) // InformerProviderNamespace defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client for fetching namespace objects. type InformerProviderNamespace func( - client kubernetes.Interface, + fieldSelector fields.Selector, stopCh chan struct{}, -) cache.SharedInformer +) (cache.SharedInformer, error) // InformerProviderNode defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client for fetching node objects. type InformerProviderNode func( - client kubernetes.Interface, nodeName string, watchSyncPeriod time.Duration, stopCh chan struct{}, -) cache.SharedInformer +) (cache.SharedInformer, error) // InformerProviderReplicaSet defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProviderReplicaSet func( - client kubernetes.Interface, namespace string, transformFunc cache.TransformFunc, stopCh chan struct{}, -) cache.SharedInformer +) (cache.SharedInformer, error) func newSharedInformer( client kubernetes.Interface, @@ -72,7 +115,7 @@ func newSharedInformer( fs fields.Selector, transformFunc cache.TransformFunc, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ ListFunc: informerListFuncWithSelectors(client, namespace, ls, fs), @@ -82,10 +125,13 @@ func newSharedInformer( watchSyncPeriod, ) if transformFunc != nil { - _ = informer.SetTransform(transformFunc) // only errors if the informer was already started + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } } go informer.Run(stopCh) - return informer + return informer, nil } func informerListFuncWithSelectors(client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc { @@ -104,53 +150,37 @@ func informerWatchFuncWithSelectors(client kubernetes.Interface, namespace strin } } -// newKubeSystemSharedInformer watches only kube-system namespace -func newKubeSystemSharedInformer( - client kubernetes.Interface, - stopCh chan struct{}, -) cache.SharedInformer { - informer := cache.NewSharedInformer( - &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() - return client.CoreV1().Namespaces().List(context.Background(), opts) - }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() - return client.CoreV1().Namespaces().Watch(context.Background(), opts) - }, - }, - &api_v1.Namespace{}, - watchSyncPeriod, - ) - go informer.Run(stopCh) - return informer -} - func newNamespaceSharedInformer( client kubernetes.Interface, + fs fields.Selector, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ - ListFunc: namespaceInformerListFunc(client), - WatchFunc: namespaceInformerWatchFunc(client), + ListFunc: namespaceInformerListFunc(client, fs), + WatchFunc: namespaceInformerWatchFunc(client, fs), }, &api_v1.Namespace{}, watchSyncPeriod, ) go informer.Run(stopCh) - return informer + return informer, nil } -func namespaceInformerListFunc(client kubernetes.Interface) cache.ListFunc { +func namespaceInformerListFunc(client kubernetes.Interface, fs fields.Selector) cache.ListFunc { return func(opts metav1.ListOptions) (runtime.Object, error) { + if fs != nil { + opts.FieldSelector = fs.String() + } return client.CoreV1().Namespaces().List(context.Background(), opts) } } -func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFunc { +func namespaceInformerWatchFunc(client kubernetes.Interface, fs fields.Selector) cache.WatchFunc { return func(opts metav1.ListOptions) (watch.Interface, error) { + if fs != nil { + opts.FieldSelector = fs.String() + } return client.CoreV1().Namespaces().Watch(context.Background(), opts) } } @@ -160,7 +190,7 @@ func newReplicaSetSharedInformer( namespace string, transformFunc cache.TransformFunc, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ ListFunc: replicasetListFuncWithSelectors(client, namespace), @@ -170,10 +200,13 @@ func newReplicaSetSharedInformer( watchSyncPeriod, ) if transformFunc != nil { - _ = informer.SetTransform(transformFunc) // only errors if the informer was already started + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } } go informer.Run(stopCh) - return informer + return informer, nil } func replicasetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { @@ -193,8 +226,8 @@ func newNodeSharedInformer( nodeName string, watchSyncPeriod time.Duration, stopCh chan struct{}, -) cache.SharedInformer { +) (cache.SharedInformer, error) { informer := k8sconfig.NewNodeSharedInformer(client, nodeName, watchSyncPeriod) go informer.Run(stopCh) - return informer + return informer, nil } diff --git a/processor/k8sattributesprocessor/internal/kube/informer_test.go b/processor/k8sattributesprocessor/internal/kube/informer_test.go index ef6c4c659517..a7c3940258fb 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer_test.go +++ b/processor/k8sattributesprocessor/internal/kube/informer_test.go @@ -12,18 +12,17 @@ import ( api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) func Test_newSharedInformer(t *testing.T) { labelSelector, fieldSelector, err := selectorsFromFilters(Filters{}) require.NoError(t, err) - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) + client := fake.NewClientset() stopCh := make(chan struct{}) - informer := newSharedInformer(client, "testns", labelSelector, fieldSelector, nil, stopCh) + informer, err := newSharedInformer(client, "testns", labelSelector, fieldSelector, nil, stopCh) + assert.NoError(t, err) assert.NotNil(t, informer) assert.False(t, informer.IsStopped()) close(stopCh) @@ -33,10 +32,10 @@ func Test_newSharedInformer(t *testing.T) { } func Test_newSharedNamespaceInformer(t *testing.T) { - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) + client := fake.NewClientset() stopCh := make(chan struct{}) - informer := newNamespaceSharedInformer(client, stopCh) + informer, err := newNamespaceSharedInformer(client, nil, stopCh) + assert.NoError(t, err) assert.NotNil(t, informer) assert.False(t, informer.IsStopped()) close(stopCh) @@ -45,18 +44,6 @@ func Test_newSharedNamespaceInformer(t *testing.T) { }, time.Second*5, time.Millisecond) } -func Test_newKubeSystemSharedInformer(t *testing.T) { - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) - stopCh := make(chan struct{}) - informer := newKubeSystemSharedInformer(client, stopCh) - assert.NotNil(t, informer) - close(stopCh) - assert.EventuallyWithT(t, func(collect *assert.CollectT) { - assert.True(collect, informer.IsStopped()) - }, time.Second*5, time.Millisecond) -} - func Test_informerListFuncWithSelectors(t *testing.T) { ls, fs, err := selectorsFromFilters(Filters{ Fields: []FieldFilter{ @@ -75,8 +62,7 @@ func Test_informerListFuncWithSelectors(t *testing.T) { }, }) assert.NoError(t, err) - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) + c := fake.NewSimpleClientset() listFunc := informerListFuncWithSelectors(c, "test-ns", ls, fs) opts := metav1.ListOptions{} obj, err := listFunc(opts) @@ -85,9 +71,8 @@ func Test_informerListFuncWithSelectors(t *testing.T) { } func Test_namespaceInformerListFunc(t *testing.T) { - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) - listFunc := namespaceInformerListFunc(c) + c := fake.NewClientset() + listFunc := namespaceInformerListFunc(c, nil) opts := metav1.ListOptions{} obj, err := listFunc(opts) assert.NoError(t, err) @@ -112,8 +97,7 @@ func Test_informerWatchFuncWithSelectors(t *testing.T) { }, }) assert.NoError(t, err) - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) + c := fake.NewClientset() watchFunc := informerWatchFuncWithSelectors(c, "test-ns", ls, fs) opts := metav1.ListOptions{} obj, err := watchFunc(opts) @@ -122,9 +106,8 @@ func Test_informerWatchFuncWithSelectors(t *testing.T) { } func Test_namespaceInformerWatchFunc(t *testing.T) { - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) - watchFunc := namespaceInformerWatchFunc(c) + c := fake.NewClientset() + watchFunc := namespaceInformerWatchFunc(c, nil) opts := metav1.ListOptions{} obj, err := watchFunc(opts) assert.NoError(t, err) @@ -133,10 +116,9 @@ func Test_namespaceInformerWatchFunc(t *testing.T) { func Test_fakeInformer(t *testing.T) { // nothing real to test here. just to make coverage happy - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) stopCh := make(chan struct{}) - i := NewFakeInformer(c, "ns", nil, nil, nil, stopCh) + i, err := NewFakeInformer("ns", nil, nil, nil, stopCh) + assert.NoError(t, err) _, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second) assert.NoError(t, err) i.HasSynced() @@ -146,11 +128,9 @@ func Test_fakeInformer(t *testing.T) { } func Test_fakeNamespaceInformer(t *testing.T) { - // nothing real to test here. just to make coverage happy - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) stopCh := make(chan struct{}) - i := NewFakeNamespaceInformer(c, stopCh) + i, err := NewFakeNamespaceInformer(nil, stopCh) + assert.NoError(t, err) _, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second) assert.NoError(t, err) i.HasSynced() diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 8b91615cbc85..362b0c8159c8 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -98,16 +98,11 @@ type Client interface { // ClientProvider defines a func type that returns a new Client. type ClientProvider func( component.TelemetrySettings, - k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, - APIClientsetProvider, - InformerProvider, - InformerProviderNamespace, - InformerProviderReplicaSet, - InformerProviderNode, + *InformerProviders, bool, time.Duration, ) (Client, error) @@ -261,6 +256,22 @@ func (rules *ExtractionRules) IncludesOwnerMetadata() bool { return false } +func (rules *ExtractionRules) extractNamespaceLabelsAnnotations() bool { + for _, r := range rules.Labels { + if r.From == MetadataFromNamespace { + return true + } + } + + for _, r := range rules.Annotations { + if r.From == MetadataFromNamespace { + return true + } + } + + return false +} + // FieldExtractionRule is used to specify which fields to extract from pod fields // and inject into spans as attributes. type FieldExtractionRule struct { diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 63e325c88f70..9e1b8965fc2a 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -33,6 +33,7 @@ type kubernetesprocessor struct { telemetrySettings component.TelemetrySettings logger *zap.Logger apiConfig k8sconfig.APIConfig + k8sClientProvider kube.APIClientsetProvider kcProvider kube.ClientProvider informerProviders *kube.InformerProviders kc kube.Client @@ -47,21 +48,20 @@ type kubernetesprocessor struct { func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { if kp.informerProviders == nil { - kp.informerProviders = &kube.InformerProviders{} + k8sClient, err := kp.k8sClientProvider(kp.apiConfig) + if err != nil { + return err + } + kp.informerProviders = kube.NewDefaultInformerProviders(k8sClient) } if !kp.passthroughMode { kc, err := kubeClient( set, - kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, - nil, - kp.informerProviders.PodInformerProvider, - kp.informerProviders.NamespaceInformerProvider, - kp.informerProviders.ReplicaSetInformerProvider, - kp.informerProviders.NodeInformerProvider, + kp.informerProviders, kp.waitForMetadata, kp.waitForMetadataTimeout, ) @@ -74,6 +74,10 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k } func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) error { + if kp.k8sClientProvider == nil { + kp.k8sClientProvider = k8sconfig.MakeClient + } + if kp.kcProvider == nil { kp.kcProvider = kube.New } diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index eaba45a448a8..37816c7ac3ab 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -28,6 +28,8 @@ import ( "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" @@ -47,7 +49,7 @@ func newPodIdentifier(from string, name string, value string) kube.PodIdentifier func newTracesProcessor(cfg component.Config, next consumer.Traces, options ...option) (processor.Traces, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createTracesProcessorWithOptions( context.Background(), @@ -60,7 +62,7 @@ func newTracesProcessor(cfg component.Config, next consumer.Traces, options ...o func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metrics, options ...option) (processor.Metrics, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createMetricsProcessorWithOptions( context.Background(), @@ -73,7 +75,7 @@ func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metr func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, options ...option) (processor.Logs, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createLogsProcessorWithOptions( context.Background(), @@ -86,7 +88,7 @@ func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, opti func newProfilesProcessor(cfg component.Config, nextProfilesConsumer consumerprofiles.Profiles, options ...option) (processorprofiles.Profiles, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createProfilesProcessorWithOptions( context.Background(), @@ -97,6 +99,18 @@ func newProfilesProcessor(cfg component.Config, nextProfilesConsumer consumerpro ) } +func newFakeClientset(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { + return fake.NewClientset(), nil +} + +// withAPIClientsetProvider provides a factory method for creating the K8s clientset. +func withAPIClientsetProvider(provider kube.APIClientsetProvider) option { + return func(p *kubernetesprocessor) error { + p.k8sClientProvider = provider + return nil + } +} + // withKubeClientProvider sets the specific implementation for getting K8s Client instances func withKubeClientProvider(kcp kube.ClientProvider) option { return func(p *kubernetesprocessor) error { @@ -138,6 +152,11 @@ func newMultiTest( errFunc func(err error), options ...option, ) *multiTest { + if errFunc == nil { + errFunc = func(err error) { + require.NoError(t, err) + } + } m := &multiTest{ t: t, nextTrace: new(consumertest.TracesSink), @@ -263,22 +282,17 @@ func (m *multiTest) assertResource(batchNum int, resourceFunc func(res pcommon.R func TestNewProcessor(t *testing.T) { cfg := NewFactory().CreateDefaultConfig() - newMultiTest(t, cfg, nil, withKubeClientProvider(newFakeClient)) + newMultiTest(t, cfg, nil, withAPIClientsetProvider(newFakeClientset)) } func TestProcessorBadClientProvider(t *testing.T) { clientProvider := func( _ component.TelemetrySettings, - _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, - _ kube.APIClientsetProvider, - _ kube.InformerProvider, - _ kube.InformerProviderNamespace, - _ kube.InformerProviderReplicaSet, - _ kube.InformerProviderNode, + _ *kube.InformerProviders, _ bool, _ time.Duration, ) (kube.Client, error) { return nil, fmt.Errorf("bad client error")