Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no file size limit on upload #185

Merged
merged 3 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions pkg/cafs/cafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"
"log"
"sync"

lru "github.com/hashicorp/golang-lru"

Expand Down Expand Up @@ -152,21 +151,20 @@ func (d *defaultFs) GetAt(ctx context.Context, hash Key) (io.ReaderAt, error) {
}

func (d *defaultFs) writer(prefix string) Writer {
return &fsWriter{
store: d.store.backend,
leafSize: d.leafSize,
leafs: nil,
buf: make([]byte, d.leafSize),
offset: 0,
flushed: 0,
pather: nil,
prefix: prefix,
count: 0,
flushChan: make(chan blobFlush, sizeOfFlushChan),
errC: make(chan error, sizeOfFlushChan),
maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut),
wg: sync.WaitGroup{},
w := &fsWriter{
store: d.store.backend,
leafSize: d.leafSize,
buf: make([]byte, d.leafSize),
prefix: prefix,
flushChan: make(chan blobFlush),
errC: make(chan error),
flushThreadDoneChan: make(chan struct{}),
maxGoRoutines: make(chan struct{}, maxGoRoutinesPerPut),
blobFlushes: make([]blobFlush, 0),
errors: make([]error, 0),
}
go w.flushThread()
return w
}

func (d *defaultFs) Delete(ctx context.Context, hash Key) error {
Expand Down
87 changes: 45 additions & 42 deletions pkg/cafs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"hash/crc32"
"io"
"sync"
"sync/atomic"

"github.com/oneconcern/datamon/pkg/storage"
Expand All @@ -16,8 +15,6 @@ import (

const (
maxGoRoutinesPerPut = 10
maxCAFSFileSize = 1024 * 1024 * 1024 * 300 // 300 Gig
sizeOfFlushChan = maxCAFSFileSize / DefaultLeafSize // https://github.com/oneconcern/datamon/issues/171
)

// Writer interface for a content addressable FS
Expand All @@ -27,19 +24,21 @@ type Writer interface {
}

type fsWriter struct {
store storage.Store // CAFS backing store
prefix string // Prefix for store paths
leafSize uint32 // Size of chunks
leafs []Key // List of keys backing a file
buf []byte // Buffer stage a chunk == leafsize
offset int // till where buffer is used
flushed uint32 // writer has been flushed to store
pather func(string) string // pathing logic
count uint64 // total number of parallel writes
flushChan chan blobFlush // channel for parallel writes
errC chan error // channel for errors during parallel writes
maxGoRoutines chan struct{} // Max number of concurrent writes
wg sync.WaitGroup // Sync
store storage.Store // CAFS backing store
prefix string // Prefix for store paths
leafSize uint32 // Size of chunks
leafs []Key // List of keys backing a file
buf []byte // Buffer stage a chunk == leafsize
offset int // till where buffer is used
flushed uint32 // writer has been flushed to store
pather func(string) string // pathing logic
count uint64 // total number of parallel writes
flushChan chan blobFlush // channel for parallel writes
errC chan error // channel for errors during parallel writes
flushThreadDoneChan chan struct{}
maxGoRoutines chan struct{} // Max number of concurrent writes
blobFlushes []blobFlush
errors []error
}

func (w *fsWriter) Write(p []byte) (n int, err error) {
Expand All @@ -57,7 +56,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) {
w.offset += c
written += c
if w.offset == len(w.buf) { // sizes line up, flush and continue
w.wg.Add(1)
w.count++ // next leaf
w.maxGoRoutines <- struct{}{}
go pFlush(
Expand All @@ -71,7 +69,6 @@ func (w *fsWriter) Write(p []byte) (n int, err error) {
w.maxGoRoutines,
w.pather,
w.store,
&w.wg,
)
w.buf = make([]byte, w.leafSize) // new buffer
w.offset = 0 // new offset for new buffer
Expand All @@ -96,13 +93,10 @@ func pFlush(
maxGoRoutines chan struct{},
pather func(string) string,
destination storage.Store,
wg *sync.WaitGroup,
) {
done := func() {
wg.Done()
defer func() {
<-maxGoRoutines
}
defer done()
}()
// Calculate hash value
hasher, err := blake2b.New(&blake2b.Config{
Size: blake2b.Size,
Expand Down Expand Up @@ -217,28 +211,37 @@ func (w *fsWriter) flush(isLastNode bool) (int, error) {
return n, nil
}

func (w *fsWriter) Flush() (Key, []byte, error) {
w.leafs = make([]Key, w.count)
if w.count > 0 {
w.wg.Wait()
for {
select {
case bf := <-w.flushChan:
w.count--
w.leafs[bf.count-1] = bf.key
if w.count == 0 {
break
}
case err := <-w.errC:
return Key{}, nil, err

default:
}
if w.count == 0 {
break
func (w *fsWriter) flushThread() {
var err error
var bf blobFlush
notDone := true
for notDone {
select {
case bf, notDone = <-w.flushChan:
if notDone {
w.blobFlushes = append(w.blobFlushes, bf)
}
case err = <-w.errC:
w.errors = append(w.errors, err)
}
}
w.flushThreadDoneChan <- struct{}{}
}

// don't Write() during Flush()
func (w *fsWriter) Flush() (Key, []byte, error) {
for i := 0; i < cap(w.maxGoRoutines); i++ {
w.maxGoRoutines <- struct{}{}
}
close(w.flushChan)
<-w.flushThreadDoneChan
if len(w.errors) != 0 {
return Key{}, nil, w.errors[0]
}
w.leafs = make([]Key, len(w.blobFlushes))
for _, bf := range w.blobFlushes {
w.leafs[bf.count-1] = bf.key
}
atomic.StoreUint32(&w.flushed, 1)

_, err := w.flush(true)
Expand Down