Skip to content

Commit

Permalink
feat: check orphan children for existing block
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Oct 31, 2024
1 parent 3170539 commit e9a545a
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 355 deletions.
185 changes: 99 additions & 86 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,22 @@ func (p *Processor) StartBlockProcessing() {
timeStart := time.Now()

err = p.processBlock(blockMsg)

if err == nil {
err = p.store.MarkBlockAsDone(p.ctx, &blockHash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes)))
if err != nil {
p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
} else {
// add the total block processing time to the stats
p.logger.Info("Processed block", slog.String("hash", blockHash.String()), slog.Int("txs", len(blockMsg.TransactionHashes)), slog.String("duration", time.Since(timeStart).String()))
}
}

if err != nil {
_, errDel := p.store.DelBlockProcessing(p.ctx, &blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
continue
}

err = p.store.MarkBlockAsDone(p.ctx, &blockHash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes)))
if err != nil {
p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
continue
}

// add the total block processing time to the stats
p.logger.Info("Processed block", slog.String("hash", blockHash.String()), slog.Int("txs", len(blockMsg.TransactionHashes)), slog.String("duration", time.Since(timeStart).String()))
}
}
}()
Expand Down Expand Up @@ -426,71 +424,36 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
}

blockHash := msg.Header.BlockHash()
previousBlockHash := msg.Header.PrevBlock
blockHeight := msg.Height

p.logger.Info("processing incoming block", slog.String("hash", blockHash.String()))

// don't process block that was already processed
existingBlock, _ := p.store.GetBlock(ctx, &blockHash)
if existingBlock != nil {
// TODO: udpate orphans and if returned chain != 0 -> enter the flow
p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String()))
return nil
}
var chain chain
var competing bool
var err error

prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash)
if err != nil {
p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error()))
return err
}
// check if we've already processed that block
existingBlock, _ := p.store.GetBlock(ctx, &blockHash)

longestTipExists := true
if prevBlock == nil {
// This check is only in case there's a fresh, empty database
// with no blocks, to mark the first block as the LONGEST chain
longestTipExists, err = p.longestTipExists(ctx)
if existingBlock != nil && existingBlock.Processed {
// if the block was already processed, check and update
// possible orphan children of that block
chain, competing, err = p.updateOrphans(ctx, existingBlock, competing)
if err != nil {
p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return err
}
}

incomingBlock := createBlock(msg, prevBlock, longestTipExists)

competing, err := p.competingChainsExist(ctx, incomingBlock)
if err != nil {
p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
return err
}

if competing {
p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height))
incomingBlock.Status = blocktx_api.Status_STALE
}

p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String()))

err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot)
if err != nil {
p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return err
}

// if the block is ORPHANED, there's no need to process it any further
if incomingBlock.Status == blocktx_api.Status_ORPHANED {
return nil
}

chain, err := p.updateOrphans(ctx, incomingBlock)
if err != nil {
p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return err
}

// if there were any orphans found, we need to verify
// whether they are part of the longest or stale chain
if len(chain) > 1 {
competing = true
if len(chain) == 1 { // this means that no orphans were found
p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String()))
return nil
}
} else {
// if the block was not yet processed, proceed normally
chain, competing, err = p.verifyAndInsertBlock(ctx, msg)
if err != nil {
return err
}
}

chainTip, err := chain.getTip()
Expand All @@ -503,12 +466,12 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
if competing {
hasGreatestChainwork, err := p.hasGreatestChainwork(ctx, chainTip)
if err != nil {
p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
return err
}

if hasGreatestChainwork {
p.logger.Info("chain reorg detected", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height))
p.logger.Info("chain reorg detected", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight))
shouldPerformReorg = true
}
}
Expand All @@ -518,19 +481,19 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
if shouldPerformReorg {
txsToPublish, err = p.performReorg(ctx, chainTip)
if err != nil {
p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
return err
}
} else if chainTip.Status == blocktx_api.Status_STALE {
txsToPublish, err = p.getStaleTxs(ctx, chain)
if err != nil {
p.logger.Error("unable to get stale transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
p.logger.Error("unable to get stale transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
return err
}
} else if chainTip.Status == blocktx_api.Status_LONGEST {
txsToPublish, err = p.store.GetRegisteredTransactions(ctx, chain.getHashes())
if err != nil {
p.logger.Error("unable to get registered transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
p.logger.Error("unable to get registered transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error()))
return err
}
}
Expand All @@ -555,22 +518,69 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
return nil
}

func (p *Processor) getPrevBlock(ctx context.Context, prevHash *chainhash.Hash) (*blocktx_api.Block, error) {
prevBlock, err := p.store.GetBlock(ctx, prevHash)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
return nil, err
func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (chain, bool, error) {
blockHash := msg.Header.BlockHash()
previousBlockHash := msg.Header.PrevBlock

prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash)
if err != nil {
p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error()))
return nil, false, err
}

return prevBlock, nil
}
longestTipExists := true
if prevBlock == nil {
// This check is only in case there's a fresh, empty database
// with no blocks, to mark the first block as the LONGEST chain
longestTipExists, err = p.longestTipExists(ctx)
if err != nil {
p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
return nil, false, err
}
}

incomingBlock := createBlock(msg, prevBlock, longestTipExists)

competing, err := p.competingChainsExist(ctx, incomingBlock)
if err != nil {
p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
return nil, false, err
}

if competing {
p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height))
incomingBlock.Status = blocktx_api.Status_STALE
}

