Skip to content

Commit

Permalink
feat: collector 兼容 trpc-go opentelemetry 指标数据格式转换 --story=116217831 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Feb 26, 2024
1 parent 4cd08b6 commit 5ff6d2a
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/collector/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.43.x
0.44.x
2 changes: 1 addition & 1 deletion pkg/collector/example/subconfig.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ skywalking_agent:
exporter:
queue:
logs_batch_size: 2
metrics_batch_size: 2
metrics_batch_size: 1
traces_batch_size: 5

default:
Expand Down
117 changes: 90 additions & 27 deletions pkg/collector/exporter/converter/pushgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package converter
import (
"math"
"strconv"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,

name := *pd.MetricFamilies.Name
metrics := pd.MetricFamilies.Metric
events := make([]define.Event, 0)
pms := make([]*promMapper, 0)
for _, metric := range metrics {
lbs := map[string]string{}
if len(metric.Label) != 0 {
Expand All @@ -145,16 +146,15 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
DefaultMetricMonitor.IncConverterFailedCounter(define.RecordPushGateway, dataId)
continue
}
m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name: counter.GetValue(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, pd.Labels),
Exemplar: counter.Exemplar,
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}

// 处理 Gauge 类型数据
Expand All @@ -165,15 +165,14 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
continue
}

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name: gauge.GetValue(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}

// 处理 Summary 类型数据
Expand All @@ -184,16 +183,15 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
continue
}

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name + "_sum": summary.GetSampleSum(),
name + "_count": summary.GetSampleCount(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})

for _, quantile := range summary.GetQuantile() {
if !utils.IsValidFloat64(quantile.GetValue()) {
Expand All @@ -204,15 +202,14 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
quantileLabels := utils.CloneMap(lbs)
quantileLabels["quantile"] = strconv.FormatFloat(quantile.GetQuantile(), 'f', -1, 64)

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name: quantile.GetValue(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, quantileLabels, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}
}

Expand All @@ -224,16 +221,15 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
continue
}

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name + "_sum": histogram.GetSampleSum(),
name + "_count": histogram.GetSampleCount(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})

infSeen := false
for _, bucket := range histogram.GetBucket() {
Expand All @@ -248,31 +244,29 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
bucketLabels := utils.CloneMap(lbs)
bucketLabels["le"] = strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64)

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name + "_bucket": bucket.GetCumulativeCount(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, bucketLabels, pd.Labels),
Exemplar: bucket.Exemplar,
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}
// 仅 expfmt.FmtText 格式支持 inf
// 其他格式需要自行检查
if !infSeen {
bucketLabels := utils.CloneMap(lbs)
bucketLabels["le"] = strconv.FormatFloat(math.Inf(+1), 'f', -1, 64)
m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name + "_bucket": histogram.GetSampleCount(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, bucketLabels, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}
}

Expand All @@ -284,18 +278,87 @@ func (c pushGatewayConverter) publishEventsFromMetricFamily(token define.Token,
continue
}

m := &promMapper{
pms = append(pms, &promMapper{
Metrics: common.MapStr{
name: untyped.GetValue(),
},
Target: target,
Timestamp: getTimestamp(now, metric.TimestampMs),
Dimensions: utils.MergeMaps(lbs, pd.Labels),
}
events = append(events, c.ToEvent(token, dataId, m.AsMapStr()))
})
}
}
if len(events) > 0 {
f(events...)

pms = c.compactTrpcOTFilter(pms)
if len(pms) <= 0 {
return
}

events := make([]define.Event, 0, len(pms))
for _, pm := range pms {
events = append(events, c.ToEvent(token, dataId, pm.AsMapStr()))
}
f(events...)
}

// compactTrpcOTFilter 兼容 trpc 框架 OTfilter 指标格式
// 当且仅当 `_type`/`_name` 两个维度存在且所有指标名称以 `trpc_` 开头的才进行转换
func (c pushGatewayConverter) compactTrpcOTFilter(pms []*promMapper) []*promMapper {
const (
labelType = "_type"
labelName = "_name"
)

var ret []*promMapper
for _, pm := range pms {
var seen bool
if len(pm.Dimensions[labelType]) == 0 || len(pm.Dimensions[labelName]) == 0 {
seen = true
}
for k := range pm.Metrics {
if !strings.HasPrefix(k, "trpc_") {
seen = true
break
}
}

if seen {
ret = append(ret, pm)
continue
}

dims := make(map[string]string)
for name, value := range pm.Dimensions {
if name == labelType || name == labelName {
continue
}
dims[name] = value
}

metrics := make(common.MapStr)
for k, v := range pm.Metrics {
metric := utils.NormalizeName(pm.Dimensions[labelName])
// trpc_ 框架无 summary 概念 因此只支持 histogram 即可
if pm.Dimensions[labelType] == "histogram" {
switch {
case strings.HasSuffix(k, "_bucket"):
metric = metric + "_bucket"
case strings.HasSuffix(k, "_count"):
metric = metric + "_count"
case strings.HasSuffix(k, "_sum"):
metric = metric + "_sum"
}
}
metrics[metric] = v
}

ret = append(ret, &promMapper{
Metrics: metrics,
Target: pm.Target,
Timestamp: pm.Timestamp,
Dimensions: dims,
Exemplar: pm.Exemplar,
})
}
return ret
}
Loading

0 comments on commit 5ff6d2a

Please sign in to comment.