diff --git a/eventrecorder/event.go b/eventrecorder/event.go index 9e9d899..47344d5 100644 --- a/eventrecorder/event.go +++ b/eventrecorder/event.go @@ -121,9 +121,10 @@ func (e EventBatch) Validate() error { } type RetrievalAttempt struct { - Error string `json:"error,omitempty"` - TimeToFirstByte string `json:"timeToFirstByte,omitempty"` - Protocol string `json:"protocol,omitempty"` + Error string `json:"error,omitempty"` + TimeToFirstByte string `json:"timeToFirstByte,omitempty"` + BytesTransferred uint64 `json:"bytesTransferred,omitempty"` + Protocol string `json:"protocol,omitempty"` } type AggregateEvent struct { diff --git a/eventrecorder/recorder.go b/eventrecorder/recorder.go index ce15406..98d1c5f 100644 --- a/eventrecorder/recorder.go +++ b/eventrecorder/recorder.go @@ -159,6 +159,8 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr if event.TimeToFirstIndexerResult != "" { timeToFirstIndexerResult, _ = time.ParseDuration(event.TimeToFirstIndexerResult) } + filSPID := r.lassieSPIDToFilecoinSPID(ctx, event.StorageProviderID) + query := ` INSERT INTO aggregate_retrieval_events( instance_id, @@ -166,6 +168,7 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr root_cid, url_path, storage_provider_id, + filecoin_storage_provider_id, time_to_first_byte, bandwidth_bytes_sec, bytes_transferred, @@ -179,7 +182,7 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr protocols_attempted, protocol_succeeded ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18) ` batchQuery.Queue(query, event.InstanceID, @@ -187,6 +190,7 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr event.RootCid, event.URLPath, event.StorageProviderID, + filSPID, timeToFirstByte, event.Bandwidth, event.BytesTransferred, @@ -227,25 +231,30 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr lk.Lock() defer lk.Unlock() attempts[storageProviderID] = metrics.Attempt{ - FilSPID: filSPID, - Error: retrievalAttempt.Error, - Protocol: retrievalAttempt.Protocol, - TimeToFirstByte: timeToFirstByte, + FilSPID: filSPID, + Error: retrievalAttempt.Error, + Protocol: retrievalAttempt.Protocol, + TimeToFirstByte: timeToFirstByte, + BytesTransferred: retrievalAttempt.BytesTransferred, } query := ` INSERT INTO retrieval_attempts( retrieval_id, storage_provider_id, + filecoin_storage_provider_id, time_to_first_byte, + bytes_transferred, error, protocol ) - VALUES ($1, $2, $3, $4, $5) + VALUES ($1, $2, $3, $4, $5, $6, $7) ` batchRetrievalAttempts.Queue(query, event.RetrievalID, storageProviderID, + filSPID, timeToFirstByte, + retrievalAttempt.BytesTransferred, retrievalAttempt.Error, retrievalAttempt.Protocol, ).Exec(func(ct pgconn.CommandTag) error { @@ -260,9 +269,6 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr }) }(storageProviderID, retrievalAttempt) } - - filSPID := r.lassieSPIDToFilecoinSPID(ctx, event.StorageProviderID) - wg.Wait() if r.cfg.metrics != nil { diff --git a/eventrecorder/recorder_test.go b/eventrecorder/recorder_test.go index ec40f0b..04c5c5e 100644 --- a/eventrecorder/recorder_test.go +++ b/eventrecorder/recorder_test.go @@ -38,18 +38,20 @@ var expectedEvents = []ae{ protocolSucceeded: "transport-bitswap", attempts: map[string]metrics.Attempt{ "12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": { - Error: "", - Protocol: "transport-graphsync-filecoinv1", - TimeToFirstByte: 50 * time.Millisecond, + Error: "", + Protocol: "transport-graphsync-filecoinv1", + TimeToFirstByte: 50 * time.Millisecond, + BytesTransferred: 10001, }, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { Error: "failed to dial", Protocol: "transport-graphsync-filecoinv1", }, "Bitswap": { - Error: "", - Protocol: "transport-bitswap", - TimeToFirstByte: 20 * time.Millisecond, + Error: "", + Protocol: "transport-bitswap", + TimeToFirstByte: 20 * time.Millisecond, + BytesTransferred: 20002, }, }, }, @@ -85,18 +87,20 @@ var expectedEvents = []ae{ protocolSucceeded: "transport-ipfs-gateway-http", attempts: map[string]metrics.Attempt{ "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": { - Error: "", - Protocol: "transport-ipfs-gateway-http", - TimeToFirstByte: 100 * time.Millisecond, + Error: "", + Protocol: "transport-ipfs-gateway-http", + TimeToFirstByte: 100 * time.Millisecond, + BytesTransferred: 10001, }, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { Error: "failed to dial", Protocol: "transport-graphsync-filecoinv1", }, "Bitswap": { - Error: "", - Protocol: "transport-bitswap", - TimeToFirstByte: 200 * time.Millisecond, + Error: "", + Protocol: "transport-bitswap", + TimeToFirstByte: 200 * time.Millisecond, + BytesTransferred: 20002, }, }, }, @@ -154,6 +158,7 @@ func TestRecorderMetrics(t *testing.T) { req.Equal(aa.Error, mm.aggregatedEvents[ii].attempts[k].Error) req.Equal(aa.Protocol, mm.aggregatedEvents[ii].attempts[k].Protocol) req.Equal(aa.TimeToFirstByte, mm.aggregatedEvents[ii].attempts[k].TimeToFirstByte) + req.Equal(aa.BytesTransferred, mm.aggregatedEvents[ii].attempts[k].BytesTransferred) } } } diff --git a/metrics/events.go b/metrics/events.go index e55a8a3..72b1b91 100644 --- a/metrics/events.go +++ b/metrics/events.go @@ -147,10 +147,11 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID, } type Attempt struct { - FilSPID string - Error string - Protocol string - TimeToFirstByte time.Duration + FilSPID string + Error string + Protocol string + TimeToFirstByte time.Duration + BytesTransferred uint64 } func (m *Metrics) HandleAggregatedEvent(ctx context.Context, @@ -175,7 +176,11 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context, protocolAttempted := protocolFromMulticodecString(attempt.Protocol) switch protocolAttempted { case ProtocolBitswap: - m.requestWithBitswapAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap")) + if storageProviderID != "Bitswap" { + m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) + } else { + m.requestWithBitswapAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap")) + } case ProtocolGraphsync: m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID)) case ProtocolHttp: diff --git a/schema.sql b/schema.sql index b96bc82..4fce427 100644 --- a/schema.sql +++ b/schema.sql @@ -16,6 +16,7 @@ create table if not exists aggregate_retrieval_events( url_path text, instance_id character varying(64) not null, storage_provider_id character varying(256), + filecoin_storage_provider_id character varying(16), time_to_first_byte bigint, bandwidth_bytes_sec bigint, bytes_transferred bigint, @@ -33,8 +34,10 @@ create table if not exists aggregate_retrieval_events( create table if not exists retrieval_attempts( retrieval_id uuid not null, storage_provider_id character varying(256), + filecoin_storage_provider_id character varying(16), time_to_first_byte bigint, error text, protocol character varying(256), + bytes_transferred bigint, FOREIGN KEY (retrieval_id) REFERENCES aggregate_retrieval_events (retrieval_id) ); diff --git a/testdata/aggregategood.json b/testdata/aggregategood.json index 26f1eb3..d0547f3 100644 --- a/testdata/aggregategood.json +++ b/testdata/aggregategood.json @@ -19,7 +19,8 @@ "retrievalAttempts": { "12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": { "protocol": "transport-graphsync-filecoinv1", - "timeToFirstByte": "50ms" + "timeToFirstByte": "50ms", + "bytesTransferred": 10001 }, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { "error": "failed to dial", @@ -27,7 +28,8 @@ }, "Bitswap": { "protocol": "transport-bitswap", - "timeToFirstByte": "20ms" + "timeToFirstByte": "20ms", + "bytesTransferred": 20002 } }, "retrievalId": "c8490080-b86f-4306-a657-a0b88ac43832", @@ -71,7 +73,8 @@ "retrievalAttempts": { "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": { "protocol": "transport-ipfs-gateway-http", - "timeToFirstByte": "100ms" + "timeToFirstByte": "100ms", + "bytesTransferred": 10001 }, "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { "error": "failed to dial", @@ -79,7 +82,8 @@ }, "Bitswap": { "protocol": "transport-bitswap", - "timeToFirstByte": "200ms" + "timeToFirstByte": "200ms", + "bytesTransferred": 20002 } }, "retrievalId": "c8490080-b86f-4306-a657-a0b88ac43834",