Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining WM #1388

Merged
merged 34 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3e2b5e8
wm chain setup
Hitenjain14 Feb 15, 2024
0f4a46d
Merge branch 'staging' of https://github.com/0chain/blobber into feat…
Hitenjain14 Feb 15, 2024
cdfa217
Merge branch 'sprint-1.13' of https://github.com/0chain/blobber into …
Hitenjain14 Feb 15, 2024
b9f4d18
fix lint
Hitenjain14 Feb 15, 2024
1d5d87e
add sql migration
Hitenjain14 Feb 15, 2024
b51a8c5
fix migration
Hitenjain14 Feb 15, 2024
7d50fa5
add chain data in refpath
Hitenjain14 Feb 15, 2024
72d2603
add save logs
Hitenjain14 Feb 15, 2024
8435fea
add query param chain data
Hitenjain14 Feb 15, 2024
8dc720c
unlock wm on commit
Hitenjain14 Feb 15, 2024
b4b7a56
export chain length
Hitenjain14 Feb 15, 2024
42e0ced
fix uncommitted markers
Hitenjain14 Feb 15, 2024
9189ecf
fix sequence
Hitenjain14 Feb 15, 2024
b2772c7
change chainhash verify
Hitenjain14 Feb 16, 2024
1491cb1
Revert "change chainhash verify"
Hitenjain14 Feb 16, 2024
157ee28
fix chain size
Hitenjain14 Feb 16, 2024
87b1921
calculate chain hash
Hitenjain14 Feb 17, 2024
be1740c
fix wm update
Hitenjain14 Feb 17, 2024
92ae4a2
add chain hash logs
Hitenjain14 Feb 18, 2024
734ff94
add chain lenght logs
Hitenjain14 Feb 18, 2024
861f741
add commit log
Hitenjain14 Feb 19, 2024
6a4a4c8
Revert "fix wm update"
Hitenjain14 Feb 19, 2024
fc1c11f
revert commit marker logs
Hitenjain14 Feb 19, 2024
f88bf2a
move chain values to config
Hitenjain14 Feb 19, 2024
09eddac
add support for old wm in validators
Hitenjain14 Feb 23, 2024
b1dd3a3
Merge branch 'sprint-1.13' of https://github.com/0chain/blobber into …
Hitenjain14 Feb 23, 2024
35b95bf
Merge branch 'sprint-1.13' into feat/wm-chain
Hitenjain14 Feb 26, 2024
44090b9
Merge branch 'sprint-1.13' into feat/wm-chain
Hitenjain14 Mar 4, 2024
4753b4e
fix lint nil check
Hitenjain14 Mar 8, 2024
e307c23
add wm version
Hitenjain14 Apr 6, 2024
da2e84a
Merge remote-tracking branch 'origin' into feat/wm-chain
Hitenjain14 Apr 6, 2024
9bae8c6
Merge branch 'sprint-1.14' of https://github.com/0chain/blobber into …
Hitenjain14 Apr 6, 2024
ab9404c
Merge branch 'sprint-1.14' into feat/wm-chain
Hitenjain14 Apr 8, 2024
66ea139
change base go version
Hitenjain14 Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Allocation struct {
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
Expand Down
5 changes: 4 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *Repository) GetAllocationIds(ctx context.Context) []Res {

}

