diff --git a/pipeline/batch.go b/pipeline/batch.go index 4b377ba30..66f16a193 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -11,7 +11,8 @@ import ( ) type Batch struct { - Events []*Event + Events []*Event + eventsCopy []*Event // eventsSize contains total size of the Events in bytes eventsSize int seq int64 @@ -144,15 +145,21 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event { batchSeq := batch.seq - // lets restore the sequence of batches to make sure input will commit offsets incrementally + // we sent a packet, so we don’t need buffers and insaneJSON.Root, + // so we can only copy the information we need and release the events + eventsCopy := batch.copyHeadlessEvents() + b.opts.Controller.ReleaseEvents(events) + events = eventsCopy + + // let's restore the sequence of batches to make sure input will commit offsets incrementally b.seqMu.Lock() for b.commitSeq != batchSeq { b.cond.Wait() } b.commitSeq++ - for _, e := range events { - b.opts.Controller.Commit(e) + for i := range events { + b.opts.Controller.Commit(events[i], false) } b.cond.Broadcast() @@ -216,3 +223,23 @@ func (b *Batcher) Stop() { close(b.freeBatches) close(b.fullBatches) } + +// copyHeadlessEvents copies events without Root and other reusable buffers. +func (b *Batch) copyHeadlessEvents() []*Event { + if cap(b.eventsCopy) < len(b.Events) { + b.eventsCopy = make([]*Event, len(b.Events)) + } + + b.eventsCopy = b.eventsCopy[:len(b.Events)] + + for i := range b.Events { + cp := *b.Events[i] + cp.Buf = nil + cp.next = nil + cp.Root = nil + + b.eventsCopy[i] = &cp + } + + return b.eventsCopy +} diff --git a/pipeline/batch_test.go b/pipeline/batch_test.go index 809b0f379..d0244afb3 100644 --- a/pipeline/batch_test.go +++ b/pipeline/batch_test.go @@ -8,6 +8,7 @@ import ( "github.com/ozontech/file.d/logger" "github.com/stretchr/testify/assert" + insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/atomic" ) @@ -15,8 +16,17 @@ type batcherTail struct { commit func(event *Event) } -func (b *batcherTail) Commit(event *Event) { - b.commit(event) +func (b *batcherTail) ReleaseEvents(events []*Event) { + for _, event := range events { + insaneJSON.Release(event.Root) + } +} + +func (b *batcherTail) Commit(events *Event, backEvents bool) { + b.commit(events) + if backEvents { + b.ReleaseEvents([]*Event{events}) + } } func (b *batcherTail) Error(err string) { diff --git a/pipeline/event.go b/pipeline/event.go index 13c0b381b..6fe9c4ca5 100644 --- a/pipeline/event.go +++ b/pipeline/event.go @@ -39,11 +39,14 @@ const ( eventStageStream = 2 eventStageProcessor = 3 eventStageOutput = 4 +) + +type Kind byte - eventKindRegular int32 = 0 - eventKindIgnore int32 = 1 - eventKindTimeout int32 = 2 - eventKindUnlock int32 = 3 +const ( + EventKindRegular Kind = iota + EventKindTimeout + EventKindUnlock ) type eventStage int @@ -103,7 +106,7 @@ func (e *Event) reset(avgEventSize int) { e.next = nil e.action = atomic.Int64{} e.stream = nil - e.kind.Swap(eventKindRegular) + e.kind.Swap(int32(EventKindRegular)) } func (e *Event) StreamNameBytes() []byte { @@ -111,31 +114,27 @@ func (e *Event) StreamNameBytes() []byte { } func (e *Event) IsRegularKind() bool { - return e.kind.Load() == eventKindRegular -} - -func (e *Event) SetIgnoreKind() { - e.kind.Swap(eventKindIgnore) + return Kind(e.kind.Load()) == EventKindRegular } func (e *Event) IsUnlockKind() bool { - return e.kind.Load() == eventKindUnlock + return Kind(e.kind.Load()) == EventKindUnlock } func (e *Event) SetUnlockKind() { - e.kind.Swap(eventKindUnlock) + e.kind.Swap(int32(EventKindUnlock)) } func (e *Event) IsIgnoreKind() bool { - return e.kind.Load() == eventKindUnlock + return Kind(e.kind.Load()) == EventKindUnlock } func (e *Event) SetTimeoutKind() { - e.kind.Swap(eventKindTimeout) + e.kind.Swap(int32(EventKindTimeout)) } func (e *Event) IsTimeoutKind() bool { - return e.kind.Load() == eventKindTimeout + return Kind(e.kind.Load()) == EventKindTimeout } func (e *Event) parseJSON(json []byte) error { @@ -172,12 +171,10 @@ func (e *Event) stageStr() string { } func (e *Event) kindStr() string { - switch e.kind.Load() { - case eventKindRegular: + switch Kind(e.kind.Load()) { + case EventKindRegular: return "REGULAR" - case eventKindIgnore: - return "DEPRECATED" - case eventKindTimeout: + case EventKindTimeout: return "TIMEOUT" default: return "UNKNOWN" diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 4ffaa001f..7feecd7db 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -52,13 +52,13 @@ type InputPluginController interface { } type ActionPluginController interface { - Commit(event *Event) // commit offset of held event and skip further processing Propagate(event *Event) // throw held event back to pipeline } type OutputPluginController interface { - Commit(event *Event) // notify input plugin that event is successfully processed and save offsets + Commit(event *Event, backEvent bool) // notify input plugin that event is successfully processed and save offsets Error(err string) + ReleaseEvents(events []*Event) } type ( @@ -472,8 +472,14 @@ func (p *Pipeline) streamEvent(event *Event) uint64 { return p.streamer.putEvent(streamID, event.streamName, event) } -func (p *Pipeline) Commit(event *Event) { - p.finalize(event, true, true) +func (p *Pipeline) Commit(event *Event, backEvents bool) { + p.finalize(event, true, backEvents) +} + +func (p *Pipeline) ReleaseEvents(events []*Event) { + for i := range events { + p.eventPool.back(events[i]) + } } func (p *Pipeline) Error(err string) { @@ -494,7 +500,7 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) { p.outputEvents.Inc() p.outputSize.Add(int64(event.Size)) - if len(p.outSample) == 0 && rand.Int()&1 == 1 { + if event.Root != nil && len(p.outSample) == 0 && rand.Int()&1 == 1 { p.outSample = event.Root.Encode(p.outSample) } @@ -510,7 +516,7 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) { return } - if p.eventLogEnabled { + if p.eventLogEnabled && event.Root != nil { p.eventLogMu.Lock() p.eventLog = append(p.eventLog, event.Root.EncodeToString()) p.eventLogMu.Unlock() diff --git a/pipeline/pipeline_whitebox_test.go b/pipeline/pipeline_whitebox_test.go index 85a3fc0ec..f5958b63c 100644 --- a/pipeline/pipeline_whitebox_test.go +++ b/pipeline/pipeline_whitebox_test.go @@ -41,7 +41,7 @@ type TestInputPlugin struct{} func (p *TestInputPlugin) Start(_ AnyConfig, _ *InputPluginParams) {} func (p *TestInputPlugin) Stop() {} -func (p *TestInputPlugin) Commit(*Event) {} +func (p *TestInputPlugin) Commit(_ *Event) {} func (p *TestInputPlugin) PassEvent(_ *Event) bool { return true } diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 2283be7e9..940860411 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -24,6 +24,13 @@ type InputPlugin interface { PassEvent(event *Event) bool } +// SourceIDCommiter describes an output plugin method that allows you to commit sourceID events, +// logically extensions OutputPluginController, +// but not required for implementation. +type SourceIDCommiter interface { + CommitSourceIDs([]SourceID) +} + type ActionPlugin interface { Start(config AnyConfig, params *ActionPluginParams) Stop() diff --git a/pipeline/processor.go b/pipeline/processor.go index a7c71a1a8..347ce6fb6 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -13,14 +13,14 @@ const ( // ActionPass pass event to the next action in a pipeline ActionPass ActionResult = 0 // ActionCollapse skip further processing of event and request next event from the same stream and source as current - // plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream + // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream ActionCollapse ActionResult = 2 // ActionDiscard skip further processing of event and request next event from any stream and source ActionDiscard ActionResult = 1 // ActionHold hold event in a plugin and request next event from the same stream and source as current. // same as ActionCollapse but held event should be manually committed or returned into pipeline. // check out Commit()/Propagate() functions in InputPluginController. - // plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream. + // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream. ActionHold ActionResult = 3 ) @@ -335,10 +335,6 @@ func (p *processor) AddActionPlugin(info *ActionPluginInfo) { p.busyActions = append(p.busyActions, false) } -func (p *processor) Commit(event *Event) { - p.finalize(event, false, true) -} - // Propagate flushes an event after ActionHold. func (p *processor) Propagate(event *Event) { nextActionIdx := event.action.Inc() diff --git a/plugin/output/devnull/devnull.go b/plugin/output/devnull/devnull.go index 09522be46..eb648854e 100644 --- a/plugin/output/devnull/devnull.go +++ b/plugin/output/devnull/devnull.go @@ -50,5 +50,5 @@ func (p *Plugin) Out(event *pipeline.Event) { p.outFn(event) } - p.controller.Commit(event) + p.controller.Commit(event, true) } diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go index 672fc4655..34b329857 100644 --- a/plugin/output/stdout/stdout.go +++ b/plugin/output/stdout/stdout.go @@ -36,5 +36,5 @@ func (_ *Plugin) Stop() {} func (p *Plugin) Out(event *pipeline.Event) { fmt.Println(event.Root.EncodeToString()) - p.controller.Commit(event) + p.controller.Commit(event, true) }