Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: post-refactor clean-up (elastic#36520)
Browse files Browse the repository at this point in the history
* fix valid lint complaints
* adjust labels to new semantics
* avoid context allocations during splits
* standardise context parameter naming
* avoid fruitless map allocation
* document handlers
* scattered cleanup
* logs last
* single line signatures
* reduce name stutter
  • Loading branch information
efd6 authored and Scholar-Li committed Feb 5, 2024
1 parent 08acf97 commit ecb5430
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 180 deletions.
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

type cursor struct {
log *logp.Logger

cfg cursorConfig

state mapstr.M

log *logp.Logger
}

func newCursor(cfg cursorConfig, log *logp.Logger) *cursor {
Expand Down
45 changes: 17 additions & 28 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,68 +108,57 @@ func test(url *url.URL) error {
return nil
}

func runWithMetrics(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
) error {
func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor) error {
reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil)
defer unreg()
return run(ctx, config, publisher, cursor, reg)
return run(ctx, cfg, pub, crsr, reg)
}

func run(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
reg *monitoring.Registry,
) error {
log := ctx.Logger.With("input_url", config.Request.URL)
func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error {
log := ctx.Logger.With("input_url", cfg.Request.URL)

stdCtx := ctxtool.FromCanceller(ctx.Cancelation)

if config.Request.Tracer != nil {
if cfg.Request.Tracer != nil {
id := sanitizeFileName(ctx.ID)
config.Request.Tracer.Filename = strings.ReplaceAll(config.Request.Tracer.Filename, "*", id)
cfg.Request.Tracer.Filename = strings.ReplaceAll(cfg.Request.Tracer.Filename, "*", id)
}

metrics := newInputMetrics(reg)

httpClient, err := newHTTPClient(stdCtx, config, log, reg)
client, err := newHTTPClient(stdCtx, cfg, log, reg)
if err != nil {
return err
}

requestFactory, err := newRequestFactory(stdCtx, config, log, metrics, reg)
requestFactory, err := newRequestFactory(stdCtx, cfg, log, metrics, reg)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
var xmlDetails map[string]xml.Detail
if config.Response.XSD != "" {
xmlDetails, err = xml.Details([]byte(config.Response.XSD))
if cfg.Response.XSD != "" {
xmlDetails, err = xml.Details([]byte(cfg.Response.XSD))
if err != nil {
log.Errorf("error while collecting xml decoder type hints: %v", err)
return err
}
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, metrics, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)
pagination := newPagination(cfg, client, log)
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
requester := newRequester(client, requestFactory, responseProcessor, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)
trCtx.cursor.load(cursor)
trCtx.cursor = newCursor(cfg.Cursor, log)
trCtx.cursor.load(crsr)

doFunc := func() error {
log.Info("Process another repeated request.")

startTime := time.Now()

var err error
if err = requester.doRequest(stdCtx, trCtx, publisher); err != nil {
if err = requester.doRequest(stdCtx, trCtx, pub); err != nil {
log.Errorf("Error while processing http request: %v", err)
}

Expand All @@ -185,7 +174,7 @@ func run(
// we trigger the first call immediately,
// then we schedule it on the given interval using timed.Periodic
if err = doFunc(); err == nil {
err = timed.Periodic(stdCtx, config.Interval, doFunc)
err = timed.Periodic(stdCtx, cfg.Interval, doFunc)
}

log.Infof("Input stopped because context was cancelled with: %v", err)
Expand Down
9 changes: 2 additions & 7 deletions x-pack/filebeat/input/httpjson/input_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ func (in *cursorInput) Test(src inputcursor.Source, _ v2.TestContext) error {

// Run starts the input and blocks until it ends the execution.
// It will return on context cancellation, any other error will be retried.
func (in *cursorInput) Run(
ctx v2.Context,
src inputcursor.Source,
cursor inputcursor.Cursor,
publisher inputcursor.Publisher,
) error {
func (in *cursorInput) Run(ctx v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
s := src.(*source)
return runWithMetrics(ctx, s.config, publisher, &cursor)
return runWithMetrics(ctx, s.config, pub, &crsr)
}
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
const paginationNamespace = "pagination"

type pagination struct {
log *logp.Logger
httpClient *httpClient
client *httpClient
requestFactory *requestFactory
decoder decoderFunc
log *logp.Logger
}

func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pagination {
pagination := &pagination{httpClient: httpClient, log: log}
func newPagination(config config, client *httpClient, log *logp.Logger) *pagination {
pagination := &pagination{client: client, log: log}
if config.Response == nil {
return pagination
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
}

//nolint:bodyclose // response body is closed through drainBody method
resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq)
resp, err := iter.pagination.client.do(iter.stdCtx, httpReq)
if err != nil {
return nil, false, err
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

type rateLimiter struct {
log *logp.Logger

limit *valueTpl
reset *valueTpl
remaining *valueTpl
earlyLimit *float64

log *logp.Logger
}

func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter {
Expand Down
Loading

0 comments on commit ecb5430

Please sign in to comment.