diff --git a/pkg/collector/VERSION b/pkg/collector/VERSION index 8c5e78289..3d941bf47 100644 --- a/pkg/collector/VERSION +++ b/pkg/collector/VERSION @@ -1 +1 @@ -0.43.x \ No newline at end of file +0.44.x \ No newline at end of file diff --git a/pkg/collector/example/subconfig.yml b/pkg/collector/example/subconfig.yml index 849e9fa63..4df629a11 100644 --- a/pkg/collector/example/subconfig.yml +++ b/pkg/collector/example/subconfig.yml @@ -22,7 +22,7 @@ skywalking_agent: exporter: queue: logs_batch_size: 2 - metrics_batch_size: 2 + metrics_batch_size: 1 traces_batch_size: 5 default: diff --git a/pkg/collector/exporter/converter/pushgateway.go b/pkg/collector/exporter/converter/pushgateway.go index 802696fa5..236e1d635 100644 --- a/pkg/collector/exporter/converter/pushgateway.go +++ b/pkg/collector/exporter/converter/pushgateway.go @@ -12,6 +12,7 @@ package converter import ( "math" "strconv" + "strings" "time" "github.com/elastic/beats/libbeat/common" @@ -127,7 +128,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, name := *pd.MetricFamilies.Name metrics := pd.MetricFamilies.Metric - events := make([]define.Event, 0) + pms := make([]*promMapper, 0) for _, metric := range metrics { lbs := map[string]string{} if len(metric.Label) != 0 { @@ -145,7 +146,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, DefaultMetricMonitor.IncConverterFailedCounter(define.RecordPushGateway, dataId) continue } - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name: counter.GetValue(), }, @@ -153,8 +154,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, pd.Labels), Exemplar: counter.Exemplar, - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } // 处理 Gauge 类型数据 @@ -165,15 +165,14 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, continue } - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name: gauge.GetValue(), }, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } // 处理 Summary 类型数据 @@ -184,7 +183,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, continue } - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name + "_sum": summary.GetSampleSum(), name + "_count": summary.GetSampleCount(), @@ -192,8 +191,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) for _, quantile := range summary.GetQuantile() { if !utils.IsValidFloat64(quantile.GetValue()) { @@ -204,15 +202,14 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, quantileLabels := utils.CloneMap(lbs) quantileLabels["quantile"] = strconv.FormatFloat(quantile.GetQuantile(), 'f', -1, 64) - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name: quantile.GetValue(), }, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, quantileLabels, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } } @@ -224,7 +221,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, continue } - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name + "_sum": histogram.GetSampleSum(), name + "_count": histogram.GetSampleCount(), @@ -232,8 +229,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) infSeen := false for _, bucket := range histogram.GetBucket() { @@ -248,7 +244,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, bucketLabels := utils.CloneMap(lbs) bucketLabels["le"] = strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64) - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name + "_bucket": bucket.GetCumulativeCount(), }, @@ -256,23 +252,21 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, bucketLabels, pd.Labels), Exemplar: bucket.Exemplar, - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } // 仅 expfmt.FmtText 格式支持 inf // 其他格式需要自行检查 if !infSeen { bucketLabels := utils.CloneMap(lbs) bucketLabels["le"] = strconv.FormatFloat(math.Inf(+1), 'f', -1, 64) - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name + "_bucket": histogram.GetSampleCount(), }, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, bucketLabels, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } } @@ -284,18 +278,87 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token, continue } - m := &promMapper{ + pms = append(pms, &promMapper{ Metrics: common.MapStr{ name: untyped.GetValue(), }, Target: target, Timestamp: getTimestamp(now, metric.TimestampMs), Dimensions: utils.MergeMaps(lbs, pd.Labels), - } - events = append(events, c.ToEvent(token, dataId, m.AsMapStr())) + }) } } - if len(events) > 0 { - f(events...) + + pms = c.compactTrpcOTFilter(pms) + if len(pms) <= 0 { + return + } + + events := make([]define.Event, 0, len(pms)) + for _, pm := range pms { + events = append(events, c.ToEvent(token, dataId, pm.AsMapStr())) + } + f(events...) +} + +// compactTrpcOTFilter 兼容 trpc 框架 OTfilter 指标格式 +// 当且仅当 `_type`/`_name` 两个维度存在且所有指标名称以 `trpc_` 开头的才进行转换 +func (c pushGatewayConverter) compactTrpcOTFilter(pms []*promMapper) []*promMapper { + const ( + labelType = "_type" + labelName = "_name" + ) + + var ret []*promMapper + for _, pm := range pms { + var seen bool + if len(pm.Dimensions[labelType]) == 0 || len(pm.Dimensions[labelName]) == 0 { + seen = true + } + for k := range pm.Metrics { + if !strings.HasPrefix(k, "trpc_") { + seen = true + break + } + } + + if seen { + ret = append(ret, pm) + continue + } + + dims := make(map[string]string) + for name, value := range pm.Dimensions { + if name == labelType || name == labelName { + continue + } + dims[name] = value + } + + metrics := make(common.MapStr) + for k, v := range pm.Metrics { + metric := utils.NormalizeName(pm.Dimensions[labelName]) + // trpc_ 框架无 summary 概念 因此只支持 histogram 即可 + if pm.Dimensions[labelType] == "histogram" { + switch { + case strings.HasSuffix(k, "_bucket"): + metric = metric + "_bucket" + case strings.HasSuffix(k, "_count"): + metric = metric + "_count" + case strings.HasSuffix(k, "_sum"): + metric = metric + "_sum" + } + } + metrics[metric] = v + } + + ret = append(ret, &promMapper{ + Metrics: metrics, + Target: pm.Target, + Timestamp: pm.Timestamp, + Dimensions: dims, + Exemplar: pm.Exemplar, + }) } + return ret } diff --git a/pkg/collector/exporter/converter/pushgateway_test.go b/pkg/collector/exporter/converter/pushgateway_test.go index bb2e3c580..89315e7c1 100644 --- a/pkg/collector/exporter/converter/pushgateway_test.go +++ b/pkg/collector/exporter/converter/pushgateway_test.go @@ -72,14 +72,11 @@ func TestConvertPushGatewayData(t *testing.T) { } func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { - t.Run("convertCounter1", func(t *testing.T) { + t.Run("convertCounter", func(t *testing.T) { c := &pushGatewayConverter{} - labels := map[string]string{ - "handler": "query", - } input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds"), + Name: proto.String("http_request_total"), Help: proto.String("foo"), Type: dto.MetricType_COUNTER.Enum(), Metric: []*dto.Metric{{ @@ -98,10 +95,12 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds": float64(10), + "http_request_total": float64(10), + }, + "target": "unknown", + "dimension": map[string]string{ + "handler": "query", }, - "target": "unknown", - "dimension": labels, "timestamp": fakeTs, }), }, @@ -113,21 +112,14 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { } c.publishEventsFromMetricFamily(define.Token{}, &define.PushGatewayData{MetricFamilies: input.Family}, 0, fakeTs, gather) assert.Equal(t, input.Event, events) - - id := c.ToDataID(&define.Record{ - Token: define.Token{MetricsDataId: 10011}, - }) - assert.Equal(t, int32(10011), id) + assert.Equal(t, int32(10011), c.ToDataID(&define.Record{Token: define.Token{MetricsDataId: 10011}})) }) - t.Run("convertCounter2", func(t *testing.T) { + t.Run("convertCounterExemplar", func(t *testing.T) { c := &pushGatewayConverter{} - labels := map[string]string{ - "handler": "query", - } input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds_with_exemplar"), + Name: proto.String("http_request_total"), Help: proto.String("foo"), Type: dto.MetricType_COUNTER.Enum(), Metric: []*dto.Metric{{ @@ -158,10 +150,12 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_with_exemplar": float64(10), + "http_request_total": float64(10), + }, + "target": "unknown", + "dimension": map[string]string{ + "handler": "query", }, - "target": "unknown", - "dimension": labels, "timestamp": fakeTs, "exemplar": common.MapStr{ "bk_span_id": "fake_string", @@ -181,14 +175,61 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { assert.Equal(t, input.Event, events) }) - t.Run("convertGauge1", func(t *testing.T) { + t.Run("convertCounterOTFilter", func(t *testing.T) { c := &pushGatewayConverter{} - labels := map[string]string{ - "handler": "query", + input := testCase{ + Family: &dto.MetricFamily{ + Name: proto.String("trpc_counter_total"), + Help: proto.String("foo"), + Type: dto.MetricType_COUNTER.Enum(), + Metric: []*dto.Metric{{ + Label: []*dto.LabelPair{ + { + Name: proto.String("handler"), + Value: proto.String("query"), + }, + { + Name: proto.String("_name"), + Value: proto.String("requests.foo.total"), + }, + { + Name: proto.String("_type"), + Value: proto.String("counter"), + }, + }, + Counter: &dto.Counter{ + Value: proto.Float64(10), + }, + TimestampMs: &fakeTs, + }}, + }, + Event: []define.Event{ + c.ToEvent(define.Token{}, 0, common.MapStr{ + "metrics": common.MapStr{ + "requests_foo_total": float64(10), + }, + "target": "unknown", + "dimension": map[string]string{ + "handler": "query", + }, + "timestamp": fakeTs, + }), + }, + } + + events := make([]define.Event, 0) + gather := func(evts ...define.Event) { + events = append(events, evts...) } + c.publishEventsFromMetricFamily(define.Token{}, &define.PushGatewayData{MetricFamilies: input.Family}, 0, fakeTs, gather) + assert.Equal(t, input.Event, events) + }) + + t.Run("convertGauge", func(t *testing.T) { + c := &pushGatewayConverter{} input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds"), + Name: proto.String("active_workers"), Help: proto.String("foo"), Type: dto.MetricType_GAUGE.Enum(), Metric: []*dto.Metric{{ @@ -207,11 +248,13 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds": float64(10), + "active_workers": float64(10), }, "target": "unknown", "timestamp": fakeTs, - "dimension": labels, + "dimension": map[string]string{ + "handler": "query", + }, }), }, } @@ -224,11 +267,122 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { assert.Equal(t, input.Event, events) }) - t.Run("convertHistogram1", func(t *testing.T) { + t.Run("convertGaugeOTFilter", func(t *testing.T) { c := &pushGatewayConverter{} input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds"), + Name: proto.String("trpc_gauge_total"), + Help: proto.String("foo"), + Type: dto.MetricType_GAUGE.Enum(), + Metric: []*dto.Metric{{ + Gauge: &dto.Gauge{ + Value: proto.Float64(10), + }, + TimestampMs: &fakeTs, + Label: []*dto.LabelPair{ + { + Name: proto.String("handler"), + Value: proto.String("query"), + }, + { + Name: proto.String("_name"), + Value: proto.String("active.workers"), + }, + { + Name: proto.String("_type"), + Value: proto.String("gauge"), + }, + }, + }}, + }, + Event: []define.Event{ + c.ToEvent(define.Token{}, 0, common.MapStr{ + "metrics": common.MapStr{ + "active_workers": float64(10), + }, + "target": "unknown", + "timestamp": fakeTs, + "dimension": map[string]string{ + "handler": "query", + }, + }), + }, + } + + events := make([]define.Event, 0) + gather := func(evts ...define.Event) { + events = append(events, evts...) + } + c.publishEventsFromMetricFamily(define.Token{}, &define.PushGatewayData{MetricFamilies: input.Family}, 0, fakeTs, gather) + assert.Equal(t, input.Event, events) + }) + + t.Run("convertHistogram", func(t *testing.T) { + c := &pushGatewayConverter{} + input := testCase{ + Family: &dto.MetricFamily{ + Name: proto.String("http_request_duration_seconds"), + Help: proto.String("foo"), + Type: dto.MetricType_HISTOGRAM.Enum(), + Metric: []*dto.Metric{{ + Histogram: &dto.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*dto.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + }}, + }, + Event: []define.Event{ + c.ToEvent(define.Token{}, 0, common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_seconds_count": uint64(10), + "http_request_duration_seconds_sum": float64(10), + }, + "timestamp": fakeTs, + "target": "unknown", + "dimension": map[string]string{}, + }), + c.ToEvent(define.Token{}, 0, common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_seconds_bucket": uint64(10), + }, + "dimension": map[string]string{ + "le": "0.99", + }, + "target": "unknown", + "timestamp": fakeTs, + }), + c.ToEvent(define.Token{}, 0, common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_seconds_bucket": uint64(10), + }, + "dimension": map[string]string{ + "le": "+Inf", + }, + "target": "unknown", + "timestamp": fakeTs, + }), + }, + } + + events := make([]define.Event, 0) + gather := func(evts ...define.Event) { + events = append(events, evts...) + } + c.publishEventsFromMetricFamily(define.Token{}, &define.PushGatewayData{MetricFamilies: input.Family}, 0, fakeTs, gather) + assert.Equal(t, input.Event, events) + }) + + t.Run("convertHistogramExemplar", func(t *testing.T) { + c := &pushGatewayConverter{} + input := testCase{ + Family: &dto.MetricFamily{ + Name: proto.String("http_request_duration_seconds"), Help: proto.String("foo"), Type: dto.MetricType_HISTOGRAM.Enum(), Metric: []*dto.Metric{{ @@ -259,8 +413,8 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_count": uint64(10), - "http_request_duration_microseconds_sum": float64(10), + "http_request_duration_seconds_count": uint64(10), + "http_request_duration_seconds_sum": float64(10), }, "timestamp": fakeTs, "target": "unknown", @@ -268,7 +422,7 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { }), c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_bucket": uint64(10), + "http_request_duration_seconds_bucket": uint64(10), }, "dimension": map[string]string{ "le": "0.99", @@ -284,7 +438,7 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { }), c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_bucket": uint64(10), + "http_request_duration_seconds_bucket": uint64(10), }, "dimension": map[string]string{ "le": "+Inf", @@ -303,11 +457,11 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { assert.Equal(t, input.Event, events) }) - t.Run("convertHistogram2", func(t *testing.T) { + t.Run("convertHistogramOTFilter", func(t *testing.T) { c := &pushGatewayConverter{} input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds_with_exemplar"), + Name: proto.String("trpc_histogram_duration_seconds"), Help: proto.String("foo"), Type: dto.MetricType_HISTOGRAM.Enum(), Metric: []*dto.Metric{{ @@ -321,34 +475,52 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { }, }, }, + Label: []*dto.LabelPair{ + { + Name: proto.String("handler"), + Value: proto.String("query"), + }, + { + Name: proto.String("_name"), + Value: proto.String("http.request.duration.seconds"), + }, + { + Name: proto.String("_type"), + Value: proto.String("histogram"), + }, + }, }}, }, Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_with_exemplar_count": uint64(10), - "http_request_duration_microseconds_with_exemplar_sum": float64(10), + "http_request_duration_seconds_count": uint64(10), + "http_request_duration_seconds_sum": float64(10), }, "timestamp": fakeTs, "target": "unknown", - "dimension": map[string]string{}, + "dimension": map[string]string{ + "handler": "query", + }, }), c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_with_exemplar_bucket": uint64(10), + "http_request_duration_seconds_bucket": uint64(10), }, "dimension": map[string]string{ - "le": "0.99", + "handler": "query", + "le": "0.99", }, "target": "unknown", "timestamp": fakeTs, }), c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_with_exemplar_bucket": uint64(10), + "http_request_duration_seconds_bucket": uint64(10), }, "dimension": map[string]string{ - "le": "+Inf", + "handler": "query", + "le": "+Inf", }, "target": "unknown", "timestamp": fakeTs, @@ -364,14 +536,11 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { assert.Equal(t, input.Event, events) }) - t.Run("convertUntyped1", func(t *testing.T) { - labels := map[string]string{ - "handler": "query", - } + t.Run("convertUntyped", func(t *testing.T) { c := &pushGatewayConverter{} input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds"), + Name: proto.String("go_info"), Help: proto.String("foo"), Type: dto.MetricType_UNTYPED.Enum(), Metric: []*dto.Metric{{ @@ -389,10 +558,12 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds": float64(10), + "go_info": float64(10), + }, + "target": "unknown", + "dimension": map[string]string{ + "handler": "query", }, - "target": "unknown", - "dimension": labels, "timestamp": fakeTs, }), }, @@ -406,14 +577,11 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { assert.Equal(t, input.Event, events) }) - t.Run("convertSummary1", func(t *testing.T) { - labels := map[string]string{ - "handler": "query", - } + t.Run("convertSummary", func(t *testing.T) { c := &pushGatewayConverter{} input := testCase{ Family: &dto.MetricFamily{ - Name: proto.String("http_request_duration_microseconds"), + Name: proto.String("http_request_duration_seconds"), Help: proto.String("foo"), Type: dto.MetricType_SUMMARY.Enum(), Metric: []*dto.Metric{{ @@ -438,16 +606,18 @@ func TestGetPushGatewayEventsFromMetricFamily(t *testing.T) { Event: []define.Event{ c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds_count": uint64(10), - "http_request_duration_microseconds_sum": float64(10), + "http_request_duration_seconds_count": uint64(10), + "http_request_duration_seconds_sum": float64(10), + }, + "target": "unknown", + "dimension": map[string]string{ + "handler": "query", }, - "target": "unknown", - "dimension": labels, "timestamp": fakeTs, }), c.ToEvent(define.Token{}, 0, common.MapStr{ "metrics": common.MapStr{ - "http_request_duration_microseconds": float64(10), + "http_request_duration_seconds": float64(10), }, "dimension": map[string]string{ "handler": "query",