From df0a081948bae42764b3e8be10e09c0b77b20ea9 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 9 Oct 2023 20:27:51 +1100 Subject: [PATCH 1/3] feat: --log-file, defaulting to stdout, for standardised http log output --- cmd/booster-http/http_test.go | 121 +++++++++++++++++++++++++++------- cmd/booster-http/run.go | 17 +++++ cmd/booster-http/server.go | 111 ++++++++++++++----------------- go.mod | 2 + go.sum | 2 - 5 files changed, 167 insertions(+), 86 deletions(-) diff --git a/cmd/booster-http/http_test.go b/cmd/booster-http/http_test.go index 3ac13cffc..828d65759 100644 --- a/cmd/booster-http/http_test.go +++ b/cmd/booster-http/http_test.go @@ -8,7 +8,10 @@ import ( "io" "math/rand" "net/http" + "net/url" "os" + "strconv" + "strings" "testing" "time" @@ -32,49 +35,98 @@ const ( ) func TestNewHttpServer(t *testing.T) { + req := require.New(t) + // Create a new mock Http server port, err := testutil.FreePort() - require.NoError(t, err) + req.NoError(err) ctrl := gomock.NewController(t) - httpServer := NewHttpServer("", "0.0.0.0", port, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) + + var requestCount int + serverUpCh := make(chan struct{}) + logHandler := func(ts time.Time, remoteAddr, method string, url url.URL, status int, duration time.Duration, bytes int, compressionRatio, userAgent, msg string) { + select { + case <-serverUpCh: + default: + return + } + + t.Logf("%s %s %s %s %d %s %d %s %s %s", ts.Format(time.RFC3339), remoteAddr, method, url.String(), status, duration, bytes, compressionRatio, userAgent, msg) + requestCount++ + req.Equal("GET", method) + req.Equal("-", compressionRatio) + + switch requestCount { + case 1, 2: + req.Equal("/", url.Path) + req.Equal(http.StatusOK, status) + case 3: + req.Equal("/piece/bafynotacid!", url.Path) + req.Equal(http.StatusBadRequest, status) + req.Contains(msg, "invalid cid") + default: + req.Failf("unexpected request count", "count: %d", requestCount) + } + } + + httpServer := NewHttpServer( + "", + "0.0.0.0", + port, + mocks_booster_http.NewMockHttpServerApi(ctrl), + &HttpServerOptions{ServePieces: true, LogHandler: logHandler}, + ) err = httpServer.Start(context.Background()) - require.NoError(t, err) + req.NoError(err) waitServerUp(t, port) + close(serverUpCh) // Check that server is responding with 200 status code resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", port)) - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) + req.NoError(err) + req.Equal(200, resp.StatusCode) // Create a request with Cors header - req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/", port), nil) - require.NoError(t, err) - req.Header.Add("Origin", "test") + request, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/", port), nil) + req.NoError(err) + request.Header.Add("Origin", "test") client := new(http.Client) - response, err := client.Do(req) - require.NoError(t, err) + response, err := client.Do(request) + req.NoError(err) // Check for Cors header - require.Equal(t, "*", response.Header.Get("Access-Control-Allow-Origin")) + req.Equal("*", response.Header.Get("Access-Control-Allow-Origin")) + + // Test an error condition + request, err = http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/piece/bafynotacid!", port), nil) + req.NoError(err) + response, err = client.Do(request) + req.NoError(err) + if response.StatusCode != http.StatusBadRequest { + body, _ := io.ReadAll(response.Body) + req.Failf("wrong response code not received", "expected %d, got %d; body: [%s]", http.StatusOK, response.StatusCode, string(body)) + } // Stop the server err = httpServer.Stop() - require.NoError(t, err) + req.NoError(err) } func TestHttpGzipResponse(t *testing.T) { + req := require.New(t) + // Create a new mock Http server with custom functions port, err := testutil.FreePort() - require.NoError(t, err) + req.NoError(err) ctrl := gomock.NewController(t) mockHttpServer := mocks_booster_http.NewMockHttpServerApi(ctrl) // Create mock unsealed file for piece/car f, _ := os.Open(testFile) testFileBytes, err := io.ReadAll(f) - require.NoError(t, err) + req.NoError(err) _, err = f.Seek(0, io.SeekStart) - require.NoError(t, err) + req.NoError(err) defer f.Close() // Crate pieceInfo @@ -102,10 +154,10 @@ func TestHttpGzipResponse(t *testing.T) { lsys.TrustedStorage = true entity, err := unixfsgen.Parse("file:1MiB{zero}") - require.NoError(t, err) + req.NoError(err) t.Logf("Generating: %s", entity.Describe("")) rootEnt, err := entity.Generate(lsys, rndReader) - require.NoError(t, err) + req.NoError(err) // mirror of a similar set of tests in frisbii, but includes piece retrieval too testCases := []struct { @@ -165,19 +217,44 @@ func TestHttpGzipResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() + logHandler := func(ts time.Time, remoteAddr, method string, url url.URL, status int, duration time.Duration, bytes int, compressionRatio, userAgent, msg string) { + t.Logf("%s %s %s %s %d %s %d %s %s %s", ts.Format(time.RFC3339), remoteAddr, method, url.String(), status, duration, bytes, compressionRatio, userAgent, msg) + req.Equal("GET", method) + if url.Path == "/" { // waitServerUp + return + } + if strings.HasPrefix(url.Path, "/piece") { + req.Equal("/piece/"+testPieceCid, url.Path) + } else if strings.HasPrefix(url.Path, "/ipfs") { + req.Equal("/ipfs/"+rootEnt.Root.String(), url.Path) + } else { + req.Failf("unexpected url path", "path: %s", url.Path) + } + req.Equal(http.StatusOK, status) + if tc.expectGzip { + req.NotEqual("-", compressionRatio, "compression ratio should be set for %s", url.Path) + // convert compressionRatio string to a float64 + compressionRatio, err := strconv.ParseFloat(compressionRatio, 64) + req.NoError(err) + req.True(compressionRatio > 10, "compression ratio (%s) should be > 10 for %s", compressionRatio, url.Path) // it's all zeros + } + } + httpServer := NewHttpServer("", "0.0.0.0", port, mockHttpServer, &HttpServerOptions{ ServePieces: true, ServeTrustless: true, CompressionLevel: tc.serverCompressionLevel, Blockstore: testutil.NewLinkSystemBlockstore(lsys), + LogWriter: os.Stderr, + LogHandler: logHandler, }) err = httpServer.Start(ctx) - require.NoError(t, err) + req.NoError(err) waitServerUp(t, port) defer func() { // Stop the server err = httpServer.Stop() - require.NoError(t, err) + req.NoError(err) }() { // test /piece retrieval @@ -190,7 +267,7 @@ func TestHttpGzipResponse(t *testing.T) { request = request.WithContext(ctx) client := &http.Client{Transport: &http.Transport{DisableCompression: tc.noClientCompression}} response, err := client.Do(request) - require.NoError(t, err) + req.NoError(err) defer response.Body.Close() if response.StatusCode != http.StatusOK { @@ -217,10 +294,10 @@ func TestHttpGzipResponse(t *testing.T) { // Get the uncompressed bytes out, err := io.ReadAll(rdr) - require.NoError(t, err) + req.NoError(err) // Compare bytes from original file to uncompressed http response - require.Equal(t, testFileBytes, out) + req.Equal(testFileBytes, out) } { // test /ipfs CAR retrieval diff --git a/cmd/booster-http/run.go b/cmd/booster-http/run.go index 45ed1075d..4c488d828 100644 --- a/cmd/booster-http/run.go +++ b/cmd/booster-http/run.go @@ -115,6 +115,11 @@ var runCmd = &cli.Command{ "a reverse proxy with compression", Value: gzip.BestSpeed, }, + &cli.StringFlag{ + Name: "log-file", + Usage: "path to file to append HTTP request and error logs to, defaults to stdout (-)", + Value: "-", + }, &cli.BoolFlag{ Name: "tracing", Usage: "enables tracing of booster-http calls", @@ -243,6 +248,18 @@ var runCmd = &cli.Command{ filtered := filters.NewFilteredBlockstore(rbs, multiFilter) opts.Blockstore = filtered } + + switch cctx.String("log-file") { + case "": + case "-": + opts.LogWriter = cctx.App.Writer + default: + opts.LogWriter, err = os.OpenFile(cctx.String("log-file"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + } + sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa} server := NewHttpServer( cctx.String("base-path"), diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go index 230856847..33dc7a50d 100644 --- a/cmd/booster-http/server.go +++ b/cmd/booster-http/server.go @@ -13,7 +13,6 @@ import ( "time" "github.com/NYTimes/gziphandler" - "github.com/fatih/color" "github.com/filecoin-project/boost-gfm/retrievalmarket" "github.com/filecoin-project/boost-graphsync/storeutil" "github.com/filecoin-project/boost/metrics" @@ -68,6 +67,8 @@ type HttpServerOptions struct { ServePieces bool ServeTrustless bool CompressionLevel int + LogWriter io.Writer // for a standardised log write format + LogHandler frisbii.LogHandler // for more granular control over log output } func NewHttpServer(path string, listenAddr string, port int, api HttpServerApi, opts *HttpServerOptions) *HttpServer { @@ -128,8 +129,10 @@ func (s *HttpServer) Start(ctx context.Context) error { handler.HandleFunc("/info", s.handleInfo) handler.Handle("/metrics", metrics.Exporter("booster_http")) // metrics s.server = &http.Server{ - Addr: fmt.Sprintf("%s:%d", s.listenAddr, s.port), - Handler: c.Handler(handler), + Addr: fmt.Sprintf("%s:%d", s.listenAddr, s.port), + Handler: c.Handler( + frisbii.NewLogMiddleware(handler, frisbii.WithLogWriter(s.opts.LogWriter), frisbii.WithLogHandler(s.opts.LogHandler)), + ), // This context will be the parent of the context associated with all // incoming requests BaseContext: func(listener net.Listener) context.Context { @@ -174,8 +177,7 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { // Remove the path up to the piece cid prefixLen := len(s.pieceBasePath()) if len(r.URL.Path) <= prefixLen { - msg := fmt.Sprintf("path '%s' is missing piece CID", r.URL.Path) - writeError(w, r, http.StatusBadRequest, msg) + writeError(w, r, http.StatusBadRequest, fmt.Errorf("path '%s' is missing piece CID", r.URL.Path)) stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) return } @@ -183,8 +185,7 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { pieceCidStr := r.URL.Path[prefixLen:] pieceCid, err := cid.Parse(pieceCidStr) if err != nil { - msg := fmt.Sprintf("parsing piece CID '%s': %s", pieceCidStr, err.Error()) - writeError(w, r, http.StatusBadRequest, msg) + writeError(w, r, http.StatusBadRequest, fmt.Errorf("parsing piece CID '%s': %s", pieceCidStr, err.Error())) stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) return } @@ -193,13 +194,11 @@ func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { content, err := s.getPieceContent(ctx, pieceCid) if err != nil { if isNotFoundError(err) { - writeError(w, r, http.StatusNotFound, err.Error()) + writeError(w, r, http.StatusNotFound, err) stats.Record(ctx, metrics.HttpPieceByCid404ResponseCount.M(1)) return } - log.Errorf("getting content for piece %s: %s", pieceCid, err) - msg := fmt.Sprintf("server error getting content for piece CID %s", pieceCid) - writeError(w, r, http.StatusInternalServerError, msg) + writeError(w, r, http.StatusInternalServerError, fmt.Errorf("server error getting content for piece CID %s: %s", pieceCid, err.Error())) stats.Record(ctx, metrics.HttpPieceByCid500ResponseCount.M(1)) return } @@ -234,45 +233,22 @@ func setHeaders(w http.ResponseWriter, pieceCid cid.Cid, isGzipped bool) { w.Header().Set("Cache-Control", "public, max-age=29030400, immutable") } -func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, isGzipped bool) { - var writer http.ResponseWriter - +func serveContent(res http.ResponseWriter, req *http.Request, content io.ReadSeeker, isGzipped bool) { // http.ServeContent ignores errors when writing to the stream, so we // replace the writer with a class that watches for errors - var err error - writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) { - err = e - }} - - writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas() + res = newPieceAccountingWriter(res, toLoggingResponseWriter(res)) // Note that the last modified time is a constant value because the data // in a piece identified by a cid will never change. - start := time.Now() - if r.Method == "HEAD" { + if req.Method == "HEAD" { // For an HTTP HEAD request ServeContent doesn't send any data (just headers) - http.ServeContent(writer, r, "", time.Time{}, content) - alog("%s\tHEAD %s", color.New(color.FgGreen).Sprintf("%d", http.StatusOK), r.URL) + http.ServeContent(res, req, "", time.Time{}, content) return } // Send the content - http.ServeContent(writer, r, "", lastModified, content) - - // Write a line to the log - end := time.Now() - completeMsg := fmt.Sprintf("GET %s\n%s - %s: %s / %s bytes transferred", - r.URL, end.Format(timeFmt), start.Format(timeFmt), time.Since(start), addCommas(writeErrWatcher.count)) - if isGzipped { - completeMsg += " (gzipped)" - } - if err == nil { - alogAt(end, "%s\t%s", color.New(color.FgGreen).Sprint("DONE"), completeMsg) - } else { - alogAt(end, "%s\t%s\n%s", - color.New(color.FgRed).Sprint("FAIL"), completeMsg, err) - } + http.ServeContent(res, req, "", lastModified, content) } // isNotFoundError falls back to checking the error string for "not found". @@ -291,11 +267,14 @@ func isNotFoundError(err error) bool { return strings.Contains(strings.ToLower(err.Error()), "not found") } -func writeError(w http.ResponseWriter, r *http.Request, status int, msg string) { - w.WriteHeader(status) - w.Write([]byte("Error: " + msg)) //nolint:errcheck - alog("%s\tGET %s\n%s", - color.New(color.FgRed).Sprintf("%d", status), r.URL, msg) +func writeError(w http.ResponseWriter, r *http.Request, status int, msg error) { + log.Warnf("error handling request [%s]: %s", r.URL.String(), msg.Error()) + if lrw := toLoggingResponseWriter(w); lrw != nil { + lrw.LogError(status, msg) // will log the lowest wrapped error, so %w errors are isolated + } else { + log.Error("no logging response writer to report to") + http.Error(w, msg.Error(), status) + } } func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.ReadSeeker, error) { @@ -378,28 +357,36 @@ func (s *HttpServer) unsealedDeal(ctx context.Context, pieceCid cid.Cid, pieceDe len(pieceDeals), pieceCid, sealedCount, len(pieceDeals)-sealedCount, dealSectors, allErr) } -// writeErrorWatcher calls onError if there is an error writing to the writer -type writeErrorWatcher struct { - http.ResponseWriter - count uint64 - onError func(err error) -} - -func (w *writeErrorWatcher) Write(bz []byte) (int, error) { - count, err := w.ResponseWriter.Write(bz) - if err != nil { - w.onError(err) +func toLoggingResponseWriter(res http.ResponseWriter) *frisbii.LoggingResponseWriter { + switch lrw := res.(type) { + case *frisbii.LoggingResponseWriter: + return lrw + case *gziphandler.GzipResponseWriter: + if lrw, ok := lrw.ResponseWriter.(*frisbii.LoggingResponseWriter); ok { + return lrw + } } - w.count += uint64(count) - return count, err + return nil } -const timeFmt = "2006-01-02T15:04:05.000Z0700" +// pieceAccountingWriter reports the number of bytes written to a +// LoggingResponseWriter so the compression ratio can be calculated. +type pieceAccountingWriter struct { + http.ResponseWriter + lrw *frisbii.LoggingResponseWriter +} -func alog(l string, args ...interface{}) { - alogAt(time.Now(), l, args...) +func newPieceAccountingWriter( + w http.ResponseWriter, + lrw *frisbii.LoggingResponseWriter, +) *pieceAccountingWriter { + return &pieceAccountingWriter{ResponseWriter: w, lrw: lrw} } -func alogAt(at time.Time, l string, args ...interface{}) { - fmt.Printf(at.Format(timeFmt)+"\t"+l+"\n", args...) +func (w *pieceAccountingWriter) Write(bz []byte) (int, error) { + count, err := w.ResponseWriter.Write(bz) + if w.lrw != nil { + w.lrw.WroteBytes(count) + } + return count, err } diff --git a/go.mod b/go.mod index 46bc41966..3657e7fff 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,8 @@ replace github.com/filecoin-project/boostd-data => ./extern/boostd-data // replace github.com/filecoin-project/boost-gfm => ../boost-gfm +replace github.com/ipld/frisbii => ../../ipld/frisbii + require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/BurntSushi/toml v1.3.2 diff --git a/go.sum b/go.sum index b68a4d0de..f9d13dd6c 100644 --- a/go.sum +++ b/go.sum @@ -898,8 +898,6 @@ github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvT github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipfs/kubo v0.22.0 h1:HxYkvtFqlF+qQMTxHW+xBhrIWykWm+WbEuQpw1d67mM= github.com/ipfs/kubo v0.22.0/go.mod h1:Sn3hp55POjH9Ni0lEd/+smXpkZ0J1gKlm0Fx+E1LE60= -github.com/ipld/frisbii v0.4.0 h1:lZAD47T+MhmqYNF1/fENxo+VaM0JXjS73/UUO2cuFuU= -github.com/ipld/frisbii v0.4.0/go.mod h1:Gwj+LVuIwgHfeRMlar2luIy+PTK1SIy1QMA0hKiQ/J0= github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g= github.com/ipld/go-car v0.6.1 h1:blWbEHf1j62JMWFIqWE//YR0m7k5ZMw0AuUOU5hjrH8= github.com/ipld/go-car v0.6.1/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= From 39882eb293f124d05d45a82bcf159beaae0f2694 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 9 Oct 2023 20:55:30 +1100 Subject: [PATCH 2/3] chore: split out piece handler code --- cmd/booster-http/piecehandler.go | 255 +++++++++++++++++++++++++++++++ cmd/booster-http/server.go | 238 +---------------------------- 2 files changed, 256 insertions(+), 237 deletions(-) create mode 100644 cmd/booster-http/piecehandler.go diff --git a/cmd/booster-http/piecehandler.go b/cmd/booster-http/piecehandler.go new file mode 100644 index 000000000..c34f9cb27 --- /dev/null +++ b/cmd/booster-http/piecehandler.go @@ -0,0 +1,255 @@ +package main + +import ( + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/NYTimes/gziphandler" + "github.com/filecoin-project/boost-gfm/retrievalmarket" + "github.com/filecoin-project/boost/metrics" + "github.com/filecoin-project/boostd-data/model" + "github.com/filecoin-project/boostd-data/shared/tracing" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipld/frisbii" + "go.opencensus.io/stats" +) + +func (s *HttpServer) pieceHandler() http.HandlerFunc { + var pieceHandler http.Handler = http.HandlerFunc(s.handleByPieceCid) + if s.opts.CompressionLevel != gzip.NoCompression { + gzipWrapper := gziphandler.MustNewGzipLevelHandler(s.opts.CompressionLevel) + pieceHandler = gzipWrapper(pieceHandler) + log.Debugf("enabling compression with a level of %d", s.opts.CompressionLevel) + } + return pieceHandler.ServeHTTP +} + +func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + ctx, span := tracing.Tracer.Start(r.Context(), "http.piece_cid") + defer span.End() + stats.Record(ctx, metrics.HttpPieceByCidRequestCount.M(1)) + + // Remove the path up to the piece cid + prefixLen := len(s.pieceBasePath()) + if len(r.URL.Path) <= prefixLen { + writeError(w, r, http.StatusBadRequest, fmt.Errorf("path '%s' is missing piece CID", r.URL.Path)) + stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) + return + } + + pieceCidStr := r.URL.Path[prefixLen:] + pieceCid, err := cid.Parse(pieceCidStr) + if err != nil { + writeError(w, r, http.StatusBadRequest, fmt.Errorf("parsing piece CID '%s': %s", pieceCidStr, err.Error())) + stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) + return + } + + // Get a reader over the piece + content, err := s.getPieceContent(ctx, pieceCid) + if err != nil { + if isNotFoundError(err) { + writeError(w, r, http.StatusNotFound, err) + stats.Record(ctx, metrics.HttpPieceByCid404ResponseCount.M(1)) + return + } + writeError(w, r, http.StatusInternalServerError, fmt.Errorf("server error getting content for piece CID %s: %s", pieceCid, err.Error())) + stats.Record(ctx, metrics.HttpPieceByCid500ResponseCount.M(1)) + return + } + + setHeaders(w, pieceCid) + serveContent(w, r, content) + + stats.Record(ctx, metrics.HttpPieceByCid200ResponseCount.M(1)) + stats.Record(ctx, metrics.HttpPieceByCidRequestDuration.M(float64(time.Since(startTime).Milliseconds()))) +} + +func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.ReadSeeker, error) { + // Get the deals for the piece + pieceDeals, err := s.api.GetPieceDeals(ctx, pieceCid) + if err != nil { + return nil, fmt.Errorf("getting sector info for piece %s: %w", pieceCid, err) + } + + // Get the first unsealed deal + di, err := s.unsealedDeal(ctx, pieceCid, pieceDeals) + if err != nil { + return nil, fmt.Errorf("getting unsealed CAR file: %w", err) + } + + // Get the raw piece data from the sector + pieceReader, err := s.api.UnsealSectorAt(ctx, di.MinerAddr, di.SectorID, di.PieceOffset.Unpadded(), di.PieceLength.Unpadded()) + if err != nil { + return nil, fmt.Errorf("getting raw data from sector %d: %w", di.SectorID, err) + } + + return pieceReader, nil +} + +func isGzipped(res http.ResponseWriter) bool { + switch res.(type) { + case *gziphandler.GzipResponseWriter, gziphandler.GzipResponseWriterWithCloseNotify: + // there are conditions where we may have a GzipResponseWriter but the + // response will not be compressed, but they are related to very small + // response sizes so this shouldn't matter (much) + return true + } + return false +} + +func setHeaders(w http.ResponseWriter, pieceCid cid.Cid) { + w.Header().Set("Vary", "Accept-Encoding") + etag := `"` + pieceCid.String() + `"` // must be quoted + if isGzipped(w) { + etag = etag[:len(etag)-1] + ".gz\"" + } + w.Header().Set("Etag", etag) + w.Header().Set("Content-Type", "application/piece") + w.Header().Set("Cache-Control", "public, max-age=29030400, immutable") +} + +func serveContent(res http.ResponseWriter, req *http.Request, content io.ReadSeeker) { + // http.ServeContent ignores errors when writing to the stream, so we + // replace the writer with a class that watches for errors + res = newPieceAccountingWriter(res, toLoggingResponseWriter(res)) + + // Note that the last modified time is a constant value because the data + // in a piece identified by a cid will never change. + + if req.Method == "HEAD" { + // For an HTTP HEAD request ServeContent doesn't send any data (just headers) + http.ServeContent(res, req, "", time.Time{}, content) + return + } + + // Send the content + http.ServeContent(res, req, "", lastModified, content) +} + +// isNotFoundError falls back to checking the error string for "not found". +// Unfortunately we can't always use errors.Is() because the error might +// have crossed an RPC boundary. +func isNotFoundError(err error) bool { + if errors.Is(err, ErrNotFound) { + return true + } + if errors.Is(err, datastore.ErrNotFound) { + return true + } + if errors.Is(err, retrievalmarket.ErrNotFound) { + return true + } + return strings.Contains(strings.ToLower(err.Error()), "not found") +} + +func writeError(w http.ResponseWriter, r *http.Request, status int, msg error) { + log.Warnf("error handling request [%s]: %s", r.URL.String(), msg.Error()) + if lrw := toLoggingResponseWriter(w); lrw != nil { + lrw.LogError(status, msg) // will log the lowest wrapped error, so %w errors are isolated + } else { + log.Error("no logging response writer to report to") + http.Error(w, msg.Error(), status) + } +} + +func (s *HttpServer) unsealedDeal(ctx context.Context, pieceCid cid.Cid, pieceDeals []model.DealInfo) (*model.DealInfo, error) { + // There should always be deals in the PieceInfo, but check just in case + if len(pieceDeals) == 0 { + return nil, fmt.Errorf("there are no deals containing piece %s: %w", pieceCid, ErrNotFound) + } + + // The same piece can be in many deals. Find the first unsealed deal. + sealedCount := 0 + var allErr error + for _, di := range pieceDeals { + isUnsealed, err := s.api.IsUnsealed(ctx, di.MinerAddr, di.SectorID, di.PieceOffset.Unpadded(), di.PieceLength.Unpadded()) + if err != nil { + allErr = multierror.Append(allErr, err) + continue + } + if isUnsealed { + // Found a deal with an unsealed piece, so return the deal info + return &di, nil + } + sealedCount++ + } + + // It wasn't possible to find a deal with the piece cid that is unsealed. + // Try to return an error message with as much useful information as possible + dealSectors := make([]string, 0, len(pieceDeals)) + for _, di := range pieceDeals { + if di.IsDirectDeal { + dealSectors = append(dealSectors, fmt.Sprintf("Allocation %d: Sector %d", di.ChainDealID, di.SectorID)) + } else { + dealSectors = append(dealSectors, fmt.Sprintf("Deal %d: Sector %d", di.ChainDealID, di.SectorID)) + } + + } + + if allErr == nil { + dealSectorsErr := fmt.Errorf("%s: %w", strings.Join(dealSectors, ", "), ErrNotFound) + return nil, fmt.Errorf("checked unsealed status of %d deals containing piece %s: none are unsealed: %w", + len(pieceDeals), pieceCid, dealSectorsErr) + } + + if len(pieceDeals) == 1 { + if pieceDeals[0].IsDirectDeal { + return nil, fmt.Errorf("checking unsealed status of allocation %d (sector %d) containing piece %s: %w", + pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr) + } + return nil, fmt.Errorf("checking unsealed status of deal %d (sector %d) containing piece %s: %w", + pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr) + } + + if sealedCount == 0 { + return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s: %s: %w", + len(pieceDeals), pieceCid, dealSectors, allErr) + } + + return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s - %d are sealed, %d had errors: %s: %w", + len(pieceDeals), pieceCid, sealedCount, len(pieceDeals)-sealedCount, dealSectors, allErr) +} + +func toLoggingResponseWriter(res http.ResponseWriter) *frisbii.LoggingResponseWriter { + switch lrw := res.(type) { + case *frisbii.LoggingResponseWriter: + return lrw + case *gziphandler.GzipResponseWriter: + if lrw, ok := lrw.ResponseWriter.(*frisbii.LoggingResponseWriter); ok { + return lrw + } + } + return nil +} + +// pieceAccountingWriter reports the number of bytes written to a +// LoggingResponseWriter so the compression ratio can be calculated. +type pieceAccountingWriter struct { + http.ResponseWriter + lrw *frisbii.LoggingResponseWriter +} + +func newPieceAccountingWriter( + w http.ResponseWriter, + lrw *frisbii.LoggingResponseWriter, +) *pieceAccountingWriter { + return &pieceAccountingWriter{ResponseWriter: w, lrw: lrw} +} + +func (w *pieceAccountingWriter) Write(bz []byte) (int, error) { + count, err := w.ResponseWriter.Write(bz) + if w.lrw != nil { + w.lrw.WroteBytes(count) + } + return count, err +} diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go index 33dc7a50d..9193859b2 100644 --- a/cmd/booster-http/server.go +++ b/cmd/booster-http/server.go @@ -9,25 +9,18 @@ import ( "io" "net" "net/http" - "strings" "time" - "github.com/NYTimes/gziphandler" - "github.com/filecoin-project/boost-gfm/retrievalmarket" "github.com/filecoin-project/boost-graphsync/storeutil" "github.com/filecoin-project/boost/metrics" "github.com/filecoin-project/boostd-data/model" - "github.com/filecoin-project/boostd-data/shared/tracing" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - "github.com/hashicorp/go-multierror" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" "github.com/ipld/frisbii" "github.com/rs/cors" - "go.opencensus.io/stats" ) //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_booster_http.go -package=mocks_booster_http -source=server.go HttpServerApi,serverApi @@ -104,13 +97,7 @@ func (s *HttpServer) Start(ctx context.Context) error { handler := http.NewServeMux() if s.opts.ServePieces { - var pieceHandler http.Handler = http.HandlerFunc(s.handleByPieceCid) - if s.opts.CompressionLevel != gzip.NoCompression { - gzipWrapper := gziphandler.MustNewGzipLevelHandler(s.opts.CompressionLevel) - pieceHandler = gzipWrapper(pieceHandler) - log.Debugf("enabling compression with a level of %d", s.opts.CompressionLevel) - } - handler.HandleFunc(s.pieceBasePath(), pieceHandler.ServeHTTP) + handler.HandleFunc(s.pieceBasePath(), s.pieceHandler()) } if s.opts.ServeTrustless { @@ -167,226 +154,3 @@ func (s *HttpServer) handleInfo(w http.ResponseWriter, r *http.Request) { } json.NewEncoder(w).Encode(v) //nolint:errcheck } - -func (s *HttpServer) handleByPieceCid(w http.ResponseWriter, r *http.Request) { - startTime := time.Now() - ctx, span := tracing.Tracer.Start(r.Context(), "http.piece_cid") - defer span.End() - stats.Record(ctx, metrics.HttpPieceByCidRequestCount.M(1)) - - // Remove the path up to the piece cid - prefixLen := len(s.pieceBasePath()) - if len(r.URL.Path) <= prefixLen { - writeError(w, r, http.StatusBadRequest, fmt.Errorf("path '%s' is missing piece CID", r.URL.Path)) - stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) - return - } - - pieceCidStr := r.URL.Path[prefixLen:] - pieceCid, err := cid.Parse(pieceCidStr) - if err != nil { - writeError(w, r, http.StatusBadRequest, fmt.Errorf("parsing piece CID '%s': %s", pieceCidStr, err.Error())) - stats.Record(ctx, metrics.HttpPieceByCid400ResponseCount.M(1)) - return - } - - // Get a reader over the piece - content, err := s.getPieceContent(ctx, pieceCid) - if err != nil { - if isNotFoundError(err) { - writeError(w, r, http.StatusNotFound, err) - stats.Record(ctx, metrics.HttpPieceByCid404ResponseCount.M(1)) - return - } - writeError(w, r, http.StatusInternalServerError, fmt.Errorf("server error getting content for piece CID %s: %s", pieceCid, err.Error())) - stats.Record(ctx, metrics.HttpPieceByCid500ResponseCount.M(1)) - return - } - - isGzipped := isGzipped(w) - setHeaders(w, pieceCid, isGzipped) - serveContent(w, r, content, isGzipped) - - stats.Record(ctx, metrics.HttpPieceByCid200ResponseCount.M(1)) - stats.Record(ctx, metrics.HttpPieceByCidRequestDuration.M(float64(time.Since(startTime).Milliseconds()))) -} - -func isGzipped(res http.ResponseWriter) bool { - switch res.(type) { - case *gziphandler.GzipResponseWriter, gziphandler.GzipResponseWriterWithCloseNotify: - // there are conditions where we may have a GzipResponseWriter but the - // response will not be compressed, but they are related to very small - // response sizes so this shouldn't matter (much) - return true - } - return false -} - -func setHeaders(w http.ResponseWriter, pieceCid cid.Cid, isGzipped bool) { - w.Header().Set("Vary", "Accept-Encoding") - etag := `"` + pieceCid.String() + `"` // must be quoted - if isGzipped { - etag = etag[:len(etag)-1] + ".gz\"" - } - w.Header().Set("Etag", etag) - w.Header().Set("Content-Type", "application/piece") - w.Header().Set("Cache-Control", "public, max-age=29030400, immutable") -} - -func serveContent(res http.ResponseWriter, req *http.Request, content io.ReadSeeker, isGzipped bool) { - // http.ServeContent ignores errors when writing to the stream, so we - // replace the writer with a class that watches for errors - res = newPieceAccountingWriter(res, toLoggingResponseWriter(res)) - - // Note that the last modified time is a constant value because the data - // in a piece identified by a cid will never change. - - if req.Method == "HEAD" { - // For an HTTP HEAD request ServeContent doesn't send any data (just headers) - http.ServeContent(res, req, "", time.Time{}, content) - return - } - - // Send the content - http.ServeContent(res, req, "", lastModified, content) -} - -// isNotFoundError falls back to checking the error string for "not found". -// Unfortunately we can't always use errors.Is() because the error might -// have crossed an RPC boundary. -func isNotFoundError(err error) bool { - if errors.Is(err, ErrNotFound) { - return true - } - if errors.Is(err, datastore.ErrNotFound) { - return true - } - if errors.Is(err, retrievalmarket.ErrNotFound) { - return true - } - return strings.Contains(strings.ToLower(err.Error()), "not found") -} - -func writeError(w http.ResponseWriter, r *http.Request, status int, msg error) { - log.Warnf("error handling request [%s]: %s", r.URL.String(), msg.Error()) - if lrw := toLoggingResponseWriter(w); lrw != nil { - lrw.LogError(status, msg) // will log the lowest wrapped error, so %w errors are isolated - } else { - log.Error("no logging response writer to report to") - http.Error(w, msg.Error(), status) - } -} - -func (s *HttpServer) getPieceContent(ctx context.Context, pieceCid cid.Cid) (io.ReadSeeker, error) { - // Get the deals for the piece - pieceDeals, err := s.api.GetPieceDeals(ctx, pieceCid) - if err != nil { - return nil, fmt.Errorf("getting sector info for piece %s: %w", pieceCid, err) - } - - // Get the first unsealed deal - di, err := s.unsealedDeal(ctx, pieceCid, pieceDeals) - if err != nil { - return nil, fmt.Errorf("getting unsealed CAR file: %w", err) - } - - // Get the raw piece data from the sector - pieceReader, err := s.api.UnsealSectorAt(ctx, di.MinerAddr, di.SectorID, di.PieceOffset.Unpadded(), di.PieceLength.Unpadded()) - if err != nil { - return nil, fmt.Errorf("getting raw data from sector %d: %w", di.SectorID, err) - } - - return pieceReader, nil -} - -func (s *HttpServer) unsealedDeal(ctx context.Context, pieceCid cid.Cid, pieceDeals []model.DealInfo) (*model.DealInfo, error) { - // There should always be deals in the PieceInfo, but check just in case - if len(pieceDeals) == 0 { - return nil, fmt.Errorf("there are no deals containing piece %s: %w", pieceCid, ErrNotFound) - } - - // The same piece can be in many deals. Find the first unsealed deal. - sealedCount := 0 - var allErr error - for _, di := range pieceDeals { - isUnsealed, err := s.api.IsUnsealed(ctx, di.MinerAddr, di.SectorID, di.PieceOffset.Unpadded(), di.PieceLength.Unpadded()) - if err != nil { - allErr = multierror.Append(allErr, err) - continue - } - if isUnsealed { - // Found a deal with an unsealed piece, so return the deal info - return &di, nil - } - sealedCount++ - } - - // It wasn't possible to find a deal with the piece cid that is unsealed. - // Try to return an error message with as much useful information as possible - dealSectors := make([]string, 0, len(pieceDeals)) - for _, di := range pieceDeals { - if di.IsDirectDeal { - dealSectors = append(dealSectors, fmt.Sprintf("Allocation %d: Sector %d", di.ChainDealID, di.SectorID)) - } else { - dealSectors = append(dealSectors, fmt.Sprintf("Deal %d: Sector %d", di.ChainDealID, di.SectorID)) - } - - } - - if allErr == nil { - dealSectorsErr := fmt.Errorf("%s: %w", strings.Join(dealSectors, ", "), ErrNotFound) - return nil, fmt.Errorf("checked unsealed status of %d deals containing piece %s: none are unsealed: %w", - len(pieceDeals), pieceCid, dealSectorsErr) - } - - if len(pieceDeals) == 1 { - if pieceDeals[0].IsDirectDeal { - return nil, fmt.Errorf("checking unsealed status of allocation %d (sector %d) containing piece %s: %w", - pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr) - } - return nil, fmt.Errorf("checking unsealed status of deal %d (sector %d) containing piece %s: %w", - pieceDeals[0].ChainDealID, pieceDeals[0].SectorID, pieceCid, allErr) - } - - if sealedCount == 0 { - return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s: %s: %w", - len(pieceDeals), pieceCid, dealSectors, allErr) - } - - return nil, fmt.Errorf("checking unsealed status of %d deals containing piece %s - %d are sealed, %d had errors: %s: %w", - len(pieceDeals), pieceCid, sealedCount, len(pieceDeals)-sealedCount, dealSectors, allErr) -} - -func toLoggingResponseWriter(res http.ResponseWriter) *frisbii.LoggingResponseWriter { - switch lrw := res.(type) { - case *frisbii.LoggingResponseWriter: - return lrw - case *gziphandler.GzipResponseWriter: - if lrw, ok := lrw.ResponseWriter.(*frisbii.LoggingResponseWriter); ok { - return lrw - } - } - return nil -} - -// pieceAccountingWriter reports the number of bytes written to a -// LoggingResponseWriter so the compression ratio can be calculated. -type pieceAccountingWriter struct { - http.ResponseWriter - lrw *frisbii.LoggingResponseWriter -} - -func newPieceAccountingWriter( - w http.ResponseWriter, - lrw *frisbii.LoggingResponseWriter, -) *pieceAccountingWriter { - return &pieceAccountingWriter{ResponseWriter: w, lrw: lrw} -} - -func (w *pieceAccountingWriter) Write(bz []byte) (int, error) { - count, err := w.ResponseWriter.Write(bz) - if w.lrw != nil { - w.lrw.WroteBytes(count) - } - return count, err -} From a5534752daf9fa4d1284ad49c7b28ffcdc0caa74 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 9 Oct 2023 20:57:53 +1100 Subject: [PATCH 3/3] chore: unreplace frisbii --- go.mod | 2 -- go.sum | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 3657e7fff..46bc41966 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,6 @@ replace github.com/filecoin-project/boostd-data => ./extern/boostd-data // replace github.com/filecoin-project/boost-gfm => ../boost-gfm -replace github.com/ipld/frisbii => ../../ipld/frisbii - require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/BurntSushi/toml v1.3.2 diff --git a/go.sum b/go.sum index f9d13dd6c..b68a4d0de 100644 --- a/go.sum +++ b/go.sum @@ -898,6 +898,8 @@ github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvT github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipfs/kubo v0.22.0 h1:HxYkvtFqlF+qQMTxHW+xBhrIWykWm+WbEuQpw1d67mM= github.com/ipfs/kubo v0.22.0/go.mod h1:Sn3hp55POjH9Ni0lEd/+smXpkZ0J1gKlm0Fx+E1LE60= +github.com/ipld/frisbii v0.4.0 h1:lZAD47T+MhmqYNF1/fENxo+VaM0JXjS73/UUO2cuFuU= +github.com/ipld/frisbii v0.4.0/go.mod h1:Gwj+LVuIwgHfeRMlar2luIy+PTK1SIy1QMA0hKiQ/J0= github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g= github.com/ipld/go-car v0.6.1 h1:blWbEHf1j62JMWFIqWE//YR0m7k5ZMw0AuUOU5hjrH8= github.com/ipld/go-car v0.6.1/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8=