Skip to content

Commit

Permalink
Release events in the batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Apr 12, 2023
1 parent f6d5feb commit 39fb0e5
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 41 deletions.
35 changes: 31 additions & 4 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

type Batch struct {
Events []*Event
Events []*Event
eventsCopy []*Event
// eventsSize contains total size of the Events in bytes
eventsSize int
seq int64
Expand Down Expand Up @@ -144,15 +145,21 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {

batchSeq := batch.seq

// lets restore the sequence of batches to make sure input will commit offsets incrementally
// we sent a packet, so we don’t need buffers and insaneJSON.Root,
// so we can only copy the information we need and release the events
eventsCopy := batch.copyHeadlessEvents()
b.opts.Controller.ReleaseEvents(events)
events = eventsCopy

// let's restore the sequence of batches to make sure input will commit offsets incrementally
b.seqMu.Lock()
for b.commitSeq != batchSeq {
b.cond.Wait()
}
b.commitSeq++

for _, e := range events {
b.opts.Controller.Commit(e)
for i := range events {
b.opts.Controller.Commit(events[i], false)
}

b.cond.Broadcast()
Expand Down Expand Up @@ -216,3 +223,23 @@ func (b *Batcher) Stop() {
close(b.freeBatches)
close(b.fullBatches)
}

// copyHeadlessEvents copies events without Root and other reusable buffers.
func (b *Batch) copyHeadlessEvents() []*Event {
if cap(b.eventsCopy) < len(b.Events) {
b.eventsCopy = make([]*Event, len(b.Events))
}

b.eventsCopy = b.eventsCopy[:len(b.Events)]

for i := range b.Events {
cp := *b.Events[i]
cp.Buf = nil
cp.next = nil
cp.Root = nil

b.eventsCopy[i] = &cp
}

return b.eventsCopy
}
14 changes: 12 additions & 2 deletions pipeline/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,25 @@ import (

"github.com/ozontech/file.d/logger"
"github.com/stretchr/testify/assert"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
)

type batcherTail struct {
commit func(event *Event)
}

func (b *batcherTail) Commit(event *Event) {
b.commit(event)
func (b *batcherTail) ReleaseEvents(events []*Event) {
for _, event := range events {
insaneJSON.Release(event.Root)
}
}

func (b *batcherTail) Commit(events *Event, backEvents bool) {
b.commit(events)
if backEvents {
b.ReleaseEvents([]*Event{events})
}
}

func (b *batcherTail) Error(err string) {
Expand Down
37 changes: 17 additions & 20 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ const (
eventStageStream = 2
eventStageProcessor = 3
eventStageOutput = 4
)

type Kind byte

eventKindRegular int32 = 0
eventKindIgnore int32 = 1
eventKindTimeout int32 = 2
eventKindUnlock int32 = 3
const (
EventKindRegular Kind = iota
EventKindTimeout
EventKindUnlock
)

type eventStage int
Expand Down Expand Up @@ -103,39 +106,35 @@ func (e *Event) reset(avgEventSize int) {
e.next = nil
e.action = atomic.Int64{}
e.stream = nil
e.kind.Swap(eventKindRegular)
e.kind.Swap(int32(EventKindRegular))
}

func (e *Event) StreamNameBytes() []byte {
return StringToByteUnsafe(string(e.streamName))
}

func (e *Event) IsRegularKind() bool {
return e.kind.Load() == eventKindRegular
}

func (e *Event) SetIgnoreKind() {
e.kind.Swap(eventKindIgnore)
return Kind(e.kind.Load()) == EventKindRegular
}

func (e *Event) IsUnlockKind() bool {
return e.kind.Load() == eventKindUnlock
return Kind(e.kind.Load()) == EventKindUnlock
}

func (e *Event) SetUnlockKind() {
e.kind.Swap(eventKindUnlock)
e.kind.Swap(int32(EventKindUnlock))
}

func (e *Event) IsIgnoreKind() bool {
return e.kind.Load() == eventKindUnlock
return Kind(e.kind.Load()) == EventKindUnlock
}

func (e *Event) SetTimeoutKind() {
e.kind.Swap(eventKindTimeout)
e.kind.Swap(int32(EventKindTimeout))
}

func (e *Event) IsTimeoutKind() bool {
return e.kind.Load() == eventKindTimeout
return Kind(e.kind.Load()) == EventKindTimeout
}

func (e *Event) parseJSON(json []byte) error {
Expand Down Expand Up @@ -172,12 +171,10 @@ func (e *Event) stageStr() string {
}

func (e *Event) kindStr() string {
switch e.kind.Load() {
case eventKindRegular:
switch Kind(e.kind.Load()) {
case EventKindRegular:
return "REGULAR"
case eventKindIgnore:
return "DEPRECATED"
case eventKindTimeout:
case EventKindTimeout:
return "TIMEOUT"
default:
return "UNKNOWN"
Expand Down
18 changes: 12 additions & 6 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type InputPluginController interface {
}

type ActionPluginController interface {
Commit(event *Event) // commit offset of held event and skip further processing
Propagate(event *Event) // throw held event back to pipeline
}

type OutputPluginController interface {
Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
Commit(event *Event, backEvent bool) // notify input plugin that event is successfully processed and save offsets
Error(err string)
ReleaseEvents(events []*Event)
}

type (
Expand Down Expand Up @@ -472,8 +472,14 @@ func (p *Pipeline) streamEvent(event *Event) uint64 {
return p.streamer.putEvent(streamID, event.streamName, event)
}

func (p *Pipeline) Commit(event *Event) {
p.finalize(event, true, true)
func (p *Pipeline) Commit(event *Event, backEvents bool) {
p.finalize(event, true, backEvents)
}

func (p *Pipeline) ReleaseEvents(events []*Event) {
for i := range events {
p.eventPool.back(events[i])
}
}

func (p *Pipeline) Error(err string) {
Expand All @@ -494,7 +500,7 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
p.outputEvents.Inc()
p.outputSize.Add(int64(event.Size))

if len(p.outSample) == 0 && rand.Int()&1 == 1 {
if event.Root != nil && len(p.outSample) == 0 && rand.Int()&1 == 1 {
p.outSample = event.Root.Encode(p.outSample)
}

Expand All @@ -510,7 +516,7 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
return
}

if p.eventLogEnabled {
if p.eventLogEnabled && event.Root != nil {
p.eventLogMu.Lock()
p.eventLog = append(p.eventLog, event.Root.EncodeToString())
p.eventLogMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type TestInputPlugin struct{}

func (p *TestInputPlugin) Start(_ AnyConfig, _ *InputPluginParams) {}
func (p *TestInputPlugin) Stop() {}
func (p *TestInputPlugin) Commit(*Event) {}
func (p *TestInputPlugin) Commit(_ *Event) {}
func (p *TestInputPlugin) PassEvent(_ *Event) bool {
return true
}
7 changes: 7 additions & 0 deletions pipeline/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type InputPlugin interface {
PassEvent(event *Event) bool
}

// SourceIDCommiter describes an output plugin method that allows you to commit sourceID events,
// logically extensions OutputPluginController,
// but not required for implementation.
type SourceIDCommiter interface {
CommitSourceIDs([]SourceID)
}

type ActionPlugin interface {
Start(config AnyConfig, params *ActionPluginParams)
Stop()
Expand Down
8 changes: 2 additions & 6 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ const (
// ActionPass pass event to the next action in a pipeline
ActionPass ActionResult = 0
// ActionCollapse skip further processing of event and request next event from the same stream and source as current
// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream
// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream
ActionCollapse ActionResult = 2
// ActionDiscard skip further processing of event and request next event from any stream and source
ActionDiscard ActionResult = 1
// ActionHold hold event in a plugin and request next event from the same stream and source as current.
// same as ActionCollapse but held event should be manually committed or returned into pipeline.
// check out Commit()/Propagate() functions in InputPluginController.
// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream.
// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
ActionHold ActionResult = 3
)

Expand Down Expand Up @@ -335,10 +335,6 @@ func (p *processor) AddActionPlugin(info *ActionPluginInfo) {
p.busyActions = append(p.busyActions, false)
}

func (p *processor) Commit(event *Event) {
p.finalize(event, false, true)
}

// Propagate flushes an event after ActionHold.
func (p *processor) Propagate(event *Event) {
nextActionIdx := event.action.Inc()
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/devnull/devnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func (p *Plugin) Out(event *pipeline.Event) {
p.outFn(event)
}

p.controller.Commit(event)
p.controller.Commit(event, true)
}
2 changes: 1 addition & 1 deletion plugin/output/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ func (_ *Plugin) Stop() {}

func (p *Plugin) Out(event *pipeline.Event) {
fmt.Println(event.Root.EncodeToString())
p.controller.Commit(event)
p.controller.Commit(event, true)
}

0 comments on commit 39fb0e5

Please sign in to comment.