diff --git a/code/go/0chain.net/blobbercore/allocation/entity.go b/code/go/0chain.net/blobbercore/allocation/entity.go index 4022e813f..5d11b9d9a 100644 --- a/code/go/0chain.net/blobbercore/allocation/entity.go +++ b/code/go/0chain.net/blobbercore/allocation/entity.go @@ -53,6 +53,7 @@ type Allocation struct { BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"` BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"` LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"` + LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"` IsRedeemRequired bool `gorm:"column:is_redeem_required"` TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"` StartTime common.Timestamp `gorm:"column:start_time;not null"` diff --git a/code/go/0chain.net/blobbercore/allocation/repository.go b/code/go/0chain.net/blobbercore/allocation/repository.go index 52d2f1253..e793b3a13 100644 --- a/code/go/0chain.net/blobbercore/allocation/repository.go +++ b/code/go/0chain.net/blobbercore/allocation/repository.go @@ -191,7 +191,7 @@ func (r *Repository) GetAllocationIds(ctx context.Context) []Res { } -func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation) error { +func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation, redeemSeq int64) error { var tx = datastore.GetStore().GetTransaction(ctx) if tx == nil { logging.Logger.Panic("no transaction in the context") @@ -205,17 +205,20 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A allocationUpdates := make(map[string]interface{}) allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot allocationUpdates["is_redeem_required"] = false + allocationUpdates["last_redeemed_sequence"] = redeemSeq err = tx.Model(allocationObj).Updates(allocationUpdates).Error if err != nil { return err } allocationObj.LatestRedeemedWM = AllocationRoot allocationObj.IsRedeemRequired = false + allocationObj.LastRedeemedSeq = redeemSeq txnCache := cache[allocationID] txnCache.Allocation = allocationObj updateAlloc := func(a *Allocation) { a.LatestRedeemedWM = AllocationRoot a.IsRedeemRequired = false + a.LastRedeemedSeq = redeemSeq } txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc) cache[allocationID] = txnCache diff --git a/code/go/0chain.net/blobbercore/blobberhttp/response.go b/code/go/0chain.net/blobbercore/blobberhttp/response.go index b48fcdbd6..7bb0a707e 100644 --- a/code/go/0chain.net/blobbercore/blobberhttp/response.go +++ b/code/go/0chain.net/blobbercore/blobberhttp/response.go @@ -15,10 +15,10 @@ type ConnectionResult struct { // swagger:model CommitResult type CommitResult struct { - AllocationRoot string `json:"allocation_root"` - WriteMarker *writemarker.WriteMarker `json:"write_marker"` - Success bool `json:"success"` - ErrorMessage string `json:"error_msg,omitempty"` + AllocationRoot string `json:"allocation_root"` + WriteMarker *writemarker.WriteMarkerEntity `json:"write_marker"` + Success bool `json:"success"` + ErrorMessage string `json:"error_msg,omitempty"` //Result []*UploadResult `json:"result"` } diff --git a/code/go/0chain.net/blobbercore/config/config.go b/code/go/0chain.net/blobbercore/config/config.go index 71fa74014..3cef13b2d 100644 --- a/code/go/0chain.net/blobbercore/config/config.go +++ b/code/go/0chain.net/blobbercore/config/config.go @@ -19,6 +19,9 @@ func SetupDefaultConfig() { viper.SetDefault("openconnection_cleaner.frequency", 30) viper.SetDefault("writemarker_redeem.frequency", 10) viper.SetDefault("writemarker_redeem.num_workers", 5) + viper.SetDefault("writemarker_redeem.max_chain_length", 32) + viper.SetDefault("writemarker_redeem.max_timestamp_gap", 1800) + viper.SetDefault("writemarker_redeem.marker_redeem_interval", time.Minute*10) viper.SetDefault("readmarker_redeem.frequency", 10) viper.SetDefault("readmarker_redeem.num_workers", 5) viper.SetDefault("challenge_response.frequency", 10) @@ -100,6 +103,9 @@ type Config struct { OpenConnectionWorkerTolerance int64 WMRedeemFreq int64 WMRedeemNumWorkers int + MaxChainLength int + MaxTimestampGap int64 + MarkerRedeemInterval time.Duration RMRedeemFreq int64 RMRedeemNumWorkers int ChallengeResolveFreq int64 @@ -218,6 +224,9 @@ func ReadConfig(deploymentMode int) { Configuration.WMRedeemFreq = viper.GetInt64("writemarker_redeem.frequency") Configuration.WMRedeemNumWorkers = viper.GetInt("writemarker_redeem.num_workers") + Configuration.MaxChainLength = viper.GetInt("writemarker_redeem.max_chain_length") + Configuration.MaxTimestampGap = viper.GetInt64("writemarker_redeem.max_timestamp_gap") + Configuration.MarkerRedeemInterval = viper.GetDuration("writemarker_redeem.marker_redeem_interval") Configuration.RMRedeemFreq = viper.GetInt64("readmarker_redeem.frequency") Configuration.RMRedeemNumWorkers = viper.GetInt("readmarker_redeem.num_workers") diff --git a/code/go/0chain.net/blobbercore/convert/response_creator.go b/code/go/0chain.net/blobbercore/convert/response_creator.go index 39748dbdd..828bd2bcb 100644 --- a/code/go/0chain.net/blobbercore/convert/response_creator.go +++ b/code/go/0chain.net/blobbercore/convert/response_creator.go @@ -129,9 +129,9 @@ func CommitWriteResponseCreator(r interface{}) *blobbergrpc.CommitResponse { return &blobbergrpc.CommitResponse{ AllocationRoot: httpResp.AllocationRoot, - WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker), - ErrorMessage: httpResp.ErrorMessage, - Success: httpResp.Success, + // WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker), + ErrorMessage: httpResp.ErrorMessage, + Success: httpResp.Success, } } diff --git a/code/go/0chain.net/blobbercore/convert/response_handler.go b/code/go/0chain.net/blobbercore/convert/response_handler.go index ba4dc171e..3092c6b6d 100644 --- a/code/go/0chain.net/blobbercore/convert/response_handler.go +++ b/code/go/0chain.net/blobbercore/convert/response_handler.go @@ -87,9 +87,9 @@ func GetObjectTreeResponseHandler(getObjectTreeResponse *blobbergrpc.GetObjectTr func CommitWriteResponseHandler(resp *blobbergrpc.CommitResponse) *blobberhttp.CommitResult { return &blobberhttp.CommitResult{ AllocationRoot: resp.AllocationRoot, - WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker), - Success: resp.Success, - ErrorMessage: resp.ErrorMessage, + // WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker), + Success: resp.Success, + ErrorMessage: resp.ErrorMessage, } } diff --git a/code/go/0chain.net/blobbercore/handler/handler_common.go b/code/go/0chain.net/blobbercore/handler/handler_common.go index 3c3e19460..16d5bd590 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_common.go +++ b/code/go/0chain.net/blobbercore/handler/handler_common.go @@ -8,6 +8,8 @@ import ( "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" "github.com/0chain/blobber/code/go/0chain.net/core/build" "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" @@ -153,6 +155,13 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu return resp, statusCode, common.NewErrorf("commit_error", "error committing to meta store: %v", err) } + + if blobberRes, ok := resp.(*blobberhttp.CommitResult); ok { + // Save the write marker data + writemarker.SaveMarkerData(allocationID, blobberRes.WriteMarker.WM.Timestamp, blobberRes.WriteMarker.WM.ChainLength) + } else { + Logger.Error("Invalid response type for commit handler") + } return } } diff --git a/code/go/0chain.net/blobbercore/handler/handler_writemarker.go b/code/go/0chain.net/blobbercore/handler/handler_writemarker.go index 5704862a8..966306613 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_writemarker.go +++ b/code/go/0chain.net/blobbercore/handler/handler_writemarker.go @@ -2,20 +2,15 @@ package handler import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" - "github.com/0chain/blobber/code/go/0chain.net/core/common" . "github.com/0chain/blobber/code/go/0chain.net/core/logging" "go.uber.org/zap" ) -var WriteMarkerMutext = &writemarker.Mutex{ - ML: common.GetNewLocker(), -} - // LockWriteMarker try to lock writemarker for specified allocation id, and return latest RefTree func LockWriteMarker(ctx *Context) (interface{}, error) { connectionID, _ := ctx.FormValue("connection_id") - result, err := WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID) + result, err := writemarker.WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID) Logger.Info("Lock write marker result", zap.Any("result", result), zap.Error(err)) if err != nil { return nil, err @@ -28,7 +23,7 @@ func LockWriteMarker(ctx *Context) (interface{}, error) { func UnlockWriteMarker(ctx *Context) (interface{}, error) { connectionID := ctx.Vars["connection"] - err := WriteMarkerMutext.Unlock(ctx, ctx.AllocationId, connectionID) + err := writemarker.WriteMarkerMutext.Unlock(ctx.AllocationId, connectionID) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 44e6eb5fb..fe5ba7860 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -89,18 +89,10 @@ func readPreRedeem( } func checkPendingMarkers(ctx context.Context, allocationID string) error { - - mut := writemarker.GetLock(allocationID) - if mut == nil { - return nil - } - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - err := mut.Acquire(ctx, 1) - if err != nil { - return common.NewError("check_pending_markers", "write marker is still not redeemed") + pending := writemarker.CheckProcessingMarker(allocationID) + if pending { + return common.NewError("pending_markers", "previous marker is still pending to be redeemed") } - mut.Release(1) return nil } @@ -507,6 +499,7 @@ func (fsh *StorageHandler) CreateConnection(ctx context.Context, r *http.Request } func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*blobberhttp.CommitResult, error) { + var prevChainHash string startTime := time.Now() if r.Method == "GET" { return nil, common.NewError("invalid_method", "Invalid method used for the upload URL. Use POST instead") @@ -602,6 +595,20 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b return nil, common.NewErrorf("latest_write_marker_read_error", "Error reading the latest write marker for allocation: %v", err) } + if latestWriteMarkerEntity.Status == writemarker.Failed { + return nil, common.NewError("latest_write_marker_failed", + "Latest write marker is in failed state") + } + + if latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size != writeMarker.ChainSize { + return nil, common.NewErrorf("invalid_chain_size", + "Invalid chain size. expected:%v got %v", latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size, writeMarker.ChainSize) + } + + if latestWriteMarkerEntity.Status != writemarker.Committed { + writeMarker.ChainLength = latestWriteMarkerEntity.WM.ChainLength + } + prevChainHash = latestWriteMarkerEntity.WM.ChainHash } writemarkerEntity := &writemarker.WriteMarkerEntity{} @@ -613,7 +620,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b result.ErrorMessage = "Verification of write marker failed: " + err.Error() result.Success = false if latestWriteMarkerEntity != nil { - result.WriteMarker = &latestWriteMarkerEntity.WM + result.WriteMarker = latestWriteMarkerEntity } Logger.Error("verify_writemarker_failed", zap.Error(err)) return &result, common.NewError("write_marker_verification_failed", result.ErrorMessage) @@ -667,7 +674,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b if allocationRoot != writeMarker.AllocationRoot { result.AllocationRoot = allocationObj.AllocationRoot if latestWriteMarkerEntity != nil { - result.WriteMarker = &latestWriteMarkerEntity.WM + result.WriteMarker = latestWriteMarkerEntity } result.Success = false result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." + @@ -675,10 +682,15 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage) } + chainHash := writemarker.CalculateChainHash(prevChainHash, allocationRoot) + if chainHash != writeMarker.ChainHash { + return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash") + } + if fileMetaRoot != writeMarker.FileMetaRoot { // result.AllocationRoot = allocationObj.AllocationRoot if latestWriteMarkerEntity != nil { - result.WriteMarker = &latestWriteMarkerEntity.WM + result.WriteMarker = latestWriteMarkerEntity } result.Success = false result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." + @@ -688,6 +700,10 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b writemarkerEntity.ConnectionID = connectionObj.ID writemarkerEntity.ClientPublicKey = clientKey + writemarkerEntity.WM.ChainLength += 1 + if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength { + return nil, common.NewError("chain_length_exceeded", "Chain length exceeded") + } db := datastore.GetStore().GetTransaction(ctx) writemarkerEntity.Latest = true @@ -734,7 +750,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b db.Model(connectionObj).Updates(allocation.AllocationChangeCollector{Status: allocation.CommittedConnection}) result.AllocationRoot = allocationObj.AllocationRoot - result.WriteMarker = &writeMarker + result.WriteMarker = writemarkerEntity result.Success = true result.ErrorMessage = "" commitOperation := connectionObj.Changes[0].Operation @@ -743,10 +759,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b //Delete connection object and its changes db.Delete(connectionObj) - err = writemarkerEntity.SendToChan(ctx) - if err != nil { - return nil, common.NewError("write_marker_error", "Error redeeming the write marker") - } go allocation.DeleteConnectionObjEntry(connectionID) go AddWriteMarkerCount(clientID, connectionObj.Size <= 0) @@ -1390,6 +1402,10 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob return nil, common.NewError("write_marker_verification_failed", "Verification of the write marker failed: "+err.Error()) } + if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength { + return nil, common.NewError("chain_length_exceeded", "Chain length exceeded") + } + elapsedVerifyWM := time.Since(startTime) - elapsedAllocation - elapsedGetLock var clientIDForWriteRedeem = writeMarker.ClientID @@ -1425,7 +1441,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob if allocationRoot != writeMarker.AllocationRoot { result.AllocationRoot = allocationObj.AllocationRoot - result.WriteMarker = &latestWriteMarkerEntity.WM + result.WriteMarker = latestWriteMarkerEntity result.Success = false result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." + " Expected hash: " + allocationRoot @@ -1433,9 +1449,15 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage) } + chainHash := writemarker.CalculateChainHash(latestWriteMarkerEntity.WM.ChainHash, allocationRoot) + if chainHash != writeMarker.ChainHash { + txn.Rollback() + return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash") + } + if fileMetaRoot != writeMarker.FileMetaRoot { if latestWriteMarkerEntity != nil { - result.WriteMarker = &latestWriteMarkerEntity.WM + result.WriteMarker = latestWriteMarkerEntity } result.Success = false result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." + @@ -1490,10 +1512,6 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob if err != nil { return &result, common.NewError("allocation_commit_error", "Error committing the transaction "+err.Error()) } - err = writemarkerEntity.SendToChan(ctx) - if err != nil { - return nil, common.NewError("write_marker_error", "Error redeeming the write marker") - } err = allocation.CommitRollback(allocationID) if err != nil { Logger.Error("Error committing the rollback for allocation", zap.Error(err)) @@ -1501,7 +1519,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob elapsedCommitRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem result.AllocationRoot = allocationObj.AllocationRoot - result.WriteMarker = &writeMarker + result.WriteMarker = writemarkerEntity result.Success = true result.ErrorMessage = "" commitOperation := "rollback" diff --git a/code/go/0chain.net/blobbercore/handler/storage_handler.go b/code/go/0chain.net/blobbercore/handler/storage_handler.go index f472c1f20..b49c9064b 100644 --- a/code/go/0chain.net/blobbercore/handler/storage_handler.go +++ b/code/go/0chain.net/blobbercore/handler/storage_handler.go @@ -463,6 +463,9 @@ func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Req var result blobberhttp.LatestWriteMarkerResult if latestWM != nil { + if latestWM.Status == writemarker.Committed { + latestWM.WM.ChainLength = 0 // start a new chain + } result.LatestWM = &latestWM.WM } if prevWM != nil { @@ -560,6 +563,9 @@ func (fsh *StorageHandler) getReferencePath(ctx context.Context, r *http.Request var refPathResult blobberhttp.ReferencePathResult refPathResult.ReferencePath = refPath if latestWM != nil { + if latestWM.Status == writemarker.Committed { + latestWM.WM.ChainLength = 0 // start a new chain + } refPathResult.LatestWM = &latestWM.WM } @@ -628,6 +634,9 @@ func (fsh *StorageHandler) GetObjectTree(ctx context.Context, r *http.Request) ( var refPathResult blobberhttp.ReferencePathResult refPathResult.ReferencePath = refPath if latestWM != nil { + if latestWM.Status == writemarker.Committed { + latestWM.WM.ChainLength = 0 // start a new chain + } refPathResult.LatestWM = &latestWM.WM } return &refPathResult, nil diff --git a/code/go/0chain.net/blobbercore/writemarker/entity.go b/code/go/0chain.net/blobbercore/writemarker/entity.go index bac666ef9..1989846ee 100644 --- a/code/go/0chain.net/blobbercore/writemarker/entity.go +++ b/code/go/0chain.net/blobbercore/writemarker/entity.go @@ -2,6 +2,7 @@ package writemarker import ( "context" + "encoding/hex" "encoding/json" "fmt" "time" @@ -10,27 +11,33 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "github.com/minio/sha256-simd" "go.uber.org/zap" "gorm.io/gorm" ) type WriteMarker struct { - AllocationRoot string `gorm:"column:allocation_root;size:64;primaryKey" json:"allocation_root"` - PreviousAllocationRoot string `gorm:"column:prev_allocation_root;size:64" json:"prev_allocation_root"` - FileMetaRoot string `gorm:"column:file_meta_root;size:64" json:"file_meta_root"` - AllocationID string `gorm:"column:allocation_id;size:64;index:idx_seq,unique,priority:1" json:"allocation_id"` - Size int64 `gorm:"column:size" json:"size"` - BlobberID string `gorm:"column:blobber_id;size:64" json:"blobber_id"` - Timestamp common.Timestamp `gorm:"column:timestamp;primaryKey" json:"timestamp"` - ClientID string `gorm:"column:client_id;size:64" json:"client_id"` - Signature string `gorm:"column:signature;size:64" json:"signature"` + Version string `gorm:"-" json:"version"` + AllocationRoot string `gorm:"column:allocation_root;size:64;primaryKey" json:"allocation_root"` + PreviousAllocationRoot string `gorm:"column:prev_allocation_root;size:64" json:"prev_allocation_root"` + FileMetaRoot string `gorm:"column:file_meta_root;size:64" json:"file_meta_root"` + AllocationID string `gorm:"column:allocation_id;size:64;index:idx_seq,unique,priority:1" json:"allocation_id"` + Size int64 `gorm:"column:size" json:"size"` + ChainSize int64 `gorm:"column:chain_size" json:"chain_size"` + // ChainHash is the sha256 hash of the previous chain hash and the current allocation root + ChainHash string `gorm:"column:chain_hash;size:64" json:"chain_hash"` + ChainLength int `gorm:"column:chain_length" json:"chain_length"` + BlobberID string `gorm:"column:blobber_id;size:64" json:"blobber_id"` + Timestamp common.Timestamp `gorm:"column:timestamp;primaryKey" json:"timestamp"` + ClientID string `gorm:"column:client_id;size:64" json:"client_id"` + Signature string `gorm:"column:signature;size:64" json:"signature"` } func (wm *WriteMarker) GetHashData() string { - hashData := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d", + hashData := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%d:%d:%d", wm.AllocationRoot, wm.PreviousAllocationRoot, - wm.FileMetaRoot, wm.AllocationID, wm.BlobberID, - wm.ClientID, wm.Size, wm.Timestamp) + wm.FileMetaRoot, wm.ChainHash, wm.AllocationID, wm.BlobberID, + wm.ClientID, wm.Size, wm.ChainSize, wm.Timestamp) return hashData } @@ -43,6 +50,8 @@ const ( Rollbacked WriteMarkerStatus = 3 ) +const MARKER_VERSION = "v2" + type WriteMarkerEntity struct { // WM new WriteMarker from client WM WriteMarker `gorm:"embedded"` @@ -73,7 +82,7 @@ func (w *WriteMarkerEntity) BeforeSave(tx *gorm.DB) error { return nil } -func (wm *WriteMarkerEntity) UpdateStatus(ctx context.Context, status WriteMarkerStatus, statusMessage, redeemTxn string) (err error) { +func (wm *WriteMarkerEntity) UpdateStatus(ctx context.Context, status WriteMarkerStatus, statusMessage, redeemTxn string, startSeq, endSeq int64) (err error) { err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { db := datastore.GetStore().GetTransaction(ctx) statusBytes, _ := json.Marshal(statusMessage) @@ -100,9 +109,11 @@ func (wm *WriteMarkerEntity) UpdateStatus(ctx context.Context, status WriteMarke return err } - err = db.Exec("UPDATE write_markers SET latest = false WHERE allocation_id = ? AND allocation_root = ? AND sequence < ?", wm.WM.AllocationID, wm.WM.PreviousAllocationRoot, wm.Sequence).Error - if err != nil { - return err + if status == Committed { + err = db.Exec("UPDATE write_markers SET status=1 WHERE sequence BETWEEN ? AND ? AND allocation_id = ?", startSeq, endSeq, wm.WM.AllocationID).Error + if err != nil { + return err + } } // TODO (sfxdx): what about failed write markers ? @@ -136,6 +147,12 @@ func GetWriteMarkerEntity(ctx context.Context, allocation_root string) (*WriteMa if err != nil { return nil, err } + if wm.Status == Committed { + wm.WM.ChainLength = 0 + } + if wm.WM.ChainHash != "" { + wm.WM.Version = MARKER_VERSION + } return wm, nil } @@ -219,16 +236,65 @@ func (wm *WriteMarkerEntity) Create(ctx context.Context) error { return err } -func (wm *WriteMarkerEntity) SendToChan(ctx context.Context) error { +func GetUncommittedWriteMarkers(ctx context.Context, allocationID string, seq int64) ([]*WriteMarkerEntity, error) { + db := datastore.GetStore().GetTransaction(ctx) - sem := GetLock(wm.WM.AllocationID) - if sem == nil { - sem = SetLock(wm.WM.AllocationID) + unCommittedMarkers := make([]*WriteMarkerEntity, 0) + err := db.Table((WriteMarkerEntity{}).TableName()). + Where("allocation_id=? AND status=0 AND sequence > ?", allocationID, seq). + Order("sequence asc"). + Find(&unCommittedMarkers).Error + if err != nil && err != gorm.ErrRecordNotFound { + return nil, err } - err := sem.Acquire(context.TODO(), 1) + return unCommittedMarkers, nil +} + +func GetLatestCommittedWriteMarker(ctx context.Context, allocationID string) (*WriteMarkerEntity, error) { + db := datastore.GetStore().GetTransaction(ctx) + wm := &WriteMarkerEntity{} + err := db.Table((WriteMarkerEntity{}).TableName()). + Where("allocation_id=? AND status=1", allocationID). + Order("sequence desc"). + Take(wm).Error if err != nil { - return err + if err == gorm.ErrRecordNotFound { + return nil, nil + } + return nil, err } - writeMarkerChan <- wm - return nil + return wm, nil +} + +func GetMarkersForChain(ctx context.Context, allocationID string, startSeq, endSeq int64) ([]byte, error) { + db := datastore.GetStore().GetTransaction(ctx) + + unCommittedMarkers := make([]*WriteMarkerEntity, 0) + err := db.Table((WriteMarkerEntity{}).TableName()). + Where("allocation_id=? AND status=0 AND sequence BETWEEN ? AND ?", allocationID, startSeq, endSeq). + Order("sequence asc"). + Find(&unCommittedMarkers).Error + if err != nil && err != gorm.ErrRecordNotFound { + return nil, err + } + markers := make([]byte, 0, len(unCommittedMarkers)) + for _, marker := range unCommittedMarkers { + decodedHash, err := hex.DecodeString(marker.WM.AllocationRoot) + if err != nil { + return nil, err + } + markers = append(markers, decodedHash...) + } + return markers, nil +} + +func CalculateChainHash(prevChainHash, newRoot string) string { + hasher := sha256.New() + if prevChainHash != "" { + prevBytes, _ := hex.DecodeString(prevChainHash) + hasher.Write(prevBytes) //nolint:errcheck + } + newBytes, _ := hex.DecodeString(newRoot) + hasher.Write(newBytes) //nolint:errcheck + return hex.EncodeToString(hasher.Sum(nil)) } diff --git a/code/go/0chain.net/blobbercore/writemarker/mutex.go b/code/go/0chain.net/blobbercore/writemarker/mutex.go index a497aba88..a5a24ecf3 100644 --- a/code/go/0chain.net/blobbercore/writemarker/mutex.go +++ b/code/go/0chain.net/blobbercore/writemarker/mutex.go @@ -38,6 +38,10 @@ type Mutex struct { ML *common.MapLocker } +var WriteMarkerMutext = &Mutex{ + ML: common.GetNewLocker(), +} + // Lock will create/update lock in postgres. // If no lock exists for an allocation then new lock is created. // If lock exists and is of same connection ID then lock's createdAt is updated @@ -97,7 +101,7 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L }, nil } -func (*Mutex) Unlock(ctx context.Context, allocationID string, connectionID string) error { +func (*Mutex) Unlock(allocationID string, connectionID string) error { if allocationID == "" || connectionID == "" { return nil } diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol.go b/code/go/0chain.net/blobbercore/writemarker/protocol.go index 07279be5f..819ccb249 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol.go @@ -6,6 +6,7 @@ import ( "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/encryption" @@ -21,6 +22,7 @@ type CommitConnection struct { AllocationRoot string `json:"allocation_root"` PrevAllocationRoot string `json:"prev_allocation_root"` WriteMarker *WriteMarker `json:"write_marker"` + ChainData []byte `json:"chain_data"` } // VerifyMarker verify WriteMarker's hash and check allocation_root if it is unique @@ -109,22 +111,22 @@ func (wme *WriteMarkerEntity) VerifyMarker(ctx context.Context, dbAllocation *al return nil } -func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { +func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context, startSeq int64) error { if len(wme.CloseTxnID) > 0 { t, err := transaction.VerifyTransaction(wme.CloseTxnID, chain.GetServerChain()) if err == nil { wme.Status = Committed wme.StatusMessage = t.TransactionOutput wme.CloseTxnID = t.Hash - _ = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash) - return nil + err = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash, startSeq, wme.Sequence) + return err } } txn, err := transaction.NewTransactionEntity() if err != nil { wme.StatusMessage = "Error creating transaction entity. " + err.Error() - if err := wme.UpdateStatus(ctx, Failed, "Error creating transaction entity. "+err.Error(), ""); err != nil { + if err := wme.UpdateStatus(ctx, Failed, "Error creating transaction entity. "+err.Error(), "", startSeq, wme.Sequence); err != nil { Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err)) } return err @@ -134,6 +136,17 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { sn.AllocationRoot = wme.WM.AllocationRoot sn.PrevAllocationRoot = wme.WM.PreviousAllocationRoot sn.WriteMarker = &wme.WM + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + sn.ChainData, err = GetMarkersForChain(ctx, wme.WM.AllocationID, startSeq, wme.Sequence-1) + return err + }) + if err != nil { + wme.StatusMessage = "Error getting chain data. " + err.Error() + if err := wme.UpdateStatus(ctx, Failed, "Error getting chain data. "+err.Error(), "", startSeq, wme.Sequence); err != nil { + Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err)) + } + return err + } if sn.AllocationRoot == sn.PrevAllocationRoot { // get nonce of prev WM @@ -141,7 +154,7 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { prevWM, err = GetPreviousWM(ctx, sn.AllocationRoot, wme.WM.Timestamp) if err != nil { wme.StatusMessage = "Error getting previous write marker. " + err.Error() - if err := wme.UpdateStatus(ctx, Failed, "Error getting previous write marker. "+err.Error(), ""); err != nil { + if err := wme.UpdateStatus(ctx, Failed, "Error getting previous write marker. "+err.Error(), "", startSeq, wme.Sequence); err != nil { Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err)) } return err @@ -154,7 +167,7 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { Logger.Error("Failed during sending close connection to the miner. ", zap.String("err:", err.Error())) wme.Status = Failed wme.StatusMessage = "Failed during sending close connection to the miner. " + err.Error() - if err := wme.UpdateStatus(ctx, Failed, "Failed during sending close connection to the miner. "+err.Error(), ""); err != nil { + if err := wme.UpdateStatus(ctx, Failed, "Failed during sending close connection to the miner. "+err.Error(), "", startSeq, wme.Sequence); err != nil { Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err)) } return err @@ -169,15 +182,15 @@ func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { wme.Status = Failed wme.StatusMessage = "Error verifying the close connection transaction." + err.Error() // TODO Is this single try? - if err := wme.UpdateStatus(ctx, Failed, "Error verifying the close connection transaction."+err.Error(), txn.Hash); err != nil { + if err := wme.UpdateStatus(ctx, Failed, "Error verifying the close connection transaction."+err.Error(), txn.Hash, startSeq, wme.Sequence); err != nil { Logger.Error("WriteMarkerEntity_UpdateStatus", zap.Error(err)) } return err } wme.Status = Committed wme.StatusMessage = t.TransactionOutput - _ = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash) - return nil + err = wme.UpdateStatus(ctx, Committed, t.TransactionOutput, t.Hash, startSeq, wme.Sequence) + return err } func (wme *WriteMarkerEntity) VerifyRollbackMarker(ctx context.Context, dbAllocation *allocation.Allocation, latestWM *WriteMarkerEntity) error { @@ -196,10 +209,18 @@ func (wme *WriteMarkerEntity) VerifyRollbackMarker(ctx context.Context, dbAlloca return common.NewError("write_marker_validation_failed", "Write Marker is not for the same allocation transaction") } - if wme.WM.Size != 0 { + if wme.WM.Size != -latestWM.WM.Size { return common.NewError("empty write_marker_validation_failed", fmt.Sprintf("Write Marker size is %v but should be 0", wme.WM.Size)) } + if wme.WM.ChainSize != latestWM.WM.ChainSize+wme.WM.Size { + return common.NewError("empty write_marker_validation_failed", fmt.Sprintf("Write Marker chain size is %v but should be %v", wme.WM.ChainSize, latestWM.WM.ChainSize+wme.WM.Size)) + } + + if latestWM.Status != Committed { + wme.WM.ChainLength = latestWM.WM.ChainLength + } + if wme.WM.AllocationRoot == dbAllocation.AllocationRoot { return common.NewError("write_marker_validation_failed", "Write Marker allocation root is the same as the allocation root on record") } @@ -231,6 +252,6 @@ func (wme *WriteMarkerEntity) VerifyRollbackMarker(ctx context.Context, dbAlloca if !sigOK { return common.NewError("write_marker_validation_failed", "Write marker signature is not valid") } - + wme.WM.ChainLength += 1 return nil } diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go b/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go index 7f284eb49..79e70e74d 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go @@ -11,7 +11,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/node" ) -func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { +func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context, startSeq int64) error { for { state := conductrpc.Client().State() if state.StopWMCommit != nil && *state.StopWMCommit { @@ -20,7 +20,7 @@ func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { } break } - err := wme.redeemMarker(ctx) + err := wme.redeemMarker(ctx, startSeq) if err == nil { // send state to conductor server conductrpc.Client().BlobberCommitted(node.Self.ID) diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol_main.go b/code/go/0chain.net/blobbercore/writemarker/protocol_main.go index 77b2ebcbb..3ba3db585 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol_main.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol_main.go @@ -5,6 +5,6 @@ package writemarker import "context" -func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { - return wme.redeemMarker(ctx) +func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context, startSeq int64) error { + return wme.redeemMarker(ctx, startSeq) } diff --git a/code/go/0chain.net/blobbercore/writemarker/worker.go b/code/go/0chain.net/blobbercore/writemarker/worker.go index be0670620..4c8a63ae9 100644 --- a/code/go/0chain.net/blobbercore/writemarker/worker.go +++ b/code/go/0chain.net/blobbercore/writemarker/worker.go @@ -7,19 +7,67 @@ import ( "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" + "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "go.uber.org/zap" - "golang.org/x/sync/semaphore" "gorm.io/gorm" ) var ( - writeMarkerChan chan *WriteMarkerEntity - writeMarkerMap map[string]*semaphore.Weighted - mut sync.RWMutex + writeMarkerChan chan *markerData + markerDataMap = make(map[string]*markerData) + markerDataMut sync.Mutex ) +type markerData struct { + firstMarkerTimestamp common.Timestamp + allocationID string + retries int + chainLength int + processing bool +} + +func SaveMarkerData(allocationID string, timestamp common.Timestamp, chainLength int) { + logging.Logger.Info("SaveMarkerData", zap.Any("allocationID", allocationID), zap.Any("timestamp", timestamp), zap.Any("chainLength", chainLength)) + markerDataMut.Lock() + defer markerDataMut.Unlock() + if data, ok := markerDataMap[allocationID]; !ok { + markerDataMap[allocationID] = &markerData{ + firstMarkerTimestamp: timestamp, + allocationID: allocationID, + chainLength: 1, + } + } else { + data.chainLength = chainLength + if data.chainLength == 1 { + data.firstMarkerTimestamp = timestamp + } + if data.processMarker() { + logging.Logger.Info("ProcessMarkerData", zap.Any("allocationID", allocationID), zap.Any("timestamp", timestamp), zap.Any("chainLength", chainLength)) + data.processing = true + writeMarkerChan <- data + } + } +} + +func CheckProcessingMarker(allocationID string) bool { + markerDataMut.Lock() + defer markerDataMut.Unlock() + if data, ok := markerDataMap[allocationID]; ok { + return data.processing + } + return false +} + +func deleteMarkerData(allocationID string) { + markerDataMut.Lock() + delete(markerDataMap, allocationID) + markerDataMut.Unlock() +} + // const ( // timestampGap = 30 * 24 * 60 * 60 // 30 days // cleanupWorkerInterval = 24 * 7 * time.Hour // 7 days @@ -37,109 +85,84 @@ func SetupWorkers(ctx context.Context) { zap.Any("error", err)) } - writeMarkerMap = make(map[string]*semaphore.Weighted) - - for _, r := range res { - writeMarkerMap[r.ID] = semaphore.NewWeighted(1) - } - - go startRedeem(ctx) + startRedeem(ctx, res) + go startCollector(ctx) // go startCleanupWorker(ctx) } -func GetLock(allocationID string) *semaphore.Weighted { - mut.RLock() - defer mut.RUnlock() - return writeMarkerMap[allocationID] -} - -func SetLock(allocationID string) *semaphore.Weighted { - mut.Lock() - defer mut.Unlock() - writeMarkerMap[allocationID] = semaphore.NewWeighted(1) - return writeMarkerMap[allocationID] -} - -func redeemWriteMarker(wm *WriteMarkerEntity) error { +func redeemWriteMarker(md *markerData) error { ctx := datastore.GetStore().CreateTransaction(context.TODO()) db := datastore.GetStore().GetTransaction(ctx) - allocationID := wm.WM.AllocationID + allocationID := md.allocationID shouldRollback := false start := time.Now() + logging.Logger.Info("redeeming_write_marker", zap.String("allocationID", allocationID)) defer func() { if shouldRollback { if rollbackErr := db.Rollback().Error; rollbackErr != nil { logging.Logger.Error("Error rollback on redeeming the write marker.", zap.Any("allocation", allocationID), - zap.Any("wm", wm.WM.AllocationID), zap.Error(rollbackErr)) + zap.Error(rollbackErr)) } + + } else { + deleteMarkerData(allocationID) } }() - alloc, err := allocation.Repo.GetByIdAndLock(ctx, allocationID) + + allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), allocationID) + allocMu.RLock() + defer allocMu.RUnlock() + + alloc, err := allocation.Repo.GetAllocationFromDB(ctx, allocationID) if err != nil { - logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err)) + logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", allocationID), zap.Any("error", err)) if err != gorm.ErrRecordNotFound { - go tryAgain(wm) + go tryAgain(md) } shouldRollback = true return err } if alloc.Finalized { - logging.Logger.Info("Allocation is finalized. Skipping redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationID)) + logging.Logger.Info("Allocation is finalized. Skipping redeeming the write marker.", zap.Any("allocation", allocationID)) + deleteMarkerData(allocationID) shouldRollback = true return nil } - if alloc.AllocationRoot != wm.WM.AllocationRoot { - logging.Logger.Info("Stale write marker. Allocation root mismatch", - zap.Any("allocation", allocationID), - zap.Any("wm", wm.WM.AllocationRoot), zap.Any("alloc_root", alloc.AllocationRoot)) - if wm.ReedeemRetries == 0 && !alloc.IsRedeemRequired { - wm.ReedeemRetries++ - go tryAgain(wm) - shouldRollback = true - return nil - } - _ = wm.UpdateStatus(ctx, Rollbacked, "rollbacked", "") - err = db.Commit().Error - mut := GetLock(allocationID) - if mut != nil { - mut.Release(1) + wm, err := GetWriteMarkerEntity(ctx, alloc.AllocationRoot) + if err != nil { + logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", alloc.AllocationRoot), zap.Any("error", err)) + if err != gorm.ErrRecordNotFound { + go tryAgain(md) } + shouldRollback = true return err } - err = wm.RedeemMarker(ctx) + err = wm.RedeemMarker(ctx, alloc.LastRedeemedSeq+1) if err != nil { elapsedTime := time.Since(start) logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", wm), zap.Any("error", err), zap.Any("elapsedTime", elapsedTime)) if retryRedeem(err.Error()) { - go tryAgain(wm) + go tryAgain(md) } else { - mut := GetLock(allocationID) - if mut != nil { - mut.Release(1) - } + deleteMarkerData(allocationID) } shouldRollback = true - return err } - defer func() { - mut := GetLock(allocationID) - if mut != nil { - mut.Release(1) - } - }() - err = allocation.Repo.UpdateAllocationRedeem(ctx, allocationID, wm.WM.AllocationRoot, alloc) + + err = allocation.Repo.UpdateAllocationRedeem(ctx, allocationID, wm.WM.AllocationRoot, alloc, wm.Sequence) if err != nil { logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed", zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err)) shouldRollback = true + go tryAgain(md) return err } @@ -149,6 +172,7 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error { zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationRoot), zap.Error(err)) shouldRollback = true + go tryAgain(md) return err } elapsedTime := time.Since(start) @@ -159,45 +183,81 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error { return nil } -func startRedeem(ctx context.Context) { +func startRedeem(ctx context.Context, res []allocation.Res) { logging.Logger.Info("Start redeeming writemarkers") - writeMarkerChan = make(chan *WriteMarkerEntity, 200) + chanSize := 200 + if len(res) > chanSize { + chanSize = len(res) + } + writeMarkerChan = make(chan *markerData, chanSize) go startRedeemWorker(ctx) - - var writemarkers []*WriteMarkerEntity + markerDataMut.Lock() + defer markerDataMut.Unlock() err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { tx := datastore.GetStore().GetTransaction(ctx) - return tx.Not(WriteMarkerEntity{Status: Committed}).Find(&writemarkers).Error + for _, r := range res { + wm := WriteMarkerEntity{} + err := tx.Where("allocation_id = ?", r.ID). + Order("sequence desc"). + Take(&wm).Error + if err != nil && err != gorm.ErrRecordNotFound { + return err + } + if wm.WM.AllocationID != "" && wm.Status == Accepted { + md := &markerData{ + firstMarkerTimestamp: wm.WM.Timestamp, + allocationID: wm.WM.AllocationID, + chainLength: wm.WM.ChainLength, + processing: true, + retries: int(wm.ReedeemRetries), + } + markerDataMap[wm.WM.AllocationID] = md + writeMarkerChan <- md + } + } + return nil }) if err != nil && err != gorm.ErrRecordNotFound { logging.Logger.Error("Error redeeming the write marker. failed to load allocation's writemarker ", zap.Any("error", err)) return } - - for _, wm := range writemarkers { - mut := GetLock(wm.WM.AllocationID) - if mut == nil { - mut = SetLock(wm.WM.AllocationID) - } - err := mut.Acquire(ctx, 1) - if err != nil { - logging.Logger.Error("Error acquiring semaphore", zap.Error(err)) - continue - } - writeMarkerChan <- wm - } - } -func tryAgain(wm *WriteMarkerEntity) { - time.Sleep(time.Duration(wm.ReedeemRetries) * 5 * time.Second) - writeMarkerChan <- wm +func tryAgain(md *markerData) { + md.retries++ + time.Sleep(time.Duration(md.retries) * 5 * time.Second) + writeMarkerChan <- md } // Can add more cases where we don't want to retry func retryRedeem(errString string) bool { - return !strings.Contains(errString, "value not present") + return !strings.Contains(errString, "value not present") || !strings.Contains(errString, "Blobber is not part of the allocation") +} + +func startCollector(ctx context.Context) { + ticker := time.NewTicker(config.Configuration.MarkerRedeemInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + markerDataMut.Lock() + for _, data := range markerDataMap { + if data.processMarker() { + logging.Logger.Info("ProcessMarkerData", zap.Any("allocationID", data.allocationID), zap.Any("timestamp", data.firstMarkerTimestamp), zap.Any("chainLength", data.chainLength)) + data.processing = true + writeMarkerChan <- data + } + } + markerDataMut.Unlock() + } + } +} + +func (md *markerData) processMarker() bool { + return !md.processing && (md.chainLength >= config.Configuration.MaxChainLength || common.Now()-md.firstMarkerTimestamp > common.Timestamp(config.Configuration.MaxTimestampGap)) } // TODO: don't delete prev WM diff --git a/code/go/0chain.net/blobbercore/writemarker/writemarker.go b/code/go/0chain.net/blobbercore/writemarker/writemarker.go index fbdf97efe..dff2a364e 100644 --- a/code/go/0chain.net/blobbercore/writemarker/writemarker.go +++ b/code/go/0chain.net/blobbercore/writemarker/writemarker.go @@ -5,25 +5,23 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/core/logging" - "golang.org/x/sync/semaphore" ) func startRedeemWorker(ctx context.Context) { logging.Logger.Info("Starting redeem worker") - sem := semaphore.NewWeighted(int64(config.Configuration.WMRedeemNumWorkers)) + for i := 0; i < config.Configuration.WMRedeemNumWorkers; i++ { + go redeemWorker(ctx) + } +} + +func redeemWorker(ctx context.Context) { for { select { case <-ctx.Done(): logging.Logger.Info("Stopping redeem worker") return - case wm := <-writeMarkerChan: - err := sem.Acquire(ctx, 1) - if err == nil { - go func() { - _ = redeemWriteMarker(wm) - sem.Release(1) - }() - } + case dm := <-writeMarkerChan: + _ = redeemWriteMarker(dm) } } } diff --git a/code/go/0chain.net/validatorcore/storage/writemarker/entity.go b/code/go/0chain.net/validatorcore/storage/writemarker/entity.go index e232eb0c3..22de899a2 100644 --- a/code/go/0chain.net/validatorcore/storage/writemarker/entity.go +++ b/code/go/0chain.net/validatorcore/storage/writemarker/entity.go @@ -19,6 +19,8 @@ type WriteMarker struct { FileMetaRoot string `json:"file_meta_root"` AllocationID string `json:"allocation_id"` Size int64 `json:"size"` + ChainSize int64 `json:"chain_size"` + ChainHash string `json:"chain_hash"` BlobberID string `json:"blobber_id"` Timestamp common.Timestamp `json:"timestamp"` ClientID string `json:"client_id"` @@ -26,10 +28,18 @@ type WriteMarker struct { } func (wm *WriteMarker) GetHashData() string { - hashData := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d", - wm.AllocationRoot, wm.PreviousAllocationRoot, - wm.FileMetaRoot, wm.AllocationID, wm.BlobberID, - wm.ClientID, wm.Size, wm.Timestamp) + var hashData string + if wm.ChainHash != "" { + hashData = fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%d:%d:%d", + wm.AllocationRoot, wm.PreviousAllocationRoot, + wm.FileMetaRoot, wm.ChainHash, wm.AllocationID, wm.BlobberID, + wm.ClientID, wm.Size, wm.ChainSize, wm.Timestamp) + } else { + hashData = fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d", + wm.AllocationRoot, wm.PreviousAllocationRoot, + wm.FileMetaRoot, wm.AllocationID, wm.BlobberID, + wm.ClientID, wm.Size, wm.Timestamp) + } return hashData } diff --git a/code/go/0chain.net/validatorcore/storage/writemarker/entity_test.go b/code/go/0chain.net/validatorcore/storage/writemarker/entity_test.go index 7b3248c1e..7f02f46a1 100644 --- a/code/go/0chain.net/validatorcore/storage/writemarker/entity_test.go +++ b/code/go/0chain.net/validatorcore/storage/writemarker/entity_test.go @@ -18,7 +18,7 @@ func TestWriteMarker_GetHashData(t *testing.T) { wm, wallet, err := setupEntityTest(t) require.NoError(t, err) - want := fmt.Sprintf("%v:%v:%v:%v:%v:%v:%v:%v", "alloc_root", "prev_alloc_root", "file_meta_root", "alloc_id", "blobber_id", wallet.ClientID, 1, wm.Timestamp) + want := fmt.Sprintf("%v:%v:%v:%v:%v:%v:%v:%v:%v:%v", "alloc_root", "prev_alloc_root", "file_meta_root", "chain_hash", "alloc_id", "blobber_id", wallet.ClientID, 1, 1, wm.Timestamp) got := wm.GetHashData() t.Logf("Want: %s. Got: %s", want, got) assert.Equal(t, want, got) @@ -120,6 +120,8 @@ func setupEntityTest(t *testing.T) (*writemarker.WriteMarker, *zcncrypto.Wallet, Size: int64(1), BlobberID: "blobber_id", Timestamp: common.Now(), + ChainHash: "chain_hash", + ChainSize: int64(1), } // TODO: why the config param is not used here? diff --git a/config/0chain_blobber.yaml b/config/0chain_blobber.yaml index 1fc371fd5..eac82db9c 100755 --- a/config/0chain_blobber.yaml +++ b/config/0chain_blobber.yaml @@ -96,6 +96,9 @@ openconnection_cleaner: writemarker_redeem: frequency: 10 num_workers: 5 + max_chain_length: 32 + max_timestamp_gap: 1800 # max timestamp gap to redeem write marker in seconds + marker_redeem_interval: 10m # interval to check for write markers which are ready to redeem readmarker_redeem: frequency: 10 num_workers: 5 diff --git a/goose/migrations/1707996797_chain_wm.sql b/goose/migrations/1707996797_chain_wm.sql new file mode 100644 index 000000000..f76f4a00b --- /dev/null +++ b/goose/migrations/1707996797_chain_wm.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE write_markers +ADD COLUMN chain_hash character varying(64), +ADD COLUMN chain_size BIGINT, +ADD COLUMN chain_length integer; + +ALTER TABLE allocations ADD COLUMN last_redeemed_sequence BIGINT DEFAULT 0; +-- +goose StatementEnd \ No newline at end of file