Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

feat: collect and record Filecoin ID for all attempts #67

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/filecoin-project/lassie-event-recorder/metrics"
Expand Down Expand Up @@ -208,49 +209,65 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
}
return nil
})

attempts := make(map[string]metrics.Attempt, len(event.RetrievalAttempts))
var wg sync.WaitGroup
var lk sync.Mutex
for storageProviderID, retrievalAttempt := range event.RetrievalAttempts {
var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
}
attempts[storageProviderID] = metrics.Attempt{
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
}
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
error,
protocol
)
VALUES ($1, $2, $3, $4, $5)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.Error,
retrievalAttempt.Protocol,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
wg.Add(1)
go func(storageProviderID string, retrievalAttempt *RetrievalAttempt) {
defer wg.Done()

var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
}
filSPID := r.lassieSPIDToFilecoinSPID(ctx, storageProviderID) // call to Heyfil, may block if unknown SPID

lk.Lock()
defer lk.Unlock()
attempts[storageProviderID] = metrics.Attempt{
FilSPID: filSPID,
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
}
return nil
})
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
error,
protocol
)
VALUES ($1, $2, $3, $4, $5)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.Error,
retrievalAttempt.Protocol,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
}
return nil
})
}(storageProviderID, retrievalAttempt)
}

filSPID := r.lassieSPIDToFilecoinSPID(ctx, event.StorageProviderID)

wg.Wait()

if r.cfg.metrics != nil {
r.cfg.metrics.HandleAggregatedEvent(ctx,
r.cfg.metrics.HandleAggregatedEvent(
ctx,
timeToFirstIndexerResult,
timeToFirstByte,
event.Success,
Expand Down
11 changes: 6 additions & 5 deletions metrics/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID,
}

type Attempt struct {
FilSPID string
Error string
Protocol string
TimeToFirstByte time.Duration
Expand Down Expand Up @@ -182,20 +183,20 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
case ProtocolGraphsync:
if !recordedGraphSync {
recordedGraphSync = true
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID))
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
}
case ProtocolHttp:
if !recordedHttp {
recordedHttp = true
m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID))
m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
}
}
if attempt.Error != "" {
switch protocolAttempted {
case ProtocolGraphsync:
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID))
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
case ProtocolHttp:
m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID))
m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
default:
}
if metric, matched := m.getMatchingErrorMetric(ctx, attempt.Error); matched {
Expand Down Expand Up @@ -227,7 +228,7 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
if success {
protocol := protocolFromMulticodecString(protocolSucceeded)

m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("fil_sp_id", filSPID))
m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", filSPID))
switch protocol {
case ProtocolBitswap:
m.requestWithBitswapSuccessCount.Add(ctx, 1, attribute.String("protocol", "bitswap"))
Expand Down