Skip to content

Commit

Permalink
Destination record batching (#80)
Browse files Browse the repository at this point in the history
* implement batcher

* try background goroutine and channels

* Revert "try background goroutine and channels" (worse performance)

This reverts commit 2e46a0b.

* improve batcher performance

* implement batch write strategy

* add log message

---------

Co-authored-by: Haris Osmanagić <[email protected]>
  • Loading branch information
lovromazgon and hariso authored Jul 26, 2023
1 parent 10047a2 commit 4374355
Show file tree
Hide file tree
Showing 6 changed files with 463 additions and 21 deletions.
141 changes: 122 additions & 19 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package sdk
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -160,13 +159,7 @@ func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context, c
}

if batchSize > 0 || batchDelay > 0 {
a.writeStrategy = &writeStrategyBatch{
impl: a.impl,
batchSize: batchSize,
batchDelay: batchDelay,
}
// TODO remove this once batching is implemented
return errors.New("batching not implemented")
a.writeStrategy = newWriteStrategyBatch(a.impl, batchSize, batchDelay)
}

return nil
Expand Down Expand Up @@ -248,8 +241,11 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
return bytes.Equal(val, req.LastPosition)
})

// flush cached records
flushErr := a.writeStrategy.Flush(ctx)
// flush cached records, allow it to take at most 1 minute
flushCtx, cancel := context.WithTimeout(ctx, time.Minute) // TODO make the timeout configurable
defer cancel()