func (p *Processor) getPrevChain(ctx context.Context, prevHash *chainhash.Hash, n int) (chain, error) {
prevBlocks, err := p.store.GetPreviousBlocks(ctx, prevHash, n)
p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String()))

err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot)
if err != nil {
p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return nil, false, err
}

// if the block is ORPHANED, there's no need to process it any further
if incomingBlock.Status == blocktx_api.Status_ORPHANED {
return chain{incomingBlock}, false, nil
}

chain, competing, err := p.updateOrphans(ctx, incomingBlock, competing)
if err != nil {
p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return nil, false, err
}

return chain, competing, nil
}

func (p *Processor) getPrevBlock(ctx context.Context, prevHash *chainhash.Hash) (*blocktx_api.Block, error) {
prevBlock, err := p.store.GetBlock(ctx, prevHash)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
return nil, err
}

return prevBlocks, nil
return prevBlock, nil
}

func (p *Processor) longestTipExists(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -730,15 +740,15 @@ func (p *Processor) storeTransactions(ctx context.Context, blockId uint64, block
return nil
}

func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_api.Block) (chain, error) {
func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_api.Block, competing bool) (chain, bool, error) {
chain := []*blocktx_api.Block{incomingBlock}

orphanedBlocks, err := p.store.GetOrphanedChainUpFromHash(ctx, incomingBlock.Hash)
if err != nil {
return nil, err
return nil, false, err
}
if len(orphanedBlocks) == 0 {
return chain, nil
return chain, competing, nil
}

blockStatusUpdates := make([]store.BlockStatusUpdate, len(orphanedBlocks))
Expand All @@ -760,12 +770,15 @@ func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_ap

err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates)
if err != nil {
return nil, err
return nil, false, err
}

chain = append(chain, orphanedBlocks...)

return chain, nil
// if we found any orphans and marked them as STALE
// we need to find out if they are part of the longest
// or stale chain, so competing is returned as true
return chain, true, nil
}

func (p *Processor) performReorg(ctx context.Context, staleChainTip *blocktx_api.Block) ([]store.TransactionBlock, error) {
Expand Down
26 changes: 20 additions & 6 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestHandleBlock(t *testing.T) {
RollbackFunc: func() error {
return nil
},
LockBlocksTableFunc: func(ctx context.Context) error {
WriteLockBlocksTableFunc: func(ctx context.Context) error {
return nil
},
}
Expand All @@ -162,7 +162,7 @@ func TestHandleBlock(t *testing.T) {
},
GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) {
if tc.blockAlreadyExists {
return &blocktx_api.Block{}, nil
return &blocktx_api.Block{Processed: true}, nil
}
return nil, store.ErrBlockNotFound
},
Expand Down Expand Up @@ -276,12 +276,25 @@ func TestHandleBlock(t *testing.T) {
func TestHandleBlockReorgAndOrphans(t *testing.T) {
testCases := []struct {
name string
blockAlreadyExists bool
prevBlockStatus blocktx_api.Status
hasCompetingBlock bool
hasGreaterChainwork bool
expectedStatus blocktx_api.Status
shouldFindOrphanChain bool
}{
{
name: "block already exists - no orphans - should be ingored",
blockAlreadyExists: true,
shouldFindOrphanChain: false,
expectedStatus: blocktx_api.Status_UNKNOWN,
},
{
name: "block already exists - orphans found - reorg",
blockAlreadyExists: true,
shouldFindOrphanChain: true,
expectedStatus: blocktx_api.Status_LONGEST,
},
{
name: "previous block longest - no competing - no reorg",
prevBlockStatus: blocktx_api.Status_LONGEST,
Expand Down Expand Up @@ -367,7 +380,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) {
Chainwork: "34364008516618225545", // greatest chainwork - should cause reorg if STALE
}

shouldReturnNoBlock := true
shouldReturnNoBlock := !tc.blockAlreadyExists
shouldCheckUpdateStatuses := true

txMock := &storeMocks.DbTransactionMock{
Expand All @@ -377,7 +390,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) {
RollbackFunc: func() error {
return nil
},
LockBlocksTableFunc: func(ctx context.Context) error {
WriteLockBlocksTableFunc: func(ctx context.Context) error {
return nil
},
}
Expand All @@ -392,7 +405,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) {
}

return &blocktx_api.Block{
Status: tc.prevBlockStatus,
Status: tc.prevBlockStatus,
Processed: true,
}, nil
},
GetBlockByHeightFunc: func(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) {
Expand Down Expand Up @@ -444,7 +458,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) {
shouldCheckUpdateStatuses = false
tipStatusUpdate := blockStatusUpdates[len(blockStatusUpdates)-1]
require.Equal(t, orphanedChainTip.Hash, tipStatusUpdate.Hash)
require.Equal(t, insertedBlockStatus, tipStatusUpdate.Status)
require.Equal(t, blocktx_api.Status_STALE, tipStatusUpdate.Status)
mtx.Unlock()
}
return nil
Expand Down
Loading

0 comments on commit e9a545a

Please sign in to comment.