From 26e702194e718bd975623f6c7dd3702f77b64709 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 10 Jun 2024 14:31:41 -0700 Subject: [PATCH] objstorage: add readahead configuration This commit adds a way to configure the read-ahead method, for both cases of "informed readahead" (currently just compactions, but in the future possibly backup scans as well) and "speculative readahead". --- objstorage/objstorageprovider/provider.go | 48 +++++++ .../objstorageprovider/provider_test.go | 31 ++++- .../testdata/provider/local_readahead | 118 ++++++++++++++++++ objstorage/objstorageprovider/vfs.go | 6 +- objstorage/objstorageprovider/vfs_readable.go | 55 +++++--- open.go | 1 + options.go | 13 ++ 7 files changed, 246 insertions(+), 26 deletions(-) diff --git a/objstorage/objstorageprovider/provider.go b/objstorage/objstorageprovider/provider.go index d0f7450535..1652b7b6c1 100644 --- a/objstorage/objstorageprovider/provider.go +++ b/objstorage/objstorageprovider/provider.go @@ -92,6 +92,17 @@ type Settings struct { // out a large chunk of dirty filesystem buffers. BytesPerSync int + // Local contains fields that are only relevant for files stored on the local + // filesystem. + Local struct { + // TODO(radu): move FSCleaner, NoSyncOnClose, BytesPerSync here. + + // ReadaheadConfigFn is a function used to retrieve the current readahead + // mode. This function is run whenever a local object is open for reading. + // If it is nil, DefaultReadaheadConfig is used. + ReadaheadConfigFn func() ReadaheadConfig + } + // Fields here are set only if the provider is to support remote objects // (experimental). Remote struct { @@ -129,6 +140,43 @@ type Settings struct { } } +// ReadaheadConfig controls the use of read-ahead. +type ReadaheadConfig struct { + // Informed is the type of read-ahead for operations that are known to read a + // large consecutive chunk of a file. + Informed ReadaheadMode + + // Speculative is the type of read-ahead used automatically, when consecutive + // reads are detected. + Speculative ReadaheadMode +} + +// DefaultReadaheadConfig is the readahead config used when ReadaheadConfigFn is +// not specified. +var DefaultReadaheadConfig = ReadaheadConfig{ + Informed: FadviseSequential, + Speculative: FadviseSequential, +} + +// ReadaheadMode indicates the type of read-ahead to use, either for informed +// read-ahead (e.g. compactions) or speculative read-ahead. +type ReadaheadMode uint8 + +const ( + // NoReadahead disables readahead altogether. + NoReadahead ReadaheadMode = iota + + // SysReadahead enables the use of SYS_READAHEAD call to prefetch data. + // The prefetch window grows dynamically as consecutive writes are detected. + SysReadahead + + // FadviseSequential enables to use of FADV_SEQUENTIAL. For informed + // read-ahead, FADV_SEQUENTIAL is used from the beginning. For speculative + // read-ahead SYS_READAHEAD is first used until the window reaches the maximum + // size, then we siwtch to FADV_SEQUENTIAL. + FadviseSequential +) + // DefaultSettings initializes default settings (with no remote storage), // suitable for tests and tools. func DefaultSettings(fs vfs.FS, dirName string) Settings { diff --git a/objstorage/objstorageprovider/provider_test.go b/objstorage/objstorageprovider/provider_test.go index 98847b344f..084e8b5521 100644 --- a/objstorage/objstorageprovider/provider_test.go +++ b/objstorage/objstorageprovider/provider_test.go @@ -40,7 +40,9 @@ func TestProvider(t *testing.T) { backings := make(map[string]objstorage.RemoteObjectBacking) backingHandles := make(map[string]objstorage.RemoteObjectBackingHandle) var curProvider objstorage.Provider + readaheadConfig := DefaultReadaheadConfig datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + readaheadConfig = DefaultReadaheadConfig scanArgs := func(desc string, args ...interface{}) { t.Helper() if len(d.CmdArgs) != len(args) { @@ -68,6 +70,9 @@ func TestProvider(t *testing.T) { st.Remote.CreateOnShared = remote.CreateOnSharedAll st.Remote.CreateOnSharedLocator = "" } + st.Local.ReadaheadConfigFn = func() ReadaheadConfig { + return readaheadConfig + } require.NoError(t, fs.MkdirAll(fsDir, 0755)) p, err := Open(st) require.NoError(t, err) @@ -170,13 +175,29 @@ func TestProvider(t *testing.T) { return log.String() case "read": - forCompaction := false - if len(d.CmdArgs) == 2 && d.CmdArgs[1].Key == "for-compaction" { - d.CmdArgs = d.CmdArgs[:1] - forCompaction = true + forCompaction := d.HasArg("for-compaction") + if arg, ok := d.Arg("readahead"); ok { + var mode ReadaheadMode + switch arg.Vals[0] { + case "off": + mode = NoReadahead + case "sys-readahead": + mode = SysReadahead + case "fadvise-sequential": + mode = FadviseSequential + default: + d.Fatalf(t, "unknown readahead mode %s", arg.Vals[0]) + } + if forCompaction { + readaheadConfig.Informed = mode + } else { + readaheadConfig.Speculative = mode + } } + + d.CmdArgs = d.CmdArgs[:1] var fileNum base.FileNum - scanArgs(" [for-compaction]", &fileNum) + scanArgs(" [for-compaction] [readahead|speculative-overhead=off|sys-readahead|fadvise-sequential]", &fileNum) r, err := curProvider.OpenForReading(ctx, base.FileTypeTable, fileNum.DiskFileNum(), objstorage.OpenOptions{}) if err != nil { return err.Error() diff --git a/objstorage/objstorageprovider/testdata/provider/local_readahead b/objstorage/objstorageprovider/testdata/provider/local_readahead index 4093aee275..8ad754edb1 100644 --- a/objstorage/objstorageprovider/testdata/provider/local_readahead +++ b/objstorage/objstorageprovider/testdata/provider/local_readahead @@ -64,3 +64,121 @@ size: 2000000 1000 15000: ok (salt 1) close: p1/000001.sst close: p1/000001.sst + +# Test non-default readahead modes. + +read 1 readahead=off +0 1000 +1000 15000 +16000 30000 +46000 10000 +56000 50000 +106000 30000 +140000 80000 +---- + open: p1/000001.sst (options: *vfs.randomReadsOption) +size: 2000000 + read-at(0, 1000): p1/000001.sst +0 1000: ok (salt 1) + read-at(1000, 15000): p1/000001.sst +1000 15000: ok (salt 1) + read-at(16000, 30000): p1/000001.sst +16000 30000: ok (salt 1) + read-at(46000, 10000): p1/000001.sst +46000 10000: ok (salt 1) + read-at(56000, 50000): p1/000001.sst +56000 50000: ok (salt 1) + read-at(106000, 30000): p1/000001.sst +106000 30000: ok (salt 1) + read-at(140000, 80000): p1/000001.sst +140000 80000: ok (salt 1) + close: p1/000001.sst + +read 1 for-compaction readahead=off +0 1000 +1000 15000 +16000 30000 +46000 10000 +56000 50000 +106000 30000 +140000 80000 +---- + open: p1/000001.sst (options: *vfs.randomReadsOption) +size: 2000000 + read-at(0, 1000): p1/000001.sst +0 1000: ok (salt 1) + read-at(1000, 15000): p1/000001.sst +1000 15000: ok (salt 1) + read-at(16000, 30000): p1/000001.sst +16000 30000: ok (salt 1) + read-at(46000, 10000): p1/000001.sst +46000 10000: ok (salt 1) + read-at(56000, 50000): p1/000001.sst +56000 50000: ok (salt 1) + read-at(106000, 30000): p1/000001.sst +106000 30000: ok (salt 1) + read-at(140000, 80000): p1/000001.sst +140000 80000: ok (salt 1) + close: p1/000001.sst + +read 1 readahead=sys-readahead +0 1000 +1000 15000 +16000 30000 +46000 10000 +56000 50000 +106000 30000 +140000 80000 +---- + open: p1/000001.sst (options: *vfs.randomReadsOption) +size: 2000000 + read-at(0, 1000): p1/000001.sst +0 1000: ok (salt 1) + read-at(1000, 15000): p1/000001.sst +1000 15000: ok (salt 1) + prefetch(16000, 65536): p1/000001.sst + read-at(16000, 30000): p1/000001.sst +16000 30000: ok (salt 1) + read-at(46000, 10000): p1/000001.sst +46000 10000: ok (salt 1) + prefetch(56000, 131072): p1/000001.sst + read-at(56000, 50000): p1/000001.sst +56000 50000: ok (salt 1) + read-at(106000, 30000): p1/000001.sst +106000 30000: ok (salt 1) + prefetch(140000, 262144): p1/000001.sst + read-at(140000, 80000): p1/000001.sst +140000 80000: ok (salt 1) + close: p1/000001.sst + +# TODO(radu): for informed/sys-readahead, we should start with the maximum +# prefetch window. +read 1 for-compaction readahead=sys-readahead +0 1000 +1000 15000 +16000 30000 +46000 10000 +56000 50000 +106000 30000 +140000 80000 +---- + open: p1/000001.sst (options: *vfs.randomReadsOption) +size: 2000000 + read-at(0, 1000): p1/000001.sst +0 1000: ok (salt 1) + read-at(1000, 15000): p1/000001.sst +1000 15000: ok (salt 1) + prefetch(16000, 65536): p1/000001.sst + read-at(16000, 30000): p1/000001.sst +16000 30000: ok (salt 1) + read-at(46000, 10000): p1/000001.sst +46000 10000: ok (salt 1) + prefetch(56000, 131072): p1/000001.sst + read-at(56000, 50000): p1/000001.sst +56000 50000: ok (salt 1) + read-at(106000, 30000): p1/000001.sst +106000 30000: ok (salt 1) + prefetch(140000, 262144): p1/000001.sst + read-at(140000, 80000): p1/000001.sst +140000 80000: ok (salt 1) + close: p1/000001.sst diff --git a/objstorage/objstorageprovider/vfs.go b/objstorage/objstorageprovider/vfs.go index 5b2025179a..9e23815aac 100644 --- a/objstorage/objstorageprovider/vfs.go +++ b/objstorage/objstorageprovider/vfs.go @@ -31,7 +31,11 @@ func (p *provider) vfsOpenForReading( } return nil, err } - return newFileReadable(file, p.st.FS, filename) + readaheadConfig := DefaultReadaheadConfig + if f := p.st.Local.ReadaheadConfigFn; f != nil { + readaheadConfig = f() + } + return newFileReadable(file, p.st.FS, readaheadConfig, filename) } func (p *provider) vfsCreate( diff --git a/objstorage/objstorageprovider/vfs_readable.go b/objstorage/objstorageprovider/vfs_readable.go index 12ceee8e0b..584f6a9076 100644 --- a/objstorage/objstorageprovider/vfs_readable.go +++ b/objstorage/objstorageprovider/vfs_readable.go @@ -27,22 +27,26 @@ type fileReadable struct { // The following fields are used to possibly open the file again using the // sequential reads option (see vfsReadHandle). - filename string - fs vfs.FS + filename string + fs vfs.FS + readaheadConfig ReadaheadConfig } var _ objstorage.Readable = (*fileReadable)(nil) -func newFileReadable(file vfs.File, fs vfs.FS, filename string) (*fileReadable, error) { +func newFileReadable( + file vfs.File, fs vfs.FS, readaheadConfig ReadaheadConfig, filename string, +) (*fileReadable, error) { info, err := file.Stat() if err != nil { return nil, err } r := &fileReadable{ - file: file, - size: info.Size(), - filename: filename, - fs: fs, + file: file, + size: info.Size(), + filename: filename, + fs: fs, + readaheadConfig: readaheadConfig, } invariants.SetFinalizer(r, func(obj interface{}) { if obj.(*fileReadable).file != nil { @@ -81,8 +85,9 @@ func (r *fileReadable) NewReadHandle(_ context.Context) objstorage.ReadHandle { } type vfsReadHandle struct { - r *fileReadable - rs readaheadState + r *fileReadable + rs readaheadState + readaheadMode ReadaheadMode // sequentialFile holds a file descriptor to the same underlying File, // except with fadvise(FADV_SEQUENTIAL) called on it to take advantage of @@ -109,8 +114,9 @@ var readHandlePool = sync.Pool{ func (rh *vfsReadHandle) init(r *fileReadable) { *rh = vfsReadHandle{ - r: r, - rs: makeReadaheadState(fileMaxReadaheadSize), + r: r, + rs: makeReadaheadState(fileMaxReadaheadSize), + readaheadMode: r.readaheadConfig.Speculative, } } @@ -127,14 +133,17 @@ func (rh *vfsReadHandle) Close() error { // ReadAt is part of the objstorage.ReadHandle interface. func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error { - var n int - var err error if rh.sequentialFile != nil { // Use OS-level read-ahead. - n, err = rh.sequentialFile.ReadAt(p, offset) - } else { + n, err := rh.sequentialFile.ReadAt(p, offset) + if invariants.Enabled && err == nil && n != len(p) { + panic("short read") + } + return err + } + if rh.readaheadMode != NoReadahead { if readaheadSize := rh.rs.maybeReadahead(offset, int64(len(p))); readaheadSize > 0 { - if readaheadSize >= fileMaxReadaheadSize { + if rh.readaheadMode == FadviseSequential && readaheadSize >= fileMaxReadaheadSize { // We've reached the maximum readahead size. Beyond this point, rely on // OS-level readahead. rh.switchToOSReadahead() @@ -142,8 +151,8 @@ func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error _ = rh.r.file.Prefetch(offset, readaheadSize) } } - n, err = rh.r.file.ReadAt(p, offset) } + n, err := rh.r.file.ReadAt(p, offset) if invariants.Enabled && err == nil && n != len(p) { panic("short read") } @@ -152,10 +161,16 @@ func (rh *vfsReadHandle) ReadAt(_ context.Context, p []byte, offset int64) error // SetupForCompaction is part of the objstorage.ReadHandle interface. func (rh *vfsReadHandle) SetupForCompaction() { - rh.switchToOSReadahead() + rh.readaheadMode = rh.r.readaheadConfig.Informed + if rh.readaheadMode == FadviseSequential { + rh.switchToOSReadahead() + } } func (rh *vfsReadHandle) switchToOSReadahead() { + if invariants.Enabled && rh.readaheadMode != FadviseSequential { + panic("readheadMode not respected") + } if rh.sequentialFile != nil { return } @@ -170,8 +185,8 @@ func (rh *vfsReadHandle) switchToOSReadahead() { // RecordCacheHit is part of the objstorage.ReadHandle interface. func (rh *vfsReadHandle) RecordCacheHit(_ context.Context, offset, size int64) { - if rh.sequentialFile != nil { - // Using OS-level readahead, so do nothing. + if rh.sequentialFile != nil || rh.readaheadMode == NoReadahead { + // Using OS-level or no readahead, so do nothing. return } rh.rs.recordCacheHit(offset, size) diff --git a/open.go b/open.go index 89b7c96da5..3963d9cce1 100644 --- a/open.go +++ b/open.go @@ -310,6 +310,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { NoSyncOnClose: opts.NoSyncOnClose, BytesPerSync: opts.BytesPerSync, } + providerSettings.Local.ReadaheadConfigFn = opts.Local.ReadaheadConfigFn providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator diff --git a/options.go b/options.go index 92120b91a8..b6f240d04a 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" @@ -492,6 +493,15 @@ type Options struct { // The default cleaner uses the DeleteCleaner. Cleaner Cleaner + // Local contains option that pertain to files stored on the local filesystem. + Local struct { + // ReadaheadConfigFn is a function used to retrieve the current readahead + // mode. This function is consulted when a table enters the table cache. + ReadaheadConfigFn func() ReadaheadConfig + + // TODO(radu): move BytesPerSync, LoadBlockSema, Cleaner here. + } + // Comparer defines a total ordering over the space of []byte keys: a 'less // than' relationship. The same comparison algorithm must be used for reads // and writes over the lifetime of the DB. @@ -1002,6 +1012,9 @@ type Options struct { } } +// ReadaheadConfig controls the use of read-ahead. +type ReadaheadConfig = objstorageprovider.ReadaheadConfig + // DebugCheckLevels calls CheckLevels on the provided database. // It may be set in the DebugCheck field of Options to check // level invariants whenever a new version is installed.