diff --git a/pkg/operator/common/feature/feature.go b/pkg/operator/common/feature/feature.go index 4f4f140e9..d175fc8fd 100644 --- a/pkg/operator/common/feature/feature.go +++ b/pkg/operator/common/feature/feature.go @@ -9,7 +9,9 @@ package feature -import "strings" +import ( + "strings" +) const ( // labels features @@ -26,6 +28,8 @@ const ( keyRelabelIndex = "relabelIndex" keyMonitorMatchSelector = "monitorMatchSelector" keyMonitorDropSelector = "monitorDropSelector" + keyLabelJoinMatcher = "labelJoinMatcher" + keySliMonitor = "sliMonitor" ) func isMapKeyExists(m map[string]string, key string) bool { @@ -50,6 +54,47 @@ func parseSelector(s string) map[string]string { return selector } +type LabelJoinMatcherSpec struct { + Kind string + Annotations []string + Labels []string +} + +// parseLabelJoinMatcher 解析 labeljoin 规则 +// Kind://[label:custom_label|annotation:custom_annotation,...] +func parseLabelJoinMatcher(s string) *LabelJoinMatcherSpec { + const ( + annotationPrefix = "annotation:" + labelPrefix = "label:" + ) + + switch { + case strings.HasPrefix(s, "Pod://"): // TODO(mando): 目前仅支持 Pod + s = s[len("Pod://"):] + default: + return nil + } + + var annotations []string + var labels []string + parts := strings.Split(s, ",") + for _, part := range parts { + k := strings.TrimSpace(part) + switch { + case strings.HasPrefix(k, annotationPrefix): + annotations = append(annotations, strings.TrimSpace(k[len(annotationPrefix):])) + case strings.HasPrefix(k, labelPrefix): + labels = append(labels, strings.TrimSpace(k[len(labelPrefix):])) + } + } + + return &LabelJoinMatcherSpec{ + Kind: "Pod", + Annotations: annotations, + Labels: labels, + } +} + // IfCommonResource 检查 DataID 是否为 common 类型 func IfCommonResource(m map[string]string) bool { return isMapKeyExists(m, keyCommonResource) @@ -98,3 +143,11 @@ func MonitorMatchSelector(m map[string]string) map[string]string { func MonitorDropSelector(m map[string]string) map[string]string { return parseSelector(m[keyMonitorDropSelector]) } + +func LabelJoinMatcher(m map[string]string) *LabelJoinMatcherSpec { + return parseLabelJoinMatcher(m[keyLabelJoinMatcher]) +} + +func SliMonitor(m map[string]string) string { + return m[keySliMonitor] +} diff --git a/pkg/operator/common/feature/feature_test.go b/pkg/operator/common/feature/feature_test.go index 78156b251..6002b7b9d 100644 --- a/pkg/operator/common/feature/feature_test.go +++ b/pkg/operator/common/feature/feature_test.go @@ -46,3 +46,28 @@ func TestParseSelector(t *testing.T) { assert.Equal(t, c.output, parseSelector(c.input)) } } + +func TestParseLabelJoinMatcher(t *testing.T) { + cases := []struct { + input string + annotations []string + labels []string + }{ + { + input: "Pod://annotation:biz.service,annotation:biz.set,label:zone.key1,label:zone.key2", + annotations: []string{"biz.service", "biz.set"}, + labels: []string{"zone.key1", "zone.key2"}, + }, + { + input: "Pod:// annotation: biz.service, annotation: biz.set, label: zone.key1, label: zone.key2", + annotations: []string{"biz.service", "biz.set"}, + labels: []string{"zone.key1", "zone.key2"}, + }, + } + + for _, c := range cases { + matcher := parseLabelJoinMatcher(c.input) + assert.Equal(t, c.annotations, matcher.Annotations) + assert.Equal(t, c.labels, matcher.Labels) + } +} diff --git a/pkg/operator/common/utils/utils.go b/pkg/operator/common/utils/utils.go new file mode 100644 index 000000000..647be8a26 --- /dev/null +++ b/pkg/operator/common/utils/utils.go @@ -0,0 +1,24 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package utils + +import "strings" + +func SplitTrim(s, sep string) []string { + if s == "" { + return nil + } + + var ret []string + for _, part := range strings.Split(s, sep) { + ret = append(ret, strings.TrimSpace(part)) + } + return ret +} diff --git a/pkg/operator/operator/discover/discover.go b/pkg/operator/operator/discover/discover.go index 5d183eac4..453fd7980 100644 --- a/pkg/operator/operator/discover/discover.go +++ b/pkg/operator/operator/discover/discover.go @@ -37,6 +37,7 @@ import ( bkv1beta1 "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/apis/crd/v1beta1" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/define" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/feature" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/k8sutils" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/labelspool" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/notifier" @@ -150,6 +151,7 @@ type BaseParams struct { MetricRelabelConfigs []yaml.MapSlice MatchSelector map[string]string DropSelector map[string]string + LabelJoinMatcher *feature.LabelJoinMatcherSpec } type BaseDiscover struct { @@ -413,6 +415,7 @@ func (d *BaseDiscover) makeMetricTarget(lbls, origLabels labels.Labels, namespac metricTarget.RelabelRule = d.RelabelRule metricTarget.RelabelIndex = d.RelabelIndex metricTarget.NormalizeMetricName = d.NormalizeMetricName + metricTarget.LabelJoinMatcher = d.LabelJoinMatcher return metricTarget, nil } @@ -654,7 +657,6 @@ func (d *BaseDiscover) handleTargetGroup(targetGroup *targetgroup.Group) { continue } if childConfig == nil { - d.mm.IncCreatedChildConfigSkippedCounter() d.cache.Set(namespace, tlset, targetGroup.Labels) continue } diff --git a/pkg/operator/operator/discover/metrics.go b/pkg/operator/operator/discover/metrics.go index ab9d03156..09bf404ae 100644 --- a/pkg/operator/operator/discover/metrics.go +++ b/pkg/operator/operator/discover/metrics.go @@ -53,15 +53,6 @@ var ( []string{"name"}, ) - discoverCreatedChildConfigSkippedTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: define.MonitorNamespace, - Name: "discover_created_config_skipped_total", - Help: "discover created child config skipped total", - }, - []string{"name"}, - ) - discoverCreatedChildConfigCachedTotal = promauto.NewCounterVec( prometheus.CounterOpts{ Namespace: define.MonitorNamespace, @@ -88,15 +79,6 @@ var ( }, []string{"name"}, ) - - discoverSkippedTgSourceTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: define.MonitorNamespace, - Name: "discover_skipped_tg_source_total", - Help: "discover skipped tg source total", - }, - []string{"name"}, - ) ) func newMetricMonitor(name string) *metricMonitor { @@ -123,10 +105,6 @@ func (m *metricMonitor) IncCreatedChildConfigFailedCounter() { discoverCreatedChildConfigFailedTotal.WithLabelValues(m.name).Inc() } -func (m *metricMonitor) IncCreatedChildConfigSkippedCounter() { - discoverCreatedChildConfigSkippedTotal.WithLabelValues(m.name).Inc() -} - func (m *metricMonitor) IncCreatedChildConfigCachedCounter() { discoverCreatedChildConfigCachedTotal.WithLabelValues(m.name).Inc() } @@ -138,7 +116,3 @@ func (m *metricMonitor) IncHandledTgCounter() { func (m *metricMonitor) IncDeletedTgSourceCounter() { discoverDeletedTgSourceTotal.WithLabelValues(m.name).Inc() } - -func (m *metricMonitor) IncSkippedTgSourceCounter() { - discoverSkippedTgSourceTotal.WithLabelValues(m.name).Inc() -} diff --git a/pkg/operator/operator/discover/shared_discovery.go b/pkg/operator/operator/discover/shared_discovery.go index 800a3b65f..c0b4e885a 100644 --- a/pkg/operator/operator/discover/shared_discovery.go +++ b/pkg/operator/operator/discover/shared_discovery.go @@ -95,7 +95,6 @@ func (sd *sharedDiscovery) start() { if !ok { // 第一次记录且没有 targets 则跳过 if tg == nil || len(tg.Targets) == 0 { - sd.mm.IncSkippedTgSourceCounter() logger.Infof("sharedDiscovery %s skip tg source '%s'", sd.id, tg.Source) continue } diff --git a/pkg/operator/operator/objectsref/controller.go b/pkg/operator/operator/objectsref/controller.go index a7ec261c2..41ce5a648 100644 --- a/pkg/operator/operator/objectsref/controller.go +++ b/pkg/operator/operator/objectsref/controller.go @@ -45,9 +45,10 @@ type Object struct { OwnerRefs []OwnerRef // Pod 属性 - NodeName string - Labels map[string]string - PodIP string + NodeName string + PodIP string + Labels map[string]string + Annotations map[string]string // Containers Containers []string @@ -359,11 +360,12 @@ func newPodObjects(ctx context.Context, sharedInformer informers.SharedInformerF Name: pod.Name, Namespace: pod.Namespace, }, - OwnerRefs: toRefs(pod.OwnerReferences), - NodeName: pod.Spec.NodeName, - Labels: pod.Labels, - PodIP: pod.Status.PodIP, - Containers: toContainers(pod.Spec.Containers), + OwnerRefs: toRefs(pod.OwnerReferences), + NodeName: pod.Spec.NodeName, + Labels: pod.Labels, + Annotations: pod.Annotations, + PodIP: pod.Status.PodIP, + Containers: toContainers(pod.Spec.Containers), }) }, UpdateFunc: func(_, newObj interface{}) { @@ -377,11 +379,12 @@ func newPodObjects(ctx context.Context, sharedInformer informers.SharedInformerF Name: pod.Name, Namespace: pod.Namespace, }, - OwnerRefs: toRefs(pod.OwnerReferences), - NodeName: pod.Spec.NodeName, - Labels: pod.Labels, - PodIP: pod.Status.PodIP, - Containers: toContainers(pod.Spec.Containers), + OwnerRefs: toRefs(pod.OwnerReferences), + NodeName: pod.Spec.NodeName, + Labels: pod.Labels, + Annotations: pod.Annotations, + PodIP: pod.Status.PodIP, + Containers: toContainers(pod.Spec.Containers), }) }, DeleteFunc: func(obj interface{}) { diff --git a/pkg/operator/operator/objectsref/workload.go b/pkg/operator/operator/objectsref/workload.go index d9a78e66f..a7226998a 100644 --- a/pkg/operator/operator/objectsref/workload.go +++ b/pkg/operator/operator/objectsref/workload.go @@ -9,6 +9,11 @@ package objectsref +import ( + "strings" + "unicode" +) + // RelabelConfig relabel 配置 遵循 prometheus 规则 type RelabelConfig struct { SourceLabels []string `json:"sourceLabels"` @@ -20,63 +25,144 @@ type RelabelConfig struct { NodeName string `json:"nodeName"` } +// WorkloadRef 是 Pod 与 Workload 的关联关系 +type WorkloadRef struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + Ref OwnerRef `json:"ownerRef"` + NodeName string `json:"nodeName"` +} + +type WorkloadRefs []WorkloadRef + +func (wr WorkloadRefs) AsRelabelConfigs() []RelabelConfig { + configs := make([]RelabelConfig, 0, len(wr)*2) + + for _, ref := range wr { + configs = append(configs, RelabelConfig{ + SourceLabels: []string{"namespace", "pod_name"}, + Separator: ";", + Regex: ref.Namespace + ";" + ref.Name, + TargetLabel: "workload_kind", + Replacement: ref.Ref.Kind, + Action: "replace", + NodeName: ref.NodeName, + }) + configs = append(configs, RelabelConfig{ + SourceLabels: []string{"namespace", "pod_name"}, + Separator: ";", + Regex: ref.Namespace + ";" + ref.Name, + TargetLabel: "workload_name", + Replacement: ref.Ref.Name, + Action: "replace", + NodeName: ref.NodeName, + }) + } + return configs +} + +// PodInfoRef 是 Pod 额外补充维度 +type PodInfoRef struct { + Name string + Namespace string + Dimensions map[string]string +} + +type PodInfoRefs []PodInfoRef + +func (pr PodInfoRefs) AsRelabelConfigs() []RelabelConfig { + configs := make([]RelabelConfig, 0) + + for _, ref := range pr { + for name, value := range ref.Dimensions { + configs = append(configs, RelabelConfig{ + SourceLabels: []string{"namespace", "pod_name"}, + Separator: ";", + Regex: ref.Namespace + ";" + ref.Name, + TargetLabel: normalizeName(name), + Replacement: value, + Action: "replace", + }) + } + } + return configs +} + // WorkloadsRelabelConfigs 返回所有 workload relabel 配置 func (oc *ObjectsController) WorkloadsRelabelConfigs() []RelabelConfig { pods := oc.podObjs.GetAll() - return getWorkloadRelabelConfigs(oc.getWorkloadRefs(pods)) -} - -// WorkloadsRelabelConfigsByNodeName 根据节点名称获取 workload relabel 配置 -func (oc *ObjectsController) WorkloadsRelabelConfigsByNodeName(nodeName string) []RelabelConfig { - pods := oc.podObjs.GetByNodeName(nodeName) - return getWorkloadRelabelConfigs(oc.getWorkloadRefs(pods)) + return oc.getWorkloadRelabelConfigs(pods, "") } // WorkloadsRelabelConfigsByPodName 根据节点名称和 pod 名称获取 workload relabel 配置 -func (oc *ObjectsController) WorkloadsRelabelConfigsByPodName(nodeName, podName string) []RelabelConfig { +func (oc *ObjectsController) WorkloadsRelabelConfigsByPodName(nodeName, podName string, annotations, labels []string) []RelabelConfig { pods := oc.podObjs.GetByNodeName(nodeName) - return getWorkloadRelabelConfigs(oc.getWorkloadRefs(pods, podName)) + + var configs []RelabelConfig + configs = append(configs, oc.getWorkloadRelabelConfigs(pods, podName)...) + configs = append(configs, oc.getPodRelabelConfigs(pods, podName, annotations, labels)...) + return configs } -type WorkloadRef struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - Ref OwnerRef `json:"ownerRef"` - NodeName string `json:"nodeName"` +// PodsRelabelConfigs 获取 Pods Relabels 规则 +func (oc *ObjectsController) PodsRelabelConfigs(annotations, labels []string) []RelabelConfig { + pods := oc.podObjs.GetAll() + // TODO(mando): 暂不支持指定 podname + return oc.getPodRelabelConfigs(pods, "", annotations, labels) } -func (oc *ObjectsController) getWorkloadRefs(pods []Object, podNames ...string) []WorkloadRef { - refs := make([]WorkloadRef, 0, len(pods)) +func (oc *ObjectsController) getWorkloadRelabelConfigs(pods []Object, podName string) []RelabelConfig { + workloadRefs := make(WorkloadRefs, 0, len(pods)) + for _, pod := range pods { ownerRef := Lookup(pod.ID, oc.podObjs, oc.objsMap()) if ownerRef == nil { continue } - // 没有 podname 则命中所有 - if len(podNames) == 0 { - refs = append(refs, WorkloadRef{ + // 1) 没有 podname 则命中所有 + // 2) 存在则需要精准匹配 + if podName == "" || podName == pod.ID.Name { + workloadRefs = append(workloadRefs, WorkloadRef{ Name: pod.ID.Name, Namespace: pod.ID.Namespace, Ref: *ownerRef, NodeName: pod.NodeName, }) - continue } + } + return workloadRefs.AsRelabelConfigs() +} - // 否则只处理匹配到的 podname - for _, podName := range podNames { - if podName == pod.ID.Name { - refs = append(refs, WorkloadRef{ - Name: pod.ID.Name, - Namespace: pod.ID.Namespace, - Ref: *ownerRef, - NodeName: pod.NodeName, +func (oc *ObjectsController) getPodRelabelConfigs(pods []Object, podName string, annotations, labels []string) []RelabelConfig { + podInfoRefs := make(PodInfoRefs, 0) + + for _, pod := range pods { + // 1) 没有 podname 则命中所有 + // 2) 存在则需要精准匹配 + if podName == "" || podName == pod.ID.Name { + extra := make(map[string]string) + for _, name := range annotations { + if v, ok := pod.Annotations[name]; ok { + extra["annotation_"+name] = v + } + } + for _, name := range labels { + if v, ok := pod.Labels[name]; ok { + extra["label_"+name] = v + } + } + // 按需补充维度 + if len(extra) > 0 { + podInfoRefs = append(podInfoRefs, PodInfoRef{ + Name: pod.ID.Name, + Namespace: pod.ID.Namespace, + Dimensions: extra, }) } } } - return refs + return podInfoRefs.AsRelabelConfigs() } func (oc *ObjectsController) objsMap() map[string]*Objects { @@ -98,28 +184,6 @@ func (oc *ObjectsController) objsMap() map[string]*Objects { return om } -func getWorkloadRelabelConfigs(refs []WorkloadRef) []RelabelConfig { - configs := make([]RelabelConfig, 0, len(refs)*2) - - for _, ref := range refs { - configs = append(configs, RelabelConfig{ - SourceLabels: []string{"namespace", "pod_name"}, - Separator: ";", - Regex: ref.Namespace + ";" + ref.Name, - TargetLabel: "workload_kind", - Replacement: ref.Ref.Kind, - Action: "replace", - NodeName: ref.NodeName, - }) - configs = append(configs, RelabelConfig{ - SourceLabels: []string{"namespace", "pod_name"}, - Separator: ";", - Regex: ref.Namespace + ";" + ref.Name, - TargetLabel: "workload_name", - Replacement: ref.Ref.Name, - Action: "replace", - NodeName: ref.NodeName, - }) - } - return configs +func normalizeName(s string) string { + return strings.Join(strings.FieldsFunc(s, func(r rune) bool { return !unicode.IsLetter(r) && !unicode.IsDigit(r) && r != '_' }), "_") } diff --git a/pkg/operator/operator/operator.go b/pkg/operator/operator/operator.go index 5295771ef..72d07056f 100644 --- a/pkg/operator/operator/operator.go +++ b/pkg/operator/operator/operator.go @@ -566,6 +566,7 @@ func (c *Operator) createServiceMonitorDiscovers(serviceMonitor *promv1.ServiceM AntiAffinity: feature.IfAntiAffinity(serviceMonitor.Annotations), MatchSelector: feature.MonitorMatchSelector(serviceMonitor.Annotations), DropSelector: feature.MonitorDropSelector(serviceMonitor.Annotations), + LabelJoinMatcher: feature.LabelJoinMatcher(serviceMonitor.Annotations), Name: monitorMeta.ID(), DataID: dataID, KubeConfig: ConfKubeConfig, @@ -756,6 +757,7 @@ func (c *Operator) createPodMonitorDiscovers(podMonitor *promv1.PodMonitor) []di if tlsConfig != nil { safeTlsConfig = tlsConfig.SafeTLSConfig } + podDiscover := discover.NewPodDiscover(c.ctx, monitorMeta, c.objectsController.NodeNameExists, &discover.PodParams{ BaseParams: &discover.BaseParams{ Client: c.client, @@ -765,6 +767,7 @@ func (c *Operator) createPodMonitorDiscovers(podMonitor *promv1.PodMonitor) []di AntiAffinity: feature.IfAntiAffinity(podMonitor.Annotations), MatchSelector: feature.MonitorMatchSelector(podMonitor.Annotations), DropSelector: feature.MonitorDropSelector(podMonitor.Annotations), + LabelJoinMatcher: feature.LabelJoinMatcher(podMonitor.Annotations), Name: monitorMeta.ID(), DataID: dataID, KubeConfig: ConfKubeConfig, diff --git a/pkg/operator/operator/promsli/promsli.go b/pkg/operator/operator/promsli/promsli.go index 7a08a407e..faa12af7a 100644 --- a/pkg/operator/operator/promsli/promsli.go +++ b/pkg/operator/operator/promsli/promsli.go @@ -32,13 +32,13 @@ import ( "k8s.io/client-go/kubernetes" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/compressor" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/feature" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/k8sutils" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/notifier" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) const ( - sliAnnotationKey = "sliMonitor" sliAnnotationBuiltin = "ServiceMonitor/sli" rulesMetric = "alert_rules" @@ -106,8 +106,8 @@ func (c *Controller) handle() { } func verifyServiceMonitor(pr *promv1.PrometheusRule) (string, bool) { - v, ok := pr.Annotations[sliAnnotationKey] - if !ok { + v := feature.SliMonitor(pr.Annotations) + if v == "" { logger.Infof("skip none sli-annotations PrometheusRule: %s/%s", pr.Namespace, pr.Name) return "", false } @@ -331,7 +331,7 @@ func (c *Controller) generateServiceMonitorScrapeConfigs() []yaml.MapSlice { var cfg []yaml.MapSlice for _, sm := range c.serviceMonitors { // 内置白名单 - if sm.Annotations[sliAnnotationKey] == sliAnnotationBuiltin { + if feature.SliMonitor(sm.Annotations) == sliAnnotationBuiltin { for i, ep := range sm.Spec.Endpoints { cfg = append(cfg, generateServiceMonitorScrapeConfig(sm, ep, i, nil)) } diff --git a/pkg/operator/operator/promsli/promsli_test.go b/pkg/operator/operator/promsli/promsli_test.go index 132242162..1d602a380 100644 --- a/pkg/operator/operator/promsli/promsli_test.go +++ b/pkg/operator/operator/promsli/promsli_test.go @@ -16,12 +16,10 @@ import ( ) func TestParsePromQLMetrics(t *testing.T) { - type Case struct { + cases := []struct { q string metrics []string - } - - cases := []Case{ + }{ { q: ` ( @@ -68,12 +66,10 @@ func TestParsePromQLMetrics(t *testing.T) { } func TestToPromFormat(t *testing.T) { - type Case struct { + cases := []struct { Input map[string]string Output string - } - - cases := []Case{ + }{ { Input: map[string]string{"foo": "bar", "key1": "value1"}, Output: `alert_rules{foo="bar",key1="value1"} 1`, diff --git a/pkg/operator/operator/secret.go b/pkg/operator/operator/secret.go index a5df32491..69ab27c56 100644 --- a/pkg/operator/operator/secret.go +++ b/pkg/operator/operator/secret.go @@ -39,7 +39,7 @@ var ( ) func Slowdown() { - time.Sleep(time.Millisecond * 20) + time.Sleep(time.Millisecond * 25) // 避免高频操作 } func EqualMap(a, b map[string]struct{}) bool { @@ -301,7 +301,7 @@ func (c *Operator) cleanupDaemonSetChildSecret(childConfigs []*discover.ChildCon } for secretName := range dropSecrets { - Slowdown() // 避免高频操作 + Slowdown() logger.Infof("remove secret %s", secretName) if err := secretClient.Delete(c.ctx, secretName, metav1.DeleteOptions{}); err != nil { if !errors.IsNotFound(err) { diff --git a/pkg/operator/operator/server.go b/pkg/operator/operator/server.go index 039736272..03b404edc 100644 --- a/pkg/operator/operator/server.go +++ b/pkg/operator/operator/server.go @@ -23,6 +23,7 @@ import ( "github.com/valyala/bytebufferpool" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/utils" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/operator/discover" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/operator/objectsref" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" @@ -459,13 +460,39 @@ func (c *Operator) WorkloadNodeRoute(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) nodeName := vars["node"] - podName := r.URL.Query().Get("podName") - if len(podName) > 0 { - writeResponse(w, c.objectsController.WorkloadsRelabelConfigsByPodName(nodeName, podName)) - return + query := r.URL.Query() + podName := query.Get("podName") + annotations := utils.SplitTrim(query.Get("annotations"), ",") + labels := utils.SplitTrim(query.Get("labels"), ",") + + var configs []objectsref.RelabelConfig + configs = append(configs, c.objectsController.WorkloadsRelabelConfigsByPodName(nodeName, podName, annotations, labels)...) + + // kind/rules 是为了让 workload 同时能够支持其他 labeljoin 等其他规则 + kind := query.Get("kind") + rules := query.Get("rules") + if rules == "labeljoin" { + switch kind { + case "Pod": + configs = append(configs, c.objectsController.PodsRelabelConfigs(annotations, labels)...) + } } - writeResponse(w, c.objectsController.WorkloadsRelabelConfigsByNodeName(nodeName)) + writeResponse(w, configs) +} + +func (c *Operator) LabelJoinRoute(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + kind := query.Get("kind") + annotations := utils.SplitTrim(query.Get("annotations"), ",") + labels := utils.SplitTrim(query.Get("labels"), ",") + + switch kind { + case "Pod": + writeResponse(w, c.objectsController.PodsRelabelConfigs(annotations, labels)) + default: + writeResponse(w, nil) + } } func (c *Operator) RelationMetricsRoute(w http.ResponseWriter, _ *http.Request) { @@ -546,6 +573,7 @@ func (c *Operator) ListenAndServe() error { router.HandleFunc("/cluster_info", c.ClusterInfoRoute) router.HandleFunc("/workload", c.WorkloadRoute) router.HandleFunc("/workload/node/{node}", c.WorkloadNodeRoute) + router.HandleFunc("/labeljoin", c.LabelJoinRoute) router.HandleFunc("/relation/metrics", c.RelationMetricsRoute) router.HandleFunc("/rule/metrics", c.RuleMetricsRoute) diff --git a/pkg/operator/operator/target/metric.go b/pkg/operator/operator/target/metric.go index d9de5f07e..5512c8e3e 100644 --- a/pkg/operator/operator/target/metric.go +++ b/pkg/operator/operator/target/metric.go @@ -10,6 +10,7 @@ package target import ( + "bytes" "fmt" "hash/fnv" "math" @@ -24,12 +25,15 @@ import ( "gopkg.in/yaml.v2" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/define" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/feature" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/utils" ) const ( - relabelV1RuleWorkload = "v1/workload" - relabelV2RuleWorkload = "v2/workload" - relabelV1RuleNode = "v1/node" + relabelV1RuleWorkload = "v1/workload" + relabelV2RuleWorkload = "v2/workload" + relabelV1RuleNode = "v1/node" + relabelV1RuleLabelJoin = "v1/labeljoin" ) func IsBuiltinLabels(k string) bool { @@ -83,6 +87,7 @@ type MetricTarget struct { Mask string TaskType string DisableCustomTimestamp bool + LabelJoinMatcher *feature.LabelJoinMatcherSpec hash uint64 // 缓存 hash 避免重复计算 } @@ -97,35 +102,93 @@ func (t *MetricTarget) FileName() string { // RemoteRelabelConfig 返回采集器 workload 工作负载信息 func (t *MetricTarget) RemoteRelabelConfig() *yaml.MapItem { - switch t.RelabelRule { - case relabelV1RuleWorkload: - // index >= 0 表示 annotations 中指定了 index label - if idx := toMonitorIndex(t.RelabelIndex); idx >= 0 && idx != t.Meta.Index { - return nil - } - return &yaml.MapItem{ - Key: "metric_relabel_remote", - Value: fmt.Sprintf("http://%s:%d/workload/node/%s", ConfServiceName, ConfServicePort, t.NodeName), - } - case relabelV2RuleWorkload: - if idx := toMonitorIndex(t.RelabelIndex); idx >= 0 && idx != t.Meta.Index { - return nil - } - var podName string - for _, label := range t.Labels { - if label.Name == "pod_name" { - podName = label.Value - break + var annotationsRule, labelsRule []string + var kind string + if t.LabelJoinMatcher != nil { + annotationsRule = t.LabelJoinMatcher.Annotations + labelsRule = t.LabelJoinMatcher.Labels + kind = t.LabelJoinMatcher.Kind + } + + var path string + host := fmt.Sprintf("http://%s:%d", ConfServiceName, ConfServicePort) + params := map[string]string{} + + rules := utils.SplitTrim(t.RelabelRule, ",") + for _, rule := range rules { + switch rule { + case relabelV1RuleWorkload: + // index >= 0 表示 annotations 中指定了 index label + if idx := toMonitorIndex(t.RelabelIndex); idx >= 0 && idx != t.Meta.Index { + continue } - } - if len(podName) > 0 { - return &yaml.MapItem{ - Key: "metric_relabel_remote", - Value: fmt.Sprintf("http://%s:%d/workload/node/%s?podName=%s", ConfServiceName, ConfServicePort, t.NodeName, podName), + if len(path) == 0 { + path = fmt.Sprintf("/workload/node/%s", t.NodeName) + } + + case relabelV2RuleWorkload: + if idx := toMonitorIndex(t.RelabelIndex); idx >= 0 && idx != t.Meta.Index { + continue + } + var podName string + for _, label := range t.Labels { + if label.Name == "pod_name" { + podName = label.Value + break + } + } + // v2 需要保证有 podname 才下发 + if len(podName) > 0 { + if len(path) == 0 { + path = fmt.Sprintf("/workload/node/%s", t.NodeName) + } + params["podName"] = podName + } + + case relabelV1RuleLabelJoin: + if idx := toMonitorIndex(t.RelabelIndex); idx >= 0 && idx != t.Meta.Index { + continue } + if len(path) == 0 { + path = "/labeljoin" + } else { + params["rules"] = "labeljoin" // 兼容混合 workload+labeljoin 混合场景 + } + params["kind"] = kind + params["annotations"] = strings.Join(annotationsRule, ",") + params["labels"] = strings.Join(labelsRule, ",") + } + } + + if len(path) == 0 { + return nil + } + + u := host + path + p := makeParams(params) + if len(p) > 0 { + u = u + "?" + p + } + return &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: u, + } +} + +func makeParams(params map[string]string) string { + buf := &bytes.Buffer{} + keys := make([]string, 0, len(params)) + for k := range params { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := params[k] + if v != "" { + buf.WriteString(fmt.Sprintf("%s=%s&", k, v)) } } - return nil + return strings.TrimRight(buf.String(), "&") } func fnvHash(b []byte) uint64 { diff --git a/pkg/operator/operator/target/metric_test.go b/pkg/operator/operator/target/metric_test.go index 3d0b21a6d..6b10da1ae 100644 --- a/pkg/operator/operator/target/metric_test.go +++ b/pkg/operator/operator/target/metric_test.go @@ -14,8 +14,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/define" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/operator/common/feature" ) func TestMetricsTarget(t *testing.T) { @@ -88,3 +90,150 @@ tasks: ` assert.Equal(t, expected, string(b)) } + +func TestRemoteRelabelConfig(t *testing.T) { + cases := []struct { + Name string + Input *MetricTarget + Output *yaml.MapItem + }{ + { + Name: "NoRules", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "", + }, + Output: nil, + }, + { + Name: "v1/workload", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v1/workload", + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/workload/node/worker1", + }, + }, + { + Name: "v2/workload", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v2/workload", + Labels: labels.Labels{{Name: "pod_name", Value: "pod1"}}, + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/workload/node/worker1?podName=pod1", + }, + }, + { + Name: "v2/workload,labeljoin", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v2/workload,v1/labeljoin", + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/labeljoin", + }, + }, + { + Name: "v1/workload,v1/labeljoin", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v1/workload,v1/labeljoin", + LabelJoinMatcher: &feature.LabelJoinMatcherSpec{ + Kind: "Pod", + Annotations: []string{"annotations1"}, + Labels: []string{"label1"}, + }, + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/workload/node/worker1?annotations=annotations1&kind=Pod&labels=label1&rules=labeljoin", + }, + }, + { + Name: "v2/workload,v1/labeljoin", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v2/workload,v1/labeljoin", + LabelJoinMatcher: &feature.LabelJoinMatcherSpec{ + Kind: "Pod", + Annotations: []string{"annotations1"}, + Labels: []string{"label1"}, + }, + Labels: labels.Labels{ + {Name: "pod_name", Value: "pod1"}, + }, + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/workload/node/worker1?annotations=annotations1&kind=Pod&labels=label1&podName=pod1&rules=labeljoin", + }, + }, + { + Name: "v1/workload,v1/labeljoin", + Input: &MetricTarget{ + NodeName: "worker1", + RelabelIndex: "0", + RelabelRule: "v1/workload,v1/labeljoin", + LabelJoinMatcher: &feature.LabelJoinMatcherSpec{ + Kind: "Pod", + Annotations: []string{"annotations1"}, + Labels: []string{"label1"}, + }, + }, + Output: &yaml.MapItem{ + Key: "metric_relabel_remote", + Value: "http://:0/workload/node/worker1?annotations=annotations1&kind=Pod&labels=label1&rules=labeljoin", + }, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + assert.Equal(t, c.Output, c.Input.RemoteRelabelConfig()) + }) + } +} + +func TestMakeParams(t *testing.T) { + cases := []struct { + Input map[string]string + Output string + }{ + { + Input: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + Output: "key1=value1&key2=value2", + }, + { + Input: map[string]string{ + "key1": "value1", + }, + Output: "key1=value1", + }, + { + Input: map[string]string{ + "key1": "", + "key2": "value2", + }, + Output: "key2=value2", + }, + } + + for _, c := range cases { + assert.Equal(t, c.Output, makeParams(c.Input)) + } +}