Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove async handling code #197

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 11 additions & 70 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (

errEmptyBody = errors.New("empty body")

// ErrQueueFull may be returned by HandleStream when the internal
// queue is full.
ErrQueueFull = errors.New("queue is full")

batchPool sync.Pool
)

Expand Down Expand Up @@ -243,7 +239,6 @@ func (p *Processor) readBatch(
// Callers must not access result concurrently with HandleStream.
func (p *Processor) HandleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
reader io.Reader,
batchSize int,
Expand All @@ -256,25 +251,15 @@ func (p *Processor) HandleStream(
//
// The semaphore defaults to 200 (N), only allowing N requests to read
// an cache Y events (determined by batchSize) from the batch.
//
// Clients can set async to true which makes the processor process the
// events in the background. Returns with an error `ErrQueueFull`
// if the semaphore is full. When asynchronous processing is requested,
// the batches are decoded synchronously, but the batch is processed
// asynchronously.
if err := p.semAcquire(ctx, async); err != nil {
if err := p.semAcquire(ctx); err != nil {
return fmt.Errorf("cannot acquire semaphore: %w", err)
}
sr := p.getStreamReader(reader)

// Release the semaphore on early exit; this will be set to false
// for asynchronous requests once we may no longer exit early.
shouldReleaseSemaphore := true
// Release the semaphore on early exit
defer func() {
sr.release()
if shouldReleaseSemaphore {
p.sem.Release(1)
}
p.sem.Release(1)
}()

// The first item is the metadata object.
Expand All @@ -292,80 +277,42 @@ func (p *Processor) HandleStream(
}
}

if async {
// The semaphore is released by handleStream
shouldReleaseSemaphore = false
}
first := true
for {
err := p.handleStream(ctx, async, baseEvent, batchSize, sr, processor, result, first)
err := p.handleStream(ctx, baseEvent, batchSize, sr, processor, result)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("cannot handle stream: %w", err)
}
if first {
first = false
}
}
}

func (p *Processor) handleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
batchSize int,
sr *streamReader,
processor modelpb.BatchProcessor,
result *Result,
first bool,
) (readErr error) {
// Async requests will re-aquire the semaphore if it has more events than
// `batchSize`. In that event, the semaphore will be acquired again. If
// the semaphore is full, `ErrQueueFull` is returned.
) error {
// The first iteration will not acquire the semaphore since it's already
// acquired in the caller function.
var n int
if async {
if !first {
if err := p.semAcquire(ctx, async); err != nil {
return fmt.Errorf("cannot re-acquire semaphore: %w", err)
}
}
defer func() {
// If no events have been read on an asynchronous request, release
// the semaphore since the processing goroutine isn't scheduled.
if n == 0 {
p.sem.Release(1)
}
}()
}
var batch modelpb.Batch
if b, ok := batchPool.Get().(*modelpb.Batch); ok {
batch = (*b)[:0]
}
n, readErr = p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
n, readErr := p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
if n == 0 {
// No events to process, return the batch to the pool.
batchPool.Put(&batch)
return readErr
}
// Async requests are processed in the background and once the batch has
// been processed, the semaphore is released.
if async {
go func() {
defer p.sem.Release(1)
if err := p.processBatch(ctx, processor, &batch); err != nil {
p.logger.Error("failed handling async request", zap.Error(err))
}
}()
} else {
if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n

if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n
return readErr
}

Expand All @@ -392,16 +339,10 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader {
}
}

func (p *Processor) semAcquire(ctx context.Context, async bool) error {
func (p *Processor) semAcquire(ctx context.Context) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

if async {
if ok := p.sem.TryAcquire(1); !ok {
return ErrQueueFull
}
return nil
}
return p.sem.Acquire(ctx, 1)
}

Expand Down
Loading
Loading