Skip to content

Commit

Permalink
Merge pull request #2673 from OffchainLabs/fix-inbox-reader-reorg-window
Browse files Browse the repository at this point in the history
Fix inbox reading window after a reorg
  • Loading branch information
tsahee authored Nov 4, 2024
2 parents 30c39c3 + bc00529 commit b4e27fc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 19 deletions.
85 changes: 74 additions & 11 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ func (r *InboxReader) CaughtUp() chan struct{} {
return r.caughtUpChan
}

type lazyHashLogging struct {
f func() common.Hash
}

func (l lazyHashLogging) String() string {
return l.f().String()
}

func (l lazyHashLogging) TerminalString() string {
return l.f().TerminalString()
}

func (l lazyHashLogging) MarshalText() ([]byte, error) {
return l.f().MarshalText()
}

func (l lazyHashLogging) Format(s fmt.State, c rune) {
l.f().Format(s, c)
}

func (r *InboxReader) run(ctx context.Context, hadError bool) error {
readMode := r.config().ReadMode
from, err := r.getNextBlockToRead(ctx)
Expand Down Expand Up @@ -334,6 +354,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if ourLatestDelayedCount < checkingDelayedCount {
log.Debug("Expecting to find delayed messages", "checkingDelayedCount", checkingDelayedCount, "ourLatestDelayedCount", ourLatestDelayedCount, "currentHeight", currentHeight)
checkingDelayedCount = ourLatestDelayedCount
missingDelayed = true
} else if ourLatestDelayedCount > checkingDelayedCount {
Expand All @@ -354,6 +375,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if dbDelayedAcc != l1DelayedAcc {
log.Debug("Latest delayed accumulator mismatch", "delayedSeqNum", checkingDelayedSeqNum, "dbDelayedAcc", dbDelayedAcc, "l1DelayedAcc", l1DelayedAcc)
reorgingDelayed = true
}
}
Expand All @@ -371,6 +393,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if ourLatestBatchCount < checkingBatchCount {
log.Debug("Expecting to find sequencer batches", "checkingBatchCount", checkingBatchCount, "ourLatestBatchCount", ourLatestBatchCount, "currentHeight", currentHeight)
checkingBatchCount = ourLatestBatchCount
missingSequencer = true
} else if ourLatestBatchCount > checkingBatchCount && config.HardReorg {
Expand All @@ -390,6 +413,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
return err
}
if dbBatchAcc != l1BatchAcc {
log.Debug("Latest sequencer batch accumulator mismatch", "batchSeqNum", checkingBatchSeqNum, "dbBatchAcc", dbBatchAcc, "l1BatchAcc", l1BatchAcc)
reorgingSequencer = true
}
}
Expand Down Expand Up @@ -432,6 +456,15 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if to.Cmp(currentHeight) > 0 {
to.Set(currentHeight)
}
log.Debug(
"Looking up messages",
"from", from.String(),
"to", to.String(),
"missingDelayed", missingDelayed,
"missingSequencer", missingSequencer,
"reorgingDelayed", reorgingDelayed,
"reorgingSequencer", reorgingSequencer,
)
sequencerBatches, err := r.sequencerInbox.LookupBatchesInRange(ctx, from, to)
if err != nil {
return err
Expand All @@ -457,6 +490,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if len(sequencerBatches) > 0 {
missingSequencer = false
reorgingSequencer = false
var havePrevAcc common.Hash
firstBatch := sequencerBatches[0]
if firstBatch.SequenceNumber > 0 {
haveAcc, err := r.tracker.GetBatchAcc(firstBatch.SequenceNumber - 1)
Expand All @@ -467,7 +501,10 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != firstBatch.BeforeInboxAcc {
reorgingSequencer = true
}
havePrevAcc = haveAcc
}
readLastAcc := sequencerBatches[len(sequencerBatches)-1].AfterInboxAcc
var duplicateBatches int
if !reorgingSequencer {
// Skip any batches we already have in the database
for len(sequencerBatches) > 0 {
Expand All @@ -482,14 +519,26 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc == batch.AfterInboxAcc {
// Skip this batch, as we already have it in the database
sequencerBatches = sequencerBatches[1:]
duplicateBatches++
} else {
// The first batch AfterInboxAcc matches, but this batch doesn't,
// so we'll successfully reorg it when we hit the addMessages
break
}
}
}
log.Debug(
"Found sequencer batches",
"firstSequenceNumber", firstBatch.SequenceNumber,
"newBatchesCount", len(sequencerBatches),
"duplicateBatches", duplicateBatches,
"reorgingSequencer", reorgingSequencer,
"readBeforeAcc", firstBatch.BeforeInboxAcc,
"haveBeforeAcc", havePrevAcc,
"readLastAcc", readLastAcc,
)
} else if missingSequencer && to.Cmp(currentHeight) >= 0 {
log.Debug("Didn't find expected sequencer batches", "from", from, "to", to, "currentHeight", currentHeight)
// We were missing sequencer batches but didn't find any.
// This must mean that the sequencer batches are in the past.
reorgingSequencer = true
Expand All @@ -504,6 +553,7 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
if err != nil {
return err
}
var havePrevAcc common.Hash
if beforeCount > 0 {
haveAcc, err := r.tracker.GetDelayedAcc(beforeCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
Expand All @@ -513,14 +563,27 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if haveAcc != beforeAcc {
reorgingDelayed = true
}
havePrevAcc = haveAcc
}
log.Debug(
"Found delayed messages",
"firstSequenceNumber", beforeCount,
"count", len(delayedMessages),
"reorgingDelayed", reorgingDelayed,
"readBeforeAcc", beforeAcc,
"haveBeforeAcc", havePrevAcc,
"readLastAcc", lazyHashLogging{func() common.Hash {
// Only compute this if we need to log it, as it's somewhat expensive
return delayedMessages[len(delayedMessages)-1].AfterInboxAcc()
}},
)
} else if missingDelayed && to.Cmp(currentHeight) >= 0 {
log.Debug("Didn't find expected delayed messages", "from", from, "to", to, "currentHeight", currentHeight)
// We were missing delayed messages but didn't find any.
// This must mean that the delayed messages are in the past.
reorgingDelayed = true
}

log.Trace("looking up messages", "from", from.String(), "to", to.String(), "missingDelayed", missingDelayed, "missingSequencer", missingSequencer, "reorgingDelayed", reorgingDelayed, "reorgingSequencer", reorgingSequencer)
if !reorgingDelayed && !reorgingSequencer && (len(delayedMessages) != 0 || len(sequencerBatches) != 0) {
delayedMismatch, err := r.addMessages(ctx, sequencerBatches, delayedMessages)
if err != nil {
Expand All @@ -535,14 +598,6 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
storeSeenBatchCount()
}
}
if reorgingDelayed || reorgingSequencer {
from, err = r.getPrevBlockForReorg(from)
if err != nil {
return err
}
} else {
from = arbmath.BigAddByUint(to, 1)
}
// #nosec G115
haveMessages := uint64(len(delayedMessages) + len(sequencerBatches))
if haveMessages <= (config.TargetMessagesRead / 2) {
Expand All @@ -556,6 +611,14 @@ func (r *InboxReader) run(ctx context.Context, hadError bool) error {
} else if blocksToFetch > config.MaxBlocksToRead {
blocksToFetch = config.MaxBlocksToRead
}
if reorgingDelayed || reorgingSequencer {
from, err = r.getPrevBlockForReorg(from, blocksToFetch)
if err != nil {
return err
}
} else {
from = arbmath.BigAddByUint(to, 1)
}
}

if !readAnyBatches {
Expand All @@ -579,11 +642,11 @@ func (r *InboxReader) addMessages(ctx context.Context, sequencerBatches []*Seque
return false, nil
}

func (r *InboxReader) getPrevBlockForReorg(from *big.Int) (*big.Int, error) {
func (r *InboxReader) getPrevBlockForReorg(from *big.Int, maxBlocksBackwards uint64) (*big.Int, error) {
if from.Cmp(r.firstMessageBlock) <= 0 {
return nil, errors.New("can't get older messages")
}
newFrom := arbmath.BigSub(from, big.NewInt(10))
newFrom := arbmath.BigSub(from, new(big.Int).SetUint64(maxBlocksBackwards))
if newFrom.Cmp(r.firstMessageBlock) < 0 {
newFrom = new(big.Int).Set(r.firstMessageBlock)
}
Expand Down
20 changes: 12 additions & 8 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,22 +697,26 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client *ethclien

for _, batch := range batches {
if batch.SequenceNumber != pos {
return errors.New("unexpected batch sequence number")
return fmt.Errorf("unexpected batch sequence number %v expected %v", batch.SequenceNumber, pos)
}
if nextAcc != batch.BeforeInboxAcc {
return errors.New("previous batch accumulator mismatch")
return fmt.Errorf("previous batch accumulator %v mismatch expected %v", batch.BeforeInboxAcc, nextAcc)
}

if batch.AfterDelayedCount > 0 {
haveDelayedAcc, err := t.GetDelayedAcc(batch.AfterDelayedCount - 1)
if errors.Is(err, AccumulatorNotFoundErr) {
// We somehow missed a referenced delayed message; go back and look for it
return delayedMessagesMismatch
}
if err != nil {
notFound := errors.Is(err, AccumulatorNotFoundErr)
if err != nil && !notFound {
return err
}
if haveDelayedAcc != batch.AfterDelayedAcc {
if notFound || haveDelayedAcc != batch.AfterDelayedAcc {
log.Debug(
"Delayed message accumulator doesn't match sequencer batch",
"batch", batch.SequenceNumber,
"delayedPosition", batch.AfterDelayedCount-1,
"haveDelayedAcc", haveDelayedAcc,
"batchDelayedAcc", batch.AfterDelayedAcc,
)
// We somehow missed a delayed message reorg; go back and look for it
return delayedMessagesMismatch
}
Expand Down
4 changes: 4 additions & 0 deletions arbnode/message_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g
}
msgCount := endBatchMetadata.MessageCount
delayedCount := endBatchMetadata.DelayedMessageCount
if delayedCount > 0 {
// keep an extra delayed message for the inbox reader to use
delayedCount--
}

return m.deleteOldMessagesFromDB(ctx, msgCount, delayedCount)
}
Expand Down

0 comments on commit b4e27fc

Please sign in to comment.