Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: scattered cleanup
Browse files Browse the repository at this point in the history
* logs last
* single line signatures
  • Loading branch information
efd6 committed Sep 6, 2023
1 parent d178014 commit 01a4424
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 56 deletions.
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
)

type cursor struct {
log *logp.Logger

cfg cursorConfig

state mapstr.M

log *logp.Logger
}

func newCursor(cfg cursorConfig, log *logp.Logger) *cursor {
Expand Down
43 changes: 16 additions & 27 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,68 +108,57 @@ func test(url *url.URL) error {
return nil
}

func runWithMetrics(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
) error {
func runWithMetrics(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor) error {
reg, unreg := inputmon.NewInputRegistry("httpjson", ctx.ID, nil)
defer unreg()
return run(ctx, config, publisher, cursor, reg)
return run(ctx, cfg, pub, crsr, reg)
}

func run(
ctx v2.Context,
config config,
publisher inputcursor.Publisher,
cursor *inputcursor.Cursor,
reg *monitoring.Registry,
) error {
log := ctx.Logger.With("input_url", config.Request.URL)
func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcursor.Cursor, reg *monitoring.Registry) error {
log := ctx.Logger.With("input_url", cfg.Request.URL)

stdCtx := ctxtool.FromCanceller(ctx.Cancelation)

if config.Request.Tracer != nil {
if cfg.Request.Tracer != nil {
id := sanitizeFileName(ctx.ID)
config.Request.Tracer.Filename = strings.ReplaceAll(config.Request.Tracer.Filename, "*", id)
cfg.Request.Tracer.Filename = strings.ReplaceAll(cfg.Request.Tracer.Filename, "*", id)
}

metrics := newInputMetrics(reg)

httpClient, err := newHTTPClient(stdCtx, config, log, reg)
httpClient, err := newHTTPClient(stdCtx, cfg, log, reg)
if err != nil {
return err
}

requestFactory, err := newRequestFactory(stdCtx, config, log, metrics, reg)
requestFactory, err := newRequestFactory(stdCtx, cfg, log, metrics, reg)
if err != nil {
log.Errorf("Error while creating requestFactory: %v", err)
return err
}
var xmlDetails map[string]xml.Detail
if config.Response.XSD != "" {
xmlDetails, err = xml.Details([]byte(config.Response.XSD))
if cfg.Response.XSD != "" {
xmlDetails, err = xml.Details([]byte(cfg.Response.XSD))
if err != nil {
log.Errorf("error while collecting xml decoder type hints: %v", err)
return err
}
}
pagination := newPagination(config, httpClient, log)
responseProcessor := newResponseProcessor(config, pagination, xmlDetails, metrics, log)
pagination := newPagination(cfg, httpClient, log)
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
requester := newRequester(httpClient, requestFactory, responseProcessor, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)
trCtx.cursor.load(cursor)
trCtx.cursor = newCursor(cfg.Cursor, log)
trCtx.cursor.load(crsr)

doFunc := func() error {
log.Info("Process another repeated request.")

startTime := time.Now()

var err error
if err = requester.doRequest(stdCtx, trCtx, publisher); err != nil {
if err = requester.doRequest(stdCtx, trCtx, pub); err != nil {
log.Errorf("Error while processing http request: %v", err)
}

Expand All @@ -185,7 +174,7 @@ func run(
// we trigger the first call immediately,
// then we schedule it on the given interval using timed.Periodic
if err = doFunc(); err == nil {
err = timed.Periodic(stdCtx, config.Interval, doFunc)
err = timed.Periodic(stdCtx, cfg.Interval, doFunc)
}

log.Infof("Input stopped because context was cancelled with: %v", err)
Expand Down
9 changes: 2 additions & 7 deletions x-pack/filebeat/input/httpjson/input_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ func (in *cursorInput) Test(src inputcursor.Source, _ v2.TestContext) error {

// Run starts the input and blocks until it ends the execution.
// It will return on context cancellation, any other error will be retried.
func (in *cursorInput) Run(
ctx v2.Context,
src inputcursor.Source,
cursor inputcursor.Cursor,
publisher inputcursor.Publisher,
) error {
func (in *cursorInput) Run(ctx v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
s := src.(*source)
return runWithMetrics(ctx, s.config, publisher, &cursor)
return runWithMetrics(ctx, s.config, pub, &crsr)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
const paginationNamespace = "pagination"

type pagination struct {
log *logp.Logger
httpClient *httpClient
requestFactory *requestFactory
decoder decoderFunc
log *logp.Logger
}

func newPagination(config config, httpClient *httpClient, log *logp.Logger) *pagination {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

type rateLimiter struct {
log *logp.Logger

limit *valueTpl
reset *valueTpl
remaining *valueTpl
earlyLimit *float64

log *logp.Logger
}

func newRateLimiterFromConfig(config *rateLimitConfig, log *logp.Logger) *rateLimiter {
Expand Down
19 changes: 7 additions & 12 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,21 +258,21 @@ func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response,
}

type requestFactory struct {
chainHTTPClient *httpClient
url url.URL
method string
body *mapstr.M
transforms []basicTransform
user string
password string
log *logp.Logger
encoder encoderFunc
replace string
replaceWith string
isChain bool
until *valueTpl
chainHTTPClient *httpClient
chainResponseProcessor *responseProcessor
saveFirstResponse bool
log *logp.Logger
}

func newRequestFactory(ctx context.Context, config config, log *logp.Logger, metrics *inputMetrics, reg *monitoring.Registry) ([]*requestFactory, error) {
Expand Down Expand Up @@ -471,23 +471,18 @@ func (rf *requestFactory) newRequest(ctx *transformContext) (transformable, erro
}

type requester struct {
log *logp.Logger
client *httpClient
requestFactories []*requestFactory
responseProcessors []*responseProcessor
log *logp.Logger
}

func newRequester(
client *httpClient,
requestFactory []*requestFactory,
responseProcessor []*responseProcessor,
log *logp.Logger,
) *requester {
func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester {
return &requester{
log: log,
client: client,
requestFactories: requestFactory,
responseProcessors: responseProcessor,
requestFactories: reqs,
responseProcessors: resps,
log: log,
}
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ func (resp *response) templateValues() mapstr.M {

type responseProcessor struct {
metrics *inputMetrics
log *logp.Logger
transforms []basicTransform
split *split
pagination *pagination
xmlDetails map[string]xml.Detail
log *logp.Logger
}

func newResponseProcessor(config config, pagination *pagination, xmlDetails map[string]xml.Detail, metrics *inputMetrics, log *logp.Logger) []*responseProcessor {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var (
// by applying elements of the chain's linked list to an input until completed
// or an error state is encountered.
type split struct {
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
Expand All @@ -37,6 +36,7 @@ type split struct {
keyField string
isRoot bool
delimiter string
log *logp.Logger
}

// newSplitResponse returns a new split based on the provided config and
Expand Down Expand Up @@ -81,7 +81,6 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
}

return &split{
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
Expand All @@ -90,6 +89,7 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
delimiter: c.DelimiterString,
transforms: ts,
child: s,
log: log,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/httpjson/transform_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ type appendConfig struct {
}

type appendt struct {
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool
valueType valueType

runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error

log *logp.Logger
}

func (appendt) transformName() string { return appendName }
Expand Down
3 changes: 2 additions & 1 deletion x-pack/filebeat/input/httpjson/transform_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ type setConfig struct {
}

type set struct {
log *logp.Logger
targetInfo targetInfo
value *valueTpl
defaultValue *valueTpl
failOnTemplateError bool
valueType valueType

runFunc func(ctx *transformContext, transformable transformable, key string, val interface{}) error

log *logp.Logger
}

func (set) transformName() string { return setName }
Expand Down

0 comments on commit 01a4424

Please sign in to comment.