From 2d45337ad83249b31e06238b7d0d3e617bfb9023 Mon Sep 17 00:00:00 2001 From: Ransom Williams Date: Wed, 19 Jun 2019 16:14:03 -0700 Subject: [PATCH] unbuffered channels in bundle_pack.go Signed-off-by: Ransom Williams --- pkg/core/bundle_pack.go | 213 ++++++++++++++++++++++++++-------------- 1 file changed, 140 insertions(+), 73 deletions(-) diff --git a/pkg/core/bundle_pack.go b/pkg/core/bundle_pack.go index acc72f5c..51a8a694 100644 --- a/pkg/core/bundle_pack.go +++ b/pkg/core/bundle_pack.go @@ -36,6 +36,26 @@ type filePacked struct { duplicate bool } +func filePacked2BundleEntry(packedFile filePacked) model.BundleEntry { + return model.BundleEntry{ + Hash: packedFile.hash, + NameWithPath: packedFile.name, + FileMode: 0, // #TODO: #35 file mode support + Size: packedFile.size, + } +} + +type uploadBundleChans struct { + // recv data from goroutines about uploaded files + filePacked chan<- filePacked + error chan<- errorHit + // broadcast to all goroutines not to block by closing this channel + done <-chan struct{} + // signal file upload goroutines done by writing to this channel + doneOk chan<- struct{} + concurrencyControl <-chan struct{} +} + func uploadBundleEntriesFileList(ctx context.Context, bundle *Bundle, fileList []model.BundleEntry) error { buffer, err := yaml.Marshal(model.BundleEntries{ BundleEntries: fileList, @@ -79,6 +99,89 @@ func (b *Bundle) skipFile(file string) bool { return model.IsGeneratedFile(file) || (b.SkipOnError && !exist) } +func uploadBundleFile( + ctx context.Context, + file string, + cafsArchive cafs.Fs, + fileReader io.Reader, + chans uploadBundleChans) { + + defer func() { + <-chans.concurrencyControl + }() + written, key, keys, duplicate, e := cafsArchive.Put(ctx, fileReader) + if e != nil { + select { + case chans.error <- errorHit{ + error: e, + file: file, + }: + case <-chans.done: + } + return + } + + select { + case chans.filePacked <- filePacked{ + hash: key.String(), + keys: keys, + name: file, + size: uint64(written), + duplicate: duplicate, + }: + case <-chans.done: + } +} + +func uploadBundleFiles( + ctx context.Context, + bundle *Bundle, + files []string, + cafsArchive cafs.Fs, + chans uploadBundleChans) { + concurrencyControl := make(chan struct{}, bundle.concurrentFileUploads) + chans.concurrencyControl = concurrencyControl + for _, file := range files { + // Check to see if the file is to be skipped. + if bundle.skipFile(file) { + bundle.l.Info("skipping file", + zap.String("file", file), + zap.String("repo", bundle.RepoID), + zap.String("bundleID", bundle.BundleID), + ) + continue + } + fileReader, err := bundle.ConsumableStore.Get(ctx, file) + if err != nil { + if bundle.SkipOnError { + bundle.l.Info("skipping file", + zap.String("file", file), + zap.String("repo", bundle.RepoID), + zap.String("bundleID", bundle.BundleID), + zap.Error(err), + ) + continue + } + select { + case chans.error <- errorHit{ + error: err, + file: file, + }: + case <-chans.done: + } + } + concurrencyControl <- struct{}{} + go uploadBundleFile(ctx, file, cafsArchive, fileReader, chans) + } + /* once the buffered channel semaphore is filled with sentinel entries, + * all `uploadBundleFile` goroutines have exited. + */ + for i := 0; i < cap(concurrencyControl); i++ { + concurrencyControl <- struct{}{} + } + chans.doneOk <- struct{}{} +} + func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint, getKeys func() ([]string, error)) error { // Walk the entire tree // TODO: #53 handle large file count @@ -100,66 +203,25 @@ func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint return err } - fileList := make([]model.BundleEntry, 0) - var firstUploadBundleEntryIndex uint // Upload the files and the bundle list err = bundle.InitializeBundleID() if err != nil { return err } - fC := make(chan filePacked, len(files)) - eC := make(chan errorHit, len(files)) - cC := make(chan struct{}, bundle.concurrentFileUploads) - var count int64 - for _, file := range files { - // Check to see if the file is to be skipped. - if bundle.skipFile(file) { - bundle.l.Info("skipping file", - zap.String("file", file), - zap.String("repo", bundle.RepoID), - zap.String("bundleID", bundle.BundleID), - ) - continue - } + filePackedC := make(chan filePacked) + errorC := make(chan errorHit) + doneC := make(chan struct{}) + doneOkC := make(chan struct{}) + defer close(doneC) - var fileReader io.ReadCloser - fileReader, err = bundle.ConsumableStore.Get(ctx, file) - if err != nil && bundle.SkipOnError { - bundle.l.Info("skipping file", - zap.String("file", file), - zap.String("repo", bundle.RepoID), - zap.String("bundleID", bundle.BundleID), - zap.Error(err), - ) - continue - } else if err != nil { - return err - } - count++ - cC <- struct{}{} - go func(file string) { - defer func() { - <-cC - }() - written, key, keys, duplicate, e := cafsArchive.Put(ctx, fileReader) - if e != nil { - eC <- errorHit{ - error: e, - file: file, - } - return - } + go uploadBundleFiles(ctx, bundle, files, cafsArchive, uploadBundleChans{ + filePacked: filePackedC, + error: errorC, + done: doneC, + doneOk: doneOkC, + }) - fC <- filePacked{ - hash: key.String(), - keys: keys, - name: file, - size: uint64(written), - duplicate: duplicate, - } - }(file) - } if MemProfDir != "" { var f *os.File path := filepath.Join(MemProfDir, "upload_bundle.mem.prof") @@ -173,31 +235,36 @@ func uploadBundle(ctx context.Context, bundle *Bundle, bundleEntriesPerFile uint } f.Close() } - for count > 0 { + filePackedList := make([]filePacked, 0, len(files)) + for { + var gotDoneSignal bool select { - case f := <-fC: + case f := <-filePackedC: log.Printf("Uploaded file:%s, duplicate:%t, key:%s, keys:%d", f.name, f.duplicate, f.hash, len(f.keys)) - - count-- - - fileList = append(fileList, model.BundleEntry{ - Hash: f.hash, - NameWithPath: f.name, - FileMode: 0, // #TODO: #35 file mode support - Size: f.size}) - - // Write the bundle entry file if reached max or the last one - if count == 0 || uint(len(fileList))%bundleEntriesPerFile == 0 { - err = uploadBundleEntriesFileList(ctx, bundle, fileList[firstUploadBundleEntryIndex:]) - if err != nil { - fmt.Printf("Bundle upload failed. Failed to upload bundle entries list %v", err) - return err - } - firstUploadBundleEntryIndex = uint(len(fileList)) - } - case e := <-eC: + filePackedList = append(filePackedList, f) + case e := <-errorC: fmt.Printf("Bundle upload failed. Failed to upload file %s err: %s", e.file, e.error) return e.error + case <-doneOkC: + gotDoneSignal = true + } + if gotDoneSignal { + break + } + } + fileList := make([]model.BundleEntry, 0, bundleEntriesPerFile) + for packedFileIdx, packedFile := range filePackedList { + fileList = append(fileList, filePacked2BundleEntry(packedFile)) + // Write the bundle entry file if reached max or the last one + if packedFileIdx == len(filePackedList)-1 || len(fileList) == int(bundleEntriesPerFile) { + err = uploadBundleEntriesFileList(ctx, bundle, fileList) + if err != nil { + bundle.l.Error("Bundle upload failed. Failed to upload bundle entries list.", + zap.Error(err), + ) + return err + } + fileList = fileList[:0] } } err = uploadBundleDescriptor(ctx, bundle)