Skip to content

Commit

Permalink
remove maximum cafs file size limit
Browse files Browse the repository at this point in the history
Signed-off-by: Ransom Williams <[email protected]>
  • Loading branch information
ransomw1c committed May 30, 2019
1 parent 5b1e218 commit 9dce415
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 57 deletions.
28 changes: 13 additions & 15 deletions pkg/cafs/cafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"
"log"
"sync"

lru "github.com/hashicorp/golang-lru"

Expand Down Expand Up @@ -152,21 +151,20 @@ func (d *defaultFs) GetAt(ctx context.Context, hash Key) (io.ReaderAt, error) {
}

func (d *defaultFs) writer(prefix string) Writer {
return &fsWriter{
store: d.store.backend,
leafSize: d.leafSize,
leafs: nil,
buf: make([]byte, d.leafSize),
offset: 0,
flushed: 0,
pather: nil,
prefix: prefix,
count: 0,
flushChan: make(chan blobFlush, sizeOfFlushChan),
errC: make(chan error, sizeOfFlushChan),
maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut),
wg: sync.WaitGroup{},
w := &fsWriter{
store: d.store.backend,
leafSize: d.leafSize,
buf: make([]byte, d.leafSize),
prefix: prefix,
flushChan: make(chan blobFlush),
errC: make(chan error),
flushThreadDoneChan: make(chan struct{}),
maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut),
blobFlushes: make([]blobFlush, 0),
errors: make([]error, 0),
}
go w.flushThread()
return w
}

func (d *defaultFs) Delete(ctx context.Context, hash Key) error {
Expand Down
87 changes: 45 additions & 42 deletions pkg/cafs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"hash/crc32"
"io"
"sync"
"sync/atomic"

"github.com/oneconcern/datamon/pkg/storage"
Expand All @@ -16,8 +15,6 @@ import (

const (
maxGoRoutinesPerPut = 10
maxCAFSFileSize = 1024 * 1024 * 1024 * 300 // 300 Gig
sizeOfFlushChan = maxCAFSFileSize / DefaultLeafSize // https://github.com/oneconcern/datamon/issues/171
)

// Writer interface for a content addressable FS
Expand All @@ -27,19 +24,21 @@ type Writer interface {
}

type fsWriter struct {
store storage.Store // CAFS backing store
prefix string // Prefix for store paths
leafSize uint32 // Size of chunks
leafs []Key // List of keys backing a file
buf []byte // Buffer stage a chunk == leafsize
offset int // till where buffer is used
flushed uint32 // writer has been flushed to store
pather func(string) string // pathing logic
count uint64 // total number of parallel writes
flushChan chan blobFlush // channel for parallel writes
errC chan error // channel for errors during parallel writes
maxGoRoutines chan struct{} // Max number of concurrent writes
wg sync.WaitGroup // Sync
store storage.Store // CAFS backing store
prefix string // Prefix for store paths
leafSize uint32 // Size of chunks
leafs []Key // List of keys backing a file
buf []byte // Buffer stage a chunk == leafsize
offset int // till where buffer is used
flushed uint32 // writer has been flushed to store
pather func(string) string // pathing logic
count uint64 // total number of parallel writes
flushChan chan blobFlush // channel for parallel writes
errC chan error // channel for errors during parallel writes
flushThreadDoneChan chan struct{}
maxGoRoutines chan struct{} // Max number of concurrent writes
blobFlushes []blobFlush
errors []error
}

func (w *fsWriter) Write(p []byte) (n int, err error) {
Expand All @@ -57,7 +56,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) {
w.offset += c
written += c
if w.offset == len(w.buf) { // sizes line up, flush and continue
w.wg.Add(1)
w.count++ // next leaf
w.maxGoRoutines <- struct{}{}
go pFlush(
Expand All @@ -71,7 +69,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) {
w.maxGoRoutines,
w.pather,
w.store,
&w.wg,
)
w.buf = make([]byte, w.leafSize) // new buffer
w.offset = 0 // new offset for new buffer
Expand All @@ -96,13 +93,10 @@ func pFlush(
maxGoRoutines chan struct{},
pather func(string) string,
destination storage.Store,
wg *sync.WaitGroup,
) {
done := func() {
wg.Done()
defer func() {
<-maxGoRoutines
}
defer done()
}()
// Calculate hash value
hasher, err := blake2b.New(&blake2b.Config{
Size: blake2b.Size,
Expand Down Expand Up @@ -217,28 +211,37 @@ func (w *fsWriter) flush(isLastNode bool) (int, error) {
return n, nil
}

func (w *fsWriter) Flush() (Key, []byte, error) {
w.leafs = make([]Key, w.count)
if w.count > 0 {
w.wg.Wait()
for {
select {
case bf := <-w.flushChan:
w.count--
w.leafs[bf.count-1] = bf.key
if w.count == 0 {
break
}
case err := <-w.errC:
return Key{}, nil, err

default:
}
if w.count == 0 {
break
func (w *fsWriter) flushThread() {
var err error
var bf blobFlush
notDone := true
for notDone {
select {
case bf, notDone = <-w.flushChan:
if notDone {
w.blobFlushes = append(w.blobFlushes, bf)
}
case err = <-w.errC:
w.errors = append(w.errors, err)
}
}
w.flushThreadDoneChan <- struct{}{}
}

// don't Write() during Flush()
func (w *fsWriter) Flush() (Key, []byte, error) {
for i := 0; i < cap(w.maxGoRoutines); i++ {
w.maxGoRoutines <- struct{}{}
}
close(w.flushChan)
<-w.flushThreadDoneChan
if len(w.errors) != 0 {
return Key{}, nil, w.errors[0]
}
w.leafs = make([]Key, len(w.blobFlushes))
for _, bf := range w.blobFlushes {
w.leafs[bf.count-1] = bf.key
}
atomic.StoreUint32(&w.flushed, 1)

_, err := w.flush(true)
Expand Down

0 comments on commit 9dce415

Please sign in to comment.