Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle reorg #2

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions internal/handler/disputeGame.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ func (r *RetryDisputeGameClient) ProcessDisputeGameMove(ctx context.Context, evt
return fmt.Errorf("[processDisputeGameMove] event data to disputeGameMove err: %s", err)
}
var storageClaimSize int64
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=?", evt.ContractAddress).Count(&storageClaimSize)
r.DB.Model(&schema.GameClaimData{}).Where("game_contract=? and on_chain_status = ?",
evt.ContractAddress, schema.GameClaimDataOnChainStatusValid).Count(&storageClaimSize)
data, err := r.Client.RetryClaimData(ctx, &bind.CallOpts{}, big.NewInt(storageClaimSize))
if err != nil {
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s", evt.ContractAddress, storageClaimSize, errors.WithStack(err))
return fmt.Errorf("[processDisputeGameMove] contract: %s, index: %d move event get claim data err: %s",
evt.ContractAddress, storageClaimSize, errors.WithStack(err))
}

pos := types.NewPositionFromGIndex(data.Position)
Expand Down Expand Up @@ -172,17 +174,18 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
}

gameClaim := &schema.GameClaimData{
GameContract: strings.ToLower(disputeGame.DisputeProxy),
DataIndex: 0,
ParentIndex: claimData.ParentIndex,
CounteredBy: claimData.CounteredBy.Hex(),
Claimant: claimData.Claimant.Hex(),
Bond: cast.ToString(claimData.Bond),
Claim: hex.EncodeToString(claimData.Claim[:]),
Position: cast.ToString(claimData.Position),
Clock: claimData.Clock.Int64(),
OutputBlock: l2Block.Uint64(),
EventID: evt.ID,
GameContract: strings.ToLower(disputeGame.DisputeProxy),
DataIndex: 0,
ParentIndex: claimData.ParentIndex,
CounteredBy: claimData.CounteredBy.Hex(),
Claimant: claimData.Claimant.Hex(),
Bond: cast.ToString(claimData.Bond),
Claim: hex.EncodeToString(claimData.Claim[:]),
Position: cast.ToString(claimData.Position),
Clock: claimData.Clock.Int64(),
OutputBlock: l2Block.Uint64(),
EventID: evt.ID,
OnChainStatus: schema.GameClaimDataOnChainStatusValid,
}

game := &schema.DisputeGame{
Expand All @@ -201,6 +204,7 @@ func (r *RetryDisputeGameClient) addDisputeGame(ctx context.Context, evt *schema
GameType: disputeGame.GameType,
L2BlockNumber: l2Block.Int64(),
Status: schema.DisputeGameStatusInProgress,
OnChainStatus: schema.DisputeGameOnChainStatusValid,
}
err = r.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(gameClaim).Error
Expand Down
13 changes: 6 additions & 7 deletions internal/handler/latestBlockNumber.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package handler

import (
"context"
"time"

"github.com/optimism-java/dispute-explorer/pkg/rpc"
"github.com/pkg/errors"

"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/log"
"github.com/pkg/errors"
"github.com/spf13/cast"
)

func LatestBlackNumber(ctx *svc.ServiceContext) {
for {
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"finalized\", false],\"id\":1}")
latest, err := ctx.L1RPC.BlockNumber(context.Background())
if err != nil {
log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err))
time.Sleep(3 * time.Second)
continue
}
block := rpc.ParseJSONBlock(string(blockJSON))

ctx.LatestBlockNumber = block.Number()
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", block.Number())
ctx.LatestBlockNumber = cast.ToInt64(latest)
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest)
time.Sleep(3 * time.Second)
}
}
46 changes: 45 additions & 1 deletion internal/handler/syncBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func SyncBlock(ctx *svc.ServiceContext) {
// 防止服务启停切换时同时存在2个服务同步数据
// Prevent data synchronization between two services during service start/stop switchover
time.Sleep(10 * time.Second)
var syncedBlock schema.SyncBlock
err := ctx.DB.Where("status = ? or status = ? ", schema.BlockValid, schema.BlockPending).Order("block_number desc").First(&syncedBlock).Error
Expand Down Expand Up @@ -54,6 +54,7 @@ func SyncBlock(ctx *svc.ServiceContext) {

if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash {
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash)
rollbackBlock(ctx)
continue
}

Expand All @@ -80,3 +81,46 @@ func SyncBlock(ctx *svc.ServiceContext) {
ctx.SyncedBlockHash = common.HexToHash(block.Hash())
}
}

