From f0cad107d02593da3412b14d92614e75113ae3b4 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 15 Aug 2023 15:09:00 +1000 Subject: [PATCH] feat: collect and record Filecoin ID for all attempts --- eventrecorder/recorder.go | 87 +++++++++++++++++++++++---------------- metrics/events.go | 11 ++--- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/eventrecorder/recorder.go b/eventrecorder/recorder.go index e41cea7..ce15406 100644 --- a/eventrecorder/recorder.go +++ b/eventrecorder/recorder.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "sync" "time" "github.com/filecoin-project/lassie-event-recorder/metrics" @@ -208,49 +209,65 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr } return nil }) + attempts := make(map[string]metrics.Attempt, len(event.RetrievalAttempts)) + var wg sync.WaitGroup + var lk sync.Mutex for storageProviderID, retrievalAttempt := range event.RetrievalAttempts { - var timeToFirstByte time.Duration - if retrievalAttempt.TimeToFirstByte != "" { - timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte) - } - attempts[storageProviderID] = metrics.Attempt{ - Error: retrievalAttempt.Error, - Protocol: retrievalAttempt.Protocol, - TimeToFirstByte: timeToFirstByte, - } - query := ` - INSERT INTO retrieval_attempts( - retrieval_id, - storage_provider_id, - time_to_first_byte, - error, - protocol - ) - VALUES ($1, $2, $3, $4, $5) - ` - batchRetrievalAttempts.Queue(query, - event.RetrievalID, - storageProviderID, - timeToFirstByte, - retrievalAttempt.Error, - retrievalAttempt.Protocol, - ).Exec(func(ct pgconn.CommandTag) error { - rowsAffected := ct.RowsAffected() - switch rowsAffected { - case 0: - totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected) - default: - totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected) + wg.Add(1) + go func(storageProviderID string, retrievalAttempt *RetrievalAttempt) { + defer wg.Done() + + var timeToFirstByte time.Duration + if retrievalAttempt.TimeToFirstByte != "" { + timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte) + } + filSPID := r.lassieSPIDToFilecoinSPID(ctx, storageProviderID) // call to Heyfil, may block if unknown SPID + + lk.Lock() + defer lk.Unlock() + attempts[storageProviderID] = metrics.Attempt{ + FilSPID: filSPID, + Error: retrievalAttempt.Error, + Protocol: retrievalAttempt.Protocol, + TimeToFirstByte: timeToFirstByte, } - return nil - }) + query := ` + INSERT INTO retrieval_attempts( + retrieval_id, + storage_provider_id, + time_to_first_byte, + error, + protocol + ) + VALUES ($1, $2, $3, $4, $5) + ` + batchRetrievalAttempts.Queue(query, + event.RetrievalID, + storageProviderID, + timeToFirstByte, + retrievalAttempt.Error, + retrievalAttempt.Protocol, + ).Exec(func(ct pgconn.CommandTag) error { + rowsAffected := ct.RowsAffected() + switch rowsAffected { + case 0: + totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected) + default: + totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected) + } + return nil + }) + }(storageProviderID, retrievalAttempt) } filSPID := r.lassieSPIDToFilecoinSPID(ctx, event.StorageProviderID) + wg.Wait() + if r.cfg.metrics != nil { - r.cfg.metrics.HandleAggregatedEvent(ctx, + r.cfg.metrics.HandleAggregatedEvent( + ctx, timeToFirstIndexerResult, timeToFirstByte, event.Success, diff --git a/metrics/events.go b/metrics/events.go index b7f788d..8e88e57 100644 --- a/metrics/events.go +++ b/metrics/events.go @@ -147,6 +147,7 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID, } type Attempt struct { + FilSPID string Error string Protocol string TimeToFirstByte time.Duration @@ -182,20 +183,20 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context, case ProtocolGraphsync: if !recordedGraphSync { recordedGraphSync = true - m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID)) + m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) } case ProtocolHttp: if !recordedHttp { recordedHttp = true - m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID)) + m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) } } if attempt.Error != "" { switch protocolAttempted { case ProtocolGraphsync: - m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID)) + m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) case ProtocolHttp: - m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID)) + m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) default: } if metric, matched := m.getMatchingErrorMetric(ctx, attempt.Error); matched { @@ -227,7 +228,7 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context, if success { protocol := protocolFromMulticodecString(protocolSucceeded) - m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("fil_sp_id", filSPID)) + m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", filSPID)) switch protocol { case ProtocolBitswap: m.requestWithBitswapSuccessCount.Add(ctx, 1, attribute.String("protocol", "bitswap"))