diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index ebe163b94..3f4add853 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -778,11 +778,10 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { RemotePath: fullRemotePath, CustomMeta: option.CustomMeta, } - // numBlocks := option.NumBlocks - // if numBlocks <= 1 { - // numBlocks = 100 - // } - numBlocks := 60 + numBlocks := option.NumBlocks + if numBlocks <= 1 { + numBlocks = 100 + } options := []sdk.ChunkedUploadOption{ sdk.WithThumbnail(option.ThumbnailBytes.Buffer), diff --git a/zboxcore/sdk/chunked_upload_model.go b/zboxcore/sdk/chunked_upload_model.go index 356272ee1..0eb5b9724 100644 --- a/zboxcore/sdk/chunked_upload_model.go +++ b/zboxcore/sdk/chunked_upload_model.go @@ -91,7 +91,7 @@ type ChunkedUpload struct { //used in wasm check chunked_upload_process_js.go listenChan chan struct{} //nolint:unused //used in wasm check chunked_upload_process_js.go - processMap map[int]int //nolint:unused + processMap map[int]zboxutil.Uint128 //nolint:unused //used in wasm check chunked_upload_process_js.go processMapLock sync.Mutex //nolint:unused } diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index 6c4022056..de135b572 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -151,6 +151,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, var thumbnailChunkData []byte worker := jsbridge.GetWorker(blobber.blobber.ID) if worker == nil { + logger.Logger.Error("worker not found for blobber: ", blobber.blobber.Baseurl) continue } if len(thumbnailShards) > 0 { @@ -192,6 +193,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, err = worker.PostMessage(safejs.Safe(obj), []safejs.Value{safejs.Safe(fileshardUint8.Get("buffer"))}) if err == nil { successCount++ + } else { + logger.Logger.Error("error posting message to worker: ", err) + su.uploadMask = su.uploadMask.And(zboxutil.NewUint128(1).Lsh(pos).Not()) } if isFinal { blobber.fileRef.ChunkSize = su.chunkSize @@ -300,7 +304,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er //get error message //get final result var err error - isFinal, err = su.processWebWorkerUpload(data, blobber) + isFinal, err = su.processWebWorkerUpload(data, blobber, pos) if err != nil { errC := atomic.AddInt32(&errCount, 1) if errC >= int32(su.consensus.consensusThresh) { @@ -308,13 +312,11 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er } } else { uploadSuccess = true - su.consensus.Done() } return default: logger.Logger.Error("unknown msg type: ", msgType) } - return } }(pos) @@ -335,9 +337,9 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er su.ctxCncl(err) respChan <- err } - for chunkEndIndex, count := range su.processMap { - if count >= su.consensus.consensusThresh { - su.updateProgress(chunkEndIndex, su.uploadMask) + for chunkEndIndex, mask := range su.processMap { + if mask.CountOnes() >= su.consensus.consensusThresh { + su.updateProgress(chunkEndIndex, mask) delete(su.processMap, chunkEndIndex) } } @@ -349,7 +351,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er } } -func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber) (bool, error) { +func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber, pos uint64) (bool, error) { var isFinal bool success, err := data.Get("success") if err != nil { @@ -368,7 +370,7 @@ func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *Chu chunkEndIndexObj, _ := data.Get("chunkEndIndex") chunkEndIndex, _ := chunkEndIndexObj.Int() - su.updateChunkProgress(chunkEndIndex) + su.updateChunkProgress(chunkEndIndex, pos) finalRequestObject, _ := data.Get("isFinal") finalRequest, _ := finalRequestObject.Bool() if finalRequest { @@ -693,7 +695,7 @@ type eventChanWorker struct { func (su *ChunkedUpload) startProcessor() { su.listenChan = make(chan struct{}, su.uploadWorkers) - su.processMap = make(map[int]int) + su.processMap = make(map[int]zboxutil.Uint128) su.uploadWG.Add(1) go func() { respChan := make(chan error, 1) @@ -734,8 +736,8 @@ func (su *ChunkedUpload) startProcessor() { }() } -func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int) { +func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int, pos uint64) { su.processMapLock.Lock() - su.processMap[chunkEndIndex] += 1 + su.processMap[chunkEndIndex] = su.processMap[chunkEndIndex].Or(zboxutil.NewUint128(1).Lsh(pos)) su.processMapLock.Unlock() }