Skip to content

Commit

Permalink
handle reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouop0 committed Sep 11, 2024
1 parent 5ea2afd commit d170b6c
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 76 deletions.
30 changes: 17 additions & 13 deletions internal/handler/disputeGame.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ func (r *RetryDisputeGameClient) ProcessDisputeGameMove(ctx context.Context, evt
return fmt.Errorf("[processDisputeGameMove] event data to disputeGameMove err: %s", err)
}
var storageClaimSize int64
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=?", evt.ContractAddress).Count(&storageClaimSize)
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=? and on_chain_status = ?",
evt.ContractAddress, schema.GameClaimDataOnChainStatusValid).Count(&storageClaimSize)
data, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, big.NewInt(storageClaimSize))
if err != nil {
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, storageClaimSize, errors.WithStack(err))
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s",
evt.ContractAddress, storageClaimSize, errors.WithStack(err))
}

pos := types.NewPositionFromGIndex(data.Position)
Expand Down Expand Up @@ -172,17 +174,18 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
}

gameClaim := &schema.GameClaimData{
GameContract: strings.ToLower(disputeGame.DisputeProxy),
DataIndex: 0,
ParentIndex: claimData.ParentIndex,
CounteredBy: claimData.CounteredBy.Hex(),
Claimant: claimData.Claimant.Hex(),
Bond: cast.ToString(claimData.Bond),
Claim: hex.EncodeToString(claimData.Claim[:]),
Position: cast.ToString(claimData.Position),
Clock: claimData.Clock.Int64(),
OutputBlock: l2Block.Uint64(),
EventID: evt.ID,
GameContract: strings.ToLower(disputeGame.DisputeProxy),
DataIndex: 0,
ParentIndex: claimData.ParentIndex,
CounteredBy: claimData.CounteredBy.Hex(),
Claimant: claimData.Claimant.Hex(),
Bond: cast.ToString(claimData.Bond),
Claim: hex.EncodeToString(claimData.Claim[:]),
Position: cast.ToString(claimData.Position),
Clock: claimData.Clock.Int64(),
OutputBlock: l2Block.Uint64(),
EventID: evt.ID,
OnChainStatus: schema.GameClaimDataOnChainStatusValid,
}

game := &schema.DisputeGame{
Expand All @@ -201,6 +204,7 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
GameType: disputeGame.GameType,
L2BlockNumber: l2Block.Int64(),
Status: schema.DisputeGameStatusInProgress,
OnChainStatus: schema.DisputeGameOnChainStatusValid,
}
err = r.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(gameClaim).Error
Expand Down
9 changes: 4 additions & 5 deletions internal/handler/latestBlockNumber.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package handler

import (
"context"
"time"

"github.com/optimism-java/dispute-explorer/pkg/rpc"
"github.com/pkg/errors"

"github.com/optimism-java/dispute-explorer/internal/svc"
Expand All @@ -12,16 +12,15 @@ import (

func LatestBlackNumber(ctx *svc.ServiceContext) {
for {
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"finalized\", false],\"id\":1}")
latest, err := ctx.L1RPC.BlockNumber(context.Background())
if err != nil {
log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err))
time.Sleep(3 * time.Second)
continue
}
block := rpc.ParseJSONBlock(string(blockJSON))

ctx.LatestBlockNumber = block.Number()
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", block.Number())
ctx.LatestBlockNumber = int64(latest)

Check failure on line 22 in internal/handler/latestBlockNumber.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

G115: integer overflow conversion uint64 -> int64 (gosec)
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest)
time.Sleep(3 * time.Second)
}
}
47 changes: 46 additions & 1 deletion internal/handler/syncBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func SyncBlock(ctx *svc.ServiceContext) {
// 防止服务启停切换时同时存在2个服务同步数据
// Prevent data synchronization between two services during service start/stop switchover
time.Sleep(10 * time.Second)
var syncedBlock schema.SyncBlock
err := ctx.DB.Where("status = ? or status = ? ", schema.BlockValid, schema.BlockPending).Order("block_number desc").First(&syncedBlock).Error
Expand Down Expand Up @@ -54,6 +54,7 @@ func SyncBlock(ctx *svc.ServiceContext) {

if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash {
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash)
rollbackBlock(ctx)
continue
}

Expand All @@ -80,3 +81,47 @@ func SyncBlock(ctx *svc.ServiceContext) {
ctx.SyncedBlockHash = common.HexToHash(block.Hash())
}
}

