Skip to content

Commit

Permalink
fix upload wg
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Jan 22, 2024
1 parent 15783a0 commit 0b23592
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 11 deletions.
13 changes: 8 additions & 5 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func CreateChunkedUpload(

su.isRepair = isRepair
su.uploadChan = make(chan UploadData, 10)
su.uploadWG = sizedwaitgroup.New(5)
su.uploadWG.Add(1)
go su.uploadProcessor()

return su, nil
Expand Down Expand Up @@ -393,7 +393,6 @@ func (su *ChunkedUpload) process() error {
}
alreadyUploadedData := 0
defer su.chunkReader.Close()
defer close(su.uploadChan)
defer su.ctxCncl(nil)
for {

Expand Down Expand Up @@ -652,6 +651,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
}

if isFinal {
close(su.uploadChan)
su.uploadWG.Wait()
select {
case <-su.ctx.Done():
Expand Down Expand Up @@ -739,18 +739,21 @@ func getShardSize(dataSize int64, dataShards int, isEncrypted bool) int64 {
}

func (su *ChunkedUpload) uploadProcessor() {
defer su.uploadWG.Done()
swg := sizedwaitgroup.New(5)
for {
select {
case <-su.ctx.Done():
return
case uploadData, ok := <-su.uploadChan:
if !ok {
swg.Wait()
return
}
su.uploadWG.Add()
swg.Add()
go func() {
su.uploadToBlobbers(uploadData)
su.uploadWG.Done()
swg.Done()
}()
}
}
Expand Down Expand Up @@ -780,7 +783,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) {
wg.Add(1)
go func(pos uint64) {
defer wg.Done()
err := su.blobbers[pos].sendUploadRequest(ctx, su, uploadData.chunkEndIndex, uploadData.isFinal, su.encryptedKey, uploadData.uploadBody[pos].dataBuffers, uploadData.uploadBody[pos].formData, pos)
err := su.blobbers[pos].sendUploadRequest(ctx, su, uploadData.chunkEndIndex, uploadData.isFinal, su.encryptedKey, uploadData.uploadBody[pos].dataBuffers, uploadData.uploadBody[pos].formData, pos, &consensus)

if err != nil {
if strings.Contains(err.Error(), "duplicate") {
Expand Down
6 changes: 3 additions & 3 deletions zboxcore/sdk/chunked_upload_blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
chunkIndex int, isFinal bool,
encryptedKey string, dataBuffers []*bytes.Buffer,
formData ChunkedUploadFormMetadata,
pos uint64) (err error) {
pos uint64, consensus *Consensus) (err error) {

defer func() {

Expand All @@ -62,7 +62,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(

sb.fileRef.EncryptedKey = encryptedKey
sb.fileRef.CalculateHash()
su.consensus.Done()
consensus.Done()
}

return nil
Expand Down Expand Up @@ -160,7 +160,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest(
if err != nil {
return err
}
su.consensus.Done()
consensus.Done()

if formData.ThumbnailBytesLen > 0 {

Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_form_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (b *chunkedUploadFormBuilder) Build(
return nil, metadata, err
}

if i == numBodies-1 && isFinal {
if isFinal && i == numBodies-1 {
formData.IsFinal = true
}

Expand Down
3 changes: 1 addition & 2 deletions zboxcore/sdk/chunked_upload_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/google/uuid"
"github.com/klauspost/reedsolomon"
"github.com/remeh/sizedwaitgroup"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ type ChunkedUpload struct {
encryptedKeyPoint string
encryptedKey string
uploadChan chan UploadData
uploadWG sizedwaitgroup.SizedWaitGroup
uploadWG sync.WaitGroup
}

// FileMeta metadata of stream input/local
Expand Down

0 comments on commit 0b23592

Please sign in to comment.