Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consensus in upload #1628

Merged
merged 4 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,11 +778,10 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) {
RemotePath: fullRemotePath,
CustomMeta: option.CustomMeta,
}
// numBlocks := option.NumBlocks
// if numBlocks <= 1 {
// numBlocks = 100
// }
numBlocks := 60
numBlocks := option.NumBlocks
if numBlocks <= 1 {
numBlocks = 100
}

options := []sdk.ChunkedUploadOption{
sdk.WithThumbnail(option.ThumbnailBytes.Buffer),
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type ChunkedUpload struct {
//used in wasm check chunked_upload_process_js.go
listenChan chan struct{} //nolint:unused
//used in wasm check chunked_upload_process_js.go
processMap map[int]int //nolint:unused
processMap map[int]zboxutil.Uint128 //nolint:unused
//used in wasm check chunked_upload_process_js.go
processMapLock sync.Mutex //nolint:unused
}
Expand Down
24 changes: 13 additions & 11 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
var thumbnailChunkData []byte
worker := jsbridge.GetWorker(blobber.blobber.ID)
if worker == nil {
logger.Logger.Error("worker not found for blobber: ", blobber.blobber.Baseurl)
continue
}
if len(thumbnailShards) > 0 {
Expand Down Expand Up @@ -192,6 +193,9 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int,
err = worker.PostMessage(safejs.Safe(obj), []safejs.Value{safejs.Safe(fileshardUint8.Get("buffer"))})
if err == nil {
successCount++
} else {
logger.Logger.Error("error posting message to worker: ", err)
su.uploadMask = su.uploadMask.And(zboxutil.NewUint128(1).Lsh(pos).Not())
}
if isFinal {
blobber.fileRef.ChunkSize = su.chunkSize
Expand Down Expand Up @@ -300,21 +304,19 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
//get error message
//get final result
var err error
isFinal, err = su.processWebWorkerUpload(data, blobber)
isFinal, err = su.processWebWorkerUpload(data, blobber, pos)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
wgErrors <- err
}
} else {
uploadSuccess = true
su.consensus.Done()
}
return
default:
logger.Logger.Error("unknown msg type: ", msgType)
}
return
}

}(pos)
Expand All @@ -335,9 +337,9 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
su.ctxCncl(err)
respChan <- err
}
for chunkEndIndex, count := range su.processMap {
if count >= su.consensus.consensusThresh {
su.updateProgress(chunkEndIndex, su.uploadMask)
for chunkEndIndex, mask := range su.processMap {
if mask.CountOnes() >= su.consensus.consensusThresh {
su.updateProgress(chunkEndIndex, mask)
delete(su.processMap, chunkEndIndex)
}
}
Expand All @@ -349,7 +351,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
}
}

func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber) (bool, error) {
func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *ChunkedUploadBlobber, pos uint64) (bool, error) {
var isFinal bool
success, err := data.Get("success")
if err != nil {
Expand All @@ -368,7 +370,7 @@ func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *Chu

chunkEndIndexObj, _ := data.Get("chunkEndIndex")
chunkEndIndex, _ := chunkEndIndexObj.Int()
su.updateChunkProgress(chunkEndIndex)
su.updateChunkProgress(chunkEndIndex, pos)
finalRequestObject, _ := data.Get("isFinal")
finalRequest, _ := finalRequestObject.Bool()
if finalRequest {
Expand Down Expand Up @@ -693,7 +695,7 @@ type eventChanWorker struct {

func (su *ChunkedUpload) startProcessor() {
su.listenChan = make(chan struct{}, su.uploadWorkers)
su.processMap = make(map[int]int)
su.processMap = make(map[int]zboxutil.Uint128)
su.uploadWG.Add(1)
go func() {
respChan := make(chan error, 1)
Expand Down Expand Up @@ -734,8 +736,8 @@ func (su *ChunkedUpload) startProcessor() {
}()
}

func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int) {
func (su *ChunkedUpload) updateChunkProgress(chunkEndIndex int, pos uint64) {
su.processMapLock.Lock()
su.processMap[chunkEndIndex] += 1
su.processMap[chunkEndIndex] = su.processMap[chunkEndIndex].Or(zboxutil.NewUint128(1).Lsh(pos))
su.processMapLock.Unlock()
}
Loading