diff --git a/pkg/transfer/define/interface.go b/pkg/transfer/define/interface.go index ec9de5c6d..941d18faf 100644 --- a/pkg/transfer/define/interface.go +++ b/pkg/transfer/define/interface.go @@ -48,6 +48,9 @@ type Payload interface { // Meta info Meta() PayloadMeta + SetETLRecord(*ETLRecord) + GetETLRecord() *ETLRecord + // SetTime sets the time received SetTime(t time.Time) // GetTime gets the time received diff --git a/pkg/transfer/define/payload.go b/pkg/transfer/define/payload.go index 4a8faa697..67bb4fe4b 100644 --- a/pkg/transfer/define/payload.go +++ b/pkg/transfer/define/payload.go @@ -32,6 +32,8 @@ type BasePayload struct { Data []byte t time.Time flag PayloadFlag + + r *ETLRecord } func (p BasePayload) copy() *BasePayload { @@ -109,6 +111,14 @@ func (p *BasePayload) Flag() PayloadFlag { return p.flag } +func (p *BasePayload) SetETLRecord(r *ETLRecord) { + p.r = r +} + +func (p *BasePayload) GetETLRecord() *ETLRecord { + return p.r +} + // NewBasePayloadFrom : func NewBasePayloadFrom(data []byte, sn int) *BasePayload { return &BasePayload{ diff --git a/pkg/transfer/elasticsearch/backend.go b/pkg/transfer/elasticsearch/backend.go index fb300b6df..2afaef9b9 100644 --- a/pkg/transfer/elasticsearch/backend.go +++ b/pkg/transfer/elasticsearch/backend.go @@ -88,10 +88,15 @@ func (b *BulkHandler) asRecord(etlRecord *define.ETLRecord) (*Record, error) { // Product func (b *BulkHandler) Handle(ctx context.Context, payload define.Payload, killChan chan<- error) (result interface{}, at time.Time, ok bool) { var etlRecord define.ETLRecord - err := payload.To(&etlRecord) - if err != nil { - logging.Warnf("%v error %v dropped payload %+v", b, err, payload) - return nil, time.Time{}, false + r := payload.GetETLRecord() + if r != nil { + etlRecord = *r + } else { + err := payload.To(&etlRecord) + if err != nil { + logging.Warnf("%v error %v dropped payload %+v", b, err, payload) + return nil, time.Time{}, false + } } return &etlRecord, utils.ParseTimeStamp(*etlRecord.Time), true diff --git a/pkg/transfer/template/etl/formatter/processor.go b/pkg/transfer/template/etl/formatter/processor.go index 6f11ea6ba..3635a3fba 100644 --- a/pkg/transfer/template/etl/formatter/processor.go +++ b/pkg/transfer/template/etl/formatter/processor.go @@ -42,6 +42,7 @@ type Processor struct { *define.ProcessorMonitor pipelineConfig *config.PipelineConfig handlers RecordHandlers + etlConfig string } // Process : process json data @@ -55,12 +56,18 @@ func (p *Processor) Process(d define.Payload, outputChan chan<- define.Payload, } err = p.handlers.Handle(record, func(record *define.ETLRecord) error { + // 只对日志数据进行处理 + if p.etlConfig == "bk_flat_batch" { + d.SetETLRecord(record) + outputChan <- d + return nil + } + payload, err := define.DerivePayload(d, record) if err != nil { logging.Warnf("%v handle payload %#v failed: %v", p, d, err) return err } - outputChan <- payload return nil }) @@ -81,6 +88,7 @@ func NewProcessor(ctx context.Context, name string, handlers RecordHandlers) *Pr BaseDataProcessor: define.NewBaseDataProcessor(pipeConf.FormatName(name)), ProcessorMonitor: pipeline.NewDataProcessorMonitor(name, config.PipelineConfigFromContext(ctx)), handlers: handlers, + etlConfig: pipeConf.ETLConfig, } return p