Skip to content
This repository has been archived by the owner on Mar 15, 2024. It is now read-only.

feat: add bytes transferred per event #71

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions eventrecorder/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func (e EventBatch) Validate() error {
}

type RetrievalAttempt struct {
Error string `json:"error,omitempty"`
TimeToFirstByte string `json:"timeToFirstByte,omitempty"`
Protocol string `json:"protocol,omitempty"`
Error string `json:"error,omitempty"`
TimeToFirstByte string `json:"timeToFirstByte,omitempty"`
BytesTransferred uint64 `json:"bytesTransferred,omitempty"`
Protocol string `json:"protocol,omitempty"`
}

type AggregateEvent struct {
Expand Down
13 changes: 8 additions & 5 deletions eventrecorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,28 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr
lk.Lock()
defer lk.Unlock()
attempts[storageProviderID] = metrics.Attempt{
FilSPID: filSPID,
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
FilSPID: filSPID,
Error: retrievalAttempt.Error,
Protocol: retrievalAttempt.Protocol,
TimeToFirstByte: timeToFirstByte,
BytesTransferred: retrievalAttempt.BytesTransferred,
}
query := `
INSERT INTO retrieval_attempts(
retrieval_id,
storage_provider_id,
time_to_first_byte,
bytes_transferred,
error,
protocol
)
VALUES ($1, $2, $3, $4, $5)
VALUES ($1, $2, $3, $4, $5, $6)
`
batchRetrievalAttempts.Queue(query,
event.RetrievalID,
storageProviderID,
timeToFirstByte,
retrievalAttempt.BytesTransferred,
retrievalAttempt.Error,
retrievalAttempt.Protocol,
).Exec(func(ct pgconn.CommandTag) error {
Expand Down
29 changes: 17 additions & 12 deletions eventrecorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ var expectedEvents = []ae{
protocolSucceeded: "transport-bitswap",
attempts: map[string]metrics.Attempt{
"12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": {
Error: "",
Protocol: "transport-graphsync-filecoinv1",
TimeToFirstByte: 50 * time.Millisecond,
Error: "",
Protocol: "transport-graphsync-filecoinv1",
TimeToFirstByte: 50 * time.Millisecond,
BytesTransferred: 10001,
},
"12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": {
Error: "failed to dial",
Protocol: "transport-graphsync-filecoinv1",
},
"Bitswap": {
Error: "",
Protocol: "transport-bitswap",
TimeToFirstByte: 20 * time.Millisecond,
Error: "",
Protocol: "transport-bitswap",
TimeToFirstByte: 20 * time.Millisecond,
BytesTransferred: 20002,
},
},
},
Expand Down Expand Up @@ -85,18 +87,20 @@ var expectedEvents = []ae{
protocolSucceeded: "transport-ipfs-gateway-http",
attempts: map[string]metrics.Attempt{
"12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": {
Error: "",
Protocol: "transport-ipfs-gateway-http",
TimeToFirstByte: 100 * time.Millisecond,
Error: "",
Protocol: "transport-ipfs-gateway-http",
TimeToFirstByte: 100 * time.Millisecond,
BytesTransferred: 10001,
},
"12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": {
Error: "failed to dial",
Protocol: "transport-graphsync-filecoinv1",
},
"Bitswap": {
Error: "",
Protocol: "transport-bitswap",
TimeToFirstByte: 200 * time.Millisecond,
Error: "",
Protocol: "transport-bitswap",
TimeToFirstByte: 200 * time.Millisecond,
BytesTransferred: 20002,
},
},
},
Expand Down Expand Up @@ -154,6 +158,7 @@ func TestRecorderMetrics(t *testing.T) {
req.Equal(aa.Error, mm.aggregatedEvents[ii].attempts[k].Error)
req.Equal(aa.Protocol, mm.aggregatedEvents[ii].attempts[k].Protocol)
req.Equal(aa.TimeToFirstByte, mm.aggregatedEvents[ii].attempts[k].TimeToFirstByte)
req.Equal(aa.BytesTransferred, mm.aggregatedEvents[ii].attempts[k].BytesTransferred)
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions metrics/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ func (m *Metrics) HandleSuccessEvent(ctx context.Context, id types.RetrievalID,
}

type Attempt struct {
FilSPID string
Error string
Protocol string
TimeToFirstByte time.Duration
FilSPID string
Error string
Protocol string
TimeToFirstByte time.Duration
BytesTransferred uint64
}

func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
Expand All @@ -175,7 +176,11 @@ func (m *Metrics) HandleAggregatedEvent(ctx context.Context,
protocolAttempted := protocolFromMulticodecString(attempt.Protocol)
switch protocolAttempted {
case ProtocolBitswap:
m.requestWithBitswapAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap"))
if storageProviderID != "Bitswap" {
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
} else {
m.requestWithBitswapAttempt.Add(ctx, 1, attribute.String("protocol", "bitswap"))
}
case ProtocolGraphsync:
m.requestWithGraphSyncAttempt.Add(ctx, 1, attribute.String("protocol", "graphsync"), attribute.String("sp_id", storageProviderID), attribute.String("fil_sp_id", attempt.FilSPID))
case ProtocolHttp:
Expand Down
1 change: 1 addition & 0 deletions schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ create table if not exists retrieval_attempts(
time_to_first_byte bigint,
error text,
protocol character varying(256),
bytes_transferred bigint,
FOREIGN KEY (retrieval_id) REFERENCES aggregate_retrieval_events (retrieval_id)
);
12 changes: 8 additions & 4 deletions testdata/aggregategood.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
"retrievalAttempts": {
"12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": {
"protocol": "transport-graphsync-filecoinv1",
"timeToFirstByte": "50ms"
"timeToFirstByte": "50ms",
"bytesTransferred": 10001
},
"12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": {
"error": "failed to dial",
"protocol": "transport-graphsync-filecoinv1"
},
"Bitswap": {
"protocol": "transport-bitswap",
"timeToFirstByte": "20ms"
"timeToFirstByte": "20ms",
"bytesTransferred": 20002
}
},
"retrievalId": "c8490080-b86f-4306-a657-a0b88ac43832",
Expand Down Expand Up @@ -71,15 +73,17 @@
"retrievalAttempts": {
"12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": {
"protocol": "transport-ipfs-gateway-http",
"timeToFirstByte": "100ms"
"timeToFirstByte": "100ms",
"bytesTransferred": 10001
},
"12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": {
"error": "failed to dial",
"protocol": "transport-graphsync-filecoinv1"
},
"Bitswap": {
"protocol": "transport-bitswap",
"timeToFirstByte": "200ms"
"timeToFirstByte": "200ms",
"bytesTransferred": 20002
}
},
"retrievalId": "c8490080-b86f-4306-a657-a0b88ac43834",
Expand Down