diff --git a/diskmap/diskmap.go b/diskmap/diskmap.go index 633f397..020782d 100644 --- a/diskmap/diskmap.go +++ b/diskmap/diskmap.go @@ -4,43 +4,43 @@ In addition the diskmap utilizes mmap on reads to make the random access faster. Usage is simplistic: - // Create a new diskmap. - p := path.Join(os.TempDir(), nextSuffix()) - w, err := diskmap.New(p) - if err != nil { - panic(err) - } - - // Write a key/value to the diskmap. - if err := w.Write([]byte("hello"), []byte("world")); err != nil { - panic(err) - } - - // Close the file to writing. - w.Close() - - // Open the file for reading. - m, err := diskmap.Open(p) - if err != nil { - panic(err) - } - - // Read the value at key "hello". - v, err := m.Read([]byte("hello")) - if err != nil { - panic(err) - } - - // Print the value at key "hello". - fmt.Println(string(v)) - - // Loop through all entries in the map. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // Make sure if we end the "range" early we don't leave any leaky goroutines. - - for kv := range m.Range(ctx) { - fmt.Printf("key: %s, value: %s", string(kv.Key), string(kv.Value)) - } + // Create a new diskmap. + p := path.Join(os.TempDir(), nextSuffix()) + w, err := diskmap.New(p) + if err != nil { + panic(err) + } + + // Write a key/value to the diskmap. + if err := w.Write([]byte("hello"), []byte("world")); err != nil { + panic(err) + } + + // Close the file to writing. + w.Close() + + // Open the file for reading. + m, err := diskmap.Open(p) + if err != nil { + panic(err) + } + + // Read the value at key "hello". + v, err := m.Read([]byte("hello")) + if err != nil { + panic(err) + } + + // Print the value at key "hello". + fmt.Println(string(v)) + + // Loop through all entries in the map. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure if we end the "range" early we don't leave any leaky goroutines. + + for kv := range m.Range(ctx) { + fmt.Printf("key: %s, value: %s", string(kv.Key), string(kv.Value)) + } Storage details: @@ -50,44 +50,45 @@ the offset where the index starts and the number of key/value pairs stored. The for future use. All numbers are int64 values. The file structure looks as follows: - - - [index offset] - [number of key/value pairs] - - - [byte value] - [byte value] - ... - - - [data offset] - [data length] - [key length] - [key] - ... - - + + + + [index offset] + [number of key/value pairs] + + + [byte value] + [byte value] + ... + + + [data offset] + [data length] + [key length] + [key] + ... + + Reading the file is simply: - * read the initial 8 bytes into a int64 to get the offset to the index - * seek to the index offset - * read the data storage offset - * read the key length - * read the key - * build map of key to disk offset using the data above. + - read the initial 8 bytes into a int64 to get the offset to the index + - seek to the index offset + - read the data storage offset + - read the key length + - read the key + - build map of key to disk offset using the data above. */ package diskmap import ( - "encoding/binary" - "fmt" - "os" - "sync" - - log "github.com/golang/glog" - "github.com/johnsiilver/golib/mmap" - "golang.org/x/net/context" + "encoding/binary" + "fmt" + "os" + "reflect" + "sync" + "unsafe" + + "golang.org/x/net/context" ) // reservedHeader is the size, in bytes, of the reserved header portion of the file. @@ -98,33 +99,36 @@ var endian = binary.LittleEndian // Reader provides read access to the the diskmap file. type Reader interface { - // Get fetches key "k" and returns the value. Errors when key not found. Thread-safe. - Read(k []byte) ([]byte, error) + // Get fetches key "k" and returns the value. Errors when key not found. Thread-safe. + Read(k []byte) ([]byte, error) - // Range allows iteration over all the key/value pairs stored in the diskmap. If not interating - // over all values, Cancel() or a timeout should be used on the Context to prevent a goroutine leak. - Range(ctx context.Context) chan KeyValue + // Range allows iteration over all the key/value pairs stored in the diskmap. If not interating + // over all values, Cancel() or a timeout should be used on the Context to prevent a goroutine leak. + Range(ctx context.Context) chan KeyValue - // Close closes the diskmap file. - Close() error + // Close closes the diskmap file. + Close() error } // Writer provides write access to the diskmap file. An error on write makes the Writer unusable. type Writer interface { - // Write writes a key/value pair to disk. Thread-safe. - Write(k, v []byte) error + // Write writes a key/value pair to disk. Thread-safe. + Write(k, v []byte) error - // Close syncronizes the file to disk and closes it. - Close() error + // Close syncronizes the file to disk and closes it. + Close() error } // KeyValue holds a key/value pair. type KeyValue struct { - // Key is the key the value was stored at. - Key []byte + // Key is the key the value was stored at. + Key []byte + + // Value is the value stored at Key. + Value []byte - // Value is the value stored at Key. - Value []byte + // Err indicates that there was an error in the return stream. + Err error } // table is a list of entries that are eventually encoded onto disk at the end of the file. @@ -132,240 +136,251 @@ type index []entry // entry is an entry in the index to allow locating data associated with a key. type entry struct { - // offset is the offset from the start of the file to locate the value associated with key. - offset int64 + // offset is the offset from the start of the file to locate the value associated with key. + offset int64 - // length is the length of the data from offset above. - length int64 + // length is the length of the data from offset above. + length int64 - // key is the key part of the key/value pair. - key []byte + // key is the key part of the key/value pair. + key []byte } // value holds the data needed to locate a value of a key/value pair on disk. type value struct { - // offset is the offset from the start of the file to locate the value associated with key. - offset int64 + // offset is the offset from the start of the file to locate the value associated with key. + offset int64 - // length is the length of the data from offset above. - length int64 + // length is the length of the data from offset above. + length int64 } // writer implements Writer. type writer struct { - file *os.File - index index - offset int64 - num int64 - sync.Mutex + file *os.File + index index + offset int64 + num int64 + sync.Mutex } // New returns a new Writer that writes to file "p". func New(p string) (Writer, error) { - f, err := os.Create(p) - if err != nil { - return nil, err - } - - if err := f.Chmod(0600); err != nil { - return nil, err - } - - if _, err = f.Seek(reservedHeader, 0); err != nil { - return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) - } - - return &writer{ - file: f, - offset: reservedHeader, - index: make(index, 0, 1000), - Mutex: sync.Mutex{}, - }, nil + f, err := os.Create(p) + if err != nil { + return nil, err + } + + if err := f.Chmod(0600); err != nil { + return nil, err + } + + if _, err = f.Seek(reservedHeader, 0); err != nil { + return nil, fmt.Errorf("was unable to seek %d bytes into the file: %q", reservedHeader, err) + } + + return &writer{ + file: f, + offset: reservedHeader, + index: make(index, 0, 1000), + Mutex: sync.Mutex{}, + }, nil } // Write implements Writer.Write(). func (w *writer) Write(k, v []byte) error { - w.Lock() - defer w.Unlock() - - if _, err := w.file.Write(v); err != nil { - return fmt.Errorf("problem writing key/value to disk: %q", err) - } - - w.index = append( - w.index, - entry{ - key: k, - offset: w.offset, - length: int64(len(v)), - }, - ) - - w.offset += int64(len(v)) - w.num += 1 - - return nil + w.Lock() + defer w.Unlock() + + if _, err := w.file.Write(v); err != nil { + return fmt.Errorf("problem writing key/value to disk: %q", err) + } + + w.index = append( + w.index, + entry{ + key: k, + offset: w.offset, + length: int64(len(v)), + }, + ) + + w.offset += int64(len(v)) + w.num += 1 + + return nil } // Close implements Writer.Close(). func (w *writer) Close() error { - // Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry. - for _, entry := range w.index { - if err := binary.Write(w.file, endian, entry.offset); err != nil { - return fmt.Errorf("could not write offset value %d: %q", entry.offset, err) - } - - if err := binary.Write(w.file, endian, entry.length); err != nil { - return fmt.Errorf("could not write data length: %q", err) - } - - if err := binary.Write(w.file, endian, int64(len(entry.key))); err != nil { - return fmt.Errorf("could not write key length: %q", err) - } - - if _, err := w.file.Write(entry.key); err != nil { - return fmt.Errorf("could not write key to disk: %q", err) - } - } - - // Now that we've written all our data to the end of the file, we can go back to our reserved header - // and write our offset to the index at the beginnign of the file. - if _, err := w.file.Seek(0, 0); err != nil { - return fmt.Errorf("could not seek to beginning of the file: %q", err) - } - - // Write the offset to the index to our reserved header. - if err := binary.Write(w.file, endian, w.offset); err != nil { - return fmt.Errorf("could not write the index offset to the reserved header: %q", err) - } - - // Write the number of key/value pairs. - if err := binary.Write(w.file, endian, w.num); err != nil { - return fmt.Errorf("could not write the number of key/value pairs to the reserved header: %q", err) - } - - if err := w.file.Sync(); err != nil { - return err - } - - return w.file.Close() + // Write each data offset, then the length of the key, then finally the key to disk (our index) for each entry. + for _, entry := range w.index { + if err := binary.Write(w.file, endian, entry.offset); err != nil { + return fmt.Errorf("could not write offset value %d: %q", entry.offset, err) + } + + if err := binary.Write(w.file, endian, entry.length); err != nil { + return fmt.Errorf("could not write data length: %q", err) + } + + if err := binary.Write(w.file, endian, int64(len(entry.key))); err != nil { + return fmt.Errorf("could not write key length: %q", err) + } + + if _, err := w.file.Write(entry.key); err != nil { + return fmt.Errorf("could not write key to disk: %q", err) + } + } + + // Now that we've written all our data to the end of the file, we can go back to our reserved header + // and write our offset to the index at the beginnign of the file. + if _, err := w.file.Seek(0, 0); err != nil { + return fmt.Errorf("could not seek to beginning of the file: %q", err) + } + + // Write the offset to the index to our reserved header. + if err := binary.Write(w.file, endian, w.offset); err != nil { + return fmt.Errorf("could not write the index offset to the reserved header: %q", err) + } + + // Write the number of key/value pairs. + if err := binary.Write(w.file, endian, w.num); err != nil { + return fmt.Errorf("could not write the number of key/value pairs to the reserved header: %q", err) + } + + if err := w.file.Sync(); err != nil { + return err + } + + return w.file.Close() } // reader implements Reader. type reader struct { - // index is the key to offset data mapping. - index map[string]value + // index is the key to offset data mapping. + index map[string]value + + // file holds the mapping file in mmap. + file *os.File - // file holds the mapping file in mmap. - file mmap.Map + sync.Mutex } // Open returns a Reader for a file written by a Writer. func Open(p string) (Reader, error) { - f, err := os.Open(p) - if err != nil { - return nil, err - } + f, err := os.Open(p) + if err != nil { + return nil, err + } - m, err := mmap.NewMap(f, mmap.Flag(mmap.Shared), mmap.Prot(mmap.Read)) - if err != nil { - return nil, fmt.Errorf("problems mmapping %q: %q", p, err) - } + var ( + offset int64 + num int64 + ) - var ( - offset int64 - num int64 - ) + // Read our reserved header. + if err := binary.Read(f, endian, &offset); err != nil { + return nil, fmt.Errorf("cannot read index offset: %q", err) + } - // Read our reserved header. - if err := binary.Read(m, endian, &offset); err != nil { - return nil, fmt.Errorf("cannot read index offset: %q", err) - } + if err := binary.Read(f, endian, &num); err != nil { + return nil, fmt.Errorf("cannot read number of entries from reserved header: %q", err) + } - if err := binary.Read(m, endian, &num); err != nil { - return nil, fmt.Errorf("cannot read number of entries from reserved header: %q", err) - } + if _, err := f.Seek(offset, 0); err != nil { + return nil, fmt.Errorf("cannot seek to index offset: %q", err) + } - if _, err := m.Seek(offset, 0); err != nil { - return nil, fmt.Errorf("cannot seek to index offset: %q", err) - } + kv := make(map[string]value, num) - kv := make(map[string]value, num) + var dOff, dLen, kLen int64 - var dOff, dLen, kLen int64 + // Read the index data into a map. + for i := int64(0); i < num; i++ { + if err := binary.Read(f, endian, &dOff); err != nil { + return nil, fmt.Errorf("cannot read a data offset in index: %q", err) + } - // Read the index data into a map. - for i := int64(0); i < num; i++ { - if err := binary.Read(m, endian, &dOff); err != nil { - return nil, fmt.Errorf("cannot read a data offset in index: %q", err) - } + if err := binary.Read(f, endian, &dLen); err != nil { + return nil, fmt.Errorf("cannot read a key offset in index: %q", err) + } - if err := binary.Read(m, endian, &dLen); err != nil { - return nil, fmt.Errorf("cannot read a key offset in index: %q", err) - } + if err := binary.Read(f, endian, &kLen); err != nil { + return nil, fmt.Errorf("cannot read a key offset in index: %q", err) + } - if err := binary.Read(m, endian, &kLen); err != nil { - return nil, fmt.Errorf("cannot read a key offset in index: %q", err) - } + key := make([]byte, kLen) - key := make([]byte, kLen) + if _, err := f.Read(key); err != nil { + return nil, fmt.Errorf("error reading in a key from the index: %q", err) + } - if _, err := m.Read(key); err != nil { - return nil, fmt.Errorf("error reading in a key from the index: %q", err) - } + kv[string(key)] = value{ + offset: dOff, + length: dLen, + } + } - kv[string(key)] = value{ - offset: dOff, - length: dLen, - } - } - - return reader{index: kv, file: m}, nil + return &reader{index: kv, file: f}, nil } // Read implements Reader.Read(). -func (r reader) Read(k []byte) ([]byte, error) { - if v, ok := r.index[string(k)]; ok { - if _, err := r.file.Seek(v.offset, 0); err != nil { - return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err) - } - b := make([]byte, v.length) - if _, err := r.file.Read(b); err != nil { - return nil, fmt.Errorf("error reading value from file: %q", err) - } - return b, nil - } - - return nil, fmt.Errorf("key was not found") +func (r *reader) Read(k []byte) ([]byte, error) { + r.Lock() + defer r.Unlock() + + if v, ok := r.index[string(k)]; ok { + if _, err := r.file.Seek(v.offset, 0); err != nil { + return nil, fmt.Errorf("cannot reach offset for key supplied by the index: %q", err) + } + b := make([]byte, v.length) + if _, err := r.file.Read(b); err != nil { + return nil, fmt.Errorf("error reading value from file: %q", err) + } + return b, nil + } + + return nil, fmt.Errorf("key was not found") } // Range implements Reader.Range(). -func (r reader) Range(ctx context.Context) chan KeyValue { - ch := make(chan KeyValue, 10) - - go func() { - defer close(ch) - - for k, _ := range r.index { - v, err := r.Read([]byte(k)) - if err != nil { - log.Errorf("key %q had no data associated with it (this is bad!): %q", k, err) - continue - } - - select { - case ch <-KeyValue{[]byte(k), v}: - // Do nothing. - case <-ctx.Done(): - return - } - } - }() - - return ch +func (r *reader) Range(ctx context.Context) chan KeyValue { + ch := make(chan KeyValue, 10) + + go func() { + defer close(ch) + + for k := range r.index { + v, err := r.Read(UnsafeGetBytes(k)) + if err != nil { + ch <- KeyValue{Err: fmt.Errorf("key %q had no data associated with it (this is bad!): %q", k, err)} + return + } + + select { + case ch <- KeyValue{Key: UnsafeGetBytes(k), Value: v}: + // Do nothing. + case <-ctx.Done(): + return + } + } + }() + + return ch } // Close implements Reader.Close(). -func (r reader) Close() error { - return r.file.Close() +func (r *reader) Close() error { + return r.file.Close() +} + +// UnsafeGetBytes retrieves the underlying []byte held in string "s" without doing +// a copy. Do not modify the []byte or suffer the consequences. +func UnsafeGetBytes(s string) []byte { + if s == "" { + return nil + } + return (*[0x7fff0000]byte)(unsafe.Pointer( + (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), + )[:len(s):len(s)] } diff --git a/diskmap/diskmap_test.go b/diskmap/diskmap_test.go index d4bb08b..8deb404 100644 --- a/diskmap/diskmap_test.go +++ b/diskmap/diskmap_test.go @@ -1,73 +1,73 @@ package diskmap import ( - "bytes" - "math/rand" - "os" - "path" - "strconv" - "testing" - "time" + "bytes" + "math/rand" + "os" + "path" + "strconv" + "testing" + "time" ) const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" func randStringBytes() []byte { - b := make([]byte, 1000) - for i := range b { - b[i] = letters[rand.Intn(len(letters))] - } - return b + b := make([]byte, 1000) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return b } func TestDiskMap(t *testing.T) { - p := path.Join(os.TempDir(), nextSuffix()) - w, err := New(p) - if err != nil { - panic(err) - } - defer os.Remove(p) + p := path.Join(os.TempDir(), nextSuffix()) + w, err := New(p) + if err != nil { + panic(err) + } + defer os.Remove(p) - data := make(map[string][]byte, 1000) - for i := 0; i < 1000; i++ { - k := []byte(nextSuffix()) - v := randStringBytes() + data := make(map[string][]byte, 1000) + for i := 0; i < 1000; i++ { + k := []byte(nextSuffix()) + v := randStringBytes() - if err := w.Write(k, v); err != nil { - t.Fatalf("error writing:\nkey:%q\nvalue:%q\n", k, v) - } - data[string(k)] = v - } + if err := w.Write(k, v); err != nil { + t.Fatalf("error writing:\nkey:%q\nvalue:%q\n", k, v) + } + data[string(k)] = v + } - if err := w.Close(); err != nil { - t.Fatalf("error closing the Writer: %q", err) - } + if err := w.Close(); err != nil { + t.Fatalf("error closing the Writer: %q", err) + } - r, err := Open(p) - if err != nil { - t.Fatalf("error opening diskmap %q", err) - } + r, err := Open(p) + if err != nil { + t.Fatalf("error opening diskmap %q", err) + } - for k, v := range data { - val, err := r.Read([]byte(k)) - if err != nil { - t.Errorf("a key/value pair was lost: %q", err) - continue - } + for k, v := range data { + val, err := r.Read([]byte(k)) + if err != nil { + t.Errorf("a key/value pair was lost: %q", err) + continue + } - if bytes.Compare(val, v) != 0 { - t.Errorf("a value was not correctly stored") - } - } + if bytes.Compare(val, v) != 0 { + t.Errorf("a value was not correctly stored") + } + } - if _, err := r.Read([]byte("helloworld")); err == nil { - t.Errorf("a non-existant key passed to Read() did not return an error") - } + if _, err := r.Read([]byte("helloworld")); err == nil { + t.Errorf("a non-existant key passed to Read() did not return an error") + } } func nextSuffix() string { - r := uint32(time.Now().UnixNano() + int64(os.Getpid())) + r := uint32(time.Now().UnixNano() + int64(os.Getpid())) - r = r*1664525 + 1013904223 // constants from Numerical Recipes - return strconv.Itoa(int(1e9 + r%1e9))[1:] + r = r*1664525 + 1013904223 // constants from Numerical Recipes + return strconv.Itoa(int(1e9 + r%1e9))[1:] }