func rollbackBlock(ctx *svc.ServiceContext) {
for {
rollbackBlockNumber := ctx.SyncedBlockNumber

log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)

blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}")
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err))
continue
}

rollbackBlock := rpc.ParseJSONBlock(string(blockJSON))
log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash)

if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash {
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Model(schema.SyncBlock{}).Where(" (status = ? or status = ?) AND block_number>?",
schema.BlockValid, schema.BlockPending, ctx.SyncedBlockNumber).Update("status", schema.BlockRollback).Error
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback Block err: %s\n", errors.WithStack(err))
return err
}
return nil
})
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback db transaction err: %s\n", errors.WithStack(err))
continue
}
log.Infof("[Handler.SyncBlock.RollRackBlock] Rollback blocks is Stop\n")
return
}
var previousBlock schema.SyncBlock
rest := ctx.DB.Where("block_number = ? AND (status = ? or status = ?) ", rollbackBlockNumber-1, schema.BlockValid, schema.BlockPending).First(&previousBlock)
if rest.Error != nil {
log.Errorf("[Handler.RollRackBlock] Previous block by number error: %s\n", errors.WithStack(rest.Error))
continue
}
ctx.SyncedBlockNumber = previousBlock.BlockNumber
ctx.SyncedBlockHash = common.HexToHash(previousBlock.BlockHash)
}

}

Check failure on line 127 in internal/handler/syncBlock.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

unnecessary trailing newline (whitespace)
204 changes: 160 additions & 44 deletions internal/handler/syncDispute.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,186 @@
package handler

import (
"fmt"
"github.com/optimism-java/dispute-explorer/internal/blockchain"
"github.com/pkg/errors"
"gorm.io/gorm"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/optimism-java/dispute-explorer/internal/schema"
"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/event"
evt "github.com/optimism-java/dispute-explorer/pkg/event"
"github.com/optimism-java/dispute-explorer/pkg/log"
"golang.org/x/time/rate"
)

func SyncDispute(ctx *svc.ServiceContext) {
for {
var events []schema.SyncEvent
err := ctx.DB.Where("status=?", schema.EventPending).Limit(20).Find(&events).Error
err := ctx.DB.Where("status=? OR status=?", schema.EventPending, schema.EventRollback).Order("block_number").Limit(50).Find(&events).Error
if err != nil {
time.Sleep(3 * time.Second)
continue
}
for _, evt := range events {
disputeCreated := event.DisputeGameCreated{}
disputeMove := event.DisputeGameMove{}
disputeResolved := event.DisputeGameResolved{}
switch {
case evt.EventName == disputeCreated.Name() && evt.EventHash == disputeCreated.EventHash().String():
err = disputeCreated.ToObj(evt.Data)
if err != nil {
log.Errorf("[handle.SyncDispute] event data to DisputeGameCreated err: %s", err)
}
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for created err: %s", err)
}
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
case evt.EventName == disputeMove.Name() && evt.EventHash == disputeMove.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for move err: %s", err)
}
err = disputeClient.ProcessDisputeGameMove(ctx.Context, evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
case evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for resolved err: %s", err)
}
err = disputeClient.ProcessDisputeGameResolve(evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
if len(events) == 0 {
log.Infof("[Handler.SyncDispute] Pending events count is 0\n")
time.Sleep(2 * time.Second)
continue
}

var wg sync.WaitGroup
for _, event := range events {
wg.Add(1)
go func(_wg *sync.WaitGroup, ctx *svc.ServiceContext, event schema.SyncEvent) {
defer _wg.Done()
if event.Status == schema.EventPending {
// add events & block.status= valid
err = HandlePendingEvent(ctx, event)
if err != nil {
log.Errorf("[Handler.SyncEvent] HandlePendingBlock err: %s\n", errors.WithStack(err))
time.Sleep(500 * time.Millisecond)
}
} else if event.Status == schema.EventRollback {
// event.status=rollback & block.status=invalid
err = HandleRollbackEvent(ctx, event)
if err != nil {
log.Errorf("[Handler.SyncEvent] HandleRollbackBlock err: %s\n", errors.WithStack(err))
time.Sleep(500 * time.Millisecond)
}
}
default:
log.Infof("this event does not be monitored %s, hash %s", evt.EventName, evt.EventHash)
}(&wg, ctx, event)
}
wg.Wait()
time.Sleep(3 * time.Second)
}
}

func HandleRollbackEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
disputeCreated := evt.DisputeGameCreated{}
disputeMove := evt.DisputeGameMove{}
disputeResolved := evt.DisputeGameResolved{}
switch {
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
// rollback created event include: dispute_game, game_data_claim
err := disputeCreated.ToObj(event.Data)
if err != nil {
log.Errorf("[handle.SyncDispute.RollbackEvent] event data to DisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
// rollback dispute_game
var disputeGame schema.DisputeGame
err = ctx.DB.Where("game_contract=?", strings.ToLower(disputeCreated.DisputeProxy)).First(&disputeGame).Error
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created event err: %s", err)
}
disputeGame.OnChainStatus = schema.DisputeGameOnChainStatusRollBack

// rollback game_claim_data
var gameDataClaim schema.GameClaimData
err = ctx.DB.Where("game_contract=? and data_index=0", strings.ToLower(disputeCreated.DisputeProxy)).First(&gameDataClaim).Error
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created the first claim data err: %s", err)
}
gameDataClaim.OnChainStatus = schema.GameClaimDataOnChainStatusRollBack

err = ctx.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(disputeGame).Error
if err != nil {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update dispute game status err: %s\n ", err)
}
err = tx.Save(gameDataClaim).Error
if err != nil {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update game data claim status err: %s\n ", err)
}

event.Status = schema.EventValid
err = tx.Save(event).Error
if err != nil {
panic(err)
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update event err: %s\n ", err)
}
return nil
})
// remove contract
blockchain.RemoveContract(event.ContractAddress)
log.Infof("remove contract: %s", event.ContractAddress)