flushErr := a.writeStrategy.Flush(flushCtx)
if flushErr != nil && err == nil {
err = flushErr
} else if flushErr != nil {
Expand Down Expand Up @@ -348,22 +344,129 @@ func (w *writeStrategySingle) Flush(context.Context) error {
// grouped into batches that get written when they reach the size batchSize or
// when the time since adding the first record to the current batch reaches
// batchDelay.
// TODO needs to be implemented
type writeStrategyBatch struct {
impl Destination
impl Destination
batcher *internal.Batcher[writeBatchItem]
}

type writeBatchItem struct {
ctx context.Context
record Record
ack func(error) error
}

batchSize int
batchDelay time.Duration
func newWriteStrategyBatch(impl Destination, batchSize int, batchDelay time.Duration) *writeStrategyBatch {
strategy := &writeStrategyBatch{impl: impl}
strategy.batcher = internal.NewBatcher(
batchSize,
batchDelay,
strategy.writeBatch,
)
return strategy
}

func (w *writeStrategyBatch) writeBatch(batch []writeBatchItem) error {
records := make([]Record, len(batch))
for i, item := range batch {
records[i] = item.record
}
// use the last record's context as the write context
ctx := batch[len(batch)-1].ctx

n, err := w.impl.Write(ctx, records)
if n == len(batch) && err != nil {
err = fmt.Errorf("connector reported a successful write of all records in the batch and simultaneously returned an error, this is probably a bug in the connector. Original error: %w", err)
n = 0 // nack all messages in the batch
} else if n < len(batch) && err == nil {
err = fmt.Errorf("batch contained %d messages, connector has only written %d without reporting the error, this is probably a bug in the connector", len(batch), n)
}

var (
firstErr error
errOnce bool
)
for i, item := range batch {
if i < n {
err := item.ack(err)
if err != nil && !errOnce {
firstErr = err
errOnce = true
}
}
}
return firstErr
}

func (w *writeStrategyBatch) Write(context.Context, Record, func(error) error) error {
panic("batching not implemented yet")
func (w *writeStrategyBatch) Write(ctx context.Context, r Record, ack func(error) error) error {
select {
case result := <-w.batcher.Results():
Logger(ctx).Debug().
Int("batchSize", result.Size).
Time("at", result.At).Err(result.Err).
Msg("last batch was flushed asynchronously")
if result.Err != nil {
return fmt.Errorf("last batch write failed: %w", result.Err)
// TODO should we stop writing new records altogether and just nack? probably
}
default:
// last batch was not flushed yet
}

result := w.batcher.Enqueue(writeBatchItem{
ctx: ctx,
record: r,
ack: ack,
})

switch result {
case internal.Scheduled:
// This message was scheduled for the next batch.
// We need to check the results channel of the previous batch, in case
// the flush happened just before enqueuing this record.
select {
case result := <-w.batcher.Results():
Logger(ctx).Debug().
Int("batchSize", result.Size).
Time("at", result.At).Err(result.Err).
Msg("last batch was flushed asynchronously")
if result.Err != nil {
return fmt.Errorf("last batch write failed: %w", result.Err)
}
default:
// last batch was not flushed yet
}
return nil
case internal.Flushed:
// This record caused a flush, get the result.
result := <-w.batcher.Results()
Logger(ctx).Debug().
Int("batchSize", result.Size).
Time("at", result.At).Err(result.Err).
Msg("batch was flushed synchronously")
return result.Err
default:
return fmt.Errorf("unknown batcher enqueue status type: %T", result)
}
}
func (w *writeStrategyBatch) Flush(context.Context) error {
panic("batching not implemented yet")
func (w *writeStrategyBatch) Flush(ctx context.Context) error {
w.batcher.Flush()
select {
case result := <-w.batcher.Results():
Logger(ctx).Debug().
Int("batchSize", result.Size).
Time("at", result.At).Err(result.Err).
Msg("batch was flushed synchronously")
if result.Err != nil {
return fmt.Errorf("last batch write failed: %w", result.Err)
}
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// DestinationUtil provides utility methods for implementing a destination.
// DestinationUtil provides utility methods for implementing a destination. Use
// it by calling Util.Destination.*.
type DestinationUtil struct{}

// Route makes it easier to implement a destination that mutates entities in
Expand Down
2 changes: 1 addition & 1 deletion destination_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func DefaultDestinationMiddleware() []DestinationMiddleware {
return []DestinationMiddleware{
DestinationWithRateLimit{},
DestinationWithRecordFormat{},
// DestinationWithBatch{}, // TODO enable batch middleware once batching is implemented
DestinationWithBatch{},
}
}

Expand Down
106 changes: 106 additions & 0 deletions internal/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"sync"
"time"
)

type Batcher[T any] struct {
sizeThreshold int
delayThreshold time.Duration
fn BatchFn[T]
results chan BatchResult

batch []T
flushTimer *time.Timer
m sync.Mutex
}

type BatchFn[T any] func([]T) error

type BatchResult struct {
At time.Time
Size int
Err error
}

type EnqueueStatus int

const (
Scheduled EnqueueStatus = iota + 1
Flushed
)

func NewBatcher[T any](sizeThreshold int, delayThreshold time.Duration, fn BatchFn[T]) *Batcher[T] {
return &Batcher[T]{
sizeThreshold: sizeThreshold,
delayThreshold: delayThreshold,
fn: fn,
results: make(chan BatchResult, 1),
}
}

func (b *Batcher[T]) Results() <-chan BatchResult {
return b.results
}

func (b *Batcher[T]) Enqueue(item T) EnqueueStatus {
b.m.Lock()
defer b.m.Unlock()

b.batch = append(b.batch, item)

if len(b.batch) == b.sizeThreshold {
// trigger flush synchronously
_ = b.flushNow()
return Flushed
}
if b.flushTimer == nil {
b.flushTimer = time.AfterFunc(b.delayThreshold, func() { b.Flush() })
}
return Scheduled
}

func (b *Batcher[T]) Flush() bool {
b.m.Lock()
defer b.m.Unlock()
return b.flushNow()
}

func (b *Batcher[T]) flushNow() bool {
if b.flushTimer != nil {
b.flushTimer.Stop()
b.flushTimer = nil
}
if len(b.batch) == 0 {
// nothing to flush
return false
}
batchCopy := make([]T, len(b.batch))
copy(batchCopy, b.batch)

at := time.Now()
err := b.fn(batchCopy)
r := BatchResult{
At: at,
Size: len(batchCopy),
Err: err,
}
b.results <- r
b.batch = b.batch[:0]
return true
}
Loading

0 comments on commit 4374355

Please sign in to comment.