Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Commit

Permalink
feat: add testability and tests for eventrecorder
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 8, 2023
1 parent c82f8b2 commit 0d2f32e
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 76 deletions.
25 changes: 16 additions & 9 deletions eventrecorder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"

"github.com/filecoin-project/lassie-event-recorder/metrics"
"github.com/filecoin-project/lassie-event-recorder/spmap"
"github.com/jackc/pgx/v5/pgxpool"
)
Expand All @@ -22,7 +21,7 @@ type (

mapcfg []spmap.Option

metrics *metrics.Metrics
metrics Metrics
}
Option func(*config) error
)
Expand All @@ -34,13 +33,14 @@ func newConfig(opts []Option) (*config, error) {
return nil, err
}
}

if cfg.dbDSN == "" {
return nil, errors.New("db URL must be specified")
if cfg.dbDSN != "" {
var err error
if cfg.pgxPoolConfig, err = pgxpool.ParseConfig(cfg.dbDSN); err != nil {
return nil, fmt.Errorf("unable to parse db URL: %w", err)
}
}
var err error
if cfg.pgxPoolConfig, err = pgxpool.ParseConfig(cfg.dbDSN); err != nil {
return nil, fmt.Errorf("unable to parse db URL: %w", err)
if cfg.pgxPoolConfig == nil && cfg.metrics == nil && cfg.mongoEndpoint == "" {
return nil, errors.New("must set up at least one of: postgres, mongo, metrics")
}
return cfg, nil
}
Expand All @@ -62,9 +62,16 @@ func WithMongoSubmissions(endpoint, db, collection string, percentage float32) O
}
}

func WithMetrics(metrics *metrics.Metrics) Option {
func WithMetrics(metrics Metrics) Option {
return func(cfg *config) error {
cfg.metrics = metrics
return nil
}
}

func WithSPMapOptions(opts ...spmap.Option) Option {
return func(cfg *config) error {
cfg.mapcfg = opts
return nil
}
}
80 changes: 58 additions & 22 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,32 @@ import (

var logger = log.Logger("lassie/eventrecorder")

type Metrics interface {
HandleStartedEvent(context.Context, types.RetrievalID, types.Phase, time.Time, string)
HandleCandidatesFoundEvent(context.Context, types.RetrievalID, time.Time, any)
HandleCandidatesFilteredEvent(context.Context, types.RetrievalID, any)
HandleFailureEvent(context.Context, types.RetrievalID, types.Phase, string, any)
HandleTimeToFirstByteEvent(context.Context, types.RetrievalID, string, time.Time)
HandleSuccessEvent(context.Context, types.RetrievalID, time.Time, string, any)

HandleAggregatedEvent(
ctx context.Context,
timeToFirstIndexerResult time.Duration,
timeToFirstByte time.Duration,
success bool,
storageProviderID string, // Lassie Peer ID
filSPID string, // Heyfil Filecoin SP ID
startTime time.Time,
endTime time.Time,
bandwidth int64,
bytesTransferred int64,
indexerCandidates int64,
indexerFiltered int64,
attempts map[string]metrics.Attempt,
protocolSucceeded string,
)
}

type EventRecorder struct {
cfg *config
db *pgxpool.Pool
Expand All @@ -43,6 +69,10 @@ func New(opts ...Option) (*EventRecorder, error) {
}

func (r *EventRecorder) RecordEvents(ctx context.Context, events []Event) error {
if r.db == nil {
return nil
}

totalLogger := logger.With("total", len(events))

var batchQuery pgx.Batch
Expand Down Expand Up @@ -180,7 +210,6 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
})
attempts := make(map[string]metrics.Attempt, len(event.RetrievalAttempts))
for storageProviderID, retrievalAttempt := range event.RetrievalAttempts {

var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
Expand Down Expand Up @@ -242,7 +271,8 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
report := RetrievalReport{
RetrievalID: event.RetrievalID,
InstanceID: event.InstanceID,
StorageProviderID: event.StorageProviderID,
StorageProviderID: event.StorageProviderID, // Lassie Peer ID
SPID: filSPID, // Heyfil Filecoin SP ID
TTFB: timeToFirstByte.Milliseconds(),
Bandwidth: int64(event.Bandwidth),
Success: event.Success,
Expand All @@ -252,27 +282,29 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
go func(reportData RetrievalReport) {
mongoReportCtx, cncl := context.WithTimeout(context.Background(), 30*time.Second)
defer cncl()
reportData.SPID = filSPID
_, err := r.mc.InsertOne(mongoReportCtx, reportData)
if err != nil {
if _, err := r.mc.InsertOne(mongoReportCtx, reportData); err != nil {
logger.Infof("failed to report to mongo: %w", err)
}
}(report)
}
}
batchResult := r.db.SendBatch(ctx, &batchQuery)
err := batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one aggregated event insertion failed", "err", err)
return err
}
batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts)
err = batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err)
return err

if r.db != nil {
batchResult := r.db.SendBatch(ctx, &batchQuery)
err := batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one aggregated event insertion failed", "err", err)
return err
}
batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts)
err = batchResult.Close()
if err != nil {
totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err)
return err
}
totalLogger.Info("Successfully submitted batch event insertion")
}
totalLogger.Info("Successfully submitted batch event insertion")

return nil
}

Expand Down Expand Up @@ -302,9 +334,11 @@ type RetrievalReport struct {

func (r *EventRecorder) Start(ctx context.Context) error {
var err error
r.db, err = pgxpool.NewWithConfig(ctx, r.cfg.pgxPoolConfig)
if err != nil {
return fmt.Errorf("failed to instantiate database connection: %w", err)
if r.cfg.pgxPoolConfig != nil {
r.db, err = pgxpool.NewWithConfig(ctx, r.cfg.pgxPoolConfig)
if err != nil {
return fmt.Errorf("failed to instantiate database connection: %w", err)
}
}

if r.cfg.mongoEndpoint != "" {
Expand All @@ -324,8 +358,10 @@ func (r *EventRecorder) Start(ctx context.Context) error {
}

func (r *EventRecorder) Shutdown() {
logger.Info("Closing database connection...")
r.db.Close()
if r.db != nil {
logger.Info("Closing database connection...")
r.db.Close()
}
logger.Info("Database connection closed successfully.")
if r.mongo != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading

0 comments on commit 0d2f32e

Please sign in to comment.