Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubstack #715

Open
wants to merge 13 commits into
base: ci_backup_20240824
Choose a base branch
from
2 changes: 1 addition & 1 deletion analytics/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func New(analytics *config.Analytics) analytics.Runner {
}

if analytics.Pubstack.Enabled {
pubstackModule, err := pubstack.NewModule(
pubstackModule, err := pubstack.NewModulePubmatic(
clients.GetDefaultHttpInstance(),
analytics.Pubstack.ScopeId,
analytics.Pubstack.IntakeUrl,
Expand Down
8 changes: 7 additions & 1 deletion analytics/clients/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import (
"net/http"
)

var defaultHttpInstance = http.DefaultClient
var defaultHttpInstance = &http.Client{
Transport: &http.Transport{
DisableKeepAlives: false,
MaxIdleConnsPerHost: 1024,
},
Timeout: 15000,
}

func GetDefaultHttpInstance() *http.Client {
// TODO 2020-06-22 @see https://github.com/prebid/prebid-server/pull/1331#discussion_r436110097
Expand Down
67 changes: 27 additions & 40 deletions analytics/pubmatic/pubmatic.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package pubmatic

import (
"runtime/debug"
"sync"

"github.com/golang/glog"
"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/analytics/pubmatic/mhttp"

"github.com/prebid/prebid-server/v2/config"
"github.com/prebid/prebid-server/v2/modules/pubmatic/openwrap"
"github.com/prebid/prebid-server/v2/modules/pubmatic/openwrap/models"
)

type RequestType string
Expand All @@ -35,43 +32,33 @@ type HTTPLogger struct {

// LogAuctionObject prepares the owlogger url and send it to logger endpoint
func (ow HTTPLogger) LogAuctionObject(ao *analytics.AuctionObject) {
var rCtx *models.RequestCtx
defer func() {
if r := recover(); r != nil {
if rCtx != nil {
rCtx.MetricsEngine.RecordOpenWrapServerPanicStats(ow.hostName, "LogAuctionObject")
glog.Errorf("stacktrace:[%s], error:[%v], pubid:[%d], profid:[%d], ver:[%d]", string(debug.Stack()), r, rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
return
}
glog.Errorf("stacktrace:[%s], error:[%v]", string(debug.Stack()), r)
}
}()

rCtx = GetRequestCtx(ao.HookExecutionOutcome)
if rCtx == nil {
// glog.Errorf("Failed to get the request context for AuctionObject - [%v]", ao)
// add this log once complete header-bidding code is migrated to modules
return
}

if rCtx.LoggerDisabled {
// logger disabled explicitly for publisher,profile request
return
}

err := RestoreBidResponse(rCtx, *ao)
if err != nil {
glog.Error("Failed to restore bid response for pub:[%d], profile:[%d], version:[%d], err:[%s].", rCtx.PubID, rCtx.ProfileID, rCtx.VersionID, err.Error())
}

url, headers := GetLogAuctionObjectAsURL(*ao, rCtx, false, false)
if url == "" {
glog.Errorf("Failed to prepare the owlogger for pub:[%d], profile:[%d], version:[%d].",
rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
return
}

go send(rCtx, url, headers, mhttp.NewMultiHttpContext())
// var rCtx *models.RequestCtx
// defer func() {
// if r := recover(); r != nil {
// if rCtx != nil {
// rCtx.MetricsEngine.RecordOpenWrapServerPanicStats(ow.hostName, "LogAuctionObject")
// glog.Errorf("stacktrace:[%s], error:[%v], pubid:[%d], profid:[%d], ver:[%d]", string(debug.Stack()), r, rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
// return
// }
// glog.Errorf("stacktrace:[%s], error:[%v]", string(debug.Stack()), r)
// }
// }()

// rCtx = GetRequestCtx(ao.HookExecutionOutcome)
// if rCtx == nil {
// // glog.Errorf("Failed to get the request context for AuctionObject - [%v]", ao)
// // add this log once complete header-bidding code is migrated to modules
// return
// }

// url, headers := GetLogAuctionObjectAsURL(*ao, rCtx, false, false)
// if url == "" {
// glog.Errorf("Failed to prepare the owlogger for pub:[%d], profile:[%d], version:[%d].",
// rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
// return
// }

// go send(rCtx, url, headers, mhttp.NewMultiHttpContext())
}

// Writes VideoObject to file
Expand Down
2 changes: 0 additions & 2 deletions analytics/pubstack/eventchannel/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"net/url"
"path"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -42,6 +41,5 @@ func BuildEndpointSender(client *http.Client, baseUrl string, module string) Sen
if err != nil {
glog.Error(err)
}
endpoint.Path = path.Join(endpoint.Path, "intake", module)
return NewHttpSender(client, endpoint.String())
}
58 changes: 58 additions & 0 deletions analytics/pubstack/pubmatic_module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package pubstack

import (
"fmt"
"net/http"
"os"
"sync"

"github.com/benbjohnson/clock"
"github.com/golang/glog"
"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/analytics/pubstack/eventchannel"
)

func NewModulePubmatic(client *http.Client, scope, endpoint, configRefreshDelay string, maxEventCount int, maxByteSize, maxTime string, clock clock.Clock) (analytics.Module, error) {

return NewModuleWithConfigTaskPubmatic(client, scope, endpoint, maxEventCount, maxByteSize, maxTime, clock)
}

func NewModuleWithConfigTaskPubmatic(client *http.Client, scope, endpoint string, maxEventCount int, maxByteSize, maxTime string, clock clock.Clock) (analytics.Module, error) {
glog.Infof("[pubstack] Initializing module scope=%s endpoint=%s\n", scope, endpoint)

// parse args
bufferCfg, err := newBufferConfig(maxEventCount, maxByteSize, maxTime)
if err != nil {
return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubstack.buffers, :%v", err)
}

defaultFeatures := map[string]bool{
auction: true,
video: true,
amp: true,
cookieSync: true,
setUID: true,
}

defaultConfig := &Configuration{
ScopeID: scope,
Endpoint: endpoint,
Features: defaultFeatures,
}

pb := PubstackModule{
scope: scope,
httpClient: client,
cfg: defaultConfig,
buffsCfg: bufferCfg,
sigTermCh: make(chan os.Signal),
stopCh: make(chan struct{}),
eventChannels: make(map[string]*eventchannel.EventChannel),
muxConfig: sync.RWMutex{},
clock: clock,
}

pb.registerChannel(auction)

return &pb, nil
}
36 changes: 32 additions & 4 deletions analytics/pubstack/pubstack_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ import (
"net/http"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"

"github.com/benbjohnson/clock"
"github.com/golang/glog"
"github.com/prebid/prebid-server/v2/analytics/pubmatic"
"github.com/prebid/prebid-server/v2/modules/pubmatic/openwrap/models"

"github.com/prebid/prebid-server/v2/analytics"
"github.com/prebid/prebid-server/v2/analytics/pubstack/eventchannel"
Expand Down Expand Up @@ -115,13 +119,37 @@ func (p *PubstackModule) LogAuctionObject(ao *analytics.AuctionObject) {
return
}

// serialize event
payload, err := helpers.JsonifyAuctionObject(ao, p.scope)
if err != nil {
glog.Warning("[pubstack] Cannot serialize auction")
var rCtx *models.RequestCtx
defer func() {
if r := recover(); r != nil {
if rCtx != nil {
glog.Errorf("stacktrace:[%s], error:[%v], pubid:[%d], profid:[%d], ver:[%d]", string(debug.Stack()), r, rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
return
}
glog.Errorf("stacktrace:[%s], error:[%v]", string(debug.Stack()), r)
}
}()

rCtx = pubmatic.GetRequestCtx(ao.HookExecutionOutcome)
if rCtx == nil {
// glog.Errorf("Failed to get the request context for AuctionObject - [%v]", ao)
// add this log once complete header-bidding code is migrated to modules
return
}

url, _ := pubmatic.GetLogAuctionObjectAsURL(*ao, rCtx, false, false)
if url == "" {
glog.Errorf("Failed to prepare the owlogger for pub:[%d], profile:[%d], version:[%d].",
rCtx.PubID, rCtx.ProfileID, rCtx.VersionID)
return
}

url = strings.TrimPrefix(url, "http://10.172.141.11/wl?")

payload := []byte(url)

payload = append(payload, byte('\n'))

p.eventChannels[auction].Push(payload)
}

Expand Down
10 changes: 5 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,13 +1064,13 @@ func SetupViper(v *viper.Viper, filename string, bidderInfos BidderInfos) {

v.SetDefault("max_request_size", 1024*256)
v.SetDefault("analytics.file.filename", "")
v.SetDefault("analytics.pubstack.endpoint", "https://s2s.pbstck.com/v1")
v.SetDefault("analytics.pubstack.scopeid", "change-me")
v.SetDefault("analytics.pubstack.enabled", false)
v.SetDefault("analytics.pubstack.endpoint", "http://10.172.141.11/wl")
v.SetDefault("analytics.pubstack.scopeid", "")
v.SetDefault("analytics.pubstack.enabled", true)
v.SetDefault("analytics.pubstack.configuration_refresh_delay", "2h")
v.SetDefault("analytics.pubstack.buffers.size", "2MB")
v.SetDefault("analytics.pubstack.buffers.size", "100KB")
v.SetDefault("analytics.pubstack.buffers.count", 100)
v.SetDefault("analytics.pubstack.buffers.timeout", "900s")
v.SetDefault("analytics.pubstack.buffers.timeout", "300s")
v.SetDefault("amp_timeout_adjustment_ms", 0)
v.BindEnv("gdpr.default_value")
v.SetDefault("gdpr.enabled", true)
Expand Down
Loading