Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for gaps in block numbers and throw if found during migration #282

Draft
wants to merge 18 commits into
base: celo10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 145 additions & 35 deletions op-chain-ops/cmd/celo-migrate/ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/sync/errgroup"
)

Expand All @@ -23,6 +25,58 @@ type RLPBlockRange struct {
tds [][]byte
}

type RLPBlockElement struct {
decodedHeader *types.Header
hash []byte
header []byte
body []byte
receipts []byte
td []byte
}

func (r *RLPBlockRange) Element(i uint64) (*RLPBlockElement, error) {
header := types.Header{}
err := rlp.DecodeBytes(r.headers[i], &header)
if err != nil {
return nil, fmt.Errorf("can't decode header: %w", err)
}
return &RLPBlockElement{
decodedHeader: &header,
hash: r.hashes[i],
header: r.headers[i],
body: r.bodies[i],
receipts: r.receipts[i],
td: r.tds[i],
}, nil
}

func (r *RLPBlockRange) DropFirst() {
r.start = r.start + 1
r.hashes = r.hashes[1:]
r.headers = r.headers[1:]
r.bodies = r.bodies[1:]
r.receipts = r.receipts[1:]
r.tds = r.tds[1:]
}

func (e *RLPBlockElement) Header() *types.Header {

return e.decodedHeader
}

func (e *RLPBlockElement) Follows(prev *RLPBlockElement) error {
if e.Header().Number.Uint64() != prev.Header().Number.Uint64()+1 {
return fmt.Errorf("header number mismatch: expected %d, actual %d", prev.Header().Number.Uint64()+1, e.Header().Number.Uint64())
}
// We compare the parent hash with the stored hash of the previous block because
// at this point the header object will not calculate the correct hash since it
// first needs to be transformed.
if e.Header().ParentHash != common.Hash(prev.hash) {
return fmt.Errorf("parent hash mismatch between blocks %d and %d", e.Header().Number.Uint64(), prev.Header().Number.Uint64())
}
return nil
}