func rollbackBlock(ctx *svc.ServiceContext) {
for {
rollbackBlockNumber := ctx.SyncedBlockNumber

log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)

blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}")
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err))
continue
}

rollbackBlock := rpc.ParseJSONBlock(string(blockJSON))
log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash)

if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash {
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Model(schema.SyncBlock{}).Where(" (status = ? or status = ?) AND block_number>?",
schema.BlockValid, schema.BlockPending, ctx.SyncedBlockNumber).Update("status", schema.BlockRollback).Error
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback Block err: %s\n", errors.WithStack(err))
return err
}
return nil
})
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock] Rollback db transaction err: %s\n", errors.WithStack(err))
continue
}
log.Infof("[Handler.SyncBlock.RollRackBlock] Rollback blocks is Stop\n")
return
}
var previousBlock schema.SyncBlock
rest := ctx.DB.Where("block_number = ? AND (status = ? or status = ?) ", rollbackBlockNumber-1, schema.BlockValid, schema.BlockPending).First(&previousBlock)
if rest.Error != nil {
log.Errorf("[Handler.RollRackBlock] Previous block by number error: %s\n", errors.WithStack(rest.Error))
continue
}
ctx.SyncedBlockNumber = previousBlock.BlockNumber
ctx.SyncedBlockHash = common.HexToHash(previousBlock.BlockHash)
}
}
205 changes: 161 additions & 44 deletions internal/handler/syncDispute.go
Original file line number Diff line number Diff line change
@@ -1,70 +1,187 @@
package handler

import (
"fmt"
"strings"
"sync"
"time"

"github.com/optimism-java/dispute-explorer/internal/blockchain"
"github.com/pkg/errors"
"gorm.io/gorm"

"github.com/ethereum/go-ethereum/common"
"github.com/optimism-java/dispute-explorer/internal/schema"
"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/event"
evt "github.com/optimism-java/dispute-explorer/pkg/event"
"github.com/optimism-java/dispute-explorer/pkg/log"
"golang.org/x/time/rate"
)

func SyncDispute(ctx *svc.ServiceContext) {
for {
var events []schema.SyncEvent
err := ctx.DB.Where("status=?", schema.EventPending).Limit(20).Find(&events).Error
err := ctx.DB.Where("status=? OR status=?", schema.EventPending, schema.EventRollback).Order("block_number").Limit(50).Find(&events).Error
if err != nil {
time.Sleep(3 * time.Second)
continue
}
for _, evt := range events {
disputeCreated := event.DisputeGameCreated{}
disputeMove := event.DisputeGameMove{}
disputeResolved := event.DisputeGameResolved{}
switch {
case evt.EventName == disputeCreated.Name() && evt.EventHash == disputeCreated.EventHash().String():
err = disputeCreated.ToObj(evt.Data)
if err != nil {
log.Errorf("[handle.SyncDispute] event data to DisputeGameCreated err: %s", err)
}
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for created err: %s", err)
}
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
case evt.EventName == disputeMove.Name() && evt.EventHash == disputeMove.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for move err: %s", err)
}
err = disputeClient.ProcessDisputeGameMove(ctx.Context, evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
}
case evt.EventName == disputeResolved.Name() && evt.EventHash == disputeResolved.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(evt.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute] init client for resolved err: %s", err)
}
err = disputeClient.ProcessDisputeGameResolve(evt)
if err != nil {
log.Errorf("[handle.SyncDispute] ProcessDisputeGameCreated err: %s", err)
if len(events) == 0 {
log.Infof("[Handler.SyncDispute] Pending events count is 0\n")
time.Sleep(2 * time.Second)
continue
}

var wg sync.WaitGroup
for _, event := range events {
wg.Add(1)
go func(_wg *sync.WaitGroup, ctx *svc.ServiceContext, event schema.SyncEvent) {
defer _wg.Done()
if event.Status == schema.EventPending {
// add events & block.status= valid
err = HandlePendingEvent(ctx, event)
if err != nil {
log.Errorf("[Handler.SyncEvent] HandlePendingBlock err: %s\n", errors.WithStack(err))
time.Sleep(500 * time.Millisecond)
}
} else if event.Status == schema.EventRollback {
// event.status=rollback & block.status=invalid
err = HandleRollbackEvent(ctx, event)
if err != nil {
log.Errorf("[Handler.SyncEvent] HandleRollbackBlock err: %s\n", errors.WithStack(err))
time.Sleep(500 * time.Millisecond)
}
}
default:
log.Infof("this event does not be monitored %s, hash %s", evt.EventName, evt.EventHash)
}(&wg, ctx, event)
}
wg.Wait()
time.Sleep(3 * time.Second)
}
}

func HandleRollbackEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
disputeCreated := evt.DisputeGameCreated{}
disputeMove := evt.DisputeGameMove{}
disputeResolved := evt.DisputeGameResolved{}
switch {
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
// rollback created event include: dispute_game, game_data_claim
err := disputeCreated.ToObj(event.Data)
if err != nil {
log.Errorf("[handle.SyncDispute.RollbackEvent] event data to DisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
// rollback dispute_game
var disputeGame schema.DisputeGame
err = ctx.DB.Where("game_contract=?", strings.ToLower(disputeCreated.DisputeProxy)).First(&disputeGame).Error
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created event err: %s", err)
}
disputeGame.OnChainStatus = schema.DisputeGameOnChainStatusRollBack

// rollback game_claim_data
var gameDataClaim schema.GameClaimData
err = ctx.DB.Where("game_contract=? and data_index=0", strings.ToLower(disputeCreated.DisputeProxy)).First(&gameDataClaim).Error
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] rollback created the first claim data err: %s", err)
}
gameDataClaim.OnChainStatus = schema.GameClaimDataOnChainStatusRollBack

err = ctx.DB.Transaction(func(tx *gorm.DB) error {
err = tx.Save(disputeGame).Error
if err != nil {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update dispute game status err: %s\n ", err)
}
err = tx.Save(gameDataClaim).Error
if err != nil {
panic(err)
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update game data claim status err: %s\n ", err)
}

event.Status = schema.EventValid
err = tx.Save(event).Error
if err != nil {
return fmt.Errorf("[handle.SyncDispute.RollbackEvent] update event err: %s\n ", err)
}
return nil
})
// remove contract
blockchain.RemoveContract(event.ContractAddress)
log.Infof("remove contract: %s", event.ContractAddress)

case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
// rollback move: rollback move depend on event_id
now := time.Now()
err := ctx.DB.Model(schema.GameClaimData{}).Where("event_id=?", event.ID).
Updates(map[string]interface{}{"on_chain_status": schema.GameClaimDataOnChainStatusRollBack, "updated_at": now}).Error
if err != nil {
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback move event err: %s ,id : %d \n", err, event.ID)
}
time.Sleep(3 * time.Second)
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
// rollback resolved
now := time.Now()
err := ctx.DB.Model(schema.DisputeGame{}).Where("game_contract=?", event.ContractAddress).
Updates(map[string]interface{}{"status": schema.DisputeGameStatusInProgress, "updated_at": now}).Error
if err != nil {
log.Errorf("[Handler.SyncDispute.RollbackBlock] rollback resolved event err: %s ,id : %d \n", err, event.ID)
}
blockchain.AddContract(event.ContractAddress)
log.Infof("[Handler.SyncDispute.RollbackBlock] rollback resolved event id : %d, contract: %s", event.ID, event.ContractAddress)
default:
log.Infof("[Handler.SyncDispute.RollbackBlock] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
return nil
}
return nil
}

func HandlePendingEvent(ctx *svc.ServiceContext, event schema.SyncEvent) error {
disputeCreated := evt.DisputeGameCreated{}
disputeMove := evt.DisputeGameMove{}
disputeResolved := evt.DisputeGameResolved{}
switch {
case event.EventName == disputeCreated.Name() && event.EventHash == disputeCreated.EventHash().String():
err := disputeCreated.ToObj(event.Data)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] event data to DisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(disputeCreated.DisputeProxy),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for created err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameCreated(ctx.Context, event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
case event.EventName == disputeMove.Name() && event.EventHash == disputeMove.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for move err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameMove(ctx.Context, event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
case event.EventName == disputeResolved.Name() && event.EventHash == disputeResolved.EventHash().String():
disputeClient, err := NewRetryDisputeGameClient(ctx.DB, common.HexToAddress(event.ContractAddress),
ctx.L1RPC, rate.Limit(ctx.Config.RPCRateLimit), ctx.Config.RPCRateBurst)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] init client for resolved err: %s", err)
return errors.WithStack(err)
}
err = disputeClient.ProcessDisputeGameResolve(event)
if err != nil {
log.Errorf("[handle.SyncDispute.HandlePendingEvent] ProcessDisputeGameCreated err: %s", err)
return errors.WithStack(err)
}
default:
log.Infof("[handle.SyncDispute.HandlePendingEvent] this event does not be monitored %s, hash %s", event.EventName, event.EventHash)
return nil
}
return nil
}
Loading
Loading