diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 4207a5eb3..ca50d7d8e 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -940,3 +940,54 @@ func getBlobbers(stakable bool) ([]*sdk.Blobber, error) { } return blobbs, err } + +func repairAllocation(allocationID string) error { + alloc, err := getAllocation(allocationID) + if err != nil { + return err + } + statusBar := sdk.NewRepairBar(allocationID) + err = alloc.RepairAlloc(statusBar) + if err != nil { + return err + } + statusBar.Wait() + return statusBar.CheckError() +} + +func checkAllocStatus(allocationID string) (string, error) { + alloc, err := getAllocation(allocationID) + if err != nil { + return "", err + } + status, blobberStatus, err := alloc.CheckAllocStatus() + var statusStr string + switch status { + case sdk.Repair: + statusStr = "repair" + case sdk.Broken: + statusStr = "broken" + default: + statusStr = "ok" + } + statusResult := CheckStatusResult{ + Status: statusStr, + Err: err, + BlobberStatus: blobberStatus, + } + statusBytes, err := json.Marshal(statusResult) + if err != nil { + return "", err + } + + return string(statusBytes), err +} + +func skipStatusCheck(allocationID string, checkStatus bool) error { + alloc, err := getAllocation(allocationID) + if err != nil { + return err + } + alloc.SetCheckStatus(checkStatus) + return nil +} diff --git a/wasmsdk/demo/index.html b/wasmsdk/demo/index.html index d25c4178d..5fb9dca82 100644 --- a/wasmsdk/demo/index.html +++ b/wasmsdk/demo/index.html @@ -171,10 +171,10 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates } const getWallet = () => { - const clientID = "b1d533fa60431a76014c4f94a7e8e19a3b1a7f34eebd4cacd29a8dd948b3844c" - const publicKey = "1f30a07b34146435cabc3244a1452fb8933f6982e0b33f384e5d25b9d6531e24e342003349990483f8481052a0748cbc72355d9cbda621ec914f7ed03c127791" - const privateKey = "fb98f46969be6921586e547b0f6f70c6b92d7823359f00bafa3900523910661a" - const mnemonic = "snake second property crush thrive monkey already lake fire sort cheap lake census adult this cloth panic filter taste punch pistol project rack obscure" + const clientID = "ab5b8ab19abe574d92238e0a5dc0c2abd53614cd12cab6b09576fab2a0f64a83" + const publicKey = "d430b1b33eab43bd09886e6125e246600e36e3f88d658d00bf836aa564251e2364bccbfb18f1cb1b5fa4d96ba602b59bc009e6e0223b6a8ebdafc14822b78d23" + const privateKey = "0c2cb85c8c33b3cc35dc12b4754e61ae9488bba857982c382caa461ecac19d19" + const mnemonic = "grace fiscal menu squeeze certain drum ostrich lunar ugly remember cousin observe oxygen brisk toward notable shoot cushion develop marble open aspect couch noise" return { clientID, publicKey, privateKey, mnemonic } @@ -211,7 +211,7 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates let network = query.get('network') if (!network || network == 'undefined') { - network = "dev.zus.network" + network = "mainnet.zus.network" } const blockWorker = 'https://' + network + '/dns'; @@ -441,7 +441,7 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates alert("please selection allocationID") return } - const { list = [] } = await goWasm.sdk.listObjects(allocationId, '/') + const { list = [] } = await goWasm.sdk.listObjects(allocationId, '/',-1,50) files = list || [] bindFiles() }) @@ -799,7 +799,7 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates } objects.push({ - remotePath: path, + //remotePath: path, downloadOp: 1, numBlocks: 0, downloadToDisk: true, @@ -807,7 +807,7 @@

please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates let stringifiedArray = JSON.stringify(objects); try { - const results = await goWasm.sdk.multiDownload(allocationId, stringifiedArray, '', '') + const results = await goWasm.sdk.multiDownload('', stringifiedArray, 'eyJjbGllbnRfaWQiOiIiLCJvd25lcl9pZCI6IjI2ZTIzMjFhZWMxZmEyZDY1NGQ1MDQ5OWY3ZjhmYWJhNjNkYWMxYTExYTQwZDU3NDJkNDAzMWJmMzEzMzAxMTYiLCJhbGxvY2F0aW9uX2lkIjoiMDAwMzAzOTA1MGI3ZDdiM2FlNmI3MGEwZTVjMWU4ZjRhOTkxNzc1YWJiOTQ2NjljMDg4YzczNzJlMzYwMzkyYiIsImZpbGVfcGF0aF9oYXNoIjoiYWEzODE0NTM2ZWI2OWQwNjU4ZWM0OTgyZmE3ZTIwM2I2ZGI2ZWExYmU4ZmMxODRiMWJhOTZhMTk3NmMwM2JlOCIsImFjdHVhbF9maWxlX2hhc2giOiIxMjUwMjJhZGRiZTIwZDNhOWUzYjcxZTA0NjUzZjY3YiIsImZpbGVfbmFtZSI6InVidW50dS0yMi4wNC40LWxpdmUtc2VydmVyLWFtZDY0LmlzbyIsInJlZmVyZW5jZV90eXBlIjoiZiIsImV4cGlyYXRpb24iOjAsInRpbWVzdGFtcCI6MTcxNjM3ODIxNiwiZW5jcnlwdGVkIjpmYWxzZSwic2lnbmF0dXJlIjoiYmEzNzQ1NzlmZTczZDc1MWIwMTNiMjM2NjUzZDRiMGYyYzNjZDJlYTMyNTFkODg0MmRiNWQxNTlhNjBiN2ExMiJ9', '') console.log(JSON.stringify(results)) } catch (e) { alert(e) diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index 83c0fc9ee..f705d2b24 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -189,6 +189,9 @@ func main() { "send": send, "cancelUpload": cancelUpload, "pauseUpload": pauseUpload, + "repairAllocation": repairAllocation, + "checkAllocStatus": checkAllocStatus, + "skipStatusCheck": skipStatusCheck, // player "play": play, diff --git a/wasmsdk/response.go b/wasmsdk/response.go index 0d4bf09dc..0f0c754b6 100644 --- a/wasmsdk/response.go +++ b/wasmsdk/response.go @@ -1,5 +1,7 @@ package main +import "github.com/0chain/gosdk/zboxcore/sdk" + type FileCommandResponse struct { CommandSuccess bool `json:"commandSuccess,omitempty"` Error string `json:"error,omitempty"` @@ -12,3 +14,9 @@ type DownloadCommandResponse struct { FileName string `json:"fileName,omitempty"` Url string `json:"url,omitempty"` } + +type CheckStatusResult struct { + Status string `json:"status"` + Err error `json:"error"` + BlobberStatus []sdk.BlobberStatus `json:"blobberStatus"` +} diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 50410c8f9..bcd2838f5 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -256,6 +256,10 @@ func SetWasm() { extraCount = 0 } +func (a *Allocation) SetCheckStatus(checkStatus bool) { + a.checkStatus = checkStatus +} + func getPriceRange(name string) (PriceRange, error) { conf, err := GetStorageSCConfig() if err != nil { diff --git a/zboxcore/sdk/copyworker.go b/zboxcore/sdk/copyworker.go index 7cb216ec6..546b28d60 100644 --- a/zboxcore/sdk/copyworker.go +++ b/zboxcore/sdk/copyworker.go @@ -219,7 +219,7 @@ func (req *CopyRequest) ProcessCopy() error { defer writeMarkerMutex.Unlock(req.ctx, req.copyMask, req.blobbers, time.Minute, req.connectionID) //nolint: errcheck //Check if the allocation is to be repaired or rolled back - status, err := req.allocationObj.CheckAllocStatus() + status, _, err := req.allocationObj.CheckAllocStatus() if err != nil { logger.Logger.Error("Error checking allocation status: ", err) return fmt.Errorf("Copy failed: %s", err.Error()) diff --git a/zboxcore/sdk/moveworker.go b/zboxcore/sdk/moveworker.go index 2380cd542..9e9e08dc8 100644 --- a/zboxcore/sdk/moveworker.go +++ b/zboxcore/sdk/moveworker.go @@ -214,7 +214,7 @@ func (req *MoveRequest) ProcessMove() error { } //Check if the allocation is to be repaired or rolled back - status, err := req.allocationObj.CheckAllocStatus() + status, _, err := req.allocationObj.CheckAllocStatus() if err != nil { logger.Logger.Error("Error checking allocation status: ", err) return fmt.Errorf("Move failed: %s", err.Error()) diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index 61a0691d3..246f1efcb 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -240,7 +240,7 @@ func (mo *MultiOperation) Process() error { start = time.Now() status := Commit if !mo.isRepair && !mo.allocationObj.checkStatus { - status, err = mo.allocationObj.CheckAllocStatus() + status, _, err = mo.allocationObj.CheckAllocStatus() if err != nil { logger.Logger.Error("Error checking allocation status", err) if singleClientMode { @@ -251,34 +251,15 @@ func (mo *MultiOperation) Process() error { return fmt.Errorf("Check allocation status failed: %s", err.Error()) } if status == Repair { - logger.Logger.Info("Repairing allocation") if singleClientMode { mo.allocationObj.commitMutex.Unlock() } else { writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck } - statusBar := NewRepairBar(mo.allocationObj.ID) - if statusBar == nil { - for _, op := range mo.operations { - op.Error(mo.allocationObj, 0, ErrRetryOperation) - } - return ErrRetryOperation - } - statusBar.wg.Add(1) - err = mo.allocationObj.RepairAlloc(statusBar) - if err != nil { - return err - } - statusBar.wg.Wait() - if statusBar.success { - l.Logger.Info("Repair success") - } else { - l.Logger.Error("Repair failed") - } for _, op := range mo.operations { - op.Error(mo.allocationObj, 0, ErrRetryOperation) + op.Error(mo.allocationObj, 0, ErrRepairRequired) } - return ErrRetryOperation + return ErrRepairRequired } } if singleClientMode { diff --git a/zboxcore/sdk/renameworker.go b/zboxcore/sdk/renameworker.go index c366e527f..42a4cfd6c 100644 --- a/zboxcore/sdk/renameworker.go +++ b/zboxcore/sdk/renameworker.go @@ -212,7 +212,7 @@ func (req *RenameRequest) ProcessRename() error { defer writeMarkerMutex.Unlock(req.ctx, req.renameMask, req.blobbers, time.Minute, req.connectionID) //nolint: errcheck //Check if the allocation is to be repaired or rolled back - status, err := req.allocationObj.CheckAllocStatus() + status, _, err := req.allocationObj.CheckAllocStatus() if err != nil { logger.Logger.Error("Error checking allocation status: ", err) return fmt.Errorf("rename failed: %s", err.Error()) diff --git a/zboxcore/sdk/repairCallback.go b/zboxcore/sdk/repairCallback.go index 306225fe1..3a62d3308 100644 --- a/zboxcore/sdk/repairCallback.go +++ b/zboxcore/sdk/repairCallback.go @@ -55,12 +55,25 @@ func NewRepairBar(allocID string) *StatusBar { if !mutMap[allocID].TryLock() { return nil } + wg := &sync.WaitGroup{} + wg.Add(1) return &StatusBar{ - wg: &sync.WaitGroup{}, + wg: wg, allocID: allocID, } } +func (s *StatusBar) Wait() { + s.wg.Wait() +} + +func (s *StatusBar) CheckError() error { + if !s.success { + return s.err + } + return nil +} + func mutUnlock(allocID string) { mapLock.Lock() mutMap[allocID].Unlock() diff --git a/zboxcore/sdk/rollback.go b/zboxcore/sdk/rollback.go index 7f5bf0e07..e68cc20bf 100644 --- a/zboxcore/sdk/rollback.go +++ b/zboxcore/sdk/rollback.go @@ -43,12 +43,21 @@ const ( Rollback ) -var ErrRetryOperation = errors.New("retry_operation") +var ( + ErrRetryOperation = errors.New("retry_operation") + ErrRepairRequired = errors.New("repair_required") +) type RollbackBlobber struct { blobber *blockchain.StorageNode commitResult *CommitResult lpm *LatestPrevWriteMarker + blobIndex int +} + +type BlobberStatus struct { + ID string + Status string } func GetWritemarker(allocID, allocTx, id, baseUrl string) (*LatestPrevWriteMarker, error) { @@ -242,23 +251,29 @@ func (rb *RollbackBlobber) processRollback(ctx context.Context, tx string) error return thrown.New("rolback_error", fmt.Sprint("Rollback failed")) } -func (a *Allocation) CheckAllocStatus() (AllocStatus, error) { +func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) { wg := &sync.WaitGroup{} markerChan := make(chan *RollbackBlobber, len(a.Blobbers)) var errCnt int32 var markerError error - for _, blobber := range a.Blobbers { + blobberRes := make([]BlobberStatus, len(a.Blobbers)) + for ind, blobber := range a.Blobbers { wg.Add(1) - go func(blobber *blockchain.StorageNode) { + go func(blobber *blockchain.StorageNode, ind int) { defer wg.Done() + blobStatus := BlobberStatus{ + ID: blobber.ID, + Status: "available", + } wr, err := GetWritemarker(a.ID, a.Tx, blobber.ID, blobber.Baseurl) if err != nil { atomic.AddInt32(&errCnt, 1) markerError = err l.Logger.Error("error during getWritemarker", zap.Error(err)) + blobStatus.Status = "unavailable" } if wr == nil { markerChan <- nil @@ -267,15 +282,17 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) { blobber: blobber, lpm: wr, commitResult: &CommitResult{}, + blobIndex: ind, } } - }(blobber) + blobberRes[ind] = blobStatus + }(blobber, ind) } wg.Wait() close(markerChan) if a.ParityShards > 0 && errCnt > int32(a.ParityShards) { - return Broken, common.NewError("check_alloc_status_failed", markerError.Error()) + return Broken, blobberRes, common.NewError("check_alloc_status_failed", markerError.Error()) } versionMap := make(map[string][]*RollbackBlobber) @@ -312,18 +329,20 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) { } if len(versionMap) < 2 { - return Commit, nil + return Commit, blobberRes, nil } req := a.DataShards if len(versionMap[latestVersion]) > req { - return Commit, nil + return Commit, blobberRes, nil } if len(versionMap[latestVersion]) >= req || len(versionMap[prevVersion]) >= req || len(versionMap) > 2 { - // TODO: Return Repair after refactoring the repair function - return Repair, nil + for _, rb := range versionMap[prevVersion] { + blobberRes[rb.blobIndex].Status = "repair" + } + return Repair, blobberRes, nil } else { l.Logger.Info("versionMapLen", zap.Int("versionMapLen", len(versionMap)), zap.Int("latestLen", len(versionMap[latestVersion])), zap.Int("prevLen", len(versionMap[prevVersion]))) } @@ -351,14 +370,14 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) { wg.Wait() if errCnt > int32(fullConsensus) { - return Broken, common.NewError("rollback_failed", "Rollback failed") + return Broken, blobberRes, common.NewError("rollback_failed", "Rollback failed") } if errCnt == int32(fullConsensus) { - return Repair, nil + return Repair, blobberRes, nil } - return Rollback, nil + return Rollback, blobberRes, nil } func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) {