From cf233a9aa9b6321b30b741d63da44aab6ecb0d4f Mon Sep 17 00:00:00 2001 From: dongdong Date: Mon, 18 Nov 2024 17:58:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20operator=20=E4=BC=98=E5=8C=96=20secrets?= =?UTF-8?q?=20list=20=E5=86=85=E5=AD=98=20--story=3D120731727=20(#624)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/operator/operator/discover/base.go | 17 +++- .../shareddiscovery/shared_discovery.go | 36 +++++++-- pkg/operator/operator/kubelet.go | 24 +++--- pkg/operator/operator/kubelet_test.go | 66 ++++++++-------- .../operator/objectsref/bklogconfig.go | 27 +++---- .../operator/objectsref/controller.go | 78 +++++++++++++++++++ pkg/operator/operator/objectsref/ingress.go | 2 +- pkg/operator/operator/objectsref/relation.go | 63 ++++++--------- pkg/operator/operator/secret.go | 42 ++++------ pkg/operator/operator/server.go | 4 +- 10 files changed, 222 insertions(+), 137 deletions(-) diff --git a/pkg/operator/operator/discover/base.go b/pkg/operator/operator/discover/base.go index 25d128439..48f5d875c 100644 --- a/pkg/operator/operator/discover/base.go +++ b/pkg/operator/operator/discover/base.go @@ -380,7 +380,8 @@ func (d *BaseDiscover) loopHandleTargetGroup() { case <-ticker.C: counter++ - tgList, updatedAt := shareddiscovery.FetchTargetGroups(d.UK()) + // 避免 skip 情况下多申请不必要的内存 + updatedAt := shareddiscovery.FetchTargetGroupsUpdatedAt(d.UK()) logger.Debugf("%s updated at: %v", d.Name(), time.Unix(updatedAt, 0)) if time.Now().Unix()-updatedAt > duration*2 && counter%resync != 0 && d.fetched { logger.Debugf("%s found nothing changed, skip targetgourps handled", d.Name()) @@ -388,6 +389,8 @@ func (d *BaseDiscover) loopHandleTargetGroup() { } d.fetched = true + // 真正需要变更时才 fetch targetgroups + tgList := shareddiscovery.FetchTargetGroups(d.UK()) for _, tg := range tgList { if tg == nil { continue @@ -571,6 +574,12 @@ func (d *BaseDiscover) notify(source string, childConfigs []*ChildConfig) { d.childConfigMut.Lock() defer d.childConfigMut.Unlock() + // 如果新的 source/childconfigs 为空且之前的缓存也为空 那就无需对比处理了 + if len(childConfigs) == 0 && len(d.childConfigGroups[source]) == 0 { + logger.Debugf("%s skip handle notify", d.Name()) + return + } + if _, ok := d.childConfigGroups[source]; !ok { d.childConfigGroups[source] = make(map[uint64]*ChildConfig) } @@ -609,6 +618,12 @@ func (d *BaseDiscover) notify(source string, childConfigs []*ChildConfig) { logger.Infof("%s found targetgroup.source changed", source) Publish() } + + // 删除事件 即后续 source 可能不会再有任何事件了 + if len(d.childConfigGroups[source]) == 0 { + delete(d.childConfigGroups, source) + logger.Infof("delete source (%s), cause no childconfigs", source) + } } // populateLabels builds a label set from the given label set and scrape configuration. diff --git a/pkg/operator/operator/discover/shareddiscovery/shared_discovery.go b/pkg/operator/operator/discover/shareddiscovery/shared_discovery.go index fe874296d..fd08d309c 100644 --- a/pkg/operator/operator/discover/shareddiscovery/shared_discovery.go +++ b/pkg/operator/operator/discover/shareddiscovery/shared_discovery.go @@ -74,8 +74,8 @@ type tgWithTime struct { updatedAt int64 } -// FetchTargetGroups 获取缓存 targetgroups 以及最新更新时间 -func FetchTargetGroups(uk string) ([]*targetgroup.Group, int64) { +// FetchTargetGroups 获取缓存 targetgroups +func FetchTargetGroups(uk string) []*targetgroup.Group { sharedDiscoveryLock.Lock() defer sharedDiscoveryLock.Unlock() @@ -83,7 +83,19 @@ func FetchTargetGroups(uk string) ([]*targetgroup.Group, int64) { return d.fetch() } - return nil, 0 + return nil +} + +// FetchTargetGroupsUpdatedAt 获取缓存最新更新时间 +func FetchTargetGroupsUpdatedAt(uk string) int64 { + sharedDiscoveryLock.Lock() + defer sharedDiscoveryLock.Unlock() + + if d, ok := sharedDiscoveryMap[uk]; ok { + return d.fetchUpdatedAt() + } + + return 0 } // Register 注册 shared discovery @@ -161,6 +173,7 @@ func (sd *SharedDiscovery) start() { var total int for source, tg := range sd.store { // 超过 10 分钟未更新且已经没有目标的对象需要删除 + // 确保 basediscovery 已经处理了删除事件 if now-tg.updatedAt > 600 { if tg.tg == nil || len(tg.tg.Targets) == 0 { delete(sd.store, source) @@ -178,17 +191,26 @@ func (sd *SharedDiscovery) start() { } } -func (sd *SharedDiscovery) fetch() ([]*targetgroup.Group, int64) { +func (sd *SharedDiscovery) fetch() []*targetgroup.Group { + sd.mut.RLock() + defer sd.mut.RUnlock() + + ret := make([]*targetgroup.Group, 0, len(sd.store)) + for _, v := range sd.store { + ret = append(ret, v.tg) + } + return ret +} + +func (sd *SharedDiscovery) fetchUpdatedAt() int64 { sd.mut.RLock() defer sd.mut.RUnlock() var maxTs int64 = math.MinInt64 - ret := make([]*targetgroup.Group, 0, 2) for _, v := range sd.store { if maxTs < v.updatedAt { maxTs = v.updatedAt } - ret = append(ret, v.tg) } - return ret, maxTs + return maxTs } diff --git a/pkg/operator/operator/kubelet.go b/pkg/operator/operator/kubelet.go index 51d2ac99a..ea1bb01e2 100644 --- a/pkg/operator/operator/kubelet.go +++ b/pkg/operator/operator/kubelet.go @@ -134,11 +134,8 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error { }, } - nodes, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - if err != nil { - return errors.Wrap(err, "listing nodes failed") - } - logger.Debugf("Nodes retrieved from the Kubernetes API, num_nodes:%d", len(nodes.Items)) + nodes := c.objectsController.NodeObjs() + logger.Debugf("nodes retrieved from the Kubernetes API, num_nodes: %d", len(nodes)) addresses, errs := getNodeAddresses(nodes) for _, err := range errs { @@ -171,7 +168,7 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error { }, } - err = k8sutils.CreateOrUpdateService(ctx, c.client.CoreV1().Services(cfg.Namespace), svc) + err := k8sutils.CreateOrUpdateService(ctx, c.client.CoreV1().Services(cfg.Namespace), svc) if err != nil { return errors.Wrap(err, "synchronizing kubelet service object failed") } @@ -186,23 +183,24 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error { return nil } -func getNodeAddresses(nodes *corev1.NodeList) ([]corev1.EndpointAddress, []error) { +func getNodeAddresses(nodes []*corev1.Node) ([]corev1.EndpointAddress, []error) { addresses := make([]corev1.EndpointAddress, 0) errs := make([]error, 0) - for _, n := range nodes.Items { - address, _, err := k8sutils.GetNodeAddress(n) + for i := 0; i < len(nodes); i++ { + node := nodes[i] + address, _, err := k8sutils.GetNodeAddress(*node) if err != nil { - errs = append(errs, errors.Wrapf(err, "failed to determine hostname for node (%s)", n.Name)) + errs = append(errs, errors.Wrapf(err, "failed to determine hostname for node (%s)", node.Name)) continue } addresses = append(addresses, corev1.EndpointAddress{ IP: address, TargetRef: &corev1.ObjectReference{ Kind: "Node", - Name: n.Name, - UID: n.UID, - APIVersion: n.APIVersion, + Name: node.Name, + UID: node.UID, + APIVersion: node.APIVersion, }, }) } diff --git a/pkg/operator/operator/kubelet_test.go b/pkg/operator/operator/kubelet_test.go index 5e52d63ee..42c31527d 100644 --- a/pkg/operator/operator/kubelet_test.go +++ b/pkg/operator/operator/kubelet_test.go @@ -22,24 +22,22 @@ import ( func TestGetNodeAddresses(t *testing.T) { cases := []struct { name string - nodes *corev1.NodeList + nodes []*corev1.Node expectedAddresses []string expectedErrors int }{ { name: "simple", - nodes: &corev1.NodeList{ - Items: []corev1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node-0", - }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Address: "127.0.0.1", - Type: corev1.NodeInternalIP, - }, + nodes: []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Address: "127.0.0.1", + Type: corev1.NodeInternalIP, }, }, }, @@ -50,31 +48,29 @@ func TestGetNodeAddresses(t *testing.T) { }, { name: "missing ip on one node", - nodes: &corev1.NodeList{ - Items: []corev1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node-0", - }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Address: "node-0", - Type: corev1.NodeHostName, - }, + nodes: []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-0", + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Address: "node-0", + Type: corev1.NodeHostName, }, }, }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "node-1", - }, - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{ - { - Address: "127.0.0.1", - Type: corev1.NodeInternalIP, - }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: corev1.NodeStatus{ + Addresses: []corev1.NodeAddress{ + { + Address: "127.0.0.1", + Type: corev1.NodeInternalIP, }, }, }, diff --git a/pkg/operator/operator/objectsref/bklogconfig.go b/pkg/operator/operator/objectsref/bklogconfig.go index 78287272d..d39ea3067 100644 --- a/pkg/operator/operator/objectsref/bklogconfig.go +++ b/pkg/operator/operator/objectsref/bklogconfig.go @@ -70,13 +70,6 @@ func (e *bkLogConfigEntity) isVCluster(matcherLabel map[string]string) bool { return ok } -func (e *bkLogConfigEntity) getValues(matcherLabel map[string]string, key string, defaultValue string) string { - if v, ok := matcherLabel[key]; ok { - return v - } - return defaultValue -} - func (e *bkLogConfigEntity) getWorkloadName(name string, kind string) string { if stringx.LowerEq(kind, kindReplicaSet) { index := strings.LastIndex(name, "-") @@ -85,7 +78,11 @@ func (e *bkLogConfigEntity) getWorkloadName(name string, kind string) string { return name } -func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations map[string]string, ownerRefs []OwnerRef) bool { +func (e *bkLogConfigEntity) MatchWorkload(labels, annotations map[string]string, ownerRefs []OwnerRef) bool { + return e.matchWorkloadType(labels, annotations, ownerRefs) && e.matchWorkloadType(labels, annotations, ownerRefs) +} + +func (e *bkLogConfigEntity) matchWorkloadName(labels, annotations map[string]string, ownerRefs []OwnerRef) bool { if e.Obj.Spec.WorkloadName == "" { return true } @@ -96,9 +93,9 @@ func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations } var names []string - if e.isVCluster(matcherLabels) { - name := e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadNameAnnotationKey, "") - kind := e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadTypeAnnotationKey, "") + if e.isVCluster(labels) { + name := annotations[configs.G().VCluster.WorkloadNameAnnotationKey] + kind := annotations[configs.G().VCluster.WorkloadTypeAnnotationKey] names = append(names, e.getWorkloadName(name, kind)) } else { for _, ownerReference := range ownerRefs { @@ -117,14 +114,14 @@ func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations return false } -func (e *bkLogConfigEntity) MatchWorkloadType(matcherLabels, matcherAnnotations map[string]string, ownerRefs []OwnerRef) bool { +func (e *bkLogConfigEntity) matchWorkloadType(labels, annotations map[string]string, ownerRefs []OwnerRef) bool { if e.Obj.Spec.WorkloadType == "" { return true } var kinds []string - if e.isVCluster(matcherLabels) { - kinds = append(kinds, e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadTypeAnnotationKey, "")) + if e.isVCluster(labels) { + kinds = append(kinds, annotations[configs.G().VCluster.WorkloadTypeAnnotationKey]) } else { for _, ownerReference := range ownerRefs { kinds = append(kinds, ownerReference.Kind) @@ -258,7 +255,7 @@ func (m *BkLogConfigMap) Set(e *bkLogConfigEntity) { m.entitiesMap[e.UUID()] = e } -func (m *BkLogConfigMap) RangeBkLogConfig(visitFunc func(e *bkLogConfigEntity)) { +func (m *BkLogConfigMap) Range(visitFunc func(e *bkLogConfigEntity)) { m.lock.RLock() defer m.lock.RUnlock() diff --git a/pkg/operator/operator/objectsref/controller.go b/pkg/operator/operator/objectsref/controller.go index e3445f1ec..cfa81a4aa 100644 --- a/pkg/operator/operator/objectsref/controller.go +++ b/pkg/operator/operator/objectsref/controller.go @@ -30,6 +30,7 @@ import ( bkversioned "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/client/clientset/versioned" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/define" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/k8sutils" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/configs" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) @@ -164,6 +165,7 @@ const ( kindService = "Service" kindEndpoints = "Endpoints" kindIngress = "Ingress" + kindSecret = "Secret" kindDeployment = "Deployment" kindReplicaSet = "ReplicaSet" kindStatefulSet = "StatefulSet" @@ -181,6 +183,7 @@ const ( resourceServices = "services" resourceEndpoints = "endpoints" resourceIngresses = "ingresses" + resourceSecrets = "secrets" // builtin workload resourceReplicaSets = "replicasets" @@ -230,6 +233,7 @@ type ObjectsController struct { cronJobObjs *Objects gameStatefulSetObjs *Objects gameDeploymentsObjs *Objects + secretObjs *Objects nodeObjs *NodeMap serviceObjs *ServiceMap endpointsObjs *EndpointsMap @@ -307,6 +311,13 @@ func NewController(ctx context.Context, client kubernetes.Interface, mClient met return nil, err } + // configs.G().MonitorNamespace SharedInformer + monitorSharedInformer := metadatainformer.NewFilteredSharedInformerFactory(mClient, define.ReSyncPeriod, configs.G().MonitorNamespace, nil) + controller.secretObjs, err = newSecretObjects(ctx, monitorSharedInformer) + if err != nil { + return nil, err + } + // Extend/Workload tkexObjs, err := newTkexObjects(ctx, metaSharedInformer, resources) if err != nil { @@ -341,6 +352,14 @@ func (oc *ObjectsController) NodeNameExists(s string) (string, bool) { return oc.nodeObjs.NameExists(s) } +func (oc *ObjectsController) SecretObjs() []Object { + return oc.secretObjs.GetAll() +} + +func (oc *ObjectsController) NodeObjs() []*corev1.Node { + return oc.nodeObjs.GetAll() +} + func (oc *ObjectsController) Stop() { oc.cancel() } @@ -469,6 +488,65 @@ func newPodObjects(ctx context.Context, sharedInformer informers.SharedInformerF return objs, nil } +func newSecretObjects(ctx context.Context, sharedInformer metadatainformer.SharedInformerFactory) (*Objects, error) { + genericInformer := sharedInformer.ForResource(corev1.SchemeGroupVersion.WithResource(resourceSecrets)) + objs := NewObjects(kindSecret) + + informer := genericInformer.Informer() + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + secret, ok := obj.(*metav1.PartialObjectMetadata) + if !ok { + logger.Errorf("excepted Secret/PartialObjectMetadata type, got %T", obj) + return + } + objs.Set(Object{ + ID: ObjectID{ + Name: secret.Name, + Namespace: secret.Namespace, + }, + Labels: secret.Labels, + }) + }, + UpdateFunc: func(_, newObj interface{}) { + secret, ok := newObj.(*metav1.PartialObjectMetadata) + if !ok { + logger.Errorf("excepted Secret/PartialObjectMetadata type, got %T", newObj) + return + } + objs.Set(Object{ + ID: ObjectID{ + Name: secret.Name, + Namespace: secret.Namespace, + }, + Labels: secret.Labels, + }) + }, + DeleteFunc: func(obj interface{}) { + secret, ok := obj.(*metav1.PartialObjectMetadata) + if !ok { + logger.Errorf("excepted Secret/PartialObjectMetadata type, got %T", obj) + return + } + objs.Del(ObjectID{ + Name: secret.Name, + Namespace: secret.Namespace, + }) + }, + }) + if err != nil { + return nil, err + } + + go informer.Run(ctx.Done()) + + synced := k8sutils.WaitForNamedCacheSync(ctx, kindSecret, informer) + if !synced { + return nil, errors.New("failed to sync Secret caches") + } + return objs, nil +} + func newReplicaSetObjects(ctx context.Context, sharedInformer metadatainformer.SharedInformerFactory) (*Objects, error) { genericInformer := sharedInformer.ForResource(appsv1.SchemeGroupVersion.WithResource(resourceReplicaSets)) objs := NewObjects(kindReplicaSet) diff --git a/pkg/operator/operator/objectsref/ingress.go b/pkg/operator/operator/objectsref/ingress.go index 8fd11f18f..fca8d45bf 100644 --- a/pkg/operator/operator/objectsref/ingress.go +++ b/pkg/operator/operator/objectsref/ingress.go @@ -70,7 +70,7 @@ func (m *IngressMap) Del(namespace, name string) { } } -func (m *IngressMap) rangeIngress(namespace string, visitFunc func(name string, ingress ingressEntity)) { +func (m *IngressMap) Range(namespace string, visitFunc func(name string, ingress ingressEntity)) { m.mut.Lock() defer m.mut.Unlock() diff --git a/pkg/operator/operator/objectsref/relation.go b/pkg/operator/operator/objectsref/relation.go index 9b473150e..2d9d1bd4a 100644 --- a/pkg/operator/operator/objectsref/relation.go +++ b/pkg/operator/operator/objectsref/relation.go @@ -92,7 +92,7 @@ func (oc *ObjectsController) GetServiceRelations(w io.Writer) { }) } - oc.ingressObjs.rangeIngress(namespace, func(name string, ingress ingressEntity) { + oc.ingressObjs.Range(namespace, func(name string, ingress ingressEntity) { for _, s := range ingress.services { if s != svc.name { continue @@ -176,24 +176,22 @@ func (oc *ObjectsController) GetReplicasetRelations(w io.Writer) { } func (oc *ObjectsController) GetDataSourceRelations(w io.Writer) { - oc.bkLogConfigObjs.RangeBkLogConfig(func(e *bkLogConfigEntity) { - labels := []relationLabel{ - {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, - {Name: "bklogconfig_namespace", Value: e.Obj.Namespace}, - {Name: "bklogconfig_name", Value: e.Obj.Name}, - } + pods := oc.podObjs.GetAll() + nodes := oc.nodeObjs.GetAll() + + oc.bkLogConfigObjs.Range(func(e *bkLogConfigEntity) { relationBytes(w, relationMetric{ - Name: relationBkLogConfigWithDataSource, - Labels: labels, + Name: relationBkLogConfigWithDataSource, + Labels: []relationLabel{ + {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, + {Name: "bklogconfig_namespace", Value: e.Obj.Namespace}, + {Name: "bklogconfig_name", Value: e.Obj.Name}, + }, }) switch e.Obj.Spec.LogConfigType { case logConfigTypeStd, logConfigTypeContainer: - if oc.podObjs == nil { - return - } - - for _, pod := range oc.podObjs.GetAll() { + for _, pod := range pods { if !e.MatchNamespace(pod.ID.Namespace) { continue } @@ -207,11 +205,7 @@ func (oc *ObjectsController) GetDataSourceRelations(w io.Writer) { continue } - if !e.MatchWorkloadType(pod.Labels, pod.Annotations, pod.OwnerRefs) { - continue - } - - if !e.MatchWorkloadName(pod.Labels, pod.Annotations, pod.OwnerRefs) { + if !e.MatchWorkload(pod.Labels, pod.Annotations, pod.OwnerRefs) { continue } } @@ -223,30 +217,24 @@ func (oc *ObjectsController) GetDataSourceRelations(w io.Writer) { continue } } - podRelationStatus = true } // 只需要上报到 pod 层级就够了 if podRelationStatus { - labels := []relationLabel{ - {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, - {Name: "namespace", Value: pod.ID.Namespace}, - {Name: "pod", Value: pod.ID.Name}, - } relationBytes(w, relationMetric{ - Name: relationDataSourceWithPod, - Labels: labels, + Name: relationDataSourceWithPod, + Labels: []relationLabel{ + {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, + {Name: "namespace", Value: pod.ID.Namespace}, + {Name: "pod", Value: pod.ID.Name}, + }, }) } } case logConfigTypeNode: - if oc.nodeObjs == nil { - return - } - - for _, node := range oc.nodeObjs.GetAll() { + for _, node := range nodes { if !e.MatchLabel(node.GetLabels()) { continue } @@ -255,13 +243,12 @@ func (oc *ObjectsController) GetDataSourceRelations(w io.Writer) { continue } - labels := []relationLabel{ - {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, - {Name: "node", Value: node.Name}, - } relationBytes(w, relationMetric{ - Name: relationDataSourceWithNode, - Labels: labels, + Name: relationDataSourceWithNode, + Labels: []relationLabel{ + {Name: "bk_data_id", Value: fmt.Sprintf("%d", e.Obj.Spec.DataId)}, + {Name: "node", Value: node.Name}, + }, }) } } diff --git a/pkg/operator/operator/secret.go b/pkg/operator/operator/secret.go index 60d00400e..f40dee0a5 100644 --- a/pkg/operator/operator/secret.go +++ b/pkg/operator/operator/secret.go @@ -268,19 +268,11 @@ func (c *Operator) cleanupDaemonSetChildSecret(childConfigs []*discover.ChildCon } } - onSuccess := true - secretClient := c.client.CoreV1().Secrets(configs.G().MonitorNamespace) - - secrets, err := secretClient.List(c.ctx, metav1.ListOptions{}) - if err != nil { - logger.Errorf("failed to list secret, error: %v", err) - onSuccess = false - } - // 记录已经存在的 secrets existSecrets := make(map[string]struct{}) - for _, secret := range secrets.Items { - existSecrets[secret.Name] = struct{}{} + secrets := c.objectsController.SecretObjs() + for _, secret := range secrets { + existSecrets[secret.ID.Name] = struct{}{} } logger.Infof("list %d secrets from %s namespace", len(existSecrets), configs.G().MonitorNamespace) @@ -289,7 +281,7 @@ func (c *Operator) cleanupDaemonSetChildSecret(childConfigs []*discover.ChildCon // 如果 node 已经没有采集配置了 则需要删除 for _, node := range noConfigNodes { secretName := tasks.GetDaemonSetTaskSecretName(node) - if _, ok := existSecrets[secretName]; !ok && onSuccess { + if _, ok := existSecrets[secretName]; !ok { continue } dropSecrets[secretName] = struct{}{} @@ -313,6 +305,7 @@ func (c *Operator) cleanupDaemonSetChildSecret(childConfigs []*discover.ChildCon } } + secretClient := c.client.CoreV1().Secrets(configs.G().MonitorNamespace) for secretName := range dropSecrets { Slowdown() logger.Infof("remove secret %s", secretName) @@ -543,24 +536,23 @@ func (c *Operator) collectChildConfigs() ([]*discover.ChildConfig, []*discover.C func (c *Operator) cleanupInvalidSecrets() { secretClient := c.client.CoreV1().Secrets(configs.G().MonitorNamespace) - secrets, err := secretClient.List(c.ctx, metav1.ListOptions{ - LabelSelector: "createdBy=bkmonitor-operator", - }) - if err != nil { - logger.Errorf("failed to list secrets, err: %v", err) - return - } + secrets := c.objectsController.SecretObjs() // 清理不合法的 secrets - for _, secret := range secrets.Items { + for _, secret := range secrets { + // 只处理 operator 创建的 secrets + if secret.Labels["createdBy"] != "bkmonitor-operator" { + continue + } + if _, ok := secret.Labels[tasks.LabelTaskType]; !ok { - if err := secretClient.Delete(c.ctx, secret.Name, metav1.DeleteOptions{}); err != nil { - c.mm.IncHandledSecretFailedCounter(secret.Name, action.Delete, err) - logger.Errorf("failed to delete secret %s, err: %v", secret.Name, err) + if err := secretClient.Delete(c.ctx, secret.ID.Name, metav1.DeleteOptions{}); err != nil { + c.mm.IncHandledSecretFailedCounter(secret.ID.Name, action.Delete, err) + logger.Errorf("failed to delete secret %s, err: %v", secret.ID.Name, err) continue } - c.mm.IncHandledSecretSuccessCounter(secret.Name, action.Delete) - logger.Infof("remove invalid secret %s", secret.Name) + c.mm.IncHandledSecretSuccessCounter(secret.ID.Name, action.Delete) + logger.Infof("remove invalid secret %s", secret.ID.Name) } } } diff --git a/pkg/operator/operator/server.go b/pkg/operator/operator/server.go index 4a89829ce..dfe2b7ec9 100644 --- a/pkg/operator/operator/server.go +++ b/pkg/operator/operator/server.go @@ -219,7 +219,7 @@ const ( ` formatMonitorEndpoint = ` [√] check endpoint -- Description: operator 监听 monitor endpoints 数量,共 %d 个 +- Description: operator 匹配 %d 个 monitor,共有 %d 个 endpoints %s ` formatScrapeStats = ` @@ -343,7 +343,7 @@ func (c *Operator) CheckRoute(w http.ResponseWriter, r *http.Request) { for _, v := range endpoints { total += v } - writef(formatMonitorEndpoint, total, string(b)) + writef(formatMonitorEndpoint, len(endpoints), total, string(b)) // 检查采集指标数据量 onScrape := r.URL.Query().Get("scrape")