Skip to content

Commit

Permalink
Refactor, fix races, fix batcher commitment
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 4, 2023
1 parent 29604e1 commit f0a0633
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 33 deletions.
44 changes: 20 additions & 24 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Batch struct {
Events []*Event

eventsOffsets []Event
eventOffsets []*Event

// eventsSize contains total size of the Events in bytes
eventsSize int
Expand Down Expand Up @@ -73,7 +73,6 @@ type Batcher struct {
batch *Batch

// cycle of batches: freeBatches => fullBatches, fullBatches => freeBatches
// TODO get rid of freeBatches, fullBatches system, which prevents from graceful degradation.
freeBatches chan *Batch
fullBatches chan *Batch
mu *sync.Mutex
Expand Down Expand Up @@ -136,14 +135,13 @@ type WorkerData any

func (b *Batcher) work() {
t := time.Now()
events := make([]*Event, 0)
data := WorkerData(nil)
for batch := range b.fullBatches {
now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.WithLabelValues().Observe(time.Since(now).Seconds())

events = b.commitBatch(events, batch)
b.commitBatch(batch)

shouldRunMaintenance := b.opts.MaintenanceFn != nil && b.opts.MaintenanceInterval != 0 && time.Since(t) > b.opts.MaintenanceInterval
if shouldRunMaintenance {
Expand All @@ -153,18 +151,13 @@ func (b *Batcher) work() {
}
}

func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
// we need to release batch first and then commit events
// so lets swap local slice with batch slice to avoid data copying
events, batch.Events = batch.Events, events

func (b *Batcher) commitBatch(batch *Batch) {
batchSeq := batch.seq

// we sent a batch, so we don’t need buffers and insaneJSON.Root,
// so we can only copy the information we need and release the events
eventOffsets := batch.copyEventOffsets(events)
b.opts.Controller.ReleaseEvents(events)
events = eventOffsets
events := batch.copyEventOffsets(batch.Events)
b.opts.Controller.ReleaseEvents(batch.Events)

now := time.Now()
// let's restore the sequence of batches to make sure input will commit offsets incrementally
Expand All @@ -179,12 +172,10 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {
b.opts.Controller.Commit(events[i], false)
}

b.cond.Broadcast()
b.seqMu.Unlock()

b.freeBatches <- batch
b.seqMu.Unlock()

return events
b.cond.Broadcast()
}

func (b *Batcher) heartbeat() {
Expand Down Expand Up @@ -236,23 +227,28 @@ func (b *Batcher) getBatch() *Batch {
func (b *Batcher) Stop() {
b.shouldStop.Store(true)

// todo add scenario without races.
b.seqMu.Lock()
defer b.seqMu.Unlock()

close(b.freeBatches)
close(b.fullBatches)
}

// copyEventOffsets copies events without Root and other reusable buffers.
func (b *Batch) copyEventOffsets(events []*Event) []*Event {
if len(b.eventsOffsets) < len(events) {
b.eventsOffsets = make([]Event, len(events))
if len(b.eventOffsets) < len(events) {
b.eventOffsets = make([]*Event, len(events))
prealloc := make([]Event, len(events)) // store the events nearly to be more cache friendly
for i := range b.eventOffsets {
b.eventOffsets[i] = &prealloc[i]
}
}

eventsInfo := make([]*Event, len(events))
for i := range events {
b.eventsOffsets[i].Offset = events[i].Offset
b.eventsOffsets[i].SourceID = events[i].SourceID
eventsInfo[i] = &b.eventsOffsets[i]
events[i].CopyTo(b.eventOffsets[i])
b.eventOffsets[i].Buf = nil
b.eventOffsets[i].Root = nil
}

return eventsInfo
return b.eventOffsets[:len(events)]
}
7 changes: 5 additions & 2 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"sync"
"time"

"github.com/ozontech/file.d/logger"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"

"github.com/ozontech/file.d/logger"
)

type Event struct {
Expand Down Expand Up @@ -188,6 +187,10 @@ func (e *Event) String() string {
return fmt.Sprintf("kind=%s, action=%d, source=%d/%s, stream=%s, stage=%s, json=%s", e.kindStr(), e.action.Load(), e.SourceID, e.SourceName, e.streamName, e.stageStr(), e.Root.EncodeToString())
}

func (e *Event) CopyTo(event *Event) {
*event = *e
}

// channels are slower than this implementation by ~20%
type eventPool struct {
capacity int
Expand Down
7 changes: 0 additions & 7 deletions pipeline/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ type InputPlugin interface {
PassEvent(event *Event) bool
}

// SourceIDCommiter describes an output plugin method that allows you to commit sourceID events,
// logically extensions OutputPluginController,
// but not required for implementation.
type SourceIDCommiter interface {
CommitSourceIDs([]SourceID)
}

type ActionPlugin interface {
Start(config AnyConfig, params *ActionPluginParams)
Stop()
Expand Down

0 comments on commit f0a0633

Please sign in to comment.