Skip to content

Commit

Permalink
refactor loadgen event handler for future OTLP support (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
endorama authored Feb 9, 2024
1 parent 9617962 commit ef2ed71
Show file tree
Hide file tree
Showing 16 changed files with 473 additions and 295 deletions.
8 changes: 4 additions & 4 deletions cmd/apmsoak/scenarios.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ scenarios:
event_rate: 100/s
apm-server:
- event_rate: 10000/1s
agent_name: go
agent_name: apm-go
agents_replicas: 3
- event_rate: 10000/1s
agent_name: nodejs
agent_name: apm-nodejs
agents_replicas: 3
- event_rate: 10000/1s
agent_name: python
agent_name: apm-python
agents_replicas: 3
- event_rate: 10000/1s
agent_name: ruby
agent_name: apm-ruby
agents_replicas: 3
39 changes: 34 additions & 5 deletions internal/loadgen/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ type EventHandlerParams struct {
// Headers contains HTTP headers shipped with all requests.
// NOTE: these headers are not sanitized in logs.
Headers map[string]string

// One of: apm/http, otlp/http
// NOTE: otlp/grpc is not supported
Protocol string
// One of: any, logs, metrics, traces
// NOTE: for Protocol apm/http there is no difference
// between each value. When using Protocol otlp/http
// each data type requires a separate EventHandler.
Datatype string
}

func (e EventHandlerParams) MarshalLogObject(enc zapcore.ObjectEncoder) error {
Expand Down Expand Up @@ -79,16 +88,34 @@ func NewEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) {
if p.Logger == nil {
return nil, fmt.Errorf("nil logger in params")
}
switch p.Protocol {
case "apm/http":
return newAPMEventHandler(p)
case "otlp/http":
// TODO: support OTLP event handling
// switch p.Datatype {
// case "logs":
// case "metrics":
// case "traces":
// }

return nil, fmt.Errorf("invalid datatype (%s) for protocol (%s)", p.Datatype, p.Protocol)
}

return nil, fmt.Errorf("invalid or unsupported protocol (%s)", p.Protocol)
}

func newAPMEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) {
// We call the HTTPTransport constructor to avoid copying all the config
// parsing that creates the `*http.Client`.
t, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{})
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot create HTTP transport: %w", err)
}
transp := eventhandler.NewTransport(p.Logger, t.Client, p.URL, p.Token, p.APIKey, p.Headers)
return eventhandler.New(p.Logger, eventhandler.Config{

c := eventhandler.Config{
Path: filepath.Join("events", p.Path),
Transport: transp,
Transport: eventhandler.NewAPMTransport(p.Logger, t.Client, p.URL, p.Token, p.APIKey, p.Headers),
Storage: events,
Limiter: p.Limiter,
Rand: p.Rand,
Expand All @@ -101,5 +128,7 @@ func NewEventHandler(p EventHandlerParams) (*eventhandler.Handler, error) {
RewriteTransactionNames: p.RewriteTransactionNames,
RewriteTransactionTypes: p.RewriteTransactionTypes,
RewriteTimestamps: p.RewriteTimestamps,
})
}

return eventhandler.NewAPM(p.Logger, c)
}
69 changes: 69 additions & 0 deletions internal/loadgen/eventhandler/apm-collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventhandler

import (
"bytes"
"fmt"
"time"

"github.com/tidwall/gjson"
)

var (
metaHeader = []byte(`{"metadata":`)
rumMetaHeader = []byte(`{"m":`)
)

// APMEventCollector extracts relevant metadata and event from
// single line scans.
type APMEventCollector struct{}

// Filter skips processing RUM related events.
func (a *APMEventCollector) Filter(line []byte) error {
if bytes.HasPrefix(line, rumMetaHeader) {
return fmt.Errorf("rum data support not implemented")
}

return nil
}

// IsMeta identifies metadata lines from APM protocol.
func (a *APMEventCollector) IsMeta(line []byte) bool {
return bytes.HasPrefix(line, metaHeader)
}

// Process processes single lines extracting APM events.
// It uniforms events timestamp.
func (a *APMEventCollector) Process(linecopy []byte) event {
event := event{payload: linecopy}
result := gjson.ParseBytes(linecopy)
result.ForEach(func(key, value gjson.Result) bool {
event.objectType = key.Str // lines look like {"span":{...}}
timestampResult := value.Get("timestamp")
if timestampResult.Exists() {
switch timestampResult.Type {
case gjson.Number:
us := timestampResult.Int()
if us >= 0 {
s := us / 1000000
ns := (us - (s * 1000000)) * 1000
event.timestamp = time.Unix(s, ns)
}
case gjson.String:
tstr := timestampResult.Str
for _, f := range supportedTSFormats {
if t, err := time.Parse(f, tstr); err == nil {
event.timestamp = t
break
}
}
}
}
return false
})

return event
}
140 changes: 140 additions & 0 deletions internal/loadgen/eventhandler/apm-writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventhandler

