Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

Commit

Permalink
feat: collect and record Filecoin ID for all attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 15, 2023
1 parent 99de266 commit 5963abf
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 40 deletions.
87 changes: 52 additions & 35 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/filecoin-project/lassie-event-recorder/metrics"
Expand Down Expand Up @@ -208,49 +209,65 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
}
return nil
})

attempts := make(map[string]metrics.Attempt, len(event.RetrievalAttempts))
var wg sync.WaitGroup
var lk sync.Mutex
for storageProviderID, retrievalAttempt := range event.RetrievalAttempts {
var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
}
attempts[storageProviderID] = metrics.Attempt{
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
}
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
error,
protocol
)
VALUES ($1, $2, $3, $4, $5)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.Error,
retrievalAttempt.Protocol,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
wg.Add(1)
go func(storageProviderID string, retrievalAttempt *RetrievalAttempt) {
defer wg.Done()

var timeToFirstByte time.Duration
if retrievalAttempt.TimeToFirstByte != "" {
timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte)
}
filSPID := r.lassieSPIDToFilecoinSPID(ctx, storageProviderID) // call to Heyfil, may block if unknown SPID

lk.Lock()
defer lk.Unlock()
attempts[storageProviderID] = metrics.Attempt{
FilSPID: filSPID,
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
}
return nil
})
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
error,
protocol
)
VALUES ($1, $2, $3, $4, $5)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.Error,
retrievalAttempt.Protocol,
).Exec(func(ct pgconn.CommandTag) error {
rowsAffected := ct.RowsAffected()
switch rowsAffected {
case 0:
totalLogger.Warnw("Retrieval attempt insertion did not affect any rows", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
default:
totalLogger.Debugw("Inserted retrieval attempt successfully", "retrievalID", event.RetrievalID, "retrievalAttempt", retrievalAttempt, "storageProviderID", storageProviderID, "rowsAffected", rowsAffected)
}
return nil
})
}(storageProviderID, retrievalAttempt)
}

filSPID := r.lassieSPIDToFilecoinSPID(ctx, event.StorageProviderID)

wg.Wait()

if r.cfg.metrics != nil {
r.cfg.metrics.HandleAggregatedEvent(ctx,
r.cfg.metrics.HandleAggregatedEvent(
ctx,
timeToFirstIndexerResult,
timeToFirstByte,
event.Success,
Expand Down
11 changes: 6 additions & 5 deletions metrics/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID,
}

type Attempt struct {
FilSPID string
Error string
Protocol string
TimeToFirstByte time.Duration
Expand Down Expand Up @@ -182,20 +183,20 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
case ProtocolGraphsync:
if !recordedGraphSync {
recordedGraphSync = true
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID))
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
}
case ProtocolHttp:
if !recordedHttp {
recordedHttp = true
m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID))
m.requestWithHttpAttempt.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
}
}
if attempt.Error != "" {
switch protocolAttempted {
case ProtocolGraphsync:
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID))
m.graphsyncRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
case ProtocolHttp:
m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID))
m.httpRetrievalFailureCount.Add(ctx, 1, attribute.String("protocol", "http"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
default:
}
if metric, matched := m.getMatchingErrorMetric(ctx, attempt.Error); matched {
Expand Down Expand Up @@ -227,7 +228,7 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
if success {
protocol := protocolFromMulticodecString(protocolSucceeded)

m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("fil_sp_id", filSPID))
m.requestWithSuccessCount.Add(ctx, 1, attribute.String("protocol", protocol), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", filSPID))
switch protocol {
case ProtocolBitswap:
m.requestWithBitswapSuccessCount.Add(ctx, 1, attribute.String("protocol", "bitswap"))
Expand Down

0 comments on commit 5963abf

Please sign in to comment.