From 01a4424627aa0785049a10a54e6d5ffb94001e33 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 6 Sep 2023 13:25:35 +0930 Subject: [PATCH] x-pack/filebeat/input/httpjson: scattered cleanup * logs last * single line signatures --- x-pack/filebeat/input/httpjson/cursor.go | 4 +- x-pack/filebeat/input/httpjson/input.go | 43 +++++++------------ .../filebeat/input/httpjson/input_cursor.go | 9 +--- x-pack/filebeat/input/httpjson/pagination.go | 2 +- .../filebeat/input/httpjson/rate_limiter.go | 4 +- x-pack/filebeat/input/httpjson/request.go | 19 +++----- x-pack/filebeat/input/httpjson/response.go | 2 +- x-pack/filebeat/input/httpjson/split.go | 4 +- .../input/httpjson/transform_append.go | 3 +- .../filebeat/input/httpjson/transform_set.go | 3 +- 10 files changed, 37 insertions(+), 56 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/cursor.go b/x-pack/filebeat/input/httpjson/cursor.go index a431eb3f486a..92cd53a52a29 100644 --- a/x-pack/filebeat/input/httpjson/cursor.go +++ b/x-pack/filebeat/input/httpjson/cursor.go @@ -11,11 +11,11 @@ import ( ) type cursor struct { - log *logp.Logger - cfg cursorConfig state mapstr.M + + log *logp.Logger } func newCursor(cfg cursorConfig, log *logp.Logger) *cursor { diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index eea7bd39e281..b2b8d96c6562 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -108,60 +108,49 @@ func test(url *url.URL) error { return nil } -func runWithMetrics( - ctx v2.Context, - config config, - publisher inputcursor.Publisher, - cursor *inputcursor.Cursor, -) error { +func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor) error { reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil) defer unreg() - return run(ctx, config, publisher, cursor, reg) + return run(ctx, cfg, pub, crsr, reg) } -func run( - ctx v2.Context, - config config, - publisher inputcursor.Publisher, - cursor *inputcursor.Cursor, - reg *monitoring.Registry, -) error { - log := ctx.Logger.With("input_url", config.Request.URL) +func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error { + log := ctx.Logger.With("input_url", cfg.Request.URL) stdCtx := ctxtool.FromCanceller(ctx.Cancelation) - if config.Request.Tracer != nil { + if cfg.Request.Tracer != nil { id := sanitizeFileName(ctx.ID) - config.Request.Tracer.Filename = strings.ReplaceAll(config.Request.Tracer.Filename, "*", id) + cfg.Request.Tracer.Filename = strings.ReplaceAll(cfg.Request.Tracer.Filename, "*", id) } metrics := newInputMetrics(reg) - httpClient, err := newHTTPClient(stdCtx, config, log, reg) + httpClient, err := newHTTPClient(stdCtx, cfg, log, reg) if err != nil { return err } - requestFactory, err := newRequestFactory(stdCtx, config, log, metrics, reg) + requestFactory, err := newRequestFactory(stdCtx, cfg, log, metrics, reg) if err != nil { log.Errorf("Error while creating requestFactory: %v", err) return err } var xmlDetails map[string]xml.Detail - if config.Response.XSD != "" { - xmlDetails, err = xml.Details([]byte(config.Response.XSD)) + if cfg.Response.XSD != "" { + xmlDetails, err = xml.Details([]byte(cfg.Response.XSD)) if err != nil { log.Errorf("error while collecting xml decoder type hints: %v", err) return err } } - pagination := newPagination(config, httpClient, log) - responseProcessor := newResponseProcessor(config, pagination, xmlDetails, metrics, log) + pagination := newPagination(cfg, httpClient, log) + responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log) requester := newRequester(httpClient, requestFactory, responseProcessor, log) trCtx := emptyTransformContext() - trCtx.cursor = newCursor(config.Cursor, log) - trCtx.cursor.load(cursor) + trCtx.cursor = newCursor(cfg.Cursor, log) + trCtx.cursor.load(crsr) doFunc := func() error { log.Info("Process another repeated request.") @@ -169,7 +158,7 @@ func run( startTime := time.Now() var err error - if err = requester.doRequest(stdCtx, trCtx, publisher); err != nil { + if err = requester.doRequest(stdCtx, trCtx, pub); err != nil { log.Errorf("Error while processing http request: %v", err) } @@ -185,7 +174,7 @@ func run( // we trigger the first call immediately, // then we schedule it on the given interval using timed.Periodic if err = doFunc(); err == nil { - err = timed.Periodic(stdCtx, config.Interval, doFunc) + err = timed.Periodic(stdCtx, cfg.Interval, doFunc) } log.Infof("Input stopped because context was cancelled with: %v", err) diff --git a/x-pack/filebeat/input/httpjson/input_cursor.go b/x-pack/filebeat/input/httpjson/input_cursor.go index 99923bb98f43..139910bd905a 100644 --- a/x-pack/filebeat/input/httpjson/input_cursor.go +++ b/x-pack/filebeat/input/httpjson/input_cursor.go @@ -45,12 +45,7 @@ func (in *cursorInput) Test(src inputcursor.Source, _ v2.TestContext) error { // Run starts the input and blocks until it ends the execution. // It will return on context cancellation, any other error will be retried. -func (in *cursorInput) Run( - ctx v2.Context, - src inputcursor.Source, - cursor inputcursor.Cursor, - publisher inputcursor.Publisher, -) error { +func (in *cursorInput) Run(ctx v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error { s := src.(*source) - return runWithMetrics(ctx, s.config, publisher, &cursor) + return runWithMetrics(ctx, s.config, pub, &crsr) } diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 1b8cfc6d240f..459a15287519 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -20,10 +20,10 @@ import ( const paginationNamespace = "pagination" type pagination struct { - log *logp.Logger httpClient *httpClient requestFactory *requestFactory decoder decoderFunc + log *logp.Logger } func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pagination { diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index c1af39f77588..dcbed7c1ac91 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -16,12 +16,12 @@ import ( ) type rateLimiter struct { - log *logp.Logger - limit *valueTpl reset *valueTpl remaining *valueTpl earlyLimit *float64 + + log *logp.Logger } func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter { diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 2ac6023d061a..cc2b39127f9e 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -258,21 +258,21 @@ func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, } type requestFactory struct { + chainHTTPClient *httpClient url url.URL method string body *mapstr.M transforms []basicTransform user string password string - log *logp.Logger encoder encoderFunc replace string replaceWith string isChain bool until *valueTpl - chainHTTPClient *httpClient chainResponseProcessor *responseProcessor saveFirstResponse bool + log *logp.Logger } func newRequestFactory(ctx context.Context, config config, log *logp.Logger, metrics *inputMetrics, reg *monitoring.Registry) ([]*requestFactory, error) { @@ -471,23 +471,18 @@ func (rf *requestFactory) newRequest(ctx *transformContext) (transformable, erro } type requester struct { - log *logp.Logger client *httpClient requestFactories []*requestFactory responseProcessors []*responseProcessor + log *logp.Logger } -func newRequester( - client *httpClient, - requestFactory []*requestFactory, - responseProcessor []*responseProcessor, - log *logp.Logger, -) *requester { +func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester { return &requester{ - log: log, client: client, - requestFactories: requestFactory, - responseProcessors: responseProcessor, + requestFactories: reqs, + responseProcessors: resps, + log: log, } } diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 045af1faf15f..63ba34d9d9ab 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -94,11 +94,11 @@ func (resp *response) templateValues() mapstr.M { type responseProcessor struct { metrics *inputMetrics - log *logp.Logger transforms []basicTransform split *split pagination *pagination xmlDetails map[string]xml.Detail + log *logp.Logger } func newResponseProcessor(config config, pagination *pagination, xmlDetails map[string]xml.Detail, metrics *inputMetrics, log *logp.Logger) []*responseProcessor { diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index f6b7be47d56f..feec75f77594 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -27,7 +27,6 @@ var ( // by applying elements of the chain's linked list to an input until completed // or an error state is encountered. type split struct { - log *logp.Logger targetInfo targetInfo kind string transforms []basicTransform @@ -37,6 +36,7 @@ type split struct { keyField string isRoot bool delimiter string + log *logp.Logger } // newSplitResponse returns a new split based on the provided config and @@ -81,7 +81,6 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { } return &split{ - log: log, targetInfo: ti, kind: c.Type, keepParent: c.KeepParent, @@ -90,6 +89,7 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { delimiter: c.DelimiterString, transforms: ts, child: s, + log: log, }, nil } diff --git a/x-pack/filebeat/input/httpjson/transform_append.go b/x-pack/filebeat/input/httpjson/transform_append.go index f402e8bb2e36..e7068974f784 100644 --- a/x-pack/filebeat/input/httpjson/transform_append.go +++ b/x-pack/filebeat/input/httpjson/transform_append.go @@ -23,7 +23,6 @@ type appendConfig struct { } type appendt struct { - log *logp.Logger targetInfo targetInfo value *valueTpl defaultValue *valueTpl @@ -31,6 +30,8 @@ type appendt struct { valueType valueType runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error + + log *logp.Logger } func (appendt) transformName() string { return appendName } diff --git a/x-pack/filebeat/input/httpjson/transform_set.go b/x-pack/filebeat/input/httpjson/transform_set.go index 185ec30b0a4f..06ba590f4792 100644 --- a/x-pack/filebeat/input/httpjson/transform_set.go +++ b/x-pack/filebeat/input/httpjson/transform_set.go @@ -27,7 +27,6 @@ type setConfig struct { } type set struct { - log *logp.Logger targetInfo targetInfo value *valueTpl defaultValue *valueTpl @@ -35,6 +34,8 @@ type set struct { valueType valueType runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error + + log *logp.Logger } func (set) transformName() string { return setName }