Skip to content

Commit

Permalink
Merge pull request #1628 from 0chain/hotfix/consensus-upload
Browse files Browse the repository at this point in the history
Fix consensus in upload
  • Loading branch information
dabasov authored Sep 29, 2024
2 parents 52d7773 + 6def636 commit 4cb7621
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
9 changes: 4 additions & 5 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,11 +778,10 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
RemotePath: fullRemotePath,
CustomMeta: option.CustomMeta,
}
// numBlocks := option.NumBlocks
// if numBlocks <= 1 {
// numBlocks = 100
// }
numBlocks := 60
numBlocks := option.NumBlocks
if numBlocks <= 1 {
numBlocks = 100
}

options := []sdk.ChunkedUploadOption{
sdk.WithThumbnail(option.ThumbnailBytes.Buffer),
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type ChunkedUpload struct {
//used in wasm check chunked_upload_process_js.go
listenChan chan struct{} //nolint:unused
//used in wasm check chunked_upload_process_js.go
processMap map[int]int //nolint:unused
processMap map[int]zboxutil.Uint128 //nolint:unused
//used in wasm check chunked_upload_process_js.go
processMapLock sync.Mutex //nolint:unused
}
Expand Down
24 changes: 13 additions & 11 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
var thumbnailChunkData []byte
worker := jsbridge.GetWorker(blobber.blobber.ID)
if worker == nil {
logger.Logger.Error("worker not found for blobber: ", blobber.blobber.Baseurl)
continue
}
if len(thumbnailShards) > 0 {
Expand Down Expand Up @@ -192,6 +193,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
err = worker.PostMessage(safejs.Safe(obj), []safejs.Value{safejs.Safe(fileshardUint8.Get("buffer"))})
if err == nil {
successCount++
} else {
logger.Logger.Error("error posting message to worker: ", err)
su.uploadMask = su.uploadMask.And(zboxutil.NewUint128(1).Lsh(pos).Not())
}
if isFinal {
blobber.fileRef.ChunkSize = su.chunkSize
Expand Down Expand Up @@ -300,21 +304,19 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
//get error message
//get final result
var err error
isFinal, err = su.processWebWorkerUpload(data, blobber)
isFinal, err = su.processWebWorkerUpload(data, blobber, pos)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
wgErrors <- err
}
} else {
uploadSuccess = true
su.consensus.Done()
}
return
default:
logger.Logger.Error("unknown msg type: ", msgType)
}
return
}

}(pos)
Expand All @@ -335,9 +337,9 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
su.ctxCncl(err)
respChan <- err
}
for chunkEndIndex, count := range su.processMap {
if count >= su.consensus.consensusThresh {
su.updateProgress(chunkEndIndex, su.uploadMask)
for chunkEndIndex, mask := range su.processMap {
if mask.CountOnes() >= su.consensus.consensusThresh {
su.updateProgress(chunkEndIndex, mask)
delete(su.processMap, chunkEndIndex)
}
}
Expand All @@ -349,7 +351,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
}
}

func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber) (bool, error) {
func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber, pos uint64) (bool, error) {
var isFinal bool
success, err := data.Get("success")
if err != nil {
Expand All @@ -368,7 +370,7 @@ func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *Chu

chunkEndIndexObj, _ := data.Get("chunkEndIndex")
chunkEndIndex, _ := chunkEndIndexObj.Int()
su.updateChunkProgress(chunkEndIndex)
su.updateChunkProgress(chunkEndIndex, pos)
finalRequestObject, _ := data.Get("isFinal")
finalRequest, _ := finalRequestObject.Bool()
if finalRequest {
Expand Down Expand Up @@ -693,7 +695,7 @@ type eventChanWorker struct {

func (su *ChunkedUpload) startProcessor() {
su.listenChan = make(chan struct{}, su.uploadWorkers)
su.processMap = make(map[int]int)
su.processMap = make(map[int]zboxutil.Uint128)
su.uploadWG.Add(1)
go func() {
respChan := make(chan error, 1)
Expand Down Expand Up @@ -734,8 +736,8 @@ func (su *ChunkedUpload) startProcessor() {
}()
}

func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int) {
func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int, pos uint64) {
su.processMapLock.Lock()
su.processMap[chunkEndIndex] += 1
su.processMap[chunkEndIndex] = su.processMap[chunkEndIndex].Or(zboxutil.NewUint128(1).Lsh(pos))
su.processMapLock.Unlock()
}

0 comments on commit 4cb7621

Please sign in to comment.