From 6ffc494ba288436d6963e5415cd9fb91d30ec93a Mon Sep 17 00:00:00 2001 From: "dhruv.sonone" Date: Mon, 23 Dec 2024 12:33:13 +0530 Subject: [PATCH] ExitPoint Stage implementation --- endpoints/openrtb2/auction.go | 28 ++++++++++++++++++++++----- endpoints/openrtb2/test_utils.go | 5 +++++ hooks/empty_plan.go | 4 ++++ hooks/hookexecution/executor.go | 33 ++++++++++++++++++++++++++++++++ hooks/hookstage/exitpoint.go | 22 +++++++++++++++++++++ hooks/plan.go | 12 ++++++++++++ hooks/repo.go | 13 +++++++++++++ 7 files changed, 112 insertions(+), 5 deletions(-) create mode 100644 hooks/hookstage/exitpoint.go diff --git a/endpoints/openrtb2/auction.go b/endpoints/openrtb2/auction.go index e1c8326091b..d65ab590ec9 100644 --- a/endpoints/openrtb2/auction.go +++ b/endpoints/openrtb2/auction.go @@ -418,16 +418,34 @@ func sendAuctionResponse( enc := json.NewEncoder(w) enc.SetEscapeHTML(false) - w.Header().Set("Content-Type", "application/json") + rawResponse, err := json.Marshal(response) + if err != nil { - // If an error happens when encoding the response, there isn't much we can do. - // If we've sent _any_ bytes, then Go would have sent the 200 status code first. - // That status code can't be un-sent... so the best we can do is log the error. - if err := enc.Encode(response); err != nil { + } + + rawResponse, header, err := hookExecutor.ExecuteExitPointStage(rawResponse, w.Header()) + if err != nil { + } + + for key, values := range header { + for _, value := range values { + w.Header().Add(key, value) + } + } + + if _, err := w.Write(rawResponse); err != nil { labels.RequestStatus = metrics.RequestStatusNetworkErr ao.Errors = append(ao.Errors, fmt.Errorf("/openrtb2/auction Failed to send response: %v", err)) } + // If an error happens when encoding the response, there isn't much we can do. + // If we've sent _any_ bytes, then Go would have sent the 200 status code first. + // That status code can't be un-sent... so the best we can do is log the error. + // if err := enc.Encode(response); err != nil { + // labels.RequestStatus = metrics.RequestStatusNetworkErr + // ao.Errors = append(ao.Errors, fmt.Errorf("/openrtb2/auction Failed to send response: %v", err)) + // } + return labels, ao } diff --git a/endpoints/openrtb2/test_utils.go b/endpoints/openrtb2/test_utils.go index e8a976f9645..589d14f02e3 100644 --- a/endpoints/openrtb2/test_utils.go +++ b/endpoints/openrtb2/test_utils.go @@ -1449,6 +1449,7 @@ type mockPlanBuilder struct { rawBidderResponsePlan hooks.Plan[hookstage.RawBidderResponse] allProcessedBidResponsesPlan hooks.Plan[hookstage.AllProcessedBidResponses] auctionResponsePlan hooks.Plan[hookstage.AuctionResponse] + exitPointPlan hooks.Plan[hookstage.ExitPoint] } func (m mockPlanBuilder) PlanForEntrypointStage(_ string) hooks.Plan[hookstage.Entrypoint] { @@ -1483,6 +1484,10 @@ func (m mockPlanBuilder) PlanForAuctionResponseStage(_ string, _ *config.Account return m.auctionResponsePlan } +func (m mockPlanBuilder) PlanForExitPointStage(_ string, _ *config.Account) hooks.Plan[hookstage.ExitPoint] { + return m.exitPointPlan +} + func makePlan[H any](hook H) hooks.Plan[H] { return hooks.Plan[H]{ { diff --git a/hooks/empty_plan.go b/hooks/empty_plan.go index e8ae505878a..51133629070 100644 --- a/hooks/empty_plan.go +++ b/hooks/empty_plan.go @@ -40,3 +40,7 @@ func (e EmptyPlanBuilder) PlanForAllProcessedBidResponsesStage(endpoint string, func (e EmptyPlanBuilder) PlanForAuctionResponseStage(endpoint string, account *config.Account) Plan[hookstage.AuctionResponse] { return nil } + +func (e EmptyPlanBuilder) PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint] { + return nil +} diff --git a/hooks/hookexecution/executor.go b/hooks/hookexecution/executor.go index 2249fd29d42..2950787ad0f 100644 --- a/hooks/hookexecution/executor.go +++ b/hooks/hookexecution/executor.go @@ -41,6 +41,7 @@ type StageExecutor interface { ExecuteAllProcessedBidResponsesStage(adapterBids map[openrtb_ext.BidderName]*entities.PbsOrtbSeatBid) ExecuteAuctionResponseStage(response *openrtb2.BidResponse) ExecuteBeforeRequestValidationStage(req *openrtb2.BidRequest) *RejectError + ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError) } type HookStageExecutor interface { @@ -326,6 +327,34 @@ func (e *hookExecutor) ExecuteAuctionResponseStage(response *openrtb2.BidRespons e.pushStageOutcome(outcome) } +func (e *hookExecutor) ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError) { + plan := e.planBuilder.PlanForExitPointStage(e.endpoint, e.account) + if len(plan) == 0 { + return response, headers, nil + } + + handler := func( + ctx context.Context, + moduleCtx hookstage.ModuleInvocationContext, + hook hookstage.ExitPoint, + payload hookstage.ExitPointPayload, + ) (hookstage.HookResult[hookstage.ExitPointPayload], error) { + return hook.HandleExitPointHook(ctx, moduleCtx, payload) + } + + stageName := hooks.StageAuctionResponse.String() + executionCtx := e.newContext(stageName) + payload := hookstage.ExitPointPayload{RawResponse: response} + + outcome, _, contexts, _ := executeStage(executionCtx, plan, payload, handler, e.metricEngine) + outcome.Entity = entityAuctionResponse + outcome.Stage = stageName + + e.saveModuleContexts(contexts) + e.pushStageOutcome(outcome) + return payload.RawResponse, payload.Headers, nil +} + func (e *hookExecutor) newContext(stage string) executionContext { return executionContext{ account: e.account, @@ -389,3 +418,7 @@ func (executor EmptyHookExecutor) ExecuteAuctionResponseStage(_ *openrtb2.BidRes func (executor EmptyHookExecutor) ExecuteBeforeRequestValidationStage(_ *openrtb2.BidRequest) *RejectError { return nil } + +func (executor EmptyHookExecutor) ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError) { + return nil, nil, nil +} diff --git a/hooks/hookstage/exitpoint.go b/hooks/hookstage/exitpoint.go new file mode 100644 index 00000000000..7fa9ff378b0 --- /dev/null +++ b/hooks/hookstage/exitpoint.go @@ -0,0 +1,22 @@ +package hookstage + +import ( + "context" + "net/http" +) + +type ExitPoint interface { + HandleExitPointHook( + context.Context, + ModuleInvocationContext, + ExitPointPayload, + ) (HookResult[ExitPointPayload], error) +} + +// RawBidderResponsePayload consists of a list of adapters.TypedBid +// objects representing bids returned by a particular bidder. +// Hooks are allowed to modify bids using mutations. +type ExitPointPayload struct { + RawResponse []byte + Headers http.Header +} diff --git a/hooks/plan.go b/hooks/plan.go index 0d119b91b23..34544b1c410 100644 --- a/hooks/plan.go +++ b/hooks/plan.go @@ -19,6 +19,7 @@ const ( StageRawBidderResponse Stage = "raw_bidder_response" StageAllProcessedBidResponses Stage = "all_processed_bid_responses" StageAuctionResponse Stage = "auction_response" + StageExitPoint Stage = "exitpoint" ) func (s Stage) String() string { @@ -42,6 +43,7 @@ type ExecutionPlanBuilder interface { PlanForRawBidderResponseStage(endpoint string, account *config.Account) Plan[hookstage.RawBidderResponse] PlanForAllProcessedBidResponsesStage(endpoint string, account *config.Account) Plan[hookstage.AllProcessedBidResponses] PlanForAuctionResponseStage(endpoint string, account *config.Account) Plan[hookstage.AuctionResponse] + PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint] } // Plan represents a slice of groups of hooks of a specific type grouped in the established order. @@ -167,6 +169,16 @@ func (p PlanBuilder) PlanForAuctionResponseStage(endpoint string, account *confi ) } +func (p PlanBuilder) PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint] { + return getMergedPlan( + p.hooks, + account, + endpoint, + StageExitPoint, + p.repo.GetExitPointHook, + ) +} + type hookFn[T any] func(moduleName string) (T, bool) func getMergedPlan[T any]( diff --git a/hooks/repo.go b/hooks/repo.go index 52f838739f0..9c950cf64d5 100644 --- a/hooks/repo.go +++ b/hooks/repo.go @@ -22,6 +22,7 @@ type HookRepository interface { GetRawBidderResponseHook(id string) (hookstage.RawBidderResponse, bool) GetAllProcessedBidResponsesHook(id string) (hookstage.AllProcessedBidResponses, bool) GetAuctionResponseHook(id string) (hookstage.AuctionResponse, bool) + GetExitPointHook(id string) (hookstage.ExitPoint, bool) } // NewHookRepository returns a new instance of the HookRepository interface. @@ -51,6 +52,7 @@ type hookRepository struct { rawBidderResponseHooks map[string]hookstage.RawBidderResponse allProcessedBidResponseHooks map[string]hookstage.AllProcessedBidResponses auctionResponseHooks map[string]hookstage.AuctionResponse + exitPointHooks map[string]hookstage.ExitPoint } func (r *hookRepository) GetEntrypointHook(id string) (hookstage.Entrypoint, bool) { @@ -85,6 +87,10 @@ func (r *hookRepository) GetAuctionResponseHook(id string) (hookstage.AuctionRes return getHook(r.auctionResponseHooks, id) } +func (r *hookRepository) GetExitPointHook(id string) (hookstage.ExitPoint, bool) { + return getHook(r.exitPointHooks, id) +} + func (r *hookRepository) add(id string, hook interface{}) error { var hasAnyHooks bool var err error @@ -145,6 +151,13 @@ func (r *hookRepository) add(id string, hook interface{}) error { } } + if h, ok := hook.(hookstage.ExitPoint); ok { + hasAnyHooks = true + if r.exitPointHooks, err = addHook(r.exitPointHooks, h, id); err != nil { + return err + } + } + if !hasAnyHooks { return fmt.Errorf(`hook "%s" does not implement any supported hook interface`, id) }