case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
// rollback move: rollback move depend on event_id
now := time.Now()
err := ctx.DB.Model(schema.GameClaimData{}).Where("event_id=?", event.ID).
Updates(map[string]interface{}{"on_chain_status": schema.GameClaimDataOnChainStatusRollBack, "updated_at": now}).Error
if err != nil {
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback move event err: %s ,id : %d \n", err, event.ID)
}
time.Sleep(3 * time.Second)
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
// rollback resolved
now := time.Now()
err := ctx.DB.Model(schema.DisputeGame{}).Where("game_contract=?", event.ContractAddress).
Updates(map[string]interface{}{"status": schema.DisputeGameStatusInProgress, "updated_at": now}).Error
if err != nil {
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback resolved event err: %s ,id : %d \n", err, event.ID)
}
blockchain.AddContract(event.ContractAddress)
log.Infof("[Handler.SyncDispute.RollbackBlock] rollback resolved event id : %d, contract: %s", event.ID, event.ContractAddress)
default:
log.Infof("[Handler.SyncDispute.RollbackBlock] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
return nil
}
return nil
}

func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
disputeCreated := evt.DisputeGameCreated{}
disputeMove := evt.DisputeGameMove{}
disputeResolved := evt.DisputeGameResolved{}
switch {
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
err := disputeCreated.ToObj(event.Data)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] event data to DisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for created err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for move err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameMove(ctx.Context, event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for resolved err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameResolve(event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
default:
log.Infof("[handle.SyncDispute.HandlePendingEvent] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
return nil
}
return nil
}
4 changes: 4 additions & 0 deletions internal/schema/dispute_game.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const (
DisputeGameStatusInProgress = 0
DisputeGameStatusChallengerWin = 1
DisputeGameStatusDefenderWin = 2

DisputeGameOnChainStatusValid = "valid"
DisputeGameOnChainStatusRollBack = "rollback"
)

type DisputeGame struct {
Expand All @@ -27,6 +30,7 @@ type DisputeGame struct {
Status uint8 `json:"status"`
Computed bool `json:"computed"`
CalculateLost bool `json:"calculate_lost"`
OnChainStatus string `json:"on_chain_status"`
}

func (DisputeGame) TableName() string {
Expand Down
Loading

0 comments on commit d170b6c

Please sign in to comment.