Skip to content

Commit

Permalink
Refactor k8s client management
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Nov 29, 2024
1 parent 1f5b8d8 commit 9a41d69
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 309 deletions.
35 changes: 21 additions & 14 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"go.opentelemetry.io/collector/component"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
)

Expand All @@ -39,32 +37,41 @@ func selectors() (labels.Selector, fields.Selector) {
// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(
_ component.TelemetrySettings,
_ k8sconfig.APIConfig,
rules kube.ExtractionRules,
filters kube.Filters,
associations []kube.Association,
_ kube.Excludes,
_ kube.APIClientsetProvider,
_ kube.InformerProvider,
_ kube.InformerProviderNamespace,
_ kube.InformerProviderReplicaSet,
_ kube.InformerProviderNode,
_ *kube.InformerProviders,
_ bool,
_ time.Duration,
) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
closeCh := make(chan struct{})
informer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh)
if err != nil {
return nil, err
}
nsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh)
if err != nil {
return nil, err
}
nodeInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh)
if err != nil {
return nil, err
}
rsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh)
if err != nil {
return nil, err
}
return &fakeClient{
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh),
NodeInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh),
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs, nil, closeCh),
Informer: informer,
NamespaceInformer: nsInformer,
NodeInformer: nodeInformer,
ReplicaSetInformer: rsInformer,
StopCh: make(chan struct{}),
}, nil
}
Expand Down
110 changes: 40 additions & 70 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata"
)

Expand All @@ -43,7 +41,6 @@ type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
podHandlerRegistration cache.ResourceEventHandlerRegistration
namespaceInformer cache.SharedInformer
Expand Down Expand Up @@ -93,16 +90,11 @@ var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)
// New initializes a new k8s Client.
func New(
set component.TelemetrySettings,
apiCfg k8sconfig.APIConfig,
rules ExtractionRules,
filters Filters,
associations []Association,
exclude Excludes,
newClientSet APIClientsetProvider,
newInformer InformerProvider,
newNamespaceInformer InformerProviderNamespace,
newReplicaSetInformer InformerProviderReplicaSet,
newNodeInformer InformerProviderNode,
informerProviders *InformerProviders,
waitForMetadata bool,
waitForMetadataTimeout time.Duration,
) (Client, error) {
Expand All @@ -129,15 +121,6 @@ func New(
c.Namespaces = map[string]*Namespace{}
c.Nodes = map[string]*Node{}
c.ReplicaSets = map[string]*ReplicaSet{}
if newClientSet == nil {
newClientSet = k8sconfig.MakeClient
}

kc, err := newClientSet(apiCfg)
if err != nil {
return nil, err
}
c.kc = kc

labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters)
if err != nil {
Expand All @@ -148,24 +131,6 @@ func New(
zap.String("labelSelector", labelSelector.String()),
zap.String("fieldSelector", fieldSelector.String()),
)
if newInformer == nil {
newInformer = newSharedInformer
}

if newNamespaceInformer == nil {
switch {
case c.extractNamespaceLabelsAnnotations():
// if rules to extract metadata from namespace is configured use namespace shared informer containing
// all namespaces including kube-system which contains cluster uid information (kube-system-uid)
newNamespaceInformer = newNamespaceSharedInformer
case rules.ClusterUID:
// use kube-system shared informer to only watch kube-system namespace
// reducing overhead of watching all the namespaces
newNamespaceInformer = newKubeSystemSharedInformer
default:
newNamespaceInformer = NewNoOpInformer
}
}

podTransformFunc := func(object any) (any, error) {
originalPod, success := object.(*api_v1.Pod)
Expand All @@ -175,14 +140,31 @@ func New(

return removeUnnecessaryPodData(originalPod, c.Rules), nil
}
c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector, podTransformFunc, c.stopCh)
c.informer, err = informerProviders.PodInformerProvider(
c.Filters.Namespace,
labelSelector,
fieldSelector,
podTransformFunc,
c.stopCh,
)
if err != nil {
return nil, err
}

c.namespaceInformer = newNamespaceInformer(c.kc, c.stopCh)
// if rules to extract metadata from namespace is configured use namespace shared informer containing
// all namespaces including kube-system which contains cluster uid information (kube-system-uid)
if rules.extractNamespaceLabelsAnnotations() || rules.ClusterUID {
fs := fields.Everything()
if !rules.extractNamespaceLabelsAnnotations() {
fs = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace)
}
c.namespaceInformer, err = informerProviders.NamespaceInformerProvider(fs, c.stopCh)
if err != nil {
return nil, err
}
}

if rules.DeploymentName || rules.DeploymentUID {
if newReplicaSetInformer == nil {
newReplicaSetInformer = newReplicaSetSharedInformer
}
transformFunc := func(object any) (any, error) {
originalReplicaset, success := object.(*apps_v1.ReplicaSet)
if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing
Expand All @@ -191,19 +173,21 @@ func New(

return removeUnnecessaryReplicaSetData(originalReplicaset), nil
}
c.replicasetInformer = newReplicaSetInformer(
c.kc,
c.replicasetInformer, err = informerProviders.ReplicaSetInformerProvider(
c.Filters.Namespace,
transformFunc,
c.stopCh,
)
if err != nil {
return nil, err
}
}

if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
if newNodeInformer == nil {
newNodeInformer = newNodeSharedInformer
c.nodeInformer, err = informerProviders.NodeInformerProvider(c.Filters.Node, 5*time.Minute, c.stopCh)
if err != nil {
return nil, err
}
c.nodeInformer = newNodeInformer(c.kc, c.Filters.Node, 5*time.Minute, c.stopCh)
}

return c, err
Expand All @@ -225,15 +209,17 @@ func (c *WatchClient) Start() error {
}
synced = append(synced, c.podHandlerRegistration.HasSynced)

c.namespaceHandlerRegistration, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
})
if err != nil {
return err
if c.namespaceInformer != nil {
c.namespaceHandlerRegistration, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
})
if err != nil {
return err
}
synced = append(synced, c.namespaceHandlerRegistration.HasSynced)
}
synced = append(synced, c.namespaceHandlerRegistration.HasSynced)

if c.replicasetInformer != nil {
c.replicasetHandlerRegistration, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -1022,22 +1008,6 @@ func (c *WatchClient) addOrUpdateNamespace(namespace *api_v1.Namespace) {
c.m.Unlock()
}

func (c *WatchClient) extractNamespaceLabelsAnnotations() bool {
for _, r := range c.Rules.Labels {
if r.From == MetadataFromNamespace {
return true
}
}

for _, r := range c.Rules.Annotations {
if r.From == MetadataFromNamespace {
return true
}
}

return false
}

func (c *WatchClient) extractNodeLabelsAnnotations() bool {
for _, r := range c.Rules.Labels {
if r.From == MetadataFromNode {
Expand Down
Loading

0 comments on commit 9a41d69

Please sign in to comment.