diff --git a/pkg/bkmonitorbeat/VERSION b/pkg/bkmonitorbeat/VERSION index f8113db05..1b4940759 100644 --- a/pkg/bkmonitorbeat/VERSION +++ b/pkg/bkmonitorbeat/VERSION @@ -1 +1 @@ -v3.45.x \ No newline at end of file +v3.46.x \ No newline at end of file diff --git a/pkg/bkmonitorbeat/configs/kubeevent.go b/pkg/bkmonitorbeat/configs/kubeevent.go index 0e3af3842..8b3bb69a2 100644 --- a/pkg/bkmonitorbeat/configs/kubeevent.go +++ b/pkg/bkmonitorbeat/configs/kubeevent.go @@ -22,9 +22,10 @@ const ( type KubeEventConfig struct { BaseTaskParam `config:"_,inline"` - EventSpan time.Duration `config:"event_span"` - Interval time.Duration `config:"interval"` - TailFiles []string `config:"tail_files"` + EventSpan time.Duration `config:"event_span"` + Interval time.Duration `config:"interval"` + TailFiles []string `config:"tail_files"` + UpMetricsDataID int32 `config:"upmetrics_dataid"` // 自监控 dataid } func (c *KubeEventConfig) GetTaskConfigList() []define.TaskConfig { diff --git a/pkg/bkmonitorbeat/define/code.go b/pkg/bkmonitorbeat/define/code.go index 02718ffd9..cdf2db88f 100644 --- a/pkg/bkmonitorbeat/define/code.go +++ b/pkg/bkmonitorbeat/define/code.go @@ -9,76 +9,25 @@ package define -import ( - "bytes" - "fmt" - "io" -) - const ( NameGatherUp = "bkm_gather_up" LabelUpCode = "bkm_up_code" LabelUpCodeName = "bkm_up_code_name" + // MetricBeat 任务指标 + NameMetricBeatUp = "bkm_metricbeat_endpoint_up" NameMetricBeatScrapeDuration = "bkm_metricbeat_scrape_duration_seconds" NameMetricBeatScrapeSize = "bkm_metricbeat_scrape_size_bytes" NameMetricBeatScrapeLine = "bkm_metricbeat_scrape_line" NameMetricBeatHandleDuration = "bkm_metricbeat_handle_duration_seconds" -) - -var innerMetrics = map[string]struct{}{ - NameMetricBeatUp: {}, - NameMetricBeatScrapeDuration: {}, - NameMetricBeatScrapeSize: {}, - NameMetricBeatScrapeLine: {}, - NameMetricBeatHandleDuration: {}, -} -func IsInnerMetric(s string) bool { - _, ok := innerMetrics[s] - return ok -} + // KubeEvent 任务指标 -func NewMetricBeatCodeReader(code NamedCode, kvs []LogKV) io.ReadCloser { - r := bytes.NewReader([]byte(MetricBeatUp(code, kvs))) - return io.NopCloser(r) -} - -const ( - prefixMetricbeat = "[metricbeat] " + NameKubeEventReceiveEvents = "bkm_kubeevent_receive_events" + NameKubeEventReportEvents = "bkm_kubeevent_report_events" ) -func MetricBeatUp(code NamedCode, kvs []LogKV) string { - s := fmt.Sprintf(`%s{code="%d",code_name="%s"} 1`, NameMetricBeatUp, code.Code(), code.Name()) - RecordLog(prefixMetricbeat+s, kvs) - return s -} - -func MetricBeatScrapeDuration(seconds float64, kvs []LogKV) string { - s := fmt.Sprintf(`%s{} %f`, NameMetricBeatScrapeDuration, seconds) - RecordLog(prefixMetricbeat+s, kvs) - return s -} - -func MetricBeatScrapeSize(size int, kvs []LogKV) string { - s := fmt.Sprintf(`%s{} %d`, NameMetricBeatScrapeSize, size) - RecordLog(prefixMetricbeat+s, kvs) - return s -} - -func MetricBeatScrapeLine(n int, kvs []LogKV) string { - s := fmt.Sprintf(`%s{} %d`, NameMetricBeatScrapeLine, n) - RecordLog(prefixMetricbeat+s, kvs) - return s -} - -func MetricBeatHandleDuration(seconds float64, kvs []LogKV) string { - s := fmt.Sprintf(`%s{} %f`, NameMetricBeatHandleDuration, seconds) - RecordLog(prefixMetricbeat+s, kvs) - return s -} - type NamedCode struct { code int name string diff --git a/pkg/bkmonitorbeat/define/recordlogs.go b/pkg/bkmonitorbeat/define/recordlogs.go index b994155e3..2d229ad2f 100644 --- a/pkg/bkmonitorbeat/define/recordlogs.go +++ b/pkg/bkmonitorbeat/define/recordlogs.go @@ -41,10 +41,14 @@ func newRecordLogs(c LogConfig) *recordLogs { } func RecordLog(template string, kvs []LogKV) { - template = fmt.Sprintf("%s; fields=%+v", template, kvs) + template = fmt.Sprintf("%s; kvs=%+v", template, kvs) rl.l.Info(template) } +func RecordLogf(template string, args ...interface{}) { + rl.l.Infof(template, args...) +} + type LogKV struct { K string V interface{} diff --git a/pkg/bkmonitorbeat/tasks/kubeevent/event.go b/pkg/bkmonitorbeat/tasks/kubeevent/event.go index 33e69382a..40d0abc2f 100644 --- a/pkg/bkmonitorbeat/tasks/kubeevent/event.go +++ b/pkg/bkmonitorbeat/tasks/kubeevent/event.go @@ -103,13 +103,23 @@ func (e *k8sEvent) GetFirstTime() int64 { } func (e *k8sEvent) GetLastTime() int64 { + var t0, t1 int64 if e.LastTs != "" { - return parseTimeLayout(e.LastTs) + t0 = parseTimeLayout(e.LastTs) } if e.Series != nil { - return parseTimeLayout(e.Series.LastObservedTime) + t1 = parseTimeLayout(e.Series.LastObservedTime) } - return time.Now().Unix() + + if t0 == 0 && t1 == 0 { + return time.Now().Unix() // 兜底 + } + + // 取最新时间点 + if t0 > t1 { + return t0 + } + return t1 } func (e *k8sEvent) GetCount() int { @@ -127,20 +137,33 @@ func (e *k8sEvent) IsZeroTime() bool { } type wrapEvent struct { - k8sEvent - dataID int32 - externalLabels []map[string]string + dataID int32 + data []common.MapStr } -func newWrapEvent(dataID int32, externalLabels []map[string]string, e k8sEvent) *wrapEvent { +func newWrapEvent(dataID int32, data []common.MapStr) *wrapEvent { return &wrapEvent{ - k8sEvent: e, - dataID: dataID, - externalLabels: externalLabels, + dataID: dataID, + data: data, } } func (e *wrapEvent) AsMapStr() common.MapStr { + return common.MapStr{ + "dataid": e.dataID, + "data": e.data, + } +} + +func (e *wrapEvent) IgnoreCMDBLevel() bool { + return true +} + +func (e *wrapEvent) GetType() string { + return define.ModuleKubeevent +} + +func toEventMapStr(e k8sEvent, externalLabels []map[string]string) common.MapStr { dimensions := common.MapStr{ "kind": e.InvolvedObject.Kind, "namespace": e.InvolvedObject.Namespace, @@ -150,13 +173,13 @@ func (e *wrapEvent) AsMapStr() common.MapStr { "host": e.Source.Host, "type": e.Type, } - for i := 0; i < len(e.externalLabels); i++ { - for k, v := range e.externalLabels[i] { + for i := 0; i < len(externalLabels); i++ { + for k, v := range externalLabels[i] { dimensions[k] = v } } - data := common.MapStr{ + return common.MapStr{ "event_name": e.Reason, "target": e.GetTarget(), "event": common.MapStr{ @@ -166,17 +189,4 @@ func (e *wrapEvent) AsMapStr() common.MapStr { "dimension": dimensions, "timestamp": e.GetLastTime() * 1000, // ms } - - return common.MapStr{ - "dataid": e.dataID, - "data": []common.MapStr{data}, - } -} - -func (e *wrapEvent) IgnoreCMDBLevel() bool { - return true -} - -func (e *wrapEvent) GetType() string { - return define.ModuleKubeevent } diff --git a/pkg/bkmonitorbeat/tasks/kubeevent/gather.go b/pkg/bkmonitorbeat/tasks/kubeevent/gather.go index ef96888b4..91e68b062 100644 --- a/pkg/bkmonitorbeat/tasks/kubeevent/gather.go +++ b/pkg/bkmonitorbeat/tasks/kubeevent/gather.go @@ -13,8 +13,10 @@ import ( "context" "encoding/json" "sync" + "sync/atomic" "time" + "github.com/elastic/beats/libbeat/common" "github.com/hpcloud/tail" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/configs" @@ -24,15 +26,19 @@ import ( ) type recorder struct { - ctx context.Context - set map[string]*k8sEvent - mut sync.Mutex - interval time.Duration - eventSpan time.Duration - started int64 - dataID int32 - externalLabels []map[string]string - out chan define.Event + ctx context.Context + set map[string]*k8sEvent + mut sync.Mutex + interval time.Duration + eventSpan time.Duration + started int64 + dataID int32 + upMetricsDataID int32 + externalLabels []map[string]string + out chan common.MapStr + + received atomic.Int64 + sent atomic.Int64 } func newRecorder(ctx context.Context, conf *configs.KubeEventConfig) *recorder { @@ -45,14 +51,15 @@ func newRecorder(ctx context.Context, conf *configs.KubeEventConfig) *recorder { eventSpan = conf.EventSpan } r := &recorder{ - ctx: ctx, - interval: interval, - eventSpan: eventSpan, - dataID: conf.DataID, - externalLabels: conf.GetLabels(), - set: map[string]*k8sEvent{}, - started: time.Now().Unix(), - out: make(chan define.Event, 1), + ctx: ctx, + interval: interval, + eventSpan: eventSpan, + dataID: conf.DataID, + upMetricsDataID: conf.UpMetricsDataID, + externalLabels: conf.GetLabels(), + set: map[string]*k8sEvent{}, + started: time.Now().Unix(), + out: make(chan common.MapStr, 1), } go r.loopSent() @@ -104,7 +111,7 @@ func (r *recorder) loopSent() { continue } cloned.Count = cnt - r.out <- newWrapEvent(r.dataID, r.externalLabels, *cloned) + r.out <- toEventMapStr(*cloned, r.externalLabels) } r.mut.Unlock() @@ -119,6 +126,8 @@ func (r *recorder) Recv(event k8sEvent) { r.mut.Lock() defer r.mut.Unlock() + r.received.Add(1) + // 异常时间处理 if event.IsZeroTime() { return @@ -176,11 +185,38 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) { go g.watchEvents(f) } + const batch = 100 + events := make([]common.MapStr, 0, batch) + + ticker := time.NewTicker(time.Second * 3) + defer ticker.Stop() + + reportTicker := time.NewTicker(time.Minute) // 自监控上报周期 + defer reportTicker.Stop() + + sentOut := func() { + e <- newWrapEvent(g.store.dataID, events) + g.store.sent.Add(int64(len(events))) + events = make([]common.MapStr, 0, batch) + } + for { select { case out := <-g.store.out: - logger.Infof("send k8s event: %+v", out.AsMapStr()) - e <- out + logger.Infof("send k8s event: %+v", out) + events = append(events, out) + if len(events) >= batch { + sentOut() + } + + case <-ticker.C: + if len(events) > 0 { + sentOut() + } + + case <-reportTicker.C: + e <- CodeMetrics(g.store.upMetricsDataID, g.TaskConfig, g.store.received.Load(), g.store.sent.Load()) + case <-g.ctx.Done(): return } @@ -215,7 +251,6 @@ func (g *Gather) watchEvents(filename string) { } } -// New : func New(globalConfig define.Config, taskConfig define.TaskConfig) define.Task { gather := &Gather{} gather.GlobalConfig = globalConfig diff --git a/pkg/bkmonitorbeat/tasks/kubeevent/upmetrics.go b/pkg/bkmonitorbeat/tasks/kubeevent/upmetrics.go new file mode 100644 index 000000000..c5deac543 --- /dev/null +++ b/pkg/bkmonitorbeat/tasks/kubeevent/upmetrics.go @@ -0,0 +1,55 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package kubeevent + +import ( + "fmt" + "strconv" + "time" + + "github.com/elastic/beats/libbeat/common" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/define" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/tasks" +) + +func CodeMetrics(dataID int32, taskConfig define.TaskConfig, receive, report int64) *tasks.GatherUpEvent { + dims := common.MapStr{ + "task_id": strconv.Itoa(int(taskConfig.GetTaskID())), + "bk_collect_type": taskConfig.GetType(), + "bk_biz_id": strconv.Itoa(int(taskConfig.GetBizID())), + } + + // 从配置文件中获取维度字段 + for _, labels := range taskConfig.GetLabels() { + for k, v := range labels { + dims[k] = v + } + } + + ev := &tasks.GatherUpEvent{ + DataID: dataID, + Time: time.Now(), + Dimensions: dims, + Metrics: common.MapStr{ + define.NameKubeEventReceiveEvents: float64(receive), + define.NameKubeEventReportEvents: float64(report), + }, + } + + var kvs []define.LogKV + for k, v := range dims { + kvs = append(kvs, define.LogKV{K: k, V: v}) + } + for k, v := range ev.Metrics { + define.RecordLog(fmt.Sprintf("[%s] %s{} %f", taskConfig.GetType(), k, v), kvs) + } + return ev +} diff --git a/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/collector.go b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/collector.go index 7b6302e2e..bd8c452c9 100644 --- a/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/collector.go +++ b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/collector.go @@ -277,13 +277,13 @@ func (m *MetricSet) getEventsFromReader(metricsReader io.ReadCloser, cleanup fun // 补充 up 指标文本 markUp := func(failed bool, t0 time.Time) { // 需要减去自监控指标 - events := m.asEvents(define.MetricBeatScrapeLine(int(total.Load()-2), m.logkvs()), milliTs) + events := m.asEvents(CodeScrapeLine(int(total.Load()-2), m.logkvs()), milliTs) if failed { - events = append(events, m.asEvents(define.MetricBeatUp(define.CodeInvalidPromFormat, m.logkvs()), milliTs)...) + events = append(events, m.asEvents(CodeUp(define.CodeInvalidPromFormat, m.logkvs()), milliTs)...) } else { - events = append(events, m.asEvents(define.MetricBeatUp(define.CodeOK, m.logkvs()), milliTs)...) + events = append(events, m.asEvents(CodeUp(define.CodeOK, m.logkvs()), milliTs)...) } - events = append(events, m.asEvents(define.MetricBeatHandleDuration(time.Since(t0).Seconds(), m.logkvs()), milliTs)...) + events = append(events, m.asEvents(CodeHandleDuration(time.Since(t0).Seconds(), m.logkvs()), milliTs)...) for i := 0; i < len(events); i++ { eventChan <- events[i] } @@ -391,7 +391,7 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { rsp, err := m.httpClient.FetchResponse() if err != nil { - m.fillMetrics(summary, define.NewMetricBeatCodeReader(define.CodeConnRefused, m.logkvs()), false) + m.fillMetrics(summary, NewCodeReader(define.CodeConnRefused, m.logkvs()), false) err = errors.Wrap(err, "request failed") logger.Error(err) return summary, err @@ -404,14 +404,14 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { if m.useTempFile { metricsFile, err = utils.CreateTempFile(m.tempFilePattern) if err != nil { - m.fillMetrics(summary, define.NewMetricBeatCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) + m.fillMetrics(summary, NewCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) err = errors.Wrap(err, "create metricsFile failed") logger.Error(err) return summary, err } if _, err = io.Copy(metricsFile, rsp.Body); err != nil { - m.fillMetrics(summary, define.NewMetricBeatCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) + m.fillMetrics(summary, NewCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) _ = metricsFile.Close() _ = os.Remove(metricsFile.Name()) err = errors.Wrap(err, "write metricsFile failed") @@ -421,7 +421,7 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { info, err := metricsFile.Stat() if err != nil { - m.fillMetrics(summary, define.NewMetricBeatCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) + m.fillMetrics(summary, NewCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) _ = metricsFile.Close() _ = os.Remove(metricsFile.Name()) err = errors.Wrap(err, "stats metricsFile failed") @@ -429,11 +429,11 @@ func (m *MetricSet) Fetch() (common.MapStr, error) { return summary, err } - metricsFile.WriteString("\n" + define.MetricBeatScrapeSize(int(info.Size()), m.logkvs())) - metricsFile.WriteString("\n" + define.MetricBeatScrapeDuration(time.Since(startTime).Seconds(), m.logkvs())) + metricsFile.WriteString("\n" + CodeScrapeSize(int(info.Size()), m.logkvs())) + metricsFile.WriteString("\n" + CodeScrapeDuration(time.Since(startTime).Seconds(), m.logkvs())) if err = metricsFile.Close(); err != nil { - m.fillMetrics(summary, define.NewMetricBeatCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) + m.fillMetrics(summary, NewCodeReader(define.CodeWriteTempFileFailed, m.logkvs()), false) _ = os.Remove(metricsFile.Name()) err = errors.Wrap(err, "close metricsFile failed") logger.Error(err) diff --git a/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/relabel.go b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/relabel.go index c4bc83167..bddd4d710 100644 --- a/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/relabel.go +++ b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/relabel.go @@ -19,7 +19,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/define" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/tasks" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) @@ -92,7 +91,7 @@ func (m *MetricSet) metricRelabel(promEvent *tasks.PromEvent) bool { }) // up metric 不做 relabels 调整 - if define.IsInnerMetric(promEvent.Key) { + if IsInnerMetric(promEvent.Key) { return true } diff --git a/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/upmetrics.go b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/upmetrics.go new file mode 100644 index 000000000..e4cae45af --- /dev/null +++ b/pkg/bkmonitorbeat/tasks/metricbeat/module/prometheus/collector/upmetrics.go @@ -0,0 +1,70 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package collector + +import ( + "bytes" + "fmt" + "io" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/define" +) + +var innerMetrics = map[string]struct{}{ + define.NameMetricBeatUp: {}, + define.NameMetricBeatScrapeDuration: {}, + define.NameMetricBeatScrapeSize: {}, + define.NameMetricBeatScrapeLine: {}, + define.NameMetricBeatHandleDuration: {}, +} + +func IsInnerMetric(s string) bool { + _, ok := innerMetrics[s] + return ok +} + +func NewCodeReader(code define.NamedCode, kvs []define.LogKV) io.ReadCloser { + r := bytes.NewReader([]byte(CodeUp(code, kvs))) + return io.NopCloser(r) +} + +const ( + prefixMetricbeat = "[metricbeat] " +) + +func CodeUp(code define.NamedCode, kvs []define.LogKV) string { + s := fmt.Sprintf(`%s{code="%d",code_name="%s"} 1`, define.NameMetricBeatUp, code.Code(), code.Name()) + define.RecordLog(prefixMetricbeat+s, kvs) + return s +} + +func CodeScrapeDuration(seconds float64, kvs []define.LogKV) string { + s := fmt.Sprintf(`%s{} %f`, define.NameMetricBeatScrapeDuration, seconds) + define.RecordLog(prefixMetricbeat+s, kvs) + return s +} + +func CodeScrapeSize(size int, kvs []define.LogKV) string { + s := fmt.Sprintf(`%s{} %d`, define.NameMetricBeatScrapeSize, size) + define.RecordLog(prefixMetricbeat+s, kvs) + return s +} + +func CodeScrapeLine(n int, kvs []define.LogKV) string { + s := fmt.Sprintf(`%s{} %d`, define.NameMetricBeatScrapeLine, n) + define.RecordLog(prefixMetricbeat+s, kvs) + return s +} + +func CodeHandleDuration(seconds float64, kvs []define.LogKV) string { + s := fmt.Sprintf(`%s{} %f`, define.NameMetricBeatHandleDuration, seconds) + define.RecordLog(prefixMetricbeat+s, kvs) + return s +} diff --git a/pkg/bkmonitorbeat/tasks/script/formatdata_windows_test.go b/pkg/bkmonitorbeat/tasks/script/formatdata_windows_test.go index bff54eaf3..40838ed85 100644 --- a/pkg/bkmonitorbeat/tasks/script/formatdata_windows_test.go +++ b/pkg/bkmonitorbeat/tasks/script/formatdata_windows_test.go @@ -17,7 +17,6 @@ import ( "github.com/stretchr/testify/suite" ) -// FormatSuite : type FormatSuite struct { suite.Suite } @@ -39,7 +38,6 @@ func (s *FormatSuite) TestGatherRun() { } } -// TestFormat : func TestFormat(t *testing.T) { suite.Run(t, &FormatSuite{}) } diff --git a/pkg/bkmonitorbeat/tasks/script/gather.go b/pkg/bkmonitorbeat/tasks/script/gather.go index 048e082a6..3d7f35d3d 100644 --- a/pkg/bkmonitorbeat/tasks/script/gather.go +++ b/pkg/bkmonitorbeat/tasks/script/gather.go @@ -26,7 +26,7 @@ import ( ) // ExecCmdLine is so tests can mock out exec.Command usage. -var ExecCmdLine = utils.RunStringWithoutErr +var ExecCmdLine = utils.RunString type Gather struct { tasks.BaseTask @@ -53,16 +53,14 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) { timeHandler, _ = tasks.GetTimestampHandler("ms") // 兜底 } - logger.Infof("task command (%s) timeout config %v", taskConf.Command, taskConf.Timeout) cmdCtx, cmdCancel := context.WithTimeout(ctx, taskConf.Timeout) defer cmdCancel() - - fmtCommand := ShellWordPreProcess(taskConf.Command) + command := ShellWordPreProcess(taskConf.Command) t0 := time.Now() - out, err := ExecCmdLine(cmdCtx, fmtCommand, taskConf.UserEnvs) + out, err := ExecCmdLine(cmdCtx, command, taskConf.UserEnvs) if err != nil { - logger.Errorf("execCmd [%s] failed: %s, content: [%s]", fmtCommand, err, out) + logger.Errorf("execute command (%s) failed, out=(%s), err: %v", command, out, err) if errors.Is(err, utils.ErrScriptTimeout) { e <- tasks.NewGatherUpEvent(g, define.CodeScriptTimeout) } else { @@ -70,7 +68,10 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) { } return } - logger.Infof("task command(%s) take: %v", fmtCommand, time.Since(t0)) + + since := time.Since(t0) + logger.Debugf("task command(%s) take %v, out=(%s)", command, since, out) + define.RecordLogf("[script] execute command(%s) take %v, len(out)=%d", command, since, len(out)) aggRst, formatErr := FormatOutput([]byte(out), milliTimestamp, taskConf.TimeOffset, timeHandler) if errors.Is(formatErr, define.ErrNoScriptOutput) { @@ -134,9 +135,9 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) { if formatErr != nil { e <- tasks.NewGatherUpEvent(g, define.CodeInvalidPromFormat) if len(aggRst) == 0 { - logger.Errorf("format output failed totally: %s", formatErr) + logger.Errorf("command(%s) format output failed totally: %s", command, formatErr) } else { - logger.Errorf("format output failed partly: %s", formatErr) + logger.Errorf("command(%s) format output failed partly: %s", command, formatErr) } } else { total++ @@ -145,15 +146,15 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) { } // KeepOneDimension 只在测试模式需要这么处理 -// 指标名+维度字段名 作为唯一的key -// 不同维度值只保留一个,但是如果有多的维度名,那么需要保留,详细可以看test里的案例 +// 指标名+维度字段名 作为唯一的 key +// 不同维度值只保留一个,但是如果有多的维度名,那么需要保留,详细可以看 test 里的案例 func (g *Gather) KeepOneDimension(data map[int64]map[string]tasks.PromEvent) { for timestamp, subResult := range data { keySet := common.StringSet{} newSubResult := make(map[string]tasks.PromEvent) for dimensionKey, pe := range subResult { // 清理部分指标,当前面的维度已经包含了某个指标后,那么接下来的维度里,则删除这个指标 - lenOfdimensionNames := len(pe.Labels) + dimensionNamesLen := len(pe.Labels) dimFieldNames := make([]string, 0) for dimK := range pe.Labels { dimFieldNames = append(dimFieldNames, dimK) @@ -163,7 +164,7 @@ func (g *Gather) KeepOneDimension(data map[int64]map[string]tasks.PromEvent) { newAggValue := make(common.MapStr) for aggKey, aggValue := range pe.AggreValue { - dimFieldNames[lenOfdimensionNames] = aggKey + dimFieldNames[dimensionNamesLen] = aggKey hashKey := utils.GeneratorHashKey(dimFieldNames) if !keySet.Has(hashKey) { keySet.Add(hashKey) diff --git a/pkg/bkmonitorbeat/tasks/script/scriptevent.go b/pkg/bkmonitorbeat/tasks/script/scriptevent.go index 66abb3367..6cf51cd2e 100644 --- a/pkg/bkmonitorbeat/tasks/script/scriptevent.go +++ b/pkg/bkmonitorbeat/tasks/script/scriptevent.go @@ -18,7 +18,6 @@ import ( bkcommon "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/common" ) -// Event script event type Event struct { DataID int32 TaskID int32 diff --git a/pkg/operator/operator/secret.go b/pkg/operator/operator/secret.go index 69ab27c56..ee2775079 100644 --- a/pkg/operator/operator/secret.go +++ b/pkg/operator/operator/secret.go @@ -125,11 +125,20 @@ func (c *Operator) createOrUpdateEventTaskSecrets() { logger.Errorf("no event dataid found, err: %s", err) return } + + // kubeevent 任务的自监控使用 custommetrics dataid + upMetricsDataID, err := c.dw.MatchMetricDataID(define.MonitorMeta{}, false) + if err != nil { + logger.Errorf("no upmetrics dataid found, err: %s", err) + return + } + secretClient := c.client.CoreV1().Secrets(ConfMonitorNamespace) eventTarget := &target.EventTarget{ - DataID: dataID.Spec.DataID, - Labels: dataID.Spec.Labels, + DataID: dataID.Spec.DataID, + Labels: dataID.Spec.Labels, + UpMetricsDataID: upMetricsDataID.Spec.DataID, } b, err := eventTarget.YamlBytes() diff --git a/pkg/operator/operator/target/event.go b/pkg/operator/operator/target/event.go index 87f3f255a..27b168cde 100644 --- a/pkg/operator/operator/target/event.go +++ b/pkg/operator/operator/target/event.go @@ -15,8 +15,9 @@ import ( // EventTarget 事件采集配置 type EventTarget struct { - DataID int - Labels map[string]string + DataID int + UpMetricsDataID int + Labels map[string]string } func (t *EventTarget) FileName() string { @@ -31,6 +32,7 @@ func (t *EventTarget) YamlBytes() ([]byte, error) { cfg = append(cfg, yaml.MapItem{Key: "version", Value: "1"}) cfg = append(cfg, yaml.MapItem{Key: "task_id", Value: 1}) cfg = append(cfg, yaml.MapItem{Key: "dataid", Value: t.DataID}) + cfg = append(cfg, yaml.MapItem{Key: "upmetrics_dataid", Value: t.UpMetricsDataID}) cfg = append(cfg, yaml.MapItem{Key: "interval", Value: ConfEventScrapeInterval}) cfg = append(cfg, yaml.MapItem{Key: "event_span", Value: ConfEventMaxSpan}) cfg = append(cfg, yaml.MapItem{Key: "tail_files", Value: ConfEventScrapeFiles})