Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Analytics Adapter: PubxAi #3863

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions analytics/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prebid/prebid-server/v3/analytics/clients"
"github.com/prebid/prebid-server/v3/analytics/filesystem"
"github.com/prebid/prebid-server/v3/analytics/pubstack"
"github.com/prebid/prebid-server/v3/analytics/pubxai"
"github.com/prebid/prebid-server/v3/config"
"github.com/prebid/prebid-server/v3/openrtb_ext"
"github.com/prebid/prebid-server/v3/ortb"
Expand Down Expand Up @@ -43,6 +44,24 @@ func New(analytics *config.Analytics) analytics.Runner {
glog.Errorf("Could not initialize PubstackModule: %v", err)
}
}
if analytics.Pubxai.Enabled {
pubxaiModule, err := pubxai.InitializePubxAIModule(
clients.GetDefaultHttpInstance(),
analytics.Pubxai.Publisherid,
analytics.Pubxai.Endpoint,
analytics.Pubxai.BufferInterval,
analytics.Pubxai.BufferSize,
analytics.Pubxai.SamplingPercentage,
analytics.Pubxai.ConfigurationRefreshInterval,
clock.New(),
)
if err == nil {
modules["pubxai"] = pubxaiModule
} else {
glog.Errorf("Could not initialize Pubxai Module: %v", err)
}

}

if analytics.Agma.Enabled {
agmaModule, err := agma.NewModule(
Expand Down
29 changes: 29 additions & 0 deletions analytics/pubxai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# PubxAI Analytics

In order to use the pubxai analytics module, it needs to be configured by the host.

You can configure the server using the following environment variables:

```bash
export PBS_ANALYTICS_PUBXAI_ENABLED="true"
export PBS_ANALYTICS_PUBXAI_ENDPOINT="https://analytics.pbxai.com"
export PBS_ANALYTICS_PUBXAI_PUBLISHERID=<your pubxid here> # should be an UUIDv4
export PBS_ANALYTICS_PUBXAI_BUFFER_INTERVAL="5m"
export PBS_ANALYTICS_PUBXAI_BUFFER_SIZE="10KB"
export PBS_ANALYTICS_PUBXAI_SAMPLING_PERCENTAGE="100"
export PBS_ANALYTICS_PUBXAI_CONFIGURATION_REFRESH_INTERVAL="5h"
```

Or using the pbs configuration file and by appending the following block:

```yaml
analytics:
pubxai:
enabled: true
publisherid: "your pubxid here" # should be an UUIDv4
endpoint: "https://analytics.pbxai.com"
buffer_interval: 5m
buffer_size: 10kb
sampling_percentage: 100
configuration_refresh_interval: 5h
```
95 changes: 95 additions & 0 deletions analytics/pubxai/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package config

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/golang/glog"
"github.com/prebid/prebid-server/v3/util/task"
)

type Configuration struct {
PublisherId string `json:"publisher_id"`
BufferInterval string `json:"buffer_interval"`
BufferSize string `json:"buffer_size"`
SamplingPercentage int `json:"sampling_percentage"`
}

type ConfigService interface {
Start(stop <-chan struct{}) <-chan *Configuration
IsSameAs(a *Configuration, b *Configuration) bool
}

type ConfigServiceImpl struct {
task *task.TickerTask
configChan chan *Configuration
}

func NewConfigService(httpClient *http.Client, pubxId, endpoint, refreshInterval string) (ConfigService, error) {
refreshDuration, err := time.ParseDuration(refreshInterval)
if err != nil {
return nil, fmt.Errorf("fail to parse the module args, arg=analytics.pubxai.configuration_refresh_delay: %v", err)
}
endpointUrl, err := url.Parse(endpoint + "/config")
if err != nil {
return nil, err
}

query := endpointUrl.Query()
query.Set("pubxId", pubxId)
endpointUrl.RawQuery = query.Encode()

configChan := make(chan *Configuration)

tr := task.NewTickerTaskFromFunc(refreshDuration, func() error {
config, err := fetchConfig(httpClient, endpointUrl)
if err != nil {
return fmt.Errorf("[pubxai] Fail to fetch remote configuration: %v", err)
}
configChan <- config
return nil
})

return &ConfigServiceImpl{
task: tr,
configChan: configChan,
}, nil
}

func fetchConfig(client *http.Client, endpoint *url.URL) (*Configuration, error) {
res, err := client.Get(endpoint.String())
if err != nil {
return nil, err
}
defer res.Body.Close()
c := Configuration{}
err = json.NewDecoder(res.Body).Decode(&c)
glog.Info("[pubxai] fetchConfig: %v at time %v", c, time.Now())
if err != nil {
return nil, err
}
return &c, nil
}

func (t *ConfigServiceImpl) Start(stop <-chan struct{}) <-chan *Configuration {
go t.task.Start()

go func() {
<-stop
t.task.Stop()
}()

return t.configChan
}

func (t *ConfigServiceImpl) IsSameAs(a *Configuration, b *Configuration) bool {

samePublisherId := a.PublisherId == b.PublisherId
sameBufferInterval := a.BufferInterval == b.BufferInterval
sameBufferSize := a.BufferSize == b.BufferSize
sameSamplingPercentage := a.SamplingPercentage == b.SamplingPercentage
return samePublisherId && sameBufferInterval && sameBufferSize && sameSamplingPercentage
}
203 changes: 203 additions & 0 deletions analytics/pubxai/config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package config

import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewConfigService(t *testing.T) {
tests := []struct {
name string
httpClient *http.Client
pubxId string
endpoint string
refreshInterval string
wantErr bool
}{
{
name: "valid configuration",
httpClient: &http.Client{},
pubxId: "testPublisher",
endpoint: "http://example.com",
refreshInterval: "1m",
wantErr: false,
},
{
name: "invalid duration",
httpClient: &http.Client{},
pubxId: "testPublisher",
endpoint: "http://example.com",
refreshInterval: "invalid",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
configService, err := NewConfigService(tt.httpClient, tt.pubxId, tt.endpoint, tt.refreshInterval)
if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, configService)
} else {
assert.NoError(t, err)
assert.NotNil(t, configService)
}
})
}
}

