diff --git a/eventrecorder/config.go b/eventrecorder/config.go index 6c0afe8..e893e94 100644 --- a/eventrecorder/config.go +++ b/eventrecorder/config.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" - "github.com/filecoin-project/lassie-event-recorder/metrics" "github.com/filecoin-project/lassie-event-recorder/spmap" "github.com/jackc/pgx/v5/pgxpool" ) @@ -22,7 +21,7 @@ type ( mapcfg []spmap.Option - metrics *metrics.Metrics + metrics Metrics } Option func(*config) error ) @@ -34,13 +33,14 @@ func newConfig(opts []Option) (*config, error) { return nil, err } } - - if cfg.dbDSN == "" { - return nil, errors.New("db URL must be specified") + if cfg.dbDSN != "" { + var err error + if cfg.pgxPoolConfig, err = pgxpool.ParseConfig(cfg.dbDSN); err != nil { + return nil, fmt.Errorf("unable to parse db URL: %w", err) + } } - var err error - if cfg.pgxPoolConfig, err = pgxpool.ParseConfig(cfg.dbDSN); err != nil { - return nil, fmt.Errorf("unable to parse db URL: %w", err) + if cfg.pgxPoolConfig == nil && cfg.metrics == nil && cfg.mongoEndpoint == "" { + return nil, errors.New("must set up at least one of: postgres, mongo, metrics") } return cfg, nil } @@ -62,9 +62,16 @@ func WithMongoSubmissions(endpoint, db, collection string, percentage float32) O } } -func WithMetrics(metrics *metrics.Metrics) Option { +func WithMetrics(metrics Metrics) Option { return func(cfg *config) error { cfg.metrics = metrics return nil } } + +func WithSPMapOptions(opts ...spmap.Option) Option { + return func(cfg *config) error { + cfg.mapcfg = opts + return nil + } +} diff --git a/eventrecorder/recorder.go b/eventrecorder/recorder.go index 44a3846..e41cea7 100644 --- a/eventrecorder/recorder.go +++ b/eventrecorder/recorder.go @@ -20,6 +20,32 @@ import ( var logger = log.Logger("lassie/eventrecorder") +type Metrics interface { + HandleStartedEvent(context.Context, types.RetrievalID, types.Phase, time.Time, string) + HandleCandidatesFoundEvent(context.Context, types.RetrievalID, time.Time, any) + HandleCandidatesFilteredEvent(context.Context, types.RetrievalID, any) + HandleFailureEvent(context.Context, types.RetrievalID, types.Phase, string, any) + HandleTimeToFirstByteEvent(context.Context, types.RetrievalID, string, time.Time) + HandleSuccessEvent(context.Context, types.RetrievalID, time.Time, string, any) + + HandleAggregatedEvent( + ctx context.Context, + timeToFirstIndexerResult time.Duration, + timeToFirstByte time.Duration, + success bool, + storageProviderID string, // Lassie Peer ID + filSPID string, // Heyfil Filecoin SP ID + startTime time.Time, + endTime time.Time, + bandwidth int64, + bytesTransferred int64, + indexerCandidates int64, + indexerFiltered int64, + attempts map[string]metrics.Attempt, + protocolSucceeded string, + ) +} + type EventRecorder struct { cfg *config db *pgxpool.Pool @@ -43,6 +69,10 @@ func New(opts ...Option) (*EventRecorder, error) { } func (r *EventRecorder) RecordEvents(ctx context.Context, events []Event) error { + if r.db == nil { + return nil + } + totalLogger := logger.With("total", len(events)) var batchQuery pgx.Batch @@ -180,7 +210,6 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr }) attempts := make(map[string]metrics.Attempt, len(event.RetrievalAttempts)) for storageProviderID, retrievalAttempt := range event.RetrievalAttempts { - var timeToFirstByte time.Duration if retrievalAttempt.TimeToFirstByte != "" { timeToFirstByte, _ = time.ParseDuration(retrievalAttempt.TimeToFirstByte) @@ -242,7 +271,8 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr report := RetrievalReport{ RetrievalID: event.RetrievalID, InstanceID: event.InstanceID, - StorageProviderID: event.StorageProviderID, + StorageProviderID: event.StorageProviderID, // Lassie Peer ID + SPID: filSPID, // Heyfil Filecoin SP ID TTFB: timeToFirstByte.Milliseconds(), Bandwidth: int64(event.Bandwidth), Success: event.Success, @@ -252,27 +282,29 @@ func (r *EventRecorder) RecordAggregateEvents(ctx context.Context, events []Aggr go func(reportData RetrievalReport) { mongoReportCtx, cncl := context.WithTimeout(context.Background(), 30*time.Second) defer cncl() - reportData.SPID = filSPID - _, err := r.mc.InsertOne(mongoReportCtx, reportData) - if err != nil { + if _, err := r.mc.InsertOne(mongoReportCtx, reportData); err != nil { logger.Infof("failed to report to mongo: %w", err) } }(report) } } - batchResult := r.db.SendBatch(ctx, &batchQuery) - err := batchResult.Close() - if err != nil { - totalLogger.Errorw("At least one aggregated event insertion failed", "err", err) - return err - } - batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts) - err = batchResult.Close() - if err != nil { - totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err) - return err + + if r.db != nil { + batchResult := r.db.SendBatch(ctx, &batchQuery) + err := batchResult.Close() + if err != nil { + totalLogger.Errorw("At least one aggregated event insertion failed", "err", err) + return err + } + batchResult = r.db.SendBatch(ctx, &batchRetrievalAttempts) + err = batchResult.Close() + if err != nil { + totalLogger.Errorw("At least one retrieval attempt insertion failed", "err", err) + return err + } + totalLogger.Info("Successfully submitted batch event insertion") } - totalLogger.Info("Successfully submitted batch event insertion") + return nil } @@ -302,9 +334,11 @@ type RetrievalReport struct { func (r *EventRecorder) Start(ctx context.Context) error { var err error - r.db, err = pgxpool.NewWithConfig(ctx, r.cfg.pgxPoolConfig) - if err != nil { - return fmt.Errorf("failed to instantiate database connection: %w", err) + if r.cfg.pgxPoolConfig != nil { + r.db, err = pgxpool.NewWithConfig(ctx, r.cfg.pgxPoolConfig) + if err != nil { + return fmt.Errorf("failed to instantiate database connection: %w", err) + } } if r.cfg.mongoEndpoint != "" { @@ -324,8 +358,10 @@ func (r *EventRecorder) Start(ctx context.Context) error { } func (r *EventRecorder) Shutdown() { - logger.Info("Closing database connection...") - r.db.Close() + if r.db != nil { + logger.Info("Closing database connection...") + r.db.Close() + } logger.Info("Database connection closed successfully.") if r.mongo != nil { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/eventrecorder/recorder_test.go b/eventrecorder/recorder_test.go new file mode 100644 index 0000000..223d216 --- /dev/null +++ b/eventrecorder/recorder_test.go @@ -0,0 +1,276 @@ +package eventrecorder_test + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/filecoin-project/lassie-event-recorder/eventrecorder" + "github.com/filecoin-project/lassie-event-recorder/httpserver" + "github.com/filecoin-project/lassie-event-recorder/metrics" + "github.com/filecoin-project/lassie-event-recorder/spmap" + spmaptestutil "github.com/filecoin-project/lassie-event-recorder/spmap/testutil" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/stretchr/testify/require" +) + +var expectedEvents = []ae{ + // bitswap success + { + timeToFirstIndexerResult: 10 * time.Millisecond, + timeToFirstByte: 40 * time.Millisecond, + success: true, + storageProviderID: "Bitswap", + filSPID: "", + startTime: time.Unix(0, 0), + endTime: time.Unix(0, 0).Add(90 * time.Millisecond), + bandwidth: 200000, + bytesTransferred: 10000, + indexerCandidates: 5, + indexerFiltered: 3, + protocolSucceeded: "transport-bitswap", + attempts: map[string]metrics.Attempt{ + "12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": { + Error: "", + Protocol: "transport-graphsync-filecoinv1", + TimeToFirstByte: 50 * time.Millisecond, + }, + "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { + Error: "failed to dial", + Protocol: "transport-graphsync-filecoinv1", + }, + "Bitswap": { + Error: "", + Protocol: "transport-bitswap", + TimeToFirstByte: 20 * time.Millisecond, + }, + }, + }, + // failure + { + timeToFirstIndexerResult: time.Duration(0), + timeToFirstByte: time.Duration(0), + success: false, + storageProviderID: "", + filSPID: "", + startTime: time.Unix(0, 0), + endTime: time.Unix(0, 0), + bandwidth: 0, + bytesTransferred: 0, + indexerCandidates: 0, + indexerFiltered: 0, + protocolSucceeded: "", + attempts: map[string]metrics.Attempt{}, + }, + // http success + { + timeToFirstIndexerResult: 20 * time.Millisecond, + timeToFirstByte: 80 * time.Millisecond, + success: true, + storageProviderID: "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE", // same as smpap/testutil/TestPeerID + filSPID: "f01228000", // from Heyfil, same as smpap/testutil/TestSPID + startTime: time.Unix(0, 0).Add(20 * time.Millisecond), + endTime: time.Unix(0, 0).Add(180 * time.Millisecond), + bandwidth: 300000, + bytesTransferred: 20000, + indexerCandidates: 10, + indexerFiltered: 6, + protocolSucceeded: "transport-ipfs-gateway-http", + attempts: map[string]metrics.Attempt{ + "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": { + Error: "", + Protocol: "transport-ipfs-gateway-http", + TimeToFirstByte: 100 * time.Millisecond, + }, + "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { + Error: "failed to dial", + Protocol: "transport-graphsync-filecoinv1", + }, + "Bitswap": { + Error: "", + Protocol: "transport-bitswap", + TimeToFirstByte: 200 * time.Millisecond, + }, + }, + }, +} + +func TestRecorderMetrics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + req := require.New(t) + + spmapts := httptest.NewServer(spmaptestutil.MockHeyfilHandler) + defer spmapts.Close() + + mm := &mockMetrics{t: t} + recorder, err := eventrecorder.New(eventrecorder.WithMetrics(mm), eventrecorder.WithSPMapOptions(spmap.WithHeyFil(spmapts.URL))) + req.NoError(err) + + handler := httpserver.NewHttpHandler(recorder) + evtts := httptest.NewServer(handler.Handler()) + defer evtts.Close() + + req.NoError(handler.Start(ctx)) + + encEventBatch, err := os.ReadFile("../testdata/aggregategood.json") + req.NoError(err) + + resp, err := http.Post( + evtts.URL+"/v2/retrieval-events", + "application/json", + bytes.NewReader(encEventBatch)) + req.NoError(err) + body, err := io.ReadAll(resp.Body) + req.NoError(err) + req.Equal(http.StatusOK, resp.StatusCode, string(body)) + req.Len(body, 0) + + req.Len(mm.aggregatedEvents, len(expectedEvents)) + for ii, ee := range expectedEvents { + req.Equal(ee.timeToFirstIndexerResult, mm.aggregatedEvents[ii].timeToFirstIndexerResult) + req.Equal(ee.timeToFirstByte, mm.aggregatedEvents[ii].timeToFirstByte) + req.Equal(ee.success, mm.aggregatedEvents[ii].success) + req.Equal(ee.storageProviderID, mm.aggregatedEvents[ii].storageProviderID) + req.Equal(ee.filSPID, mm.aggregatedEvents[ii].filSPID) + req.Equal(ee.startTime, mm.aggregatedEvents[ii].startTime) + req.Equal(ee.endTime, mm.aggregatedEvents[ii].endTime) + req.Equal(ee.bandwidth, mm.aggregatedEvents[ii].bandwidth) + req.Equal(ee.bytesTransferred, mm.aggregatedEvents[ii].bytesTransferred) + req.Equal(ee.indexerCandidates, mm.aggregatedEvents[ii].indexerCandidates) + req.Equal(ee.indexerFiltered, mm.aggregatedEvents[ii].indexerFiltered) + req.Equal(ee.protocolSucceeded, mm.aggregatedEvents[ii].protocolSucceeded) + req.Len(mm.aggregatedEvents[ii].attempts, len(ee.attempts)) + for k, aa := range ee.attempts { + req.Contains(mm.aggregatedEvents[ii].attempts, k) + req.Equal(aa.Error, mm.aggregatedEvents[ii].attempts[k].Error) + req.Equal(aa.Protocol, mm.aggregatedEvents[ii].attempts[k].Protocol) + req.Equal(aa.TimeToFirstByte, mm.aggregatedEvents[ii].attempts[k].TimeToFirstByte) + } + } +} + +type mockMetrics struct { + t *testing.T + aggregatedEvents []ae +} + +func (mm *mockMetrics) HandleStartedEvent(context.Context, types.RetrievalID, types.Phase, time.Time, string) { + require.Fail(mm.t, "unexpected HandleStartedEvent call") +} + +func (mm *mockMetrics) HandleCandidatesFoundEvent(context.Context, types.RetrievalID, time.Time, any) { + require.Fail(mm.t, "unexpected HandleCandidatesFoundEvent call") +} + +func (mm *mockMetrics) HandleCandidatesFilteredEvent(context.Context, types.RetrievalID, any) { + require.Fail(mm.t, "unexpected HandleCandidatesFilteredEvent call") +} + +func (mm *mockMetrics) HandleFailureEvent(context.Context, types.RetrievalID, types.Phase, string, any) { + require.Fail(mm.t, "unexpected HandleFailureEvent call") +} + +func (mm *mockMetrics) HandleTimeToFirstByteEvent(context.Context, types.RetrievalID, string, time.Time) { + require.Fail(mm.t, "unexpected HandleTimeToFirstByteEvent call") +} + +func (mm *mockMetrics) HandleSuccessEvent(context.Context, types.RetrievalID, time.Time, string, any) { + require.Fail(mm.t, "unexpected HandleSuccessEvent call") +} + +func (mm *mockMetrics) HandleAggregatedEvent( + ctx context.Context, + timeToFirstIndexerResult time.Duration, + timeToFirstByte time.Duration, + success bool, + storageProviderID string, // Lassie Peer ID + filSPID string, // Heyfil Filecoin SP ID + startTime time.Time, + endTime time.Time, + bandwidth int64, + bytesTransferred int64, + indexerCandidates int64, + indexerFiltered int64, + attempts map[string]metrics.Attempt, + protocolSucceeded string, +) { + if mm.aggregatedEvents == nil { + mm.aggregatedEvents = make([]ae, 0) + } + mm.aggregatedEvents = append(mm.aggregatedEvents, ae{ + timeToFirstIndexerResult, + timeToFirstByte, + success, + storageProviderID, + filSPID, + startTime, + endTime, + bandwidth, + bytesTransferred, + indexerCandidates, + indexerFiltered, + attempts, + protocolSucceeded, + }) +} + +type ae struct { + timeToFirstIndexerResult time.Duration + timeToFirstByte time.Duration + success bool + storageProviderID string // Lassie Peer ID + filSPID string // Heyfil Filecoin SP ID + startTime time.Time + endTime time.Time + bandwidth int64 + bytesTransferred int64 + indexerCandidates int64 + indexerFiltered int64 + attempts map[string]metrics.Attempt + protocolSucceeded string +} + +func (a ae) String() string { + attempts := strings.Builder{} + for k, v := range a.attempts { + attempts.WriteString(fmt.Sprintf("\t%s: error=%v, protocol=%v, ttfb=%v\n", k, v.Error, v.Protocol, v.TimeToFirstByte)) + } + + return fmt.Sprintf( + "timeToFirstIndexerResult: %v\n"+ + "timeToFirstByte: %v\n"+ + "success: %v\n"+ + "storageProviderID: %s\n"+ + "filSPID: %s\n"+ + "startTime: %v\n"+ + "endTime: %v\n"+ + "bandwidth: %d\n"+ + "bytesTransferred: %d\n"+ + "indexerCandidates: %d\n"+ + "indexerFiltered: %d\n"+ + "attempts:\n%s"+ + "protocolSucceeded: %s", + a.timeToFirstIndexerResult, + a.timeToFirstByte, + a.success, + a.storageProviderID, + a.filSPID, + a.startTime, + a.endTime, + a.bandwidth, + a.bytesTransferred, + a.indexerCandidates, + a.indexerFiltered, + attempts.String(), + a.protocolSucceeded, + ) +} diff --git a/httpserver/httpserver.go b/httpserver/httpserver.go index 9aba531..7a3b957 100644 --- a/httpserver/httpserver.go +++ b/httpserver/httpserver.go @@ -15,9 +15,9 @@ import ( var logger = log.Logger("lassie/httpserver") type HttpServer struct { - cfg *config - recorder *eventrecorder.EventRecorder - server *http.Server + cfg *config + server *http.Server + handler *HttpHandler } func NewHttpServer(recorder *eventrecorder.EventRecorder, opts ...option) (*HttpServer, error) { @@ -28,9 +28,10 @@ func NewHttpServer(recorder *eventrecorder.EventRecorder, opts ...option) (*Http var httpServer HttpServer httpServer.cfg = cfg + httpServer.handler = NewHttpHandler(recorder) httpServer.server = &http.Server{ Addr: httpServer.cfg.httpServerListenAddr, - Handler: httpServer.httpServerMux(), + Handler: httpServer.handler.Handler(), ReadTimeout: httpServer.cfg.httpServerReadTimeout, ReadHeaderTimeout: httpServer.cfg.httpServerReadHeaderTimeout, WriteTimeout: httpServer.cfg.httpServerWriteTimeout, @@ -38,43 +39,59 @@ func NewHttpServer(recorder *eventrecorder.EventRecorder, opts ...option) (*Http MaxHeaderBytes: httpServer.cfg.httpServerMaxHeaderBytes, } - httpServer.recorder = recorder - return &httpServer, nil } -func (h HttpServer) Start(ctx context.Context) error { - ln, err := net.Listen("tcp", h.server.Addr) +func (hs HttpServer) Start(ctx context.Context) error { + ln, err := net.Listen("tcp", hs.server.Addr) if err != nil { return err } // Start the event recorder - h.recorder.Start(ctx) + if err := hs.handler.Start(ctx); err != nil { + return err + } // Shutdown the event recorder when the server shutdowns - h.server.RegisterOnShutdown(func() { - h.recorder.Shutdown() + hs.server.RegisterOnShutdown(func() { + hs.handler.Shutdown() }) - go func() { _ = h.server.Serve(ln) }() + go func() { _ = hs.server.Serve(ln) }() logger.Infow("Server started", "addr", ln.Addr()) return nil } -func (h HttpServer) Shutdown(ctx context.Context) error { - return h.server.Shutdown(ctx) +func (hs HttpServer) Shutdown(ctx context.Context) error { + return hs.server.Shutdown(ctx) +} + +type HttpHandler struct { + recorder *eventrecorder.EventRecorder +} + +func NewHttpHandler(recorder *eventrecorder.EventRecorder) *HttpHandler { + return &HttpHandler{recorder} +} + +func (hh HttpHandler) Start(ctx context.Context) error { + return hh.recorder.Start(ctx) +} + +func (hh HttpHandler) Shutdown() { + hh.recorder.Shutdown() } -func (r *HttpServer) httpServerMux() *http.ServeMux { +func (hh *HttpHandler) Handler() http.Handler { mux := http.NewServeMux() - mux.HandleFunc("/v1/retrieval-events", r.handleRetrievalEvents) - mux.HandleFunc("/v2/retrieval-events", r.handleRetrievalEventsV2) - mux.HandleFunc("/ready", r.handleReady) + mux.HandleFunc("/v1/retrieval-events", hh.handleRetrievalEvents) + mux.HandleFunc("/v2/retrieval-events", hh.handleRetrievalEventsV2) + mux.HandleFunc("/ready", hh.handleReady) return mux } -func (h *HttpServer) handleRetrievalEvents(res http.ResponseWriter, req *http.Request) { +func (hh *HttpHandler) handleRetrievalEvents(res http.ResponseWriter, req *http.Request) { logger := logger.With("method", req.Method, "path", req.URL.Path) if req.Method != http.MethodPost { res.Header().Add("Allow", http.MethodPost) @@ -106,14 +123,14 @@ func (h *HttpServer) handleRetrievalEvents(res http.ResponseWriter, req *http.Re return } - err := h.recorder.RecordEvents(req.Context(), batch.Events) + err := hh.recorder.RecordEvents(req.Context(), batch.Events) if err != nil { http.Error(res, "", http.StatusInternalServerError) return } } -func (h *HttpServer) handleRetrievalEventsV2(res http.ResponseWriter, req *http.Request) { +func (hh *HttpHandler) handleRetrievalEventsV2(res http.ResponseWriter, req *http.Request) { logger := logger.With("method", req.Method, "path", req.URL.Path) if req.Method != http.MethodPost { res.Header().Add("Allow", http.MethodPost) @@ -145,14 +162,14 @@ func (h *HttpServer) handleRetrievalEventsV2(res http.ResponseWriter, req *http. return } - err := h.recorder.RecordAggregateEvents(req.Context(), batch.Events) + err := hh.recorder.RecordAggregateEvents(req.Context(), batch.Events) if err != nil { http.Error(res, "", http.StatusInternalServerError) return } } -func (r *HttpServer) handleReady(res http.ResponseWriter, req *http.Request) { +func (hh *HttpHandler) handleReady(res http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodGet: // TODO: ping DB as part of readiness check? diff --git a/spmap/spmap_test.go b/spmap/spmap_test.go index 1bf60e7..782a4a9 100644 --- a/spmap/spmap_test.go +++ b/spmap/spmap_test.go @@ -2,41 +2,23 @@ package spmap_test import ( "context" - "fmt" - "net/http" "net/http/httptest" "testing" "github.com/filecoin-project/lassie-event-recorder/spmap" + "github.com/filecoin-project/lassie-event-recorder/spmap/testutil" "github.com/libp2p/go-libp2p/core/peer" ) -var ( - TestPeerID = "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE" - TestSPID = "f01228000" -) - -var MockHeyfilHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/sp" { - http.Error(w, "nope, bad path", http.StatusBadRequest) - return - } - if r.URL.RawQuery != "peerid="+TestPeerID { - http.Error(w, "nope, bad query", http.StatusBadRequest) - return - } - fmt.Fprintf(w, `["%s"]\n`, TestSPID) -}) - func TestSPMap(t *testing.T) { - ts := httptest.NewServer(MockHeyfilHandler) + ts := httptest.NewServer(testutil.MockHeyfilHandler) defer ts.Close() spm := spmap.NewSPMap(spmap.WithHeyFil(ts.URL)) var pid peer.ID - pid.UnmarshalText([]byte(TestPeerID)) + pid.UnmarshalText([]byte(testutil.TestPeerID)) ch := spm.Get(context.Background(), pid) sid, ok := <-ch - if sid != TestSPID { + if sid != testutil.TestSPID { t.Fatalf("unexpected response; got: %v / %t", sid, ok) } } diff --git a/spmap/testutil/mockhandler.go b/spmap/testutil/mockhandler.go new file mode 100644 index 0000000..8393be5 --- /dev/null +++ b/spmap/testutil/mockhandler.go @@ -0,0 +1,23 @@ +package testutil + +import ( + "fmt" + "net/http" +) + +var ( + TestPeerID = "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE" + TestSPID = "f01228000" +) + +var MockHeyfilHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/sp" { + http.Error(w, "nope, bad path", http.StatusBadRequest) + return + } + if r.URL.RawQuery != "peerid="+TestPeerID { + http.Error(w, "nope, bad query", http.StatusBadRequest) + return + } + fmt.Fprintf(w, `["%s"]\n`, TestSPID) +}) diff --git a/testdata/aggregategood.json b/testdata/aggregategood.json new file mode 100644 index 0000000..26f1eb3 --- /dev/null +++ b/testdata/aggregategood.json @@ -0,0 +1,95 @@ + { + "events": [ + { + "bandwidth": 200000, + "bytesTransferred": 10000, + "endTime": "1970-01-01T10:00:00.09+10:00", + "indexerCandidatesFiltered": 3, + "indexerCandidatesReceived": 5, + "instanceId": "test-instance", + "protocolSucceeded": "transport-bitswap", + "protocolsAllowed": [ + "transport-graphsync-filecoinv1", + "transport-bitswap" + ], + "protocolsAttempted": [ + "transport-graphsync-filecoinv1", + "transport-bitswap" + ], + "retrievalAttempts": { + "12D3KooWEqwTBN3GE4vT6DWZiKpq24UtSBmhhwM73vg7SfTjYWaF": { + "protocol": "transport-graphsync-filecoinv1", + "timeToFirstByte": "50ms" + }, + "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { + "error": "failed to dial", + "protocol": "transport-graphsync-filecoinv1" + }, + "Bitswap": { + "protocol": "transport-bitswap", + "timeToFirstByte": "20ms" + } + }, + "retrievalId": "c8490080-b86f-4306-a657-a0b88ac43832", + "rootCid": "QmTTA2daxGqo5denp6SwLzzkLJm3fuisYEi9CoWsuHpzfb", + "startTime": "1970-01-01T10:00:00+10:00", + "storageProviderId": "Bitswap", + "success": true, + "timeToFirstByte": "40ms", + "timeToFirstIndexerResult": "10ms", + "urlPath": "/applesauce" + }, + { + "endTime": "1970-01-01T10:00:00+10:00", + "indexerCandidatesFiltered": 0, + "indexerCandidatesReceived": 0, + "instanceId": "test-instance", + "retrievalId": "5f06fd27-36db-47f8-a0f5-ef20bd0ae4b5", + "rootCid": "QmTTA2daxGqo5denp6SwLzzkLJm3fuisYEi9CoWsuHpzfb", + "startTime": "1970-01-01T10:00:00+10:00", + "success": false, + "urlPath": "/applesauce" + }, + { + "bandwidth": 300000, + "bytesTransferred": 20000, + "endTime": "1970-01-01T10:00:00.18+10:00", + "indexerCandidatesFiltered": 6, + "indexerCandidatesReceived": 10, + "instanceId": "test-instance", + "protocolSucceeded": "transport-ipfs-gateway-http", + "protocolsAllowed": [ + "transport-ipfs-gateway-http", + "transport-graphsync-filecoinv1", + "transport-bitswap" + ], + "protocolsAttempted": [ + "transport-ipfs-gateway-http", + "transport-graphsync-filecoinv1", + "transport-bitswap" + ], + "retrievalAttempts": { + "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE": { + "protocol": "transport-ipfs-gateway-http", + "timeToFirstByte": "100ms" + }, + "12D3KooWHHzSeKaY8xuZVzkLbKFfvNgPPeKhFBGrMbNzbm5akpqu": { + "error": "failed to dial", + "protocol": "transport-graphsync-filecoinv1" + }, + "Bitswap": { + "protocol": "transport-bitswap", + "timeToFirstByte": "200ms" + } + }, + "retrievalId": "c8490080-b86f-4306-a657-a0b88ac43834", + "rootCid": "QmTTA2daxGqo5denp6SwLzzkLJm3fuisYEi9CoWsuHpzfb", + "startTime": "1970-01-01T10:00:00.02+10:00", + "storageProviderId": "12D3KooWDGBkHBZye7rN6Pz9ihEZrHnggoVRQh6eEtKP4z1K4KeE", + "success": true, + "timeToFirstByte": "80ms", + "timeToFirstIndexerResult": "20ms", + "urlPath": "/applesauce" + } + ] + } \ No newline at end of file