Skip to content

Commit

Permalink
intake: Support non-blocking event processing (#8979)
Browse files Browse the repository at this point in the history
Introduces a new bool `async` query parameter for `intake/v2/events`.
This allows clients to ask the APM Server to process the contents of the
request in an async manner. This will only happen if the APM Server has
enough capacity to service the request, otherwise it will return the
infamous `publish.ErrFull` error.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Sep 8, 2022
1 parent 69816af commit bd0969d
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 43 deletions.
4 changes: 4 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits]

[float]
==== Intake API Changes
- Add a new `async` boolean query parameter (`intake/v2/events?async=<true|false>`).
When set to `true`, the batch will be processed asynchronously if the APM Server can
service the incoming request immediately, otherwise a "503 queue is full" error will
be returned {pull}8979[8979]

[float]
==== Added
Expand Down
16 changes: 16 additions & 0 deletions docs/api-events.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ Send an `HTTP POST` request to the APM Server `intake/v2/events` endpoint:
http(s)://{hostname}:{port}/intake/v2/events
------------------------------------------------------------

From version `8.5.0` onwards, the APM Server supports asynchronous processing of batches.
To request asynchronous processing the `async` query parameter can be set in the POST requst
to the `intake/v2/events` endpoint:

[source,bash]
------------------------------------------------------------
http(s)://{hostname}:{port}/intake/v2/events?async=true
------------------------------------------------------------

NOTE: Since asynchronous processing defers some of the event processing to the
background and takes place after the client has closed the request, some errors
can't be communicated back to the client and are logged by the APM Server.
Furthermore, asynchronous processing requests will only be scheduled if the APM Server can
service the incoming request, requests that cannot be serviced will receive an internal error
`503` "queue is full" error.

For <<apm-rum,RUM>> send an `HTTP POST` request to the APM Server `intake/v2/rum/events` endpoint instead:

[source,bash]
Expand Down
34 changes: 33 additions & 1 deletion internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"

"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-server/internal/beater/auth"
Expand Down Expand Up @@ -56,6 +59,7 @@ var (
type StreamHandler interface {
HandleStream(
ctx context.Context,
async bool,
base model.APMEvent,
stream io.Reader,
batchSize int,
Expand All @@ -77,6 +81,25 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

// Async can be set by clients to request non-blocking event processing,
// returning immediately with an error `publish.ErrFull` when it can't be
// serviced.
// Async processing has weaker guarantees for the client since any
// errors while processing the batch cannot be communicated back to the
// client.
// Instead, errors are logged by the APM Server.
async := asyncRequest(c.Request)

// Create a new detached context when asynchronous processing is set,
// decoupling the context from its deadline, which will finish when
// the request is handled. The batch will probably be processed after
// the request has finished, and it would cause an error if the context
// is done.
ctx := c.Request.Context()
if async {
ctx = apm.DetachedContext(ctx)
}

reader, err := decoder.CompressedRequestReader(c.Request)
if err != nil {
writeError(c, compressedRequestReaderError{err})
Expand All @@ -86,7 +109,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
base := requestMetadataFunc(c)
var result stream.Result
if err := handler.HandleStream(
c.Request.Context(),
ctx,
async,
base,
reader,
batchSize,
Expand Down Expand Up @@ -218,3 +242,11 @@ type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}

func asyncRequest(req *http.Request) bool {
var async bool
if asyncStr := req.URL.Query().Get("async"); asyncStr != "" {
async, _ = strconv.ParseBool(asyncStr)
}
return async
}
6 changes: 3 additions & 3 deletions internal/beater/auth/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ func ContextWithAuthorizer(parent context.Context, auth Authorizer) context.Cont
return context.WithValue(parent, authorizationKey{}, auth)
}

// authorizationFromContext returns the Authorizer stored in ctx, if any, and a boolean
// authorizerFromContext returns the Authorizer stored in ctx, if any, and a boolean
// indicating whether there one was found. The boolean is false if and only if the
// Authorizer is nil.
func authorizationFromContext(ctx context.Context) (Authorizer, bool) {
func authorizerFromContext(ctx context.Context) (Authorizer, bool) {
auth, ok := ctx.Value(authorizationKey{}).(Authorizer)
return auth, ok
}

// Authorize is a shortcut for obtaining an Authorizer from ctx and calling its Authorize
// method. Authorize returns ErrNoAuthorizer if ctx does not contain an Authorizer.
func Authorize(ctx context.Context, action Action, resource Resource) error {
auth, ok := authorizationFromContext(ctx)
auth, ok := authorizerFromContext(ctx)
if !ok {
return ErrNoAuthorizer
}
Expand Down
1 change: 1 addition & 0 deletions internal/logs/selectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ const (
SpanMetrics = "spanmetrics"
Transform = "transform"
Sampling = "sampling"
Processor = "processor"
)
60 changes: 58 additions & 2 deletions internal/processor/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package stream

import (
"bufio"
"bytes"
"context"
"fmt"
Expand All @@ -26,6 +27,8 @@ import (
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/internal/model"
)

Expand Down Expand Up @@ -65,7 +68,7 @@ func benchmarkStreamProcessor(b *testing.B, processor *Processor, files []string
r.Seek(0, io.SeekStart)

var result Result
processor.HandleStream(context.Background(), model.APMEvent{}, r, batchSize, batchProcessor, &result)
processor.HandleStream(context.Background(), false, model.APMEvent{}, r, batchSize, batchProcessor, &result)
}
})
}
Expand Down Expand Up @@ -98,14 +101,67 @@ func benchmarkStreamProcessorParallel(b *testing.B, processor *Processor, files
r := bytes.NewReader(data)
for p.Next() {
var result Result
processor.HandleStream(context.Background(), model.APMEvent{}, r, batchSize, batchProcessor, &result)
processor.HandleStream(context.Background(), false, model.APMEvent{}, r, batchSize, batchProcessor, &result)
r.Seek(0, io.SeekStart)
}
})
})
}
}

func BenchmarkBackendProcessorAsync(b *testing.B) {
processor := BackendProcessor(Config{
MaxEventSize: 300 * 1024, // 300 kb
Semaphore: make(chan struct{}, 200),
})
files, _ := filepath.Glob(filepath.FromSlash("../../../testdata/intake-v2/heavy.ndjson"))
benchmarkStreamProcessorAsync(b, processor, files)
}

func benchmarkStreamProcessorAsync(b *testing.B, processor *Processor, files []string) {
const batchSize = 10

for _, f := range files {
data, err := os.ReadFile(f)
require.NoError(b, err)
r := bytes.NewReader(data)

events := -1 // Exclude metadata
scanner := bufio.NewScanner(r)
for scanner.Scan() {
events++
}
require.NoError(b, scanner.Err())

batches := events / batchSize
if events >= 10 && events%batchSize > 0 {
batches++
}
if batches == 0 {
batches++
}

r.Seek(0, io.SeekStart)
// allow the channel to immediately process all the batches
batchProcessor := &accountProcessor{batch: make(chan *model.Batch, batches)}

b.Run(filepath.Base(f), func(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(data)))
for i := 0; i < b.N; i++ {
r.Seek(0, io.SeekStart)

var result Result
processor.HandleStream(context.Background(), true, model.APMEvent{}, r, batchSize, batchProcessor, &result)
// drain the batches
for i := 0; i < batches; i++ {
<-batchProcessor.batch
}
}
})
}
}

func BenchmarkReadBatch(b *testing.B) {
const batchSize = 10
processor := BackendProcessor(Config{
Expand Down
Loading

0 comments on commit bd0969d

Please sign in to comment.