import (
"fmt"
"time"

"github.com/tidwall/gjson"
)

// writeAPMEvents writes to buffers in pooledWriter JSON formatted events that can be replayed.
// Implements EventWriter interface.
func writeAPMEvents(config Config, minTimestamp time.Time, w *pooledWriter, b batch, baseTimestamp time.Time, randomBits uint64) error {
rewriteAny := config.RewriteTimestamps ||
config.RewriteIDs ||
config.RewriteServiceNames ||
config.RewriteServiceNodeNames ||
config.RewriteServiceTargetNames ||
config.RewriteSpanNames ||
config.RewriteTransactionNames ||
config.RewriteTransactionTypes

var err error
metadata := b.metadata
if config.RewriteServiceNames {
metadata, err = randomizeASCIIField(metadata, "metadata.service.name", randomBits, &w.idBuf)
if err != nil {
return fmt.Errorf("failed to rewrite `service.name`: %w", err)
}
}
if config.RewriteServiceNodeNames {
// The intakev2 field name is `service.node.configured_name`,
// this is translated to `service.node.name` in the ES documents.
metadata, err = randomizeASCIIField(metadata, "metadata.service.node.configured_name", randomBits, &w.idBuf)
if err != nil {
return fmt.Errorf("failed to rewrite `service.node.name`: %w", err)
}
}
w.Write(metadata)
w.Write(newlineBytes)

for _, event := range b.events {
if !rewriteAny {
w.Write(event.payload)
w.Write(newlineBytes)
continue
}
w.rewriteBuf.RawByte('{')
w.rewriteBuf.String(event.objectType)
w.rewriteBuf.RawString(":")
rewriteJSONObject(w, gjson.GetBytes(event.payload, event.objectType), func(key, value gjson.Result) bool {
switch key.Str {
case "timestamp":
if config.RewriteTimestamps && !event.timestamp.IsZero() {
// We always encode rewritten timestamps as strings,
// so we don't lose any precision when offsetting by
// either the base timestamp, or the minimum timestamp
// across all the batches; string-formatted timestamps
// may have nanosecond precision.
offset := event.timestamp.Sub(minTimestamp)
timestamp := baseTimestamp.Add(offset)
w.rewriteBuf.RawByte('"')
w.rewriteBuf.Time(timestamp, time.RFC3339Nano)
w.rewriteBuf.RawByte('"')
} else {
w.rewriteBuf.RawString(value.Raw)
}
case "id", "parent_id", "trace_id", "transaction_id":
if config.RewriteIDs && randomizeTraceID(&w.idBuf, value.Str, randomBits) {
w.rewriteBuf.RawByte('"')
w.rewriteBuf.RawBytes(w.idBuf.Bytes())
w.rewriteBuf.RawByte('"')
w.idBuf.Reset()
} else {
w.rewriteBuf.RawString(value.Raw)
}
case "name":
randomizeASCII(&w.idBuf, value.Str, randomBits)
switch {
case config.RewriteSpanNames && event.objectType == "span":
w.rewriteBuf.String(w.idBuf.String())
case config.RewriteTransactionNames && event.objectType == "transaction":
w.rewriteBuf.String(w.idBuf.String())
default:
w.rewriteBuf.RawString(value.Raw)
}
w.idBuf.Reset()
case "type":
switch {
case config.RewriteTransactionTypes && event.objectType == "transaction":
randomizeASCII(&w.idBuf, value.Str, randomBits)
w.rewriteBuf.String(w.idBuf.String())
w.idBuf.Reset()
default:
w.rewriteBuf.RawString(value.Raw)
}
case "context":
if !config.RewriteServiceTargetNames {
w.rewriteBuf.RawString(value.Raw)
break
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "service" {
w.rewriteBuf.RawString(value.Raw)
return true
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "target" {
w.rewriteBuf.RawString(value.Raw)
return true
}
rewriteJSONObject(w, value, func(key, value gjson.Result) bool {
if key.Str != "name" {
w.rewriteBuf.RawString(value.Raw)
return true
}
randomizeASCII(&w.idBuf, value.Str, randomBits)
w.rewriteBuf.String(w.idBuf.String())
w.idBuf.Reset()
return true
})
return true
})
return true
})
default:
w.rewriteBuf.RawString(value.Raw)
}
return true
})
w.rewriteBuf.RawString("}")
w.Write(w.rewriteBuf.Bytes())
w.Write(newlineBytes)
w.rewriteBuf.Reset()
}
return nil
}
Loading

0 comments on commit ef2ed71

Please sign in to comment.