Skip to content

Commit

Permalink
feat: transfer 支持多字段清洗能力 --story=119902812 (#567)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Oct 15, 2024
1 parent 860a927 commit daa7156
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/transfer/config/metadata_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ const (
// ResultTableOptLogSeparatorRegexp : 日志正则提取清洗专用,提取字段
ResultTableOptLogSeparatorRegexp = "separator_regexp"

ResultTableOptLogSeparatorConfigs = "separator_configs"

// 事件类
// 结果是否可以使用新的自定义维度
ResultTableOptEventAllowNewDimension = "allow_new_dimension"
Expand Down
48 changes: 42 additions & 6 deletions pkg/transfer/template/etl/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func SchemaByResultTablePlugin(table *config.MetaResultTableConfig) etl.Containe
}

// GetSeparatorFieldByOption
func GetSeparatorFieldByOption(table *config.MetaResultTableConfig) (etl.Field, error) {
helper := utils.NewMapHelper(table.Option)
func GetSeparatorFieldByOption(option map[string]interface{}, table *config.MetaResultTableConfig) (etl.Field, error) {
helper := utils.NewMapHelper(option)
action, ok := helper.GetString(config.ResultTableOptSeparatorAction)
if !ok {
return nil, nil
Expand Down Expand Up @@ -108,16 +108,52 @@ func GetSeparatorFieldByOption(table *config.MetaResultTableConfig) (etl.Field,
}
}

// PrepareByResultTablePlugin: 根据字段提取方法[json|regexp|delimiter],解析上报的日志数据内容
func GetSeparatorFieldByMultiOption(table *config.MetaResultTableConfig) ([]etl.Field, error) {
var fields []etl.Field
field, err := GetSeparatorFieldByOption(table.Option, table)
if err != nil {
return nil, err
}
fields = append(fields, field)

obj, ok := table.Option[config.ResultTableOptLogSeparatorConfigs]
if !ok {
return fields, nil
}

option, ok := obj.([]interface{})
if !ok {
return fields, nil
}

for _, item := range option {
opt, ok := item.(map[string]interface{})
if !ok {
continue
}
field, err := GetSeparatorFieldByOption(opt, table)
if err != nil {
continue
}
fields = append(fields, field)
}
return fields, nil
}

// PrepareByResultTablePlugin 根据字段提取方法[json|regexp|delimiter],解析上报的日志数据内容
func PrepareByResultTablePlugin(table *config.MetaResultTableConfig) etl.ContainerSchemaBuilderPlugin {
return func(builder *etl.ContainerSchemaBuilder) error {
fields := make([]etl.Field, 0)
field, err := GetSeparatorFieldByOption(table)
items, err := GetSeparatorFieldByMultiOption(table)
if err != nil {
return err
}
if field != nil {
fields = append(fields, field)

for i := 0; i < len(items); i++ {
field := items[i]
if field != nil {
fields = append(fields, field)
}
}

if len(fields) > 0 {
Expand Down

0 comments on commit daa7156

Please sign in to comment.