Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: avoid context allocations during splits
Browse files Browse the repository at this point in the history
Also standardise context parameters onto something approximating
convention.
  • Loading branch information
efd6 committed Sep 6, 2023
1 parent 6d93d9f commit 13c58d3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 77 deletions.
88 changes: 44 additions & 44 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type httpClient struct {
limiter *rateLimiter
}

func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error {
var (
n int
ids []string
Expand All @@ -55,7 +55,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
// iterate over collected ids from last response
if i == 0 {
// perform and store regular call responses
httpResp, err = rf.collectResponse(stdCtx, trCtx, r)
httpResp, err = rf.collectResponse(ctx, trCtx, r)
if err != nil {
return fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
if len(ids) == 0 {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
}
}
// collect data from new urls
httpResp, err = rf.collectResponse(stdCtx, chainTrCtx, r)
httpResp, err = rf.collectResponse(ctx, chainTrCtx, r)
if err != nil {
return fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
Expand Down Expand Up @@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
} else {
r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p)
r.responseProcessors[i].startProcessing(ctx, trCtx, resps, true, p)
}
n += p.eventCount()
}
Expand All @@ -202,30 +202,30 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
defer httpResp.Body.Close()
// if pagination exists for the parent request along with chaining, then for each page response the chain is processed
if isChainWithPageExpected {
n += r.processRemainingChainEvents(stdCtx, trCtx, publisher, initialResponse, chainIndex)
n += r.processRemainingChainEvents(ctx, trCtx, publisher, initialResponse, chainIndex)
}
r.log.Infof("request finished: %d events published", n)

return nil
}

// collectResponse returns response from provided request
func (rf *requestFactory) collectResponse(stdCtx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) {
func (rf *requestFactory) collectResponse(ctx context.Context, trCtx *transformContext, r *requester) (*http.Response, error) {
var err error
var httpResp *http.Response

req, err := rf.newHTTPRequest(stdCtx, trCtx)
req, err := rf.newHTTPRequest(ctx, trCtx)
if err != nil {
return nil, fmt.Errorf("failed to create http request: %w", err)
}

if rf.isChain && rf.chainHTTPClient != nil {
httpResp, err = rf.chainHTTPClient.do(stdCtx, req)
httpResp, err = rf.chainHTTPClient.do(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to execute chain http client.Do: %w", err)
}
} else {
httpResp, err = r.client.do(stdCtx, req)
httpResp, err = r.client.do(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to execute http client.Do: %w", err)
}
Expand All @@ -234,8 +234,8 @@ func (rf *requestFactory) collectResponse(stdCtx context.Context, trCtx *transfo
return httpResp, nil
}

func (c *httpClient) do(stdCtx context.Context, req *http.Request) (*http.Response, error) {
resp, err := c.limiter.execute(stdCtx, func() (*http.Response, error) {
func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, error) {
resp, err := c.limiter.execute(ctx, func() (*http.Response, error) {
return c.client.Do(req)
})
if err != nil {
Expand Down Expand Up @@ -401,7 +401,7 @@ func tryAssignAuth(parentConfig *authConfig, childConfig *authConfig) *authConfi
return childConfig
}

func (rf *requestFactory) newHTTPRequest(stdCtx context.Context, trCtx *transformContext) (*http.Request, error) {
func (rf *requestFactory) newHTTPRequest(ctx context.Context, trCtx *transformContext) (*http.Request, error) {
trReq, err := rf.newRequest(trCtx)
if err != nil {
return nil, err
Expand All @@ -425,7 +425,7 @@ func (rf *requestFactory) newHTTPRequest(stdCtx context.Context, trCtx *transfor
return nil, err
}

req = req.WithContext(stdCtx)
req = req.WithContext(ctx)

req.Header = trReq.header().Clone()

Expand Down Expand Up @@ -549,20 +549,20 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}

type chainProcessor struct {
req *requester
ctx *transformContext
pub inputcursor.Publisher
idx int
tail bool
n int
req *requester
trCtx *transformContext
pub inputcursor.Publisher
idx int
tail bool
n int
}

func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor {
return &chainProcessor{
req: req,
ctx: trCtx,
pub: pub,
idx: idx,
req: req,
trCtx: trCtx,
pub: pub,
idx: idx,
}
}

Expand All @@ -585,11 +585,11 @@ func (p *chainProcessor) handleEvent(ctx context.Context, msg mapstr.M) {
response.Body = io.NopCloser(body)

// updates the cursor for pagination last_event & last_response when chaining is present
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()
p.trCtx.updateLastEvent(msg)
p.trCtx.updateCursor()

// for each pagination response, we repeat all the chain steps / blocks
n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log)
n, err := p.req.processChainPaginationEvents(ctx, p.trCtx, p.pub, &response, p.idx, p.req.log)
if err != nil {
p.req.log.Errorf("error processing chain event: %w", err)
return
Expand All @@ -613,7 +613,7 @@ func (p *chainProcessor) eventCount() int {
// processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
//
//nolint:bodyclose // response body is closed through drainBody method
func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) {
func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) {
var (
n int
ids []string
Expand Down Expand Up @@ -674,7 +674,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}

// collect data from new urls
httpResp, err = rf.collectResponse(stdCtx, chainTrCtx, r)
httpResp, err = rf.collectResponse(ctx, chainTrCtx, r)
if err != nil {
return -1, fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
Expand All @@ -700,7 +700,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
n += p.eventCount()
}

Expand All @@ -723,20 +723,20 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
}

type publisher struct {
ctx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
}

func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
return &publisher{
ctx: trCtx,
pub: pub,
log: log,
trCtx: trCtx,
pub: pub,
log: log,
}
}

Expand All @@ -748,16 +748,16 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {
return
}

if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil {
if err := p.pub.Publish(event, p.trCtx.cursorMap()); err != nil {
p.log.Errorf("error publishing event: %v", err)
return
}
}
if len(*p.ctx.firstEventClone()) == 0 {
p.ctx.updateFirstEvent(msg)
if len(*p.trCtx.firstEventClone()) == 0 {
p.trCtx.updateFirstEvent(msg)
}
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()
p.trCtx.updateLastEvent(msg)
p.trCtx.updateCursor()

p.n++
}
Expand Down
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ type handler interface {
handleError(error)
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, h handler) {
func (rp *responseProcessor) startProcessing(ctx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, h handler) {
trCtx.clearIntervalData()

var npages int64
for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
iter := rp.pagination.newPageIterator(ctx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
Expand Down Expand Up @@ -229,12 +229,12 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
}

if rp.split == nil {
h.handleEvent(stdCtx, tr.body())
h.handleEvent(ctx, tr.body())
rp.log.Debug("no split found: continuing")
continue
}

if err := rp.split.run(trCtx, tr, h); err != nil {
if err := rp.split.run(ctx, trCtx, tr, h); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
Expand Down
Loading

0 comments on commit 13c58d3

Please sign in to comment.