Skip to content

Commit

Permalink
feat: 采集器 kubeevent 自监控补齐 --story=119258708 (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Aug 20, 2024
1 parent 7c3c428 commit 81d9c02
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 141 deletions.
2 changes: 1 addition & 1 deletion pkg/bkmonitorbeat/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.45.x
v3.46.x
7 changes: 4 additions & 3 deletions pkg/bkmonitorbeat/configs/kubeevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (
type KubeEventConfig struct {
BaseTaskParam `config:"_,inline"`

EventSpan time.Duration `config:"event_span"`
Interval time.Duration `config:"interval"`
TailFiles []string `config:"tail_files"`
EventSpan time.Duration `config:"event_span"`
Interval time.Duration `config:"interval"`
TailFiles []string `config:"tail_files"`
UpMetricsDataID int32 `config:"upmetrics_dataid"` // 自监控 dataid
}

func (c *KubeEventConfig) GetTaskConfigList() []define.TaskConfig {
Expand Down
61 changes: 5 additions & 56 deletions pkg/bkmonitorbeat/define/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,76 +9,25 @@

package define

import (
"bytes"
"fmt"
"io"
)

const (
NameGatherUp = "bkm_gather_up"
LabelUpCode = "bkm_up_code"
LabelUpCodeName = "bkm_up_code_name"

// MetricBeat 任务指标

NameMetricBeatUp = "bkm_metricbeat_endpoint_up"
NameMetricBeatScrapeDuration = "bkm_metricbeat_scrape_duration_seconds"
NameMetricBeatScrapeSize = "bkm_metricbeat_scrape_size_bytes"
NameMetricBeatScrapeLine = "bkm_metricbeat_scrape_line"
NameMetricBeatHandleDuration = "bkm_metricbeat_handle_duration_seconds"
)

var innerMetrics = map[string]struct{}{
NameMetricBeatUp: {},
NameMetricBeatScrapeDuration: {},
NameMetricBeatScrapeSize: {},
NameMetricBeatScrapeLine: {},
NameMetricBeatHandleDuration: {},
}

func IsInnerMetric(s string) bool {
_, ok := innerMetrics[s]
return ok
}
// KubeEvent 任务指标

func NewMetricBeatCodeReader(code NamedCode, kvs []LogKV) io.ReadCloser {
r := bytes.NewReader([]byte(MetricBeatUp(code, kvs)))
return io.NopCloser(r)
}

const (
prefixMetricbeat = "[metricbeat] "
NameKubeEventReceiveEvents = "bkm_kubeevent_receive_events"
NameKubeEventReportEvents = "bkm_kubeevent_report_events"
)

func MetricBeatUp(code NamedCode, kvs []LogKV) string {
s := fmt.Sprintf(`%s{code="%d",code_name="%s"} 1`, NameMetricBeatUp, code.Code(), code.Name())
RecordLog(prefixMetricbeat+s, kvs)
return s
}

func MetricBeatScrapeDuration(seconds float64, kvs []LogKV) string {
s := fmt.Sprintf(`%s{} %f`, NameMetricBeatScrapeDuration, seconds)
RecordLog(prefixMetricbeat+s, kvs)
return s
}

func MetricBeatScrapeSize(size int, kvs []LogKV) string {
s := fmt.Sprintf(`%s{} %d`, NameMetricBeatScrapeSize, size)
RecordLog(prefixMetricbeat+s, kvs)
return s
}

func MetricBeatScrapeLine(n int, kvs []LogKV) string {
s := fmt.Sprintf(`%s{} %d`, NameMetricBeatScrapeLine, n)
RecordLog(prefixMetricbeat+s, kvs)
return s
}

func MetricBeatHandleDuration(seconds float64, kvs []LogKV) string {
s := fmt.Sprintf(`%s{} %f`, NameMetricBeatHandleDuration, seconds)
RecordLog(prefixMetricbeat+s, kvs)
return s
}

type NamedCode struct {
code int
name string
Expand Down
6 changes: 5 additions & 1 deletion pkg/bkmonitorbeat/define/recordlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ func newRecordLogs(c LogConfig) *recordLogs {
}

func RecordLog(template string, kvs []LogKV) {
template = fmt.Sprintf("%s; fields=%+v", template, kvs)
template = fmt.Sprintf("%s; kvs=%+v", template, kvs)
rl.l.Info(template)
}

func RecordLogf(template string, args ...interface{}) {
rl.l.Infof(template, args...)
}

type LogKV struct {
K string
V interface{}
Expand Down
62 changes: 36 additions & 26 deletions pkg/bkmonitorbeat/tasks/kubeevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,23 @@ func (e *k8sEvent) GetFirstTime() int64 {
}

func (e *k8sEvent) GetLastTime() int64 {
var t0, t1 int64
if e.LastTs != "" {
return parseTimeLayout(e.LastTs)
t0 = parseTimeLayout(e.LastTs)
}
if e.Series != nil {
return parseTimeLayout(e.Series.LastObservedTime)
t1 = parseTimeLayout(e.Series.LastObservedTime)
}
return time.Now().Unix()

if t0 == 0 && t1 == 0 {
return time.Now().Unix() // 兜底
}

// 取最新时间点
if t0 > t1 {
return t0
}
return t1
}

func (e *k8sEvent) GetCount() int {
Expand All @@ -127,20 +137,33 @@ func (e *k8sEvent) IsZeroTime() bool {
}

type wrapEvent struct {
k8sEvent
dataID int32
externalLabels []map[string]string
dataID int32
data []common.MapStr
}

func newWrapEvent(dataID int32, externalLabels []map[string]string, e k8sEvent) *wrapEvent {
func newWrapEvent(dataID int32, data []common.MapStr) *wrapEvent {
return &wrapEvent{
k8sEvent: e,
dataID: dataID,
externalLabels: externalLabels,
dataID: dataID,
data: data,
}
}

func (e *wrapEvent) AsMapStr() common.MapStr {
return common.MapStr{
"dataid": e.dataID,
"data": e.data,
}
}

func (e *wrapEvent) IgnoreCMDBLevel() bool {
return true
}

func (e *wrapEvent) GetType() string {
return define.ModuleKubeevent
}

func toEventMapStr(e k8sEvent, externalLabels []map[string]string) common.MapStr {
dimensions := common.MapStr{
"kind": e.InvolvedObject.Kind,
"namespace": e.InvolvedObject.Namespace,
Expand All @@ -150,13 +173,13 @@ func (e *wrapEvent) AsMapStr() common.MapStr {
"host": e.Source.Host,
"type": e.Type,
}
for i := 0; i < len(e.externalLabels); i++ {
for k, v := range e.externalLabels[i] {
for i := 0; i < len(externalLabels); i++ {
for k, v := range externalLabels[i] {
dimensions[k] = v
}
}

data := common.MapStr{
return common.MapStr{
"event_name": e.Reason,
"target": e.GetTarget(),
"event": common.MapStr{
Expand All @@ -166,17 +189,4 @@ func (e *wrapEvent) AsMapStr() common.MapStr {
"dimension": dimensions,
"timestamp": e.GetLastTime() * 1000, // ms
}

return common.MapStr{
"dataid": e.dataID,
"data": []common.MapStr{data},
}
}

func (e *wrapEvent) IgnoreCMDBLevel() bool {
return true
}

func (e *wrapEvent) GetType() string {
return define.ModuleKubeevent
}
77 changes: 56 additions & 21 deletions pkg/bkmonitorbeat/tasks/kubeevent/gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/hpcloud/tail"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bkmonitorbeat/configs"
Expand All @@ -24,15 +26,19 @@ import (
)

type recorder struct {
ctx context.Context
set map[string]*k8sEvent
mut sync.Mutex
interval time.Duration
eventSpan time.Duration
started int64
dataID int32
externalLabels []map[string]string
out chan define.Event
ctx context.Context
set map[string]*k8sEvent
mut sync.Mutex
interval time.Duration
eventSpan time.Duration
started int64
dataID int32
upMetricsDataID int32
externalLabels []map[string]string
out chan common.MapStr

received atomic.Int64
sent atomic.Int64
}

func newRecorder(ctx context.Context, conf *configs.KubeEventConfig) *recorder {
Expand All @@ -45,14 +51,15 @@ func newRecorder(ctx context.Context, conf *configs.KubeEventConfig) *recorder {
eventSpan = conf.EventSpan
}
r := &recorder{
ctx: ctx,
interval: interval,
eventSpan: eventSpan,
dataID: conf.DataID,
externalLabels: conf.GetLabels(),
set: map[string]*k8sEvent{},
started: time.Now().Unix(),
out: make(chan define.Event, 1),
ctx: ctx,
interval: interval,
eventSpan: eventSpan,
dataID: conf.DataID,
upMetricsDataID: conf.UpMetricsDataID,
externalLabels: conf.GetLabels(),
set: map[string]*k8sEvent{},
started: time.Now().Unix(),
out: make(chan common.MapStr, 1),
}

go r.loopSent()
Expand Down Expand Up @@ -104,7 +111,7 @@ func (r *recorder) loopSent() {
continue
}
cloned.Count = cnt
r.out <- newWrapEvent(r.dataID, r.externalLabels, *cloned)
r.out <- toEventMapStr(*cloned, r.externalLabels)
}
r.mut.Unlock()

Expand All @@ -119,6 +126,8 @@ func (r *recorder) Recv(event k8sEvent) {
r.mut.Lock()
defer r.mut.Unlock()

r.received.Add(1)

// 异常时间处理
if event.IsZeroTime() {
return
Expand Down Expand Up @@ -176,11 +185,38 @@ func (g *Gather) Run(ctx context.Context, e chan<- define.Event) {
go g.watchEvents(f)
}

const batch = 100
events := make([]common.MapStr, 0, batch)

ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()

reportTicker := time.NewTicker(time.Minute) // 自监控上报周期
defer reportTicker.Stop()

sentOut := func() {
e <- newWrapEvent(g.store.dataID, events)
g.store.sent.Add(int64(len(events)))
events = make([]common.MapStr, 0, batch)
}

for {
select {
case out := <-g.store.out:
logger.Infof("send k8s event: %+v", out.AsMapStr())
e <- out
logger.Infof("send k8s event: %+v", out)
events = append(events, out)
if len(events) >= batch {
sentOut()
}

case <-ticker.C:
if len(events) > 0 {
sentOut()
}

case <-reportTicker.C:
e <- CodeMetrics(g.store.upMetricsDataID, g.TaskConfig, g.store.received.Load(), g.store.sent.Load())

case <-g.ctx.Done():
return
}
Expand Down Expand Up @@ -215,7 +251,6 @@ func (g *Gather) watchEvents(filename string) {
}
}

// New :
func New(globalConfig define.Config, taskConfig define.TaskConfig) define.Task {
gather := &Gather{}
gather.GlobalConfig = globalConfig
Expand Down
Loading

0 comments on commit 81d9c02

Please sign in to comment.