func TestFetchConfig(t *testing.T) {
tests := []struct {
name string
serverResp http.HandlerFunc // Changed type to http.HandlerFunc
wantConfig *Configuration
wantErr bool
}{
{
name: "successful fetch",
serverResp: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Explicit cast
config := &Configuration{
PublisherId: "testPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(config)
}),
wantConfig: &Configuration{
PublisherId: "testPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
},
wantErr: false,
},
{
name: "server error",
serverResp: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Explicit cast
w.WriteHeader(http.StatusInternalServerError)
}),
wantConfig: nil,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := httptest.NewServer(tt.serverResp) // No need for HandlerFunc here
defer server.Close()

endpointUrl, _ := url.Parse(server.URL)
config, err := fetchConfig(server.Client(), endpointUrl)

if tt.wantErr {
assert.Error(t, err)
assert.Nil(t, config)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.wantConfig, config)
}
})
}
}

func TestConfigServiceImpl_Start(t *testing.T) {
tests := []struct {
name string
refreshInterval string
}{
{
name: "start and stop service",
refreshInterval: "1m",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
httpClient := &http.Client{}
pubxId := "testPublisher"
endpoint := "http://example.com"

configService, err := NewConfigService(httpClient, pubxId, endpoint, tt.refreshInterval)
assert.NoError(t, err)

configImpl := configService.(*ConfigServiceImpl)
stop := make(chan struct{})
done := make(chan struct{})

configChan := configImpl.Start(stop)
assert.NotNil(t, configChan)

// Signal completion after a brief interval
go func() {
time.Sleep(10 * time.Millisecond)
close(stop)
done <- struct{}{}
}()

select {
case <-done:
// Success: service started and stopped
case <-time.After(time.Second):
t.Fatal("timeout waiting for service to stop")
}
})
}
}

func TestConfigServiceImpl_IsSameAs(t *testing.T) {
tests := []struct {
name string
config1 *Configuration
config2 *Configuration
expected bool
}{
{
name: "identical configs",
config1: &Configuration{
PublisherId: "testPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
},
config2: &Configuration{
PublisherId: "testPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
},
expected: true,
},
{
name: "different configs",
config1: &Configuration{
PublisherId: "testPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
},
config2: &Configuration{
PublisherId: "differentPublisher",
BufferInterval: "30s",
BufferSize: "100",
SamplingPercentage: 50,
},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
configService := &ConfigServiceImpl{}
result := configService.IsSameAs(tt.config1, tt.config2)
assert.Equal(t, tt.expected, result)
})
}
}
28 changes: 28 additions & 0 deletions analytics/pubxai/config/mocks/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package mocks

import (
"github.com/prebid/prebid-server/v3/analytics/pubxai/config"
"github.com/stretchr/testify/mock"
)

// MockConfigService is a mock of ConfigService interface using testify
type MockConfigService struct {
mock.Mock
}

// NewMockConfigService creates a new mock instance
func NewMockConfigService() *MockConfigService {
return &MockConfigService{}
}

// IsSameAs provides a mock function
func (m *MockConfigService) IsSameAs(a, b *config.Configuration) bool {
args := m.Called(a, b)
return args.Bool(0)
}

// Start provides a mock function
func (m *MockConfigService) Start(stop <-chan struct{}) <-chan *config.Configuration {
args := m.Called(stop)
return args.Get(0).(<-chan *config.Configuration)
}
Loading
Loading