diff --git a/pkg/cafs/cafs.go b/pkg/cafs/cafs.go index 8cfd4936..42cc4722 100644 --- a/pkg/cafs/cafs.go +++ b/pkg/cafs/cafs.go @@ -4,7 +4,6 @@ import ( "context" "io" "log" - "sync" lru "github.com/hashicorp/golang-lru" @@ -152,21 +151,20 @@ func (d *defaultFs) GetAt(ctx context.Context, hash Key) (io.ReaderAt, error) { } func (d *defaultFs) writer(prefix string) Writer { - return &fsWriter{ - store: d.store.backend, - leafSize: d.leafSize, - leafs: nil, - buf: make([]byte, d.leafSize), - offset: 0, - flushed: 0, - pather: nil, - prefix: prefix, - count: 0, - flushChan: make(chan blobFlush, sizeOfFlushChan), - errC: make(chan error, sizeOfFlushChan), - maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut), - wg: sync.WaitGroup{}, + w := &fsWriter{ + store: d.store.backend, + leafSize: d.leafSize, + buf: make([]byte, d.leafSize), + prefix: prefix, + flushChan: make(chan blobFlush), + errC: make(chan error), + flushThreadDoneChan: make(chan struct{}), + maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut), + blobFlushes: make([]blobFlush, 0), + errors: make([]error, 0), } + go w.flushThread() + return w } func (d *defaultFs) Delete(ctx context.Context, hash Key) error { diff --git a/pkg/cafs/writer.go b/pkg/cafs/writer.go index f88af555..15ab0463 100644 --- a/pkg/cafs/writer.go +++ b/pkg/cafs/writer.go @@ -6,7 +6,6 @@ import ( "fmt" "hash/crc32" "io" - "sync" "sync/atomic" "github.com/oneconcern/datamon/pkg/storage" @@ -16,8 +15,6 @@ import ( const ( maxGoRoutinesPerPut = 10 - maxCAFSFileSize = 1024 * 1024 * 1024 * 300 // 300 Gig - sizeOfFlushChan = maxCAFSFileSize / DefaultLeafSize // https://github.com/oneconcern/datamon/issues/171 ) // Writer interface for a content addressable FS @@ -27,19 +24,21 @@ type Writer interface { } type fsWriter struct { - store storage.Store // CAFS backing store - prefix string // Prefix for store paths - leafSize uint32 // Size of chunks - leafs []Key // List of keys backing a file - buf []byte // Buffer stage a chunk == leafsize - offset int // till where buffer is used - flushed uint32 // writer has been flushed to store - pather func(string) string // pathing logic - count uint64 // total number of parallel writes - flushChan chan blobFlush // channel for parallel writes - errC chan error // channel for errors during parallel writes - maxGoRoutines chan struct{} // Max number of concurrent writes - wg sync.WaitGroup // Sync + store storage.Store // CAFS backing store + prefix string // Prefix for store paths + leafSize uint32 // Size of chunks + leafs []Key // List of keys backing a file + buf []byte // Buffer stage a chunk == leafsize + offset int // till where buffer is used + flushed uint32 // writer has been flushed to store + pather func(string) string // pathing logic + count uint64 // total number of parallel writes + flushChan chan blobFlush // channel for parallel writes + errC chan error // channel for errors during parallel writes + flushThreadDoneChan chan struct{} + maxGoRoutines chan struct{} // Max number of concurrent writes + blobFlushes []blobFlush + errors []error } func (w *fsWriter) Write(p []byte) (n int, err error) { @@ -57,7 +56,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) { w.offset += c written += c if w.offset == len(w.buf) { // sizes line up, flush and continue - w.wg.Add(1) w.count++ // next leaf w.maxGoRoutines <- struct{}{} go pFlush( @@ -71,7 +69,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) { w.maxGoRoutines, w.pather, w.store, - &w.wg, ) w.buf = make([]byte, w.leafSize) // new buffer w.offset = 0 // new offset for new buffer @@ -96,13 +93,10 @@ func pFlush( maxGoRoutines chan struct{}, pather func(string) string, destination storage.Store, - wg *sync.WaitGroup, ) { - done := func() { - wg.Done() + defer func() { <-maxGoRoutines - } - defer done() + }() // Calculate hash value hasher, err := blake2b.New(&blake2b.Config{ Size: blake2b.Size, @@ -217,28 +211,37 @@ func (w *fsWriter) flush(isLastNode bool) (int, error) { return n, nil } -func (w *fsWriter) Flush() (Key, []byte, error) { - w.leafs = make([]Key, w.count) - if w.count > 0 { - w.wg.Wait() - for { - select { - case bf := <-w.flushChan: - w.count-- - w.leafs[bf.count-1] = bf.key - if w.count == 0 { - break - } - case err := <-w.errC: - return Key{}, nil, err - - default: - } - if w.count == 0 { - break +func (w *fsWriter) flushThread() { + var err error + var bf blobFlush + notDone := true + for notDone { + select { + case bf, notDone = <-w.flushChan: + if notDone { + w.blobFlushes = append(w.blobFlushes, bf) } + case err = <-w.errC: + w.errors = append(w.errors, err) } } + w.flushThreadDoneChan <- struct{}{} +} + +// don't Write() during Flush() +func (w *fsWriter) Flush() (Key, []byte, error) { + for i := 0; i < cap(w.maxGoRoutines); i++ { + w.maxGoRoutines <- struct{}{} + } + close(w.flushChan) + <-w.flushThreadDoneChan + if len(w.errors) != 0 { + return Key{}, nil, w.errors[0] + } + w.leafs = make([]Key, len(w.blobFlushes)) + for _, bf := range w.blobFlushes { + w.leafs[bf.count-1] = bf.key + } atomic.StoreUint32(&w.flushed, 1) _, err := w.flush(true)