From d49b462874bb4b51df8663ecfd1780618f225979 Mon Sep 17 00:00:00 2001 From: John Doak Date: Mon, 12 Dec 2022 09:50:50 -0800 Subject: [PATCH] Update diskmap/diskslice to use directio on linux. Fix filewatcher example --- diskmap/diskmap.go | 67 ++++++++++++++++++--------------- diskmap/diskmap_linux.go | 42 +++++++++++++++++++++ diskmap/diskmap_other.go | 32 ++++++++++++++++ diskmap/diskmap_test.go | 25 +++++++++++++ diskslice/diskslice.go | 72 ++++++++++++++++++------------------ diskslice/diskslice_linux.go | 46 +++++++++++++++++++++++ diskslice/diskslice_other.go | 36 ++++++++++++++++++ diskslice/diskslice_test.go | 25 +++++++++++++ filewatcher/examples_test.go | 2 +- go.mod | 1 + go.sum | 4 +- gotest.sh | 2 + 12 files changed, 287 insertions(+), 67 deletions(-) create mode 100644 diskmap/diskmap_linux.go create mode 100644 diskmap/diskmap_other.go create mode 100644 diskslice/diskslice_linux.go create mode 100644 diskslice/diskslice_other.go create mode 100755 gotest.sh diff --git a/diskmap/diskmap.go b/diskmap/diskmap.go index 020782d..9b14f31 100644 --- a/diskmap/diskmap.go +++ b/diskmap/diskmap.go @@ -1,6 +1,7 @@ /* Package diskmap provides disk storage of key/value pairs. The data is immutable once written. In addition the diskmap utilizes mmap on reads to make the random access faster. +On Linux, diskmap uses directio to speed up writes. Usage is simplistic: @@ -81,14 +82,17 @@ Reading the file is simply: package diskmap import ( + "bufio" + "context" "encoding/binary" "fmt" + "io" "os" "reflect" "sync" "unsafe" - "golang.org/x/net/context" + "github.com/brk0v/directio" ) // reservedHeader is the size, in bytes, of the reserved header portion of the file. @@ -157,42 +161,29 @@ type value struct { // writer implements Writer. type writer struct { + name string file *os.File + dio *directio.DirectIO + buf *bufio.Writer index index offset int64 num int64 sync.Mutex } -// New returns a new Writer that writes to file "p". -func New(p string) (Writer, error) { - f, err := os.Create(p) - if err != nil { - return nil, err - } - - if err := f.Chmod(0600); err != nil { - return nil, err - } - - if _, err = f.Seek(reservedHeader, 0); err != nil { - return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) - } - - return &writer{ - file: f, - offset: reservedHeader, - index: make(index, 0, 1000), - Mutex: sync.Mutex{}, - }, nil -} - // Write implements Writer.Write(). func (w *writer) Write(k, v []byte) error { w.Lock() defer w.Unlock() - if _, err := w.file.Write(v); err != nil { + var writer io.Writer + if w.buf != nil { + writer = w.buf + } else { + writer = w.file + } + + if _, err := writer.Write(v); err != nil { return fmt.Errorf("problem writing key/value to disk: %q", err) } @@ -213,24 +204,42 @@ func (w *writer) Write(k, v []byte) error { // Close implements Writer.Close(). func (w *writer) Close() error { + var writer io.Writer + if w.buf != nil { + writer = w.buf + } else { + writer = w.file + } + // Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry. for _, entry := range w.index { - if err := binary.Write(w.file, endian, entry.offset); err != nil { + if err := binary.Write(writer, endian, entry.offset); err != nil { return fmt.Errorf("could not write offset value %d: %q", entry.offset, err) } - if err := binary.Write(w.file, endian, entry.length); err != nil { + if err := binary.Write(writer, endian, entry.length); err != nil { return fmt.Errorf("could not write data length: %q", err) } - if err := binary.Write(w.file, endian, int64(len(entry.key))); err != nil { + if err := binary.Write(writer, endian, int64(len(entry.key))); err != nil { return fmt.Errorf("could not write key length: %q", err) } - if _, err := w.file.Write(entry.key); err != nil { + if _, err := writer.Write(entry.key); err != nil { return fmt.Errorf("could not write key to disk: %q", err) } } + if w.buf != nil { + w.buf.Flush() + w.dio.Flush() + + w.file.Close() + f, err := os.OpenFile(w.name, os.O_RDWR, 0666) + if err != nil { + return err + } + w.file = f + } // Now that we've written all our data to the end of the file, we can go back to our reserved header // and write our offset to the index at the beginnign of the file. diff --git a/diskmap/diskmap_linux.go b/diskmap/diskmap_linux.go new file mode 100644 index 0000000..c3577ca --- /dev/null +++ b/diskmap/diskmap_linux.go @@ -0,0 +1,42 @@ +//go:build linux + +package diskmap + +import ( + "bufio" + "os" + "sync" + "syscall" + + "github.com/brk0v/directio" +) + +// New returns a new Writer that writes to file "p". +func New(p string) (Writer, error) { + f, err := os.OpenFile(p, os.O_CREATE+os.O_WRONLY+syscall.O_DIRECT, 0666) + if err != nil { + return nil, err + } + + dio, err := directio.New(f) + if err != nil { + return nil, err + } + + w := bufio.NewWriterSize(dio, 67108864) + header := [64]byte{} + _, err = w.Write(header[:]) + if err != nil { + return nil, err + } + + return &writer{ + name: p, + file: f, + dio: dio, + buf: w, + offset: reservedHeader, + index: make(index, 0, 1000), + Mutex: sync.Mutex{}, + }, nil +} diff --git a/diskmap/diskmap_other.go b/diskmap/diskmap_other.go new file mode 100644 index 0000000..d3b47a8 --- /dev/null +++ b/diskmap/diskmap_other.go @@ -0,0 +1,32 @@ +//go:build !linux + +package diskmap + +import ( + "fmt" + "os" + "sync" +) + +// New returns a new Writer that writes to file "p". +func New(p string) (Writer, error) { + f, err := os.Create(p) + if err != nil { + return nil, err + } + + if err := f.Chmod(0600); err != nil { + return nil, err + } + + if _, err = f.Seek(reservedHeader, 0); err != nil { + return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) + } + + return &writer{ + file: f, + offset: reservedHeader, + index: make(index, 0, 1000), + Mutex: sync.Mutex{}, + }, nil +} diff --git a/diskmap/diskmap_test.go b/diskmap/diskmap_test.go index 8deb404..f065991 100644 --- a/diskmap/diskmap_test.go +++ b/diskmap/diskmap_test.go @@ -65,6 +65,31 @@ func TestDiskMap(t *testing.T) { } } +func BenchmarkDiskMap(b *testing.B) { + b.ReportAllocs() + + p := path.Join(os.TempDir(), nextSuffix()) + w, err := New(p) + if err != nil { + panic(err) + } + defer os.Remove(p) + + b.ResetTimer() + for i := 0; i < 10000; i++ { + k := []byte(nextSuffix()) + v := randStringBytes() + + if err := w.Write(k, v); err != nil { + b.Fatalf("error writing:\nkey:%q\nvalue:%q\n", k, v) + } + } + + if err := w.Close(); err != nil { + b.Fatalf("error closing the Writer: %q", err) + } +} + func nextSuffix() string { r := uint32(time.Now().UnixNano() + int64(os.Getpid())) diff --git a/diskslice/diskslice.go b/diskslice/diskslice.go index b36097f..b52fe50 100644 --- a/diskslice/diskslice.go +++ b/diskslice/diskslice.go @@ -5,19 +5,21 @@ that can be managed, moved and trivially read. More complex use cases require mo involving multiple files, lock files, etc.... This makes no attempt to provide that. Read call without a cached index consist of: - * A single seek to an index entry - * Two 8 byte reads for data offset and len - * A seek to the data - * A single read of the value + - A single seek to an index entry + - Two 8 byte reads for data offset and len + - A seek to the data + - A single read of the value + Total: 2 seeks and 3 reads Read call with cached index consits of: - * A single read to the data + - A single read to the data If doing a range over a large file or lots of range calls, it is optimal to have the Reader cache the index. Every 131,072 entries consumes 1 MiB of cached memory. File format is as follows: + [index offset] @@ -38,6 +40,7 @@ File format is as follows: package diskslice import ( + "bufio" "bytes" "context" "encoding/binary" @@ -45,6 +48,8 @@ import ( "io" "os" "sync" + + "github.com/brk0v/directio" ) // reservedHeader is the size, in bytes, of the reserved header portion of the file. @@ -77,7 +82,10 @@ type value struct { // Writer provides methods for writing an array of values to disk that can be read without // reading the file back into memory. type Writer struct { + name string file *os.File + dio *directio.DirectIO + buf *bufio.Writer index index offset int64 num int64 @@ -97,33 +105,6 @@ func WriteIntercept(interceptor func(dst io.Writer) io.WriteCloser) WriteOption } } -// New is the constructor for Writer. -func New(fpath string, options ...WriteOption) (*Writer, error) { - f, err := os.Create(fpath) - if err != nil { - return nil, err - } - - if err := f.Chmod(0600); err != nil { - return nil, err - } - - if _, err = f.Seek(reservedHeader, 0); err != nil { - return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) - } - - w := &Writer{ - file: f, - offset: reservedHeader, - index: make(index, 0, 1000), - mu: sync.Mutex{}, - } - for _, option := range options { - option(w) - } - return w, nil -} - // Write writes a byte slice to the diskslice. func (w *Writer) Write(b []byte) error { if w.interceptor != nil { @@ -141,7 +122,11 @@ func (w *Writer) Write(b []byte) error { w.mu.Lock() defer w.mu.Unlock() - if _, err := w.file.Write(b); err != nil { + var writer io.Writer = w.file + if w.buf != nil { + writer = w.buf + } + if _, err := writer.Write(b); err != nil { return fmt.Errorf("problem writing value to disk: %q", err) } @@ -161,17 +146,34 @@ func (w *Writer) Write(b []byte) error { // Close closes the file for writing and writes our index to the file. func (w *Writer) Close() error { + var writer io.Writer = w.file + if w.buf != nil { + writer = w.buf + } + // Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry. for _, entry := range w.index { - if err := binary.Write(w.file, endian, entry.offset); err != nil { + if err := binary.Write(writer, endian, entry.offset); err != nil { return fmt.Errorf("could not write offset value %d: %q", entry.offset, err) } - if err := binary.Write(w.file, endian, entry.length); err != nil { + if err := binary.Write(writer, endian, entry.length); err != nil { return fmt.Errorf("could not write data length: %q", err) } } + if w.buf != nil { + w.buf.Flush() + w.dio.Flush() + + w.file.Close() + f, err := os.OpenFile(w.name, os.O_RDWR, 0666) + if err != nil { + return err + } + w.file = f + } + // Now that we've written all our data to the end of the file, we can go back to our reserved header // and write our offset to the index at the beginnign of the file. if _, err := w.file.Seek(0, 0); err != nil { diff --git a/diskslice/diskslice_linux.go b/diskslice/diskslice_linux.go new file mode 100644 index 0000000..ae450dd --- /dev/null +++ b/diskslice/diskslice_linux.go @@ -0,0 +1,46 @@ +//go:build linux + +package diskslice + +import ( + "bufio" + "os" + "sync" + "syscall" + + "github.com/brk0v/directio" +) + +// New is the constructor for Writer. +func New(fpath string, options ...WriteOption) (*Writer, error) { + f, err := os.OpenFile(fpath, os.O_CREATE+os.O_WRONLY+syscall.O_DIRECT, 0666) + if err != nil { + return nil, err + } + + dio, err := directio.New(f) + if err != nil { + return nil, err + } + + w := bufio.NewWriterSize(dio, 67108864) + header := [64]byte{} + _, err = w.Write(header[:]) + if err != nil { + return nil, err + } + + wr := &Writer{ + name: fpath, + file: f, + buf: w, + dio: dio, + offset: reservedHeader, + index: make(index, 0, 1000), + mu: sync.Mutex{}, + } + for _, option := range options { + option(wr) + } + return wr, nil +} diff --git a/diskslice/diskslice_other.go b/diskslice/diskslice_other.go new file mode 100644 index 0000000..337e1b9 --- /dev/null +++ b/diskslice/diskslice_other.go @@ -0,0 +1,36 @@ +//go:build !linux + +package diskslice + +import ( + "fmt" + "os" + "sync" +) + +// New is the constructor for Writer. +func New(fpath string, options ...WriteOption) (*Writer, error) { + f, err := os.Create(fpath) + if err != nil { + return nil, err + } + + if err := f.Chmod(0600); err != nil { + return nil, err + } + + if _, err = f.Seek(reservedHeader, 0); err != nil { + return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) + } + + w := &Writer{ + file: f, + offset: reservedHeader, + index: make(index, 0, 1000), + mu: sync.Mutex{}, + } + for _, option := range options { + option(w) + } + return w, nil +} diff --git a/diskslice/diskslice_test.go b/diskslice/diskslice_test.go index 2f38f18..05234fe 100644 --- a/diskslice/diskslice_test.go +++ b/diskslice/diskslice_test.go @@ -106,6 +106,31 @@ func SubDiskList(t *testing.T, readOptions []ReadOption, writeOptions []WriteOpt } } +func BenchmarkDisksliceWrite(b *testing.B) { + b.ReportAllocs() + + p := path.Join(os.TempDir(), nextSuffix()) + + w, err := New(p) + if err != nil { + panic(err) + } + defer os.Remove(p) + + b.ResetTimer() + for i := 0; i < 100000; i++ { + v := randStringBytes() + + if err := w.Write(v); err != nil { + b.Fatalf("error writing:\nkey:%q\nvalue:%q\n", i, v) + } + } + if err := w.Close(); err != nil { + panic(err) + } + b.StopTimer() +} + func nextSuffix() string { r := uint32(time.Now().UnixNano() + int64(os.Getpid())) diff --git a/filewatcher/examples_test.go b/filewatcher/examples_test.go index 64b242a..9a11433 100644 --- a/filewatcher/examples_test.go +++ b/filewatcher/examples_test.go @@ -20,7 +20,7 @@ func Example() { fmt.Println(string(content)) // This fetches the content at each change. - for b := range <-ch { + for b := range ch { fmt.Println("New content of the file:") fmt.Println(string(b)) } diff --git a/go.mod b/go.mod index b1016f2..b154185 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/beeker1121/goque v2.1.0+incompatible + github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 github.com/dustin/go-humanize v1.0.0 github.com/frankban/quicktest v1.11.3 // indirect github.com/fsnotify/fsnotify v1.4.9 diff --git a/go.sum b/go.sum index 2d98689..f8ff6b1 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,11 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/beeker1121/goque v2.1.0+incompatible h1:m5pZ5b8nqzojS2DF2ioZphFYQUqGYsDORq6uefUItPM= github.com/beeker1121/goque v2.1.0+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= +github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 h1:7gNKWnX6OF+ERiXVw4I9RsHhZ52aumXdFE07nEx5v20= +github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7/go.mod h1:M/KA3XJG5PJaApPiv4gWNsgcSJquOQTqumZNLyYE0KM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -44,7 +45,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= diff --git a/gotest.sh b/gotest.sh new file mode 100755 index 0000000..0959c0f --- /dev/null +++ b/gotest.sh @@ -0,0 +1,2 @@ +#! /bin/sh +go test `go list ./... | grep -v development`