Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filestore): add mmap reader option #665

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following emojis are used to highlight certain changes:
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`. [#720](https://github.com/ipfs/boxo/pull/720)
- `routing/http/server`: exposes Prometheus metrics on `prometheus.DefaultRegisterer` and a custom one can be provided via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `filestore`: add `WithMMapReader` option to `FileManager`
lidel marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
61 changes: 61 additions & 0 deletions filestore/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package filestore

import (
"io"
"os"

"golang.org/x/exp/mmap"
)

type FileReader interface {
io.ReaderAt
io.Closer
}

var _ FileReader = (*stdReader)(nil)

type stdReader struct {
f *os.File
}

// ReadAt implements the FileReader interface.
func (r *stdReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.f.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *stdReader) Close() error {
return r.f.Close()
}

func newStdReader(path string) (FileReader, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
return &stdReader{f: f}, nil
}

var _ FileReader = (*mmapReader)(nil)

type mmapReader struct {
m *mmap.ReaderAt
}

// ReadAt implements the FileReader interface.
func (r *mmapReader) ReadAt(p []byte, off int64) (n int, err error) {
return r.m.ReadAt(p, off)
}

// Close implements the FileReader interface.
func (r *mmapReader) Close() error {
return r.m.Close()
}

func newMmapReader(path string) (FileReader, error) {
m, err := mmap.Open(path)
if err != nil {
return nil, err
}
return &mmapReader{m: m}, nil
}
128 changes: 70 additions & 58 deletions filestore/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (

var bg = context.Background()

func newTestFilestore(t *testing.T) (string, *Filestore) {
func newTestFilestore(t *testing.T, option ...Option) (string, *Filestore) {
mds := ds.NewMapDatastore()

testdir, err := os.MkdirTemp("", "filestore-test")
if err != nil {
t.Fatal(err)
}
fm := NewFileManager(mds, testdir)
fm := NewFileManager(mds, testdir, option...)
fm.AllowFiles = true

bs := blockstore.NewBlockstore(mds)
Expand All @@ -48,62 +48,74 @@ func makeFile(dir string, data []byte) (string, error) {
}

func TestBasicFilestore(t *testing.T) {
dir, fs := newTestFilestore(t)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
cases := []struct {
name string
options []Option
}{
{"default", nil},
{"mmap", []Option{WithMMapReader()}},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dir, fs := newTestFilestore(t, c.options...)

buf := make([]byte, 1000)
rand.Read(buf)

fname, err := makeFile(dir, buf)
if err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for i := 0; i < 100; i++ {
n := &posinfo.FilestoreNode{
PosInfo: &posinfo.PosInfo{
FullPath: fname,
Offset: uint64(i * 10),
},
Node: dag.NewRawNode(buf[i*10 : (i+1)*10]),
}

err := fs.Put(bg, n)
if err != nil {
t.Fatal(err)
}
cids = append(cids, n.Node.Cid())
}

for i, c := range cids {
blk, err := fs.Get(bg, c)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(blk.RawData(), buf[i*10:(i+1)*10]) {
t.Fatal("data didnt match on the way out")
}
}

kch, err := fs.AllKeysChan(context.Background())
if err != nil {
t.Fatal(err)
}

out := make(map[string]struct{})
for c := range kch {
out[c.KeyString()] = struct{}{}
}

if len(out) != len(cids) {
t.Fatal("mismatch in number of entries")
}

for _, c := range cids {
if _, ok := out[c.KeyString()]; !ok {
t.Fatal("missing cid: ", c)
}
}
})
}
}

Expand Down
37 changes: 28 additions & 9 deletions filestore/fsrefstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// FilestorePrefix identifies the key prefix for FileManager blocks.
var FilestorePrefix = ds.NewKey("filestore")

type Option func(*FileManager)

// FileManager is a blockstore implementation which stores special
// blocks FilestoreNode type. These nodes only contain a reference
// to the actual location of the block data in the filesystem
Expand All @@ -34,6 +36,7 @@ type FileManager struct {
AllowUrls bool
ds ds.Batching
root string
makeReader func(path string) (FileReader, error)
}

// CorruptReferenceError implements the error interface.
Expand All @@ -51,11 +54,32 @@ func (c CorruptReferenceError) Error() string {
return c.Err.Error()
}

// WithMMapReader sets the FileManager's reader factory to use memory-mapped file I/O.
// On Windows, when reading and writing to a file simultaneously, the system would consume
// a significant amount of memory due to caching. This memory usage is not reflected in
// the application but in the system. Using memory-mapped files (implemented with
// CreateFileMapping on Windows) avoids this issue.
func WithMMapReader() Option {
return func(f *FileManager) {
f.makeReader = newMmapReader
}
}

// NewFileManager initializes a new file manager with the given
// datastore and root. All FilestoreNodes paths are relative to the
// root path given here, which is prepended for any operations.
func NewFileManager(ds ds.Batching, root string) *FileManager {
return &FileManager{ds: dsns.Wrap(ds, FilestorePrefix), root: root}
func NewFileManager(ds ds.Batching, root string, options ...Option) *FileManager {
f := &FileManager{
ds: dsns.Wrap(ds, FilestorePrefix),
root: root,
makeReader: newStdReader,
}

for _, option := range options {
option(f)
}

return f
}

// AllKeysChan returns a channel from which to read the keys stored in
Expand Down Expand Up @@ -175,21 +199,16 @@ func (f *FileManager) readFileDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, er
p := filepath.FromSlash(d.GetFilePath())
abspath := filepath.Join(f.root, p)

fi, err := os.Open(abspath)
fi, err := f.makeReader(abspath)
if os.IsNotExist(err) {
return nil, &CorruptReferenceError{StatusFileNotFound, err}
} else if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}
defer fi.Close()

_, err = fi.Seek(int64(d.GetOffset()), io.SeekStart)
if err != nil {
return nil, &CorruptReferenceError{StatusFileError, err}
}

outbuf := make([]byte, d.GetSize_())
_, err = io.ReadFull(fi, outbuf)
_, err = fi.ReadAt(outbuf, int64(d.GetOffset()))
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, &CorruptReferenceError{StatusFileChanged, err}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
go.opentelemetry.io/otel/trace v1.31.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
Expand Down Expand Up @@ -185,7 +186,6 @@ require (
go.uber.org/fx v1.23.0 // indirect
go.uber.org/mock v0.5.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
Loading