diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e67d0be75..75ef3b1244 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes: ### Added +- add `WithMMapReader` option to `FileManager` + ### Changed - updated to go-libp2p to [v0.37.0](https://github.com/libp2p/go-libp2p/releases/tag/v0.37.0) diff --git a/filestore/filereader.go b/filestore/filereader.go new file mode 100644 index 0000000000..fba3cc942e --- /dev/null +++ b/filestore/filereader.go @@ -0,0 +1,61 @@ +package filestore + +import ( + "io" + "os" + + "golang.org/x/exp/mmap" +) + +type FileReader interface { + io.ReaderAt + io.Closer +} + +var _ FileReader = (*stdReader)(nil) + +type stdReader struct { + f *os.File +} + +// ReadAt implements the FileReader interface. +func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) { + return r.f.ReadAt(p, off) +} + +// Close implements the FileReader interface. +func (r *stdReader) Close() error { + return r.f.Close() +} + +func newStdReader(path string) (FileReader, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + return &stdReader{f: f}, nil +} + +var _ FileReader = (*mmapReader)(nil) + +type mmapReader struct { + m *mmap.ReaderAt +} + +// ReadAt implements the FileReader interface. +func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) { + return r.m.ReadAt(p, off) +} + +// Close implements the FileReader interface. +func (r *mmapReader) Close() error { + return r.m.Close() +} + +func newMmapReader(path string) (FileReader, error) { + m, err := mmap.Open(path) + if err != nil { + return nil, err + } + return &mmapReader{m: m}, nil +} diff --git a/filestore/filestore_test.go b/filestore/filestore_test.go index 9d455193a2..4d17adbe7a 100644 --- a/filestore/filestore_test.go +++ b/filestore/filestore_test.go @@ -18,14 +18,14 @@ import ( var bg = context.Background() -func newTestFilestore(t *testing.T) (string, *Filestore) { +func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) { mds := ds.NewMapDatastore() testdir, err := os.MkdirTemp("", "filestore-test") if err != nil { t.Fatal(err) } - fm := NewFileManager(mds, testdir) + fm := NewFileManager(mds, testdir, option...) fm.AllowFiles = true bs := blockstore.NewBlockstore(mds) @@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) { } func TestBasicFilestore(t *testing.T) { - dir, fs := newTestFilestore(t) - - buf := make([]byte, 1000) - rand.Read(buf) - - fname, err := makeFile(dir, buf) - if err != nil { - t.Fatal(err) - } - - var cids []cid.Cid - for i := 0; i < 100; i++ { - n := &posinfo.FilestoreNode{ - PosInfo: &posinfo.PosInfo{ - FullPath: fname, - Offset: uint64(i * 10), - }, - Node: dag.NewRawNode(buf[i*10 : (i+1)*10]), - } - - err := fs.Put(bg, n) - if err != nil { - t.Fatal(err) - } - cids = append(cids, n.Node.Cid()) - } - - for i, c := range cids { - blk, err := fs.Get(bg, c) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) { - t.Fatal("data didnt match on the way out") - } - } - - kch, err := fs.AllKeysChan(context.Background()) - if err != nil { - t.Fatal(err) - } - - out := make(map[string]struct{}) - for c := range kch { - out[c.KeyString()] = struct{}{} - } - - if len(out) != len(cids) { - t.Fatal("mismatch in number of entries") - } - - for _, c := range cids { - if _, ok := out[c.KeyString()]; !ok { - t.Fatal("missing cid: ", c) - } + cases := []struct { + name string + options []Option + }{ + {"default", nil}, + {"mmap", []Option{WithMMapReader()}}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + dir, fs := newTestFilestore(t, c.options...) + + buf := make([]byte, 1000) + rand.Read(buf) + + fname, err := makeFile(dir, buf) + if err != nil { + t.Fatal(err) + } + + var cids []cid.Cid + for i := 0; i < 100; i++ { + n := &posinfo.FilestoreNode{ + PosInfo: &posinfo.PosInfo{ + FullPath: fname, + Offset: uint64(i * 10), + }, + Node: dag.NewRawNode(buf[i*10 : (i+1)*10]), + } + + err := fs.Put(bg, n) + if err != nil { + t.Fatal(err) + } + cids = append(cids, n.Node.Cid()) + } + + for i, c := range cids { + blk, err := fs.Get(bg, c) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) { + t.Fatal("data didnt match on the way out") + } + } + + kch, err := fs.AllKeysChan(context.Background()) + if err != nil { + t.Fatal(err) + } + + out := make(map[string]struct{}) + for c := range kch { + out[c.KeyString()] = struct{}{} + } + + if len(out) != len(cids) { + t.Fatal("mismatch in number of entries") + } + + for _, c := range cids { + if _, ok := out[c.KeyString()]; !ok { + t.Fatal("missing cid: ", c) + } + } + }) } } diff --git a/filestore/fsrefstore.go b/filestore/fsrefstore.go index 158eadf7a2..eb7f190d07 100644 --- a/filestore/fsrefstore.go +++ b/filestore/fsrefstore.go @@ -25,6 +25,8 @@ import ( // FilestorePrefix identifies the key prefix for FileManager blocks. var FilestorePrefix = ds.NewKey("filestore") +type Option func(*FileManager) + // FileManager is a blockstore implementation which stores special // blocks FilestoreNode type. These nodes only contain a reference // to the actual location of the block data in the filesystem @@ -34,6 +36,7 @@ type FileManager struct { AllowUrls bool ds ds.Batching root string + makeReader func(path string) (FileReader, error) } // CorruptReferenceError implements the error interface. @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string { return c.Err.Error() } +// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O. +// On Windows, when reading and writing to a file simultaneously, the system would consume +// a significant amount of memory due to caching. This memory usage is not reflected in +// the application but in the system. Using memory-mapped files (implemented with +// CreateFileMapping on Windows) avoids this issue. +func WithMMapReader() Option { + return func(f *FileManager) { + f.makeReader = newMmapReader + } +} + // NewFileManager initializes a new file manager with the given // datastore and root. All FilestoreNodes paths are relative to the // root path given here, which is prepended for any operations. -func NewFileManager(ds ds.Batching, root string) *FileManager { - return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root} +func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager { + f := &FileManager{ + ds: dsns.Wrap(ds, FilestorePrefix), + root: root, + makeReader: newStdReader, + } + + for _, option := range options { + option(f) + } + + return f } // AllKeysChan returns a channel from which to read the keys stored in @@ -175,7 +199,7 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er p := filepath.FromSlash(d.GetFilePath()) abspath := filepath.Join(f.root, p) - fi, err := os.Open(abspath) + fi, err := f.makeReader(abspath) if os.IsNotExist(err) { return nil, &CorruptReferenceError{StatusFileNotFound, err} } else if err != nil { @@ -183,13 +207,8 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er } defer fi.Close() - _, err = fi.Seek(int64(d.GetOffset()), io.SeekStart) - if err != nil { - return nil, &CorruptReferenceError{StatusFileError, err} - } - outbuf := make([]byte, d.GetSize_()) - _, err = io.ReadFull(fi, outbuf) + _, err = fi.ReadAt(outbuf, int64(d.GetOffset())) if err == io.EOF || err == io.ErrUnexpectedEOF { return nil, &CorruptReferenceError{StatusFileChanged, err} } else if err != nil { diff --git a/go.mod b/go.mod index c98f3028c9..789c6c8bc6 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( go.opentelemetry.io/otel/trace v1.27.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.8.0 golang.org/x/sys v0.26.0 @@ -183,7 +184,6 @@ require ( go.uber.org/fx v1.23.0 // indirect go.uber.org/mock v0.5.0 // indirect golang.org/x/crypto v0.28.0 // indirect - golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/text v0.19.0 // indirect