Skip to content

Commit

Permalink
feat: transfer 优化日志处理效率 --story=117618448 (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored May 27, 2024
1 parent da9a18c commit 8a5df6a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/transfer/define/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Payload interface {
// Meta info
Meta() PayloadMeta

SetETLRecord(*ETLRecord)
GetETLRecord() *ETLRecord

// SetTime sets the time received
SetTime(t time.Time)
// GetTime gets the time received
Expand Down
10 changes: 10 additions & 0 deletions pkg/transfer/define/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type BasePayload struct {
Data []byte
t time.Time
flag PayloadFlag

r *ETLRecord
}

func (p BasePayload) copy() *BasePayload {
Expand Down Expand Up @@ -109,6 +111,14 @@ func (p *BasePayload) Flag() PayloadFlag {
return p.flag
}

func (p *BasePayload) SetETLRecord(r *ETLRecord) {
p.r = r
}

func (p *BasePayload) GetETLRecord() *ETLRecord {
return p.r
}

// NewBasePayloadFrom :
func NewBasePayloadFrom(data []byte, sn int) *BasePayload {
return &BasePayload{
Expand Down
13 changes: 9 additions & 4 deletions pkg/transfer/elasticsearch/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,15 @@ func (b *BulkHandler) asRecord(etlRecord *define.ETLRecord) (*Record, error) {
// Product
func (b *BulkHandler) Handle(ctx context.Context, payload define.Payload, killChan chan<- error) (result interface{}, at time.Time, ok bool) {
var etlRecord define.ETLRecord
err := payload.To(&etlRecord)
if err != nil {
logging.Warnf("%v error %v dropped payload %+v", b, err, payload)
return nil, time.Time{}, false
r := payload.GetETLRecord()
if r != nil {
etlRecord = *r
} else {
err := payload.To(&etlRecord)
if err != nil {
logging.Warnf("%v error %v dropped payload %+v", b, err, payload)
return nil, time.Time{}, false
}
}

return &etlRecord, utils.ParseTimeStamp(*etlRecord.Time), true
Expand Down
10 changes: 9 additions & 1 deletion pkg/transfer/template/etl/formatter/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Processor struct {
*define.ProcessorMonitor
pipelineConfig *config.PipelineConfig
handlers RecordHandlers
etlConfig string
}

// Process : process json data
Expand All @@ -55,12 +56,18 @@ func (p *Processor) Process(d define.Payload, outputChan chan<- define.Payload,
}

err = p.handlers.Handle(record, func(record *define.ETLRecord) error {
// 只对日志数据进行处理
if p.etlConfig == "bk_flat_batch" {
d.SetETLRecord(record)
outputChan <- d
return nil
}

payload, err := define.DerivePayload(d, record)
if err != nil {
logging.Warnf("%v handle payload %#v failed: %v", p, d, err)
return err
}

outputChan <- payload
return nil
})
Expand All @@ -81,6 +88,7 @@ func NewProcessor(ctx context.Context, name string, handlers RecordHandlers) *Pr
BaseDataProcessor: define.NewBaseDataProcessor(pipeConf.FormatName(name)),
ProcessorMonitor: pipeline.NewDataProcessorMonitor(name, config.PipelineConfigFromContext(ctx)),
handlers: handlers,
etlConfig: pipeConf.ETLConfig,
}

return p
Expand Down

0 comments on commit 8a5df6a

Please sign in to comment.