Skip to content

Commit

Permalink
unbuffered channels in bundle_pack.go
Browse files Browse the repository at this point in the history
Signed-off-by: Ransom Williams <[email protected]>
  • Loading branch information
ransomw1c committed Jun 24, 2019
1 parent 53f301b commit 2d45337
Showing 1 changed file with 140 additions and 73 deletions.
213 changes: 140 additions & 73 deletions pkg/core/bundle_pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ type filePacked struct {
duplicate bool
}

func filePacked2BundleEntry(packedFile filePacked) model.BundleEntry {
return model.BundleEntry{
Hash: packedFile.hash,
NameWithPath: packedFile.name,
FileMode: 0, // #TODO: #35 file mode support
Size: packedFile.size,
}
}

type uploadBundleChans struct {
// recv data from goroutines about uploaded files
filePacked chan<- filePacked
error chan<- errorHit
// broadcast to all goroutines not to block by closing this channel
done <-chan struct{}
// signal file upload goroutines done by writing to this channel
doneOk chan<- struct{}
concurrencyControl <-chan struct{}
}

func uploadBundleEntriesFileList(ctx context.Context, bundle *Bundle, fileList []model.BundleEntry) error {
buffer, err := yaml.Marshal(model.BundleEntries{
BundleEntries: fileList,
Expand Down Expand Up @@ -79,6 +99,89 @@ func (b *Bundle) skipFile(file string) bool {
return model.IsGeneratedFile(file) || (b.SkipOnError && !exist)
}

func uploadBundleFile(
ctx context.Context,
file string,
cafsArchive cafs.Fs,
fileReader io.Reader,
chans uploadBundleChans) {

defer func() {
<-chans.concurrencyControl
}()
written, key, keys, duplicate, e := cafsArchive.Put(ctx, fileReader)
if e != nil {
select {
case chans.error <- errorHit{
error: e,
file: file,
}:
case <-chans.done:
}
return
}

select {
case chans.filePacked <- filePacked{
hash: key.String(),
keys: keys,
name: file,
size: uint64(written),
duplicate: duplicate,
}:
case <-chans.done:
}
}

func uploadBundleFiles(
ctx context.Context,
bundle *Bundle,
files []string,
cafsArchive cafs.Fs,
chans uploadBundleChans) {
concurrencyControl := make(chan struct{}, bundle.concurrentFileUploads)
chans.concurrencyControl = concurrencyControl
for _, file := range files {
// Check to see if the file is to be skipped.
if bundle.skipFile(file) {
bundle.l.Info("skipping file",
zap.String("file", file),
zap.String("repo", bundle.RepoID),
zap.String("bundleID", bundle.BundleID),
)
continue
}
fileReader, err := bundle.ConsumableStore.Get(ctx, file)
if err != nil {
if bundle.SkipOnError {
bundle.l.Info("skipping file",
zap.String("file", file),
zap.String("repo", bundle.RepoID),
zap.String("bundleID", bundle.BundleID),
zap.Error(err),
)
continue
}
select {
case chans.error <- errorHit{
error: err,
file: file,
}:
case <-chans.done:
}
}
concurrencyControl <- struct{}{}
go uploadBundleFile(ctx, file, cafsArchive, fileReader, chans)
}
/* once the buffered channel semaphore is filled with sentinel entries,
* all `uploadBundleFile` goroutines have exited.
*/
for i := 0; i < cap(concurrencyControl); i++ {
concurrencyControl <- struct{}{}
}
chans.doneOk <- struct{}{}
}

func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint, getKeys func() ([]string, error)) error {
// Walk the entire tree
// TODO: #53 handle large file count
Expand All @@ -100,66 +203,25 @@ func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint
return err
}

fileList := make([]model.BundleEntry, 0)
var firstUploadBundleEntryIndex uint
// Upload the files and the bundle list
err = bundle.InitializeBundleID()
if err != nil {
return err
}

fC := make(chan filePacked, len(files))
eC := make(chan errorHit, len(files))
cC := make(chan struct{}, bundle.concurrentFileUploads)
var count int64
for _, file := range files {
// Check to see if the file is to be skipped.
if bundle.skipFile(file) {
bundle.l.Info("skipping file",
zap.String("file", file),
zap.String("repo", bundle.RepoID),
zap.String("bundleID", bundle.BundleID),
)
continue
}
filePackedC := make(chan filePacked)
errorC := make(chan errorHit)
doneC := make(chan struct{})
doneOkC := make(chan struct{})
defer close(doneC)

var fileReader io.ReadCloser
fileReader, err = bundle.ConsumableStore.Get(ctx, file)
if err != nil && bundle.SkipOnError {
bundle.l.Info("skipping file",
zap.String("file", file),
zap.String("repo", bundle.RepoID),
zap.String("bundleID", bundle.BundleID),
zap.Error(err),
)
continue
} else if err != nil {
return err
}
count++
cC <- struct{}{}
go func(file string) {
defer func() {
<-cC
}()
written, key, keys, duplicate, e := cafsArchive.Put(ctx, fileReader)
if e != nil {
eC <- errorHit{
error: e,
file: file,
}
return
}
go uploadBundleFiles(ctx, bundle, files, cafsArchive, uploadBundleChans{
filePacked: filePackedC,
error: errorC,
done: doneC,
doneOk: doneOkC,
})

fC <- filePacked{
hash: key.String(),
keys: keys,
name: file,
size: uint64(written),
duplicate: duplicate,
}
}(file)
}
if MemProfDir != "" {
var f *os.File
path := filepath.Join(MemProfDir, "upload_bundle.mem.prof")
Expand All @@ -173,31 +235,36 @@ func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint
}
f.Close()
}
for count > 0 {
filePackedList := make([]filePacked, 0, len(files))
for {
var gotDoneSignal bool
select {
case f := <-fC:
case f := <-filePackedC:
log.Printf("Uploaded file:%s, duplicate:%t, key:%s, keys:%d", f.name, f.duplicate, f.hash, len(f.keys))

count--

fileList = append(fileList, model.BundleEntry{
Hash: f.hash,
NameWithPath: f.name,
FileMode: 0, // #TODO: #35 file mode support
Size: f.size})

// Write the bundle entry file if reached max or the last one
if count == 0 || uint(len(fileList))%bundleEntriesPerFile == 0 {
err = uploadBundleEntriesFileList(ctx, bundle, fileList[firstUploadBundleEntryIndex:])
if err != nil {
fmt.Printf("Bundle upload failed. Failed to upload bundle entries list %v", err)
return err
}
firstUploadBundleEntryIndex = uint(len(fileList))
}
case e := <-eC:
filePackedList = append(filePackedList, f)
case e := <-errorC:
fmt.Printf("Bundle upload failed. Failed to upload file %s err: %s", e.file, e.error)
return e.error
case <-doneOkC:
gotDoneSignal = true
}
if gotDoneSignal {
break
}
}
fileList := make([]model.BundleEntry, 0, bundleEntriesPerFile)
for packedFileIdx, packedFile := range filePackedList {
fileList = append(fileList, filePacked2BundleEntry(packedFile))
// Write the bundle entry file if reached max or the last one
if packedFileIdx == len(filePackedList)-1 || len(fileList) == int(bundleEntriesPerFile) {
err = uploadBundleEntriesFileList(ctx, bundle, fileList)
if err != nil {
bundle.l.Error("Bundle upload failed. Failed to upload bundle entries list.",
zap.Error(err),
)
return err
}
fileList = fileList[:0]
}
}
err = uploadBundleDescriptor(ctx, bundle)
Expand Down

0 comments on commit 2d45337

Please sign in to comment.