diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 7aa040dfbde..ee1113e8c37 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -33,7 +33,7 @@ type httpClient struct { limiter *rateLimiter } -func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error { +func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error { var ( n int ids []string @@ -55,7 +55,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p // iterate over collected ids from last response if i == 0 { // perform and store regular call responses - httpResp, err = rf.collectResponse(stdCtx, trCtx, r) + httpResp, err = rf.collectResponse(ctx, trCtx, r) if err != nil { return fmt.Errorf("failed to execute rf.collectResponse: %w", err) } @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.log) - r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p) + r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p) n = p.eventCount() continue } @@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.log) - r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p) + r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p) n = p.eventCount() } else { if len(ids) == 0 { @@ -163,7 +163,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } } // collect data from new urls - httpResp, err = rf.collectResponse(stdCtx, chainTrCtx, r) + httpResp, err = rf.collectResponse(ctx, chainTrCtx, r) if err != nil { return fmt.Errorf("failed to execute rf.collectResponse: %w", err) } @@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) if rf.isChain { - rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) } else { - r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p) + r.responseProcessors[i].startProcessing(ctx, trCtx, resps, true, p) } n += p.eventCount() } @@ -202,7 +202,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p defer httpResp.Body.Close() // if pagination exists for the parent request along with chaining, then for each page response the chain is processed if isChainWithPageExpected { - n += r.processRemainingChainEvents(stdCtx, trCtx, publisher, initialResponse, chainIndex) + n += r.processRemainingChainEvents(ctx, trCtx, publisher, initialResponse, chainIndex) } r.log.Infof("request finished: %d events published", n) @@ -210,22 +210,22 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // collectResponse returns response from provided request -func (rf *requestFactory) collectResponse(stdCtx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) { +func (rf *requestFactory) collectResponse(ctx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) { var err error var httpResp *http.Response - req, err := rf.newHTTPRequest(stdCtx, trCtx) + req, err := rf.newHTTPRequest(ctx, trCtx) if err != nil { return nil, fmt.Errorf("failed to create http request: %w", err) } if rf.isChain && rf.chainHTTPClient != nil { - httpResp, err = rf.chainHTTPClient.do(stdCtx, req) + httpResp, err = rf.chainHTTPClient.do(ctx, req) if err != nil { return nil, fmt.Errorf("failed to execute chain http client.Do: %w", err) } } else { - httpResp, err = r.client.do(stdCtx, req) + httpResp, err = r.client.do(ctx, req) if err != nil { return nil, fmt.Errorf("failed to execute http client.Do: %w", err) } @@ -234,8 +234,8 @@ func (rf *requestFactory) collectResponse(stdCtx context.Context, trCtx *transfo return httpResp, nil } -func (c *httpClient) do(stdCtx context.Context, req *http.Request) (*http.Response, error) { - resp, err := c.limiter.execute(stdCtx, func() (*http.Response, error) { +func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, error) { + resp, err := c.limiter.execute(ctx, func() (*http.Response, error) { return c.client.Do(req) }) if err != nil { @@ -401,7 +401,7 @@ func tryAssignAuth(parentConfig *authConfig, childConfig *authConfig) *authConfi return childConfig } -func (rf *requestFactory) newHTTPRequest(stdCtx context.Context, trCtx *transformContext) (*http.Request, error) { +func (rf *requestFactory) newHTTPRequest(ctx context.Context, trCtx *transformContext) (*http.Request, error) { trReq, err := rf.newRequest(trCtx) if err != nil { return nil, err @@ -425,7 +425,7 @@ func (rf *requestFactory) newHTTPRequest(stdCtx context.Context, trCtx *transfor return nil, err } - req = req.WithContext(stdCtx) + req = req.WithContext(ctx) req.Header = trReq.header().Clone() @@ -549,20 +549,20 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t } type chainProcessor struct { - req *requester - ctx *transformContext - pub inputcursor.Publisher - idx int - tail bool - n int + req *requester + trCtx *transformContext + pub inputcursor.Publisher + idx int + tail bool + n int } func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor { return &chainProcessor{ - req: req, - ctx: trCtx, - pub: pub, - idx: idx, + req: req, + trCtx: trCtx, + pub: pub, + idx: idx, } } @@ -585,11 +585,11 @@ func (p *chainProcessor) handleEvent(ctx context.Context, msg mapstr.M) { response.Body = io.NopCloser(body) // updates the cursor for pagination last_event & last_response when chaining is present - p.ctx.updateLastEvent(msg) - p.ctx.updateCursor() + p.trCtx.updateLastEvent(msg) + p.trCtx.updateCursor() // for each pagination response, we repeat all the chain steps / blocks - n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log) + n, err := p.req.processChainPaginationEvents(ctx, p.trCtx, p.pub, &response, p.idx, p.req.log) if err != nil { p.req.log.Errorf("error processing chain event: %w", err) return @@ -613,7 +613,7 @@ func (p *chainProcessor) eventCount() int { // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input // //nolint:bodyclose // response body is closed through drainBody method -func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) { +func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) { var ( n int ids []string @@ -674,7 +674,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } // collect data from new urls - httpResp, err = rf.collectResponse(stdCtx, chainTrCtx, r) + httpResp, err = rf.collectResponse(ctx, chainTrCtx, r) if err != nil { return -1, fmt.Errorf("failed to execute rf.collectResponse: %w", err) } @@ -700,7 +700,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) n += p.eventCount() } @@ -723,10 +723,10 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { } type publisher struct { - ctx *transformContext - pub inputcursor.Publisher - n int - log *logp.Logger + trCtx *transformContext + pub inputcursor.Publisher + n int + log *logp.Logger } func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { @@ -734,9 +734,9 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo pub = nil } return &publisher{ - ctx: trCtx, - pub: pub, - log: log, + trCtx: trCtx, + pub: pub, + log: log, } } @@ -748,16 +748,16 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) { return } - if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil { + if err := p.pub.Publish(event, p.trCtx.cursorMap()); err != nil { p.log.Errorf("error publishing event: %v", err) return } } - if len(*p.ctx.firstEventClone()) == 0 { - p.ctx.updateFirstEvent(msg) + if len(*p.trCtx.firstEventClone()) == 0 { + p.trCtx.updateFirstEvent(msg) } - p.ctx.updateLastEvent(msg) - p.ctx.updateCursor() + p.trCtx.updateLastEvent(msg) + p.trCtx.updateCursor() p.n++ } diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index acd0deb6f61..045af1faf15 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -186,12 +186,12 @@ type handler interface { handleError(error) } -func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, h handler) { +func (rp *responseProcessor) startProcessing(ctx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, h handler) { trCtx.clearIntervalData() var npages int64 for i, httpResp := range resps { - iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) + iter := rp.pagination.newPageIterator(ctx, trCtx, httpResp, rp.xmlDetails) for { pageStartTime := time.Now() page, hasNext, err := iter.next() @@ -229,12 +229,12 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran } if rp.split == nil { - h.handleEvent(stdCtx, tr.body()) + h.handleEvent(ctx, tr.body()) rp.log.Debug("no split found: continuing") continue } - if err := rp.split.run(trCtx, tr, h); err != nil { + if err := rp.split.run(ctx, trCtx, tr, h); err != nil { switch err { //nolint:errorlint // run never returns a wrapped error. case errEmptyField: // nothing else to send for this page diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index 853df5f7578..f6b7be47d56 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -96,14 +96,13 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { // run runs the split operation on the contents of resp, processing successive // split results on via h. ctx is passed to transforms that are called during // the split. -func (s *split) run(ctx *transformContext, resp transformable, h handler) error { +func (s *split) run(ctx context.Context, trCtx *transformContext, resp transformable, h handler) error { root := resp.body() - return s.split(ctx, root, h) + return s.split(ctx, trCtx, root, h) } // split recursively executes the split processor chain. -func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { - todo := context.TODO() +func (s *split) split(ctx context.Context, trCtx *transformContext, root mapstr.M, h handler) error { v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue. @@ -113,21 +112,21 @@ func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { if v == nil { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, h) + return s.child.split(ctx, trCtx, root, h) } if s.keepParent { - h.handleEvent(todo, root) + h.handleEvent(ctx, root) } return nil } if s.isRoot { if s.keepParent { - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyField } return errEmptyRootField } - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyField } @@ -141,23 +140,23 @@ func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { if len(varr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, h) + return s.child.split(ctx, trCtx, root, h) } if s.keepParent { - h.handleEvent(todo, root) + h.handleEvent(ctx, root) } return nil } if s.isRoot { - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyRootField } - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyField } for _, e := range varr { - err := s.processMessage(ctx, root, s.targetInfo.Name, e, h) + err := s.processMessage(ctx, trCtx, root, s.targetInfo.Name, e, h) if err != nil { s.log.Debug(err) } @@ -173,22 +172,22 @@ func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { if len(vmap) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, h) + return s.child.split(ctx, trCtx, root, h) } if s.keepParent { - h.handleEvent(todo, root) + h.handleEvent(ctx, root) } return nil } if s.isRoot { return errEmptyRootField } - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyField } for k, e := range vmap { - if err := s.processMessage(ctx, root, k, e, h); err != nil { + if err := s.processMessage(ctx, trCtx, root, k, e, h); err != nil { s.log.Debug(err) } } @@ -203,18 +202,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { if len(vstr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, h) + return s.child.split(ctx, trCtx, root, h) } return nil } if s.isRoot { return errEmptyRootField } - h.handleEvent(todo, root) + h.handleEvent(ctx, root) return errEmptyField } for _, substr := range strings.Split(vstr, s.delimiter) { - if err := s.processMessageSplitString(ctx, root, substr, h); err != nil { + if err := s.processMessageSplitString(ctx, trCtx, root, substr, h); err != nil { s.log.Debug(err) } } @@ -227,7 +226,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, h handler) error { // processMessage processes an array or map split result value, v, via h after performing // any necessary transformations. If key is "", the value is an element of an array. -func (s *split) processMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, h handler) error { +func (s *split) processMessage(ctx context.Context, trCtx *transformContext, root mapstr.M, key string, v interface{}, h handler) error { obj, ok := toMapStr(v, s.targetInfo.Name) if !ok { return errExpectedSplitObj @@ -248,17 +247,17 @@ func (s *split) processMessage(ctx *transformContext, root mapstr.M, key string, var err error for _, t := range s.transforms { - tr, err = t.run(ctx, tr) + tr, err = t.run(trCtx, tr) if err != nil { return err } } if s.child != nil { - return s.child.split(ctx, clone, h) + return s.child.split(ctx, trCtx, clone, h) } - h.handleEvent(context.TODO(), clone) + h.handleEvent(ctx, clone) return nil } @@ -280,7 +279,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) { // sendMessage processes a string split result value, v, via h after performing any // necessary transformations. If key is "", the value is an element of an array. -func (s *split) processMessageSplitString(ctx *transformContext, root mapstr.M, v string, h handler) error { +func (s *split) processMessageSplitString(ctx context.Context, trCtx *transformContext, root mapstr.M, v string, h handler) error { clone := root.Clone() _, _ = clone.Put(s.targetInfo.Name, v) @@ -289,17 +288,17 @@ func (s *split) processMessageSplitString(ctx *transformContext, root mapstr.M, var err error for _, t := range s.transforms { - tr, err = t.run(ctx, tr) + tr, err = t.run(trCtx, tr) if err != nil { return err } } if s.child != nil { - return s.child.split(ctx, clone, h) + return s.child.split(ctx, trCtx, clone, h) } - h.handleEvent(context.TODO(), clone) + h.handleEvent(ctx, clone) return nil } diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go index 4ef95499d84..cd7d5a88a9e 100644 --- a/x-pack/filebeat/input/httpjson/split_test.go +++ b/x-pack/filebeat/input/httpjson/split_test.go @@ -707,7 +707,7 @@ func TestSplit(t *testing.T) { events := &stream{t: t} split, err := newSplitResponse(tc.config, logp.NewLogger("")) assert.NoError(t, err) - err = split.run(tc.ctx, tc.resp, events) + err = split.run(context.Background(), tc.ctx, tc.resp, events) if tc.expectedErr == nil { assert.NoError(t, err) } else {