func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation) error {
func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation, redeemSeq int64) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
Expand All @@ -205,17 +205,20 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
allocationUpdates := make(map[string]interface{})
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
allocationUpdates["is_redeem_required"] = false
allocationUpdates["last_redeemed_sequence"] = redeemSeq
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
if err != nil {
return err
}
allocationObj.LatestRedeemedWM = AllocationRoot
allocationObj.IsRedeemRequired = false
allocationObj.LastRedeemedSeq = redeemSeq
txnCache := cache[allocationID]
txnCache.Allocation = allocationObj
updateAlloc := func(a *Allocation) {
a.LatestRedeemedWM = AllocationRoot
a.IsRedeemRequired = false
a.LastRedeemedSeq = redeemSeq
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[allocationID] = txnCache
Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type ConnectionResult struct {

// swagger:model CommitResult
type CommitResult struct {
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarkerEntity `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
//Result []*UploadResult `json:"result"`
}

Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func SetupDefaultConfig() {
viper.SetDefault("openconnection_cleaner.frequency", 30)
viper.SetDefault("writemarker_redeem.frequency", 10)
viper.SetDefault("writemarker_redeem.num_workers", 5)
viper.SetDefault("writemarker_redeem.max_chain_length", 32)
viper.SetDefault("writemarker_redeem.max_timestamp_gap", 1800)
viper.SetDefault("writemarker_redeem.marker_redeem_interval", time.Minute*10)
viper.SetDefault("readmarker_redeem.frequency", 10)
viper.SetDefault("readmarker_redeem.num_workers", 5)
viper.SetDefault("challenge_response.frequency", 10)
Expand Down Expand Up @@ -100,6 +103,9 @@ type Config struct {
OpenConnectionWorkerTolerance int64
WMRedeemFreq int64
WMRedeemNumWorkers int
MaxChainLength int
MaxTimestampGap int64
MarkerRedeemInterval time.Duration
RMRedeemFreq int64
RMRedeemNumWorkers int
ChallengeResolveFreq int64
Expand Down Expand Up @@ -218,6 +224,9 @@ func ReadConfig(deploymentMode int) {

Configuration.WMRedeemFreq = viper.GetInt64("writemarker_redeem.frequency")
Configuration.WMRedeemNumWorkers = viper.GetInt("writemarker_redeem.num_workers")
Configuration.MaxChainLength = viper.GetInt("writemarker_redeem.max_chain_length")
Configuration.MaxTimestampGap = viper.GetInt64("writemarker_redeem.max_timestamp_gap")
Configuration.MarkerRedeemInterval = viper.GetDuration("writemarker_redeem.marker_redeem_interval")

Configuration.RMRedeemFreq = viper.GetInt64("readmarker_redeem.frequency")
Configuration.RMRedeemNumWorkers = viper.GetInt("readmarker_redeem.num_workers")
Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func CommitWriteResponseCreator(r interface{}) *blobbergrpc.CommitResponse {

return &blobbergrpc.CommitResponse{
AllocationRoot: httpResp.AllocationRoot,
WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
// WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
}
}

Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func GetObjectTreeResponseHandler(getObjectTreeResponse *blobbergrpc.GetObjectTr
func CommitWriteResponseHandler(resp *blobbergrpc.CommitResponse) *blobberhttp.CommitResult {
return &blobberhttp.CommitResult{
AllocationRoot: resp.AllocationRoot,
WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
// WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
}
}

Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/handler/handler_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/build"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
Expand Down Expand Up @@ -153,6 +155,13 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu
return resp, statusCode, common.NewErrorf("commit_error",
"error committing to meta store: %v", err)
}

if blobberRes, ok := resp.(*blobberhttp.CommitResult); ok {
// Save the write marker data
writemarker.SaveMarkerData(allocationID, blobberRes.WriteMarker.WM.Timestamp, blobberRes.WriteMarker.WM.ChainLength)
} else {
Logger.Error("Invalid response type for commit handler")
}
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@ package handler

import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
)

var WriteMarkerMutext = &writemarker.Mutex{
ML: common.GetNewLocker(),
}

// LockWriteMarker try to lock writemarker for specified allocation id, and return latest RefTree
func LockWriteMarker(ctx *Context) (interface{}, error) {
connectionID, _ := ctx.FormValue("connection_id")

result, err := WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID)
result, err := writemarker.WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID)
Logger.Info("Lock write marker result", zap.Any("result", result), zap.Error(err))
if err != nil {
return nil, err
Expand All @@ -28,7 +23,7 @@ func LockWriteMarker(ctx *Context) (interface{}, error) {
func UnlockWriteMarker(ctx *Context) (interface{}, error) {
connectionID := ctx.Vars["connection"]

err := WriteMarkerMutext.Unlock(ctx, ctx.AllocationId, connectionID)
err := writemarker.WriteMarkerMutext.Unlock(ctx.AllocationId, connectionID)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,10 @@ func readPreRedeem(
}

func checkPendingMarkers(ctx context.Context, allocationID string) error {

mut := writemarker.GetLock(allocationID)
if mut == nil {
return nil
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
err := mut.Acquire(ctx, 1)
if err != nil {
return common.NewError("check_pending_markers", "write marker is still not redeemed")
pending := writemarker.CheckProcessingMarker(allocationID)
if pending {
return common.NewError("pending_markers", "previous marker is still pending to be redeemed")
}
mut.Release(1)
return nil
}

Expand Down Expand Up @@ -507,6 +499,7 @@ func (fsh *StorageHandler) CreateConnection(ctx context.Context, r *http.Request
}

func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*blobberhttp.CommitResult, error) {
var prevChainHash string
startTime := time.Now()
if r.Method == "GET" {
return nil, common.NewError("invalid_method", "Invalid method used for the upload URL. Use POST instead")
Expand Down Expand Up @@ -602,6 +595,20 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewErrorf("latest_write_marker_read_error",
"Error reading the latest write marker for allocation: %v", err)
}
if latestWriteMarkerEntity.Status == writemarker.Failed {
return nil, common.NewError("latest_write_marker_failed",
"Latest write marker is in failed state")
}

if latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size != writeMarker.ChainSize {
return nil, common.NewErrorf("invalid_chain_size",
"Invalid chain size. expected:%v got %v", latestWriteMarkerEntity.WM.ChainSize+connectionObj.Size, writeMarker.ChainSize)
}

if latestWriteMarkerEntity.Status != writemarker.Committed {
writeMarker.ChainLength = latestWriteMarkerEntity.WM.ChainLength
}
prevChainHash = latestWriteMarkerEntity.WM.ChainHash
}

writemarkerEntity := &writemarker.WriteMarkerEntity{}
Expand All @@ -613,7 +620,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
result.ErrorMessage = "Verification of write marker failed: " + err.Error()
result.Success = false
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
Logger.Error("verify_writemarker_failed", zap.Error(err))
return &result, common.NewError("write_marker_verification_failed", result.ErrorMessage)
Expand Down Expand Up @@ -667,18 +674,23 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
if allocationRoot != writeMarker.AllocationRoot {
result.AllocationRoot = allocationObj.AllocationRoot
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." +
" Expected hash: " + allocationRoot
return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage)
}

chainHash := writemarker.CalculateChainHash(prevChainHash, allocationRoot)
if chainHash != writeMarker.ChainHash {
return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash")
}

if fileMetaRoot != writeMarker.FileMetaRoot {
// result.AllocationRoot = allocationObj.AllocationRoot
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." +
Expand All @@ -688,6 +700,10 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b

writemarkerEntity.ConnectionID = connectionObj.ID
writemarkerEntity.ClientPublicKey = clientKey
writemarkerEntity.WM.ChainLength += 1
if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength {
return nil, common.NewError("chain_length_exceeded", "Chain length exceeded")
}

db := datastore.GetStore().GetTransaction(ctx)
writemarkerEntity.Latest = true
Expand Down Expand Up @@ -734,7 +750,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b

db.Model(connectionObj).Updates(allocation.AllocationChangeCollector{Status: allocation.CommittedConnection})
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &writeMarker
result.WriteMarker = writemarkerEntity
result.Success = true
result.ErrorMessage = ""
commitOperation := connectionObj.Changes[0].Operation
Expand All @@ -743,10 +759,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
//Delete connection object and its changes

db.Delete(connectionObj)
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
}
go allocation.DeleteConnectionObjEntry(connectionID)
go AddWriteMarkerCount(clientID, connectionObj.Size <= 0)

Expand Down Expand Up @@ -1390,6 +1402,10 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
return nil, common.NewError("write_marker_verification_failed", "Verification of the write marker failed: "+err.Error())
}

if writemarkerEntity.WM.ChainLength > config.Configuration.MaxChainLength {
return nil, common.NewError("chain_length_exceeded", "Chain length exceeded")
}

elapsedVerifyWM := time.Since(startTime) - elapsedAllocation - elapsedGetLock

var clientIDForWriteRedeem = writeMarker.ClientID
Expand Down Expand Up @@ -1425,17 +1441,23 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob

if allocationRoot != writeMarker.AllocationRoot {
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
result.Success = false
result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." +
" Expected hash: " + allocationRoot
txn.Rollback()
return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage)
}

chainHash := writemarker.CalculateChainHash(latestWriteMarkerEntity.WM.ChainHash, allocationRoot)
if chainHash != writeMarker.ChainHash {
txn.Rollback()
return nil, common.NewError("chain_hash_mismatch", "Chain hash in the write marker does not match the calculated chain hash")
}

if fileMetaRoot != writeMarker.FileMetaRoot {
if latestWriteMarkerEntity != nil {
result.WriteMarker = &latestWriteMarkerEntity.WM
result.WriteMarker = latestWriteMarkerEntity
}
result.Success = false
result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." +
Expand Down Expand Up @@ -1490,18 +1512,14 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob
if err != nil {
return &result, common.NewError("allocation_commit_error", "Error committing the transaction "+err.Error())
}
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
}
err = allocation.CommitRollback(allocationID)
if err != nil {
Logger.Error("Error committing the rollback for allocation", zap.Error(err))
}

elapsedCommitRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem
result.AllocationRoot = allocationObj.AllocationRoot
result.WriteMarker = &writeMarker
result.WriteMarker = writemarkerEntity
result.Success = true
result.ErrorMessage = ""
commitOperation := "rollback"
Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Req

var result blobberhttp.LatestWriteMarkerResult
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
result.LatestWM = &latestWM.WM
}
if prevWM != nil {
Expand Down Expand Up @@ -560,6 +563,9 @@ func (fsh *StorageHandler) getReferencePath(ctx context.Context, r *http.Request
var refPathResult blobberhttp.ReferencePathResult
refPathResult.ReferencePath = refPath
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
refPathResult.LatestWM = &latestWM.WM
}

Expand Down Expand Up @@ -628,6 +634,9 @@ func (fsh *StorageHandler) GetObjectTree(ctx context.Context, r *http.Request) (
var refPathResult blobberhttp.ReferencePathResult
refPathResult.ReferencePath = refPath
if latestWM != nil {
if latestWM.Status == writemarker.Committed {
latestWM.WM.ChainLength = 0 // start a new chain
}
refPathResult.LatestWM = &latestWM.WM
}
return &refPathResult, nil
Expand Down
Loading
Loading