// NewChainFreezer is a small utility method around NewFreezer that sets the
// default parameters for the chain storage.
func NewChainFreezer(datadir string, namespace string, readonly bool) (*rawdb.Freezer, error) {
Expand Down Expand Up @@ -82,7 +136,7 @@ func migrateAncientsDb(ctx context.Context, oldDBPath, newDBPath string, batchSi
g.Go(func() error {
return readAncientBlocks(ctx, oldFreezer, numAncientsNewBefore, numAncientsOld, batchSize, readChan)
})
g.Go(func() error { return transformBlocks(ctx, readChan, transformChan) })
g.Go(func() error { return transformBlocks(ctx, readChan, transformChan, numAncientsNewBefore) })
g.Go(func() error { return writeAncientBlocks(ctx, newFreezer, transformChan, numAncientsOld) })

if err = g.Wait(); err != nil {
Expand All @@ -104,55 +158,106 @@ func migrateAncientsDb(ctx context.Context, oldDBPath, newDBPath string, batchSi

func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock, endBlock, batchSize uint64, out chan<- RLPBlockRange) error {
defer close(out)

for i := startBlock; i < endBlock; i += batchSize {
select {
case <-ctx.Done():
return ctx.Err()
default:
count := min(batchSize, endBlock-i+1)
count := min(batchSize, endBlock-i)
start := i

blockRange := RLPBlockRange{
start: start,
hashes: make([][]byte, count),
headers: make([][]byte, count),
bodies: make([][]byte, count),
receipts: make([][]byte, count),
tds: make([][]byte, count),
// If we are not at genesis include the last block of
// the previous range so we can check for continuity between ranges.
if start > 0 {
start = start - 1
count = count + 1
}
var err error

blockRange.hashes, err = freezer.AncientRange(rawdb.ChainFreezerHashTable, start, count, 0)
if err != nil {
return fmt.Errorf("failed to read hashes from old freezer: %w", err)
}
blockRange.headers, err = freezer.AncientRange(rawdb.ChainFreezerHeaderTable, start, count, 0)
if err != nil {
return fmt.Errorf("failed to read headers from old freezer: %w", err)
}
blockRange.bodies, err = freezer.AncientRange(rawdb.ChainFreezerBodiesTable, start, count, 0)
if err != nil {
return fmt.Errorf("failed to read bodies from old freezer: %w", err)
}
blockRange.receipts, err = freezer.AncientRange(rawdb.ChainFreezerReceiptTable, start, count, 0)
blockRange, err := loadRange(freezer, start, count)
if err != nil {
return fmt.Errorf("failed to read receipts from old freezer: %w", err)
return fmt.Errorf("failed to load ancient block range: %w", err)
}
blockRange.tds, err = freezer.AncientRange(rawdb.ChainFreezerDifficultyTable, start, count, 0)
if err != nil {
return fmt.Errorf("failed to read tds from old freezer: %w", err)

// Check continuity between blocks
var prevElement *RLPBlockElement
for i := uint64(0); i < count; i++ {
currElement, err := blockRange.Element(i)
if err != nil {
return err
}
if prevElement != nil {
if err := currElement.Follows(prevElement); err != nil {
return err
}
}
prevElement = currElement
}

out <- blockRange
if start > 0 {
blockRange.DropFirst()
}
out <- *blockRange
}
}
return nil
}

func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RLPBlockRange) error {
func loadRange(freezer *rawdb.Freezer, start, count uint64) (*RLPBlockRange, error) {
blockRange := &RLPBlockRange{
start: start,
hashes: make([][]byte, count),
headers: make([][]byte, count),
bodies: make([][]byte, count),
receipts: make([][]byte, count),
tds: make([][]byte, count),
}

var err error
blockRange.hashes, err = freezer.AncientRange(rawdb.ChainFreezerHashTable, start, count, 0)
if err != nil {
return nil, fmt.Errorf("failed to read hashes from old freezer: %w", err)
}
blockRange.headers, err = freezer.AncientRange(rawdb.ChainFreezerHeaderTable, start, count, 0)
if err != nil {
return nil, fmt.Errorf("failed to read headers from old freezer: %w", err)
}
blockRange.bodies, err = freezer.AncientRange(rawdb.ChainFreezerBodiesTable, start, count, 0)
if err != nil {
return nil, fmt.Errorf("failed to read bodies from old freezer: %w", err)
}
blockRange.receipts, err = freezer.AncientRange(rawdb.ChainFreezerReceiptTable, start, count, 0)
if err != nil {
return nil, fmt.Errorf("failed to read receipts from old freezer: %w", err)
}
blockRange.tds, err = freezer.AncientRange(rawdb.ChainFreezerDifficultyTable, start, count, 0)
if err != nil {
return nil, fmt.Errorf("failed to read tds from old freezer: %w", err)
}

// Make sure the number of elements retrieved from each table matches the expected length
if uint64(len(blockRange.hashes)) != count {
err = fmt.Errorf("Expected count mismatch in block range hashes: expected %d, actual %d", count, len(blockRange.hashes))
}
if uint64(len(blockRange.bodies)) != count {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range bodies: expected %d, actual %d", count, len(blockRange.bodies)))
}
if uint64(len(blockRange.headers)) != count {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range headers: expected %d, actual %d", count, len(blockRange.headers)))
}
if uint64(len(blockRange.receipts)) != count {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range receipts: expected %d, actual %d", count, len(blockRange.receipts)))
}
if uint64(len(blockRange.tds)) != count {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range total difficulties: expected %d, actual %d", count, len(blockRange.tds)))
}
return blockRange, err
}

func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RLPBlockRange, startBlock uint64) error {
alecps marked this conversation as resolved.
Show resolved Hide resolved
// Transform blocks from the in channel and send them to the out channel
defer close(out)

prevBlockNumber := uint64(startBlock - 1) // Will underflow when startBlock is 0, but then overflow back to 0

for blockRange := range in {
select {
case <-ctx.Done():
Expand All @@ -161,6 +266,12 @@ func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RL
for i := range blockRange.hashes {
blockNumber := blockRange.start + uint64(i)

if blockNumber != prevBlockNumber+1 { // Overflows back to 0 when startBlock is 0
return fmt.Errorf("gap found between ancient blocks numbered %d and %d. Please delete the target directory and repeat the migration with an uncorrupted source directory", prevBlockNumber, blockNumber)
}
// Block ranges are in order because they are read sequentially from the freezer
prevBlockNumber = blockNumber

newHeader, err := transformHeader(blockRange.headers[i])
if err != nil {
return fmt.Errorf("can't transform header: %w", err)
Expand All @@ -170,9 +281,8 @@ func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RL
return fmt.Errorf("can't transform body: %w", err)
}

if yes, newHash := hasSameHash(newHeader, blockRange.hashes[i]); !yes {
log.Error("Hash mismatch", "block", blockNumber, "oldHash", common.BytesToHash(blockRange.hashes[i]), "newHash", newHash)
return fmt.Errorf("hash mismatch at block %d", blockNumber)
if err := checkTransformedHeader(newHeader, blockRange.hashes[i], blockNumber); err != nil {
return err
}

blockRange.headers[i] = newHeader
Expand Down Expand Up @@ -216,7 +326,7 @@ func writeAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, in <-chan R
return fmt.Errorf("failed to write block range: %w", err)
}
blockRangeEnd := blockRange.start + uint64(len(blockRange.hashes)) - 1
log.Info("Wrote ancient blocks", "start", blockRange.start, "end", blockRangeEnd, "count", len(blockRange.hashes), "remaining", totalAncientBlocks-blockRangeEnd)
log.Info("Wrote ancient blocks", "start", blockRange.start, "end", blockRangeEnd, "count", len(blockRange.hashes), "remaining", totalAncientBlocks-(blockRangeEnd+1))
}
}
return nil
Expand Down
33 changes: 32 additions & 1 deletion op-chain-ops/cmd/celo-migrate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ const (
)

var (
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash
headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian)

blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
)

// encodeBlockNumber encodes a block number as big endian uint64
Expand All @@ -36,6 +42,31 @@ func headerKey(number uint64, hash common.Hash) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix
func headerTDKey(number uint64, hash common.Hash) []byte {
return append(headerKey(number, hash), headerTDSuffix...)
}

// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix
func headerHashKey(number uint64) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...)
}

// headerNumberKey = headerNumberPrefix + hash
func headerNumberKey(hash common.Hash) []byte {
return append(headerNumberPrefix, hash.Bytes()...)
}

// blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash
func blockBodyKey(number uint64, hash common.Hash) []byte {
return append(append(blockBodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// blockReceiptsKey = blockReceiptsPrefix + num (uint64 big endian) + hash
func blockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// Opens a database with access to AncientsDb
func openDB(chaindataPath string, readOnly bool) (ethdb.Database, error) {
// Will throw an error if the chaindataPath does not exist
Expand Down
4 changes: 3 additions & 1 deletion op-chain-ops/cmd/celo-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ func main() {
if isSubcommand {
return err
}
_ = cli.ShowAppHelp(ctx)
if err := cli.ShowAppHelp(ctx); err != nil {
log.Error("failed to show cli help", "err", err)
}
return fmt.Errorf("please provide a valid command")
},
}
Expand Down
Loading
Loading