Skip to content

Commit

Permalink
ExitPoint Stage implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pubmatic-Dhruv-Sonone committed Dec 23, 2024
1 parent 15dd2c5 commit 6ffc494
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 5 deletions.
28 changes: 23 additions & 5 deletions endpoints/openrtb2/auction.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,34 @@ func sendAuctionResponse(
enc := json.NewEncoder(w)
enc.SetEscapeHTML(false)

w.Header().Set("Content-Type", "application/json")
rawResponse, err := json.Marshal(response)
if err != nil {

// If an error happens when encoding the response, there isn't much we can do.
// If we've sent _any_ bytes, then Go would have sent the 200 status code first.
// That status code can't be un-sent... so the best we can do is log the error.
if err := enc.Encode(response); err != nil {
}

rawResponse, header, err := hookExecutor.ExecuteExitPointStage(rawResponse, w.Header())
if err != nil {
}

for key, values := range header {
for _, value := range values {
w.Header().Add(key, value)
}
}

if _, err := w.Write(rawResponse); err != nil {
labels.RequestStatus = metrics.RequestStatusNetworkErr
ao.Errors = append(ao.Errors, fmt.Errorf("/openrtb2/auction Failed to send response: %v", err))
}

// If an error happens when encoding the response, there isn't much we can do.
// If we've sent _any_ bytes, then Go would have sent the 200 status code first.
// That status code can't be un-sent... so the best we can do is log the error.
// if err := enc.Encode(response); err != nil {
// labels.RequestStatus = metrics.RequestStatusNetworkErr
// ao.Errors = append(ao.Errors, fmt.Errorf("/openrtb2/auction Failed to send response: %v", err))
// }

return labels, ao
}

Expand Down
5 changes: 5 additions & 0 deletions endpoints/openrtb2/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ type mockPlanBuilder struct {
rawBidderResponsePlan hooks.Plan[hookstage.RawBidderResponse]
allProcessedBidResponsesPlan hooks.Plan[hookstage.AllProcessedBidResponses]
auctionResponsePlan hooks.Plan[hookstage.AuctionResponse]
exitPointPlan hooks.Plan[hookstage.ExitPoint]
}

func (m mockPlanBuilder) PlanForEntrypointStage(_ string) hooks.Plan[hookstage.Entrypoint] {
Expand Down Expand Up @@ -1483,6 +1484,10 @@ func (m mockPlanBuilder) PlanForAuctionResponseStage(_ string, _ *config.Account
return m.auctionResponsePlan
}

func (m mockPlanBuilder) PlanForExitPointStage(_ string, _ *config.Account) hooks.Plan[hookstage.ExitPoint] {
return m.exitPointPlan
}

func makePlan[H any](hook H) hooks.Plan[H] {
return hooks.Plan[H]{
{
Expand Down
4 changes: 4 additions & 0 deletions hooks/empty_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ func (e EmptyPlanBuilder) PlanForAllProcessedBidResponsesStage(endpoint string,
func (e EmptyPlanBuilder) PlanForAuctionResponseStage(endpoint string, account *config.Account) Plan[hookstage.AuctionResponse] {
return nil
}

func (e EmptyPlanBuilder) PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint] {
return nil
}
33 changes: 33 additions & 0 deletions hooks/hookexecution/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type StageExecutor interface {
ExecuteAllProcessedBidResponsesStage(adapterBids map[openrtb_ext.BidderName]*entities.PbsOrtbSeatBid)
ExecuteAuctionResponseStage(response *openrtb2.BidResponse)
ExecuteBeforeRequestValidationStage(req *openrtb2.BidRequest) *RejectError
ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError)
}

type HookStageExecutor interface {
Expand Down Expand Up @@ -326,6 +327,34 @@ func (e *hookExecutor) ExecuteAuctionResponseStage(response *openrtb2.BidRespons
e.pushStageOutcome(outcome)
}

func (e *hookExecutor) ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError) {
plan := e.planBuilder.PlanForExitPointStage(e.endpoint, e.account)
if len(plan) == 0 {
return response, headers, nil
}

handler := func(
ctx context.Context,
moduleCtx hookstage.ModuleInvocationContext,
hook hookstage.ExitPoint,
payload hookstage.ExitPointPayload,
) (hookstage.HookResult[hookstage.ExitPointPayload], error) {
return hook.HandleExitPointHook(ctx, moduleCtx, payload)
}

stageName := hooks.StageAuctionResponse.String()
executionCtx := e.newContext(stageName)
payload := hookstage.ExitPointPayload{RawResponse: response}

outcome, _, contexts, _ := executeStage(executionCtx, plan, payload, handler, e.metricEngine)
outcome.Entity = entityAuctionResponse
outcome.Stage = stageName

e.saveModuleContexts(contexts)
e.pushStageOutcome(outcome)
return payload.RawResponse, payload.Headers, nil
}

func (e *hookExecutor) newContext(stage string) executionContext {
return executionContext{
account: e.account,
Expand Down Expand Up @@ -389,3 +418,7 @@ func (executor EmptyHookExecutor) ExecuteAuctionResponseStage(_ *openrtb2.BidRes
func (executor EmptyHookExecutor) ExecuteBeforeRequestValidationStage(_ *openrtb2.BidRequest) *RejectError {
return nil
}

func (executor EmptyHookExecutor) ExecuteExitPointStage(response []byte, headers http.Header) ([]byte, http.Header, *RejectError) {
return nil, nil, nil
}
22 changes: 22 additions & 0 deletions hooks/hookstage/exitpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package hookstage

import (
"context"
"net/http"
)

type ExitPoint interface {
HandleExitPointHook(
context.Context,
ModuleInvocationContext,
ExitPointPayload,
) (HookResult[ExitPointPayload], error)
}

// RawBidderResponsePayload consists of a list of adapters.TypedBid
// objects representing bids returned by a particular bidder.
// Hooks are allowed to modify bids using mutations.
type ExitPointPayload struct {
RawResponse []byte
Headers http.Header
}
12 changes: 12 additions & 0 deletions hooks/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
StageRawBidderResponse Stage = "raw_bidder_response"
StageAllProcessedBidResponses Stage = "all_processed_bid_responses"
StageAuctionResponse Stage = "auction_response"
StageExitPoint Stage = "exitpoint"
)

func (s Stage) String() string {
Expand All @@ -42,6 +43,7 @@ type ExecutionPlanBuilder interface {
PlanForRawBidderResponseStage(endpoint string, account *config.Account) Plan[hookstage.RawBidderResponse]
PlanForAllProcessedBidResponsesStage(endpoint string, account *config.Account) Plan[hookstage.AllProcessedBidResponses]
PlanForAuctionResponseStage(endpoint string, account *config.Account) Plan[hookstage.AuctionResponse]
PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint]
}

// Plan represents a slice of groups of hooks of a specific type grouped in the established order.
Expand Down Expand Up @@ -167,6 +169,16 @@ func (p PlanBuilder) PlanForAuctionResponseStage(endpoint string, account *confi
)
}

func (p PlanBuilder) PlanForExitPointStage(endpoint string, account *config.Account) Plan[hookstage.ExitPoint] {
return getMergedPlan(
p.hooks,
account,
endpoint,
StageExitPoint,
p.repo.GetExitPointHook,
)
}

type hookFn[T any] func(moduleName string) (T, bool)

func getMergedPlan[T any](
Expand Down
13 changes: 13 additions & 0 deletions hooks/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type HookRepository interface {
GetRawBidderResponseHook(id string) (hookstage.RawBidderResponse, bool)
GetAllProcessedBidResponsesHook(id string) (hookstage.AllProcessedBidResponses, bool)
GetAuctionResponseHook(id string) (hookstage.AuctionResponse, bool)
GetExitPointHook(id string) (hookstage.ExitPoint, bool)
}

// NewHookRepository returns a new instance of the HookRepository interface.
Expand Down Expand Up @@ -51,6 +52,7 @@ type hookRepository struct {
rawBidderResponseHooks map[string]hookstage.RawBidderResponse
allProcessedBidResponseHooks map[string]hookstage.AllProcessedBidResponses
auctionResponseHooks map[string]hookstage.AuctionResponse
exitPointHooks map[string]hookstage.ExitPoint
}

func (r *hookRepository) GetEntrypointHook(id string) (hookstage.Entrypoint, bool) {
Expand Down Expand Up @@ -85,6 +87,10 @@ func (r *hookRepository) GetAuctionResponseHook(id string) (hookstage.AuctionRes
return getHook(r.auctionResponseHooks, id)
}

func (r *hookRepository) GetExitPointHook(id string) (hookstage.ExitPoint, bool) {
return getHook(r.exitPointHooks, id)
}

func (r *hookRepository) add(id string, hook interface{}) error {
var hasAnyHooks bool
var err error
Expand Down Expand Up @@ -145,6 +151,13 @@ func (r *hookRepository) add(id string, hook interface{}) error {
}
}

if h, ok := hook.(hookstage.ExitPoint); ok {
hasAnyHooks = true
if r.exitPointHooks, err = addHook(r.exitPointHooks, h, id); err != nil {
return err
}
}

if !hasAnyHooks {
return fmt.Errorf(`hook "%s" does not implement any supported hook interface`, id)
}
Expand Down

0 comments on commit 6ffc494

Please sign in to comment.