Skip to content

Commit

Permalink
Save offsets only
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 4, 2023
1 parent 4376bcd commit d680078
Showing 1 changed file with 17 additions and 19 deletions.
36 changes: 17 additions & 19 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
)

type Batch struct {
Events []*Event
eventsCopy []*Event
Events []*Event

eventsOffsets []Event

// eventsSize contains total size of the Events in bytes
eventsSize int
seq int64
Expand All @@ -25,7 +27,7 @@ type Batch struct {
maxSizeBytes int
}

func newBatch(maxSizeCount int, maxSizeBytes int, timeout time.Duration) *Batch {
func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
if maxSizeCount < 0 {
logger.Fatalf("why batch max count less than 0?")
}
Expand Down Expand Up @@ -145,11 +147,11 @@ func (b *Batcher) commitBatch(events []*Event, batch *Batch) []*Event {

batchSeq := batch.seq

// we sent a packet, so we don’t need buffers and insaneJSON.Root,
// we sent a batch, so we don’t need buffers and insaneJSON.Root,
// so we can only copy the information we need and release the events
eventsCopy := batch.copyHeadlessEvents(events)
eventOffsets := batch.copyEventOffsets(events)
b.opts.Controller.ReleaseEvents(events)
events = eventsCopy
events = eventOffsets

// let's restore the sequence of batches to make sure input will commit offsets incrementally
b.seqMu.Lock()
Expand Down Expand Up @@ -224,22 +226,18 @@ func (b *Batcher) Stop() {
close(b.fullBatches)
}

// copyHeadlessEvents copies events without Root and other reusable buffers.
func (b *Batch) copyHeadlessEvents(events []*Event) []*Event {
if cap(b.eventsCopy) < len(events) {
b.eventsCopy = make([]*Event, len(events))
// copyEventOffsets copies events without Root and other reusable buffers.
func (b *Batch) copyEventOffsets(events []*Event) []*Event {
if len(b.eventsOffsets) < len(events) {
b.eventsOffsets = make([]Event, len(events))
}

b.eventsCopy = b.eventsCopy[:len(events)]

eventsInfo := make([]*Event, len(events))
for i := range events {
cp := *events[i]
cp.Buf = nil
cp.next = nil
cp.Root = nil

b.eventsCopy[i] = &cp
b.eventsOffsets[i].Offset = events[i].Offset
b.eventsOffsets[i].SourceID = events[i].SourceID
eventsInfo[i] = &b.eventsOffsets[i]
}

return b.eventsCopy
return eventsInfo
}

0 comments on commit d680078

Please sign in to comment.