Skip to content

Commit

Permalink
feat: transfer 调整丢弃空指标配置项 --story=116139750 (#195)
Browse files Browse the repository at this point in the history
chenjiandongx authored Feb 20, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent f00afd4 commit e355197
Showing 2 changed files with 7 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/transfer/config/metadata_options.go
Original file line number Diff line number Diff line change
@@ -74,8 +74,8 @@ const (
PipelineConfigOptAllowDynamicMetricsAsFloat = "dynamic_metrics_as_float"
// PipelineConfigOptMaxQps 允许后端写入的最大的 QPS
PipelineConfigOptMaxQps = "max_qps"
// PipelineConfigDropMetricsETLConfigs 允许丢弃空 metrics 数据的 ELTConfig 列表
PipelineConfigDropMetricsETLConfigs = "drop_metrics_etl_configs"
// PipelineConfigDropEmptyMetrics 是否丢弃空 metrics
PipelineConfigDropEmptyMetrics = "drop_empty_metrics"

// 日志类
// PipelineConfigOptSeparatorNode : "字段提取节点路径"
15 changes: 5 additions & 10 deletions pkg/transfer/kafka/backend.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ type Backend struct {
payloadChan chan define.Payload
wg sync.WaitGroup
producer Producer
dropEtlConfigs []string
dropEmptyMetrics bool

Topic string
Key string
@@ -144,9 +144,8 @@ func (b *Backend) init() error {

pipelineConfig := config.PipelineConfigFromContext(b.ctx)
if pipelineConfig != nil {
b.ETLConfig = pipelineConfig.ETLConfig
opts := utils.NewMapHelper(pipelineConfig.Option)
b.dropEtlConfigs, _ = opts.GetStringArray(config.PipelineConfigDropMetricsETLConfigs)
b.dropEmptyMetrics, _ = opts.GetBool(config.PipelineConfigDropEmptyMetrics)
}

shipper := config.ShipperConfigFromContext(b.ctx)
@@ -248,20 +247,16 @@ func (b *Backend) SendMsg(payload define.Payload) {
}
}

// 部分 ETLConfig 需要丢弃空 metrics
for _, etl := range b.dropEtlConfigs {
if etl != b.ETLConfig {
continue
}

// 丢弃空 metrics
if b.dropEmptyMetrics {
for k, v := range etlRecord.Metrics {
if v == nil {
delete(etlRecord.Metrics, k)
}
}
if len(etlRecord.Metrics) <= 0 {
b.skipStats.Inc()
logging.Warnf("skip %s useless record: %+v", etl, etlRecord)
logging.Warnf("skip empty record: %+v", etlRecord)
return
}
}

0 comments on commit e355197

Please sign in to comment.