From e9a545affa9d9cd6e9798497295ad34eea8234d5 Mon Sep 17 00:00:00 2001 From: kuba-4chain Date: Thu, 31 Oct 2024 14:18:15 +0100 Subject: [PATCH] feat: check orphan children for existing block --- internal/blocktx/processor.go | 185 ++++++++++-------- internal/blocktx/processor_test.go | 26 ++- .../blocktx/store/mocks/blocktx_db_tx_mock.go | 92 ++++----- .../blocktx/store/mocks/blocktx_store_mock.go | 56 ------ .../get_previous_blocks/blocktx.blocks.yaml | 60 ------ .../store/postgresql/get_previous_blocks.go | 53 ----- .../store/postgresql/mark_block_as_done.go | 2 +- .../blocktx/store/postgresql/postgres_test.go | 46 ----- internal/blocktx/store/store.go | 1 - 9 files changed, 166 insertions(+), 355 deletions(-) delete mode 100644 internal/blocktx/store/postgresql/fixtures/get_previous_blocks/blocktx.blocks.yaml delete mode 100644 internal/blocktx/store/postgresql/get_previous_blocks.go diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 458dc4116..175c50564 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -194,17 +194,6 @@ func (p *Processor) StartBlockProcessing() { timeStart := time.Now() err = p.processBlock(blockMsg) - - if err == nil { - err = p.store.MarkBlockAsDone(p.ctx, &blockHash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) - if err != nil { - p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - } else { - // add the total block processing time to the stats - p.logger.Info("Processed block", slog.String("hash", blockHash.String()), slog.Int("txs", len(blockMsg.TransactionHashes)), slog.String("duration", time.Since(timeStart).String())) - } - } - if err != nil { _, errDel := p.store.DelBlockProcessing(p.ctx, &blockHash, p.hostname) if errDel != nil { @@ -212,6 +201,15 @@ func (p *Processor) StartBlockProcessing() { } continue } + + err = p.store.MarkBlockAsDone(p.ctx, &blockHash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) + if err != nil { + p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) + continue + } + + // add the total block processing time to the stats + p.logger.Info("Processed block", slog.String("hash", blockHash.String()), slog.Int("txs", len(blockMsg.TransactionHashes)), slog.String("duration", time.Since(timeStart).String())) } } }() @@ -426,71 +424,36 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error { } blockHash := msg.Header.BlockHash() - previousBlockHash := msg.Header.PrevBlock + blockHeight := msg.Height p.logger.Info("processing incoming block", slog.String("hash", blockHash.String())) - // don't process block that was already processed - existingBlock, _ := p.store.GetBlock(ctx, &blockHash) - if existingBlock != nil { - // TODO: udpate orphans and if returned chain != 0 -> enter the flow - p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String())) - return nil - } + var chain chain + var competing bool + var err error - prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash) - if err != nil { - p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error())) - return err - } + // check if we've already processed that block + existingBlock, _ := p.store.GetBlock(ctx, &blockHash) - longestTipExists := true - if prevBlock == nil { - // This check is only in case there's a fresh, empty database - // with no blocks, to mark the first block as the LONGEST chain - longestTipExists, err = p.longestTipExists(ctx) + if existingBlock != nil && existingBlock.Processed { + // if the block was already processed, check and update + // possible orphan children of that block + chain, competing, err = p.updateOrphans(ctx, existingBlock, competing) if err != nil { - p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) + p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) return err } - } - - incomingBlock := createBlock(msg, prevBlock, longestTipExists) - - competing, err := p.competingChainsExist(ctx, incomingBlock) - if err != nil { - p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) - return err - } - if competing { - p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height)) - incomingBlock.Status = blocktx_api.Status_STALE - } - - p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String())) - - err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot) - if err != nil { - p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - return err - } - - // if the block is ORPHANED, there's no need to process it any further - if incomingBlock.Status == blocktx_api.Status_ORPHANED { - return nil - } - - chain, err := p.updateOrphans(ctx, incomingBlock) - if err != nil { - p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) - return err - } - - // if there were any orphans found, we need to verify - // whether they are part of the longest or stale chain - if len(chain) > 1 { - competing = true + if len(chain) == 1 { // this means that no orphans were found + p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String())) + return nil + } + } else { + // if the block was not yet processed, proceed normally + chain, competing, err = p.verifyAndInsertBlock(ctx, msg) + if err != nil { + return err + } } chainTip, err := chain.getTip() @@ -503,12 +466,12 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error { if competing { hasGreatestChainwork, err := p.hasGreatestChainwork(ctx, chainTip) if err != nil { - p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) return err } if hasGreatestChainwork { - p.logger.Info("chain reorg detected", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height)) + p.logger.Info("chain reorg detected", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight)) shouldPerformReorg = true } } @@ -518,19 +481,19 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error { if shouldPerformReorg { txsToPublish, err = p.performReorg(ctx, chainTip) if err != nil { - p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + p.logger.Error("unable to perform reorg", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) return err } } else if chainTip.Status == blocktx_api.Status_STALE { txsToPublish, err = p.getStaleTxs(ctx, chain) if err != nil { - p.logger.Error("unable to get stale transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + p.logger.Error("unable to get stale transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) return err } } else if chainTip.Status == blocktx_api.Status_LONGEST { txsToPublish, err = p.store.GetRegisteredTransactions(ctx, chain.getHashes()) if err != nil { - p.logger.Error("unable to get registered transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error())) + p.logger.Error("unable to get registered transactions", slog.String("hash", blockHash.String()), slog.Uint64("height", blockHeight), slog.String("err", err.Error())) return err } } @@ -555,22 +518,69 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error { return nil } -func (p *Processor) getPrevBlock(ctx context.Context, prevHash *chainhash.Hash) (*blocktx_api.Block, error) { - prevBlock, err := p.store.GetBlock(ctx, prevHash) - if err != nil && !errors.Is(err, store.ErrBlockNotFound) { - return nil, err +func (p *Processor) verifyAndInsertBlock(ctx context.Context, msg *p2p.BlockMessage) (chain, bool, error) { + blockHash := msg.Header.BlockHash() + previousBlockHash := msg.Header.PrevBlock + + prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash) + if err != nil { + p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error())) + return nil, false, err } - return prevBlock, nil -} + longestTipExists := true + if prevBlock == nil { + // This check is only in case there's a fresh, empty database + // with no blocks, to mark the first block as the LONGEST chain + longestTipExists, err = p.longestTipExists(ctx) + if err != nil { + p.logger.Error("unable to verify the longest tip existance in db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) + return nil, false, err + } + } + + incomingBlock := createBlock(msg, prevBlock, longestTipExists) + + competing, err := p.competingChainsExist(ctx, incomingBlock) + if err != nil { + p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error())) + return nil, false, err + } + + if competing { + p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height)) + incomingBlock.Status = blocktx_api.Status_STALE + } -func (p *Processor) getPrevChain(ctx context.Context, prevHash *chainhash.Hash, n int) (chain, error) { - prevBlocks, err := p.store.GetPreviousBlocks(ctx, prevHash, n) + p.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("status", incomingBlock.Status.String())) + + err = p.insertBlockAndStoreTransactions(ctx, incomingBlock, msg.TransactionHashes, msg.Header.MerkleRoot) if err != nil { + p.logger.Error("unable to insert block and store its transactions", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) + return nil, false, err + } + + // if the block is ORPHANED, there's no need to process it any further + if incomingBlock.Status == blocktx_api.Status_ORPHANED { + return chain{incomingBlock}, false, nil + } + + chain, competing, err := p.updateOrphans(ctx, incomingBlock, competing) + if err != nil { + p.logger.Error("unable to check and update possible orphaned child blocks", slog.String("hash", blockHash.String()), slog.String("err", err.Error())) + return nil, false, err + } + + return chain, competing, nil +} + +func (p *Processor) getPrevBlock(ctx context.Context, prevHash *chainhash.Hash) (*blocktx_api.Block, error) { + prevBlock, err := p.store.GetBlock(ctx, prevHash) + if err != nil && !errors.Is(err, store.ErrBlockNotFound) { return nil, err } - return prevBlocks, nil + return prevBlock, nil } func (p *Processor) longestTipExists(ctx context.Context) (bool, error) { @@ -730,15 +740,15 @@ func (p *Processor) storeTransactions(ctx context.Context, blockId uint64, block return nil } -func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_api.Block) (chain, error) { +func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_api.Block, competing bool) (chain, bool, error) { chain := []*blocktx_api.Block{incomingBlock} orphanedBlocks, err := p.store.GetOrphanedChainUpFromHash(ctx, incomingBlock.Hash) if err != nil { - return nil, err + return nil, false, err } if len(orphanedBlocks) == 0 { - return chain, nil + return chain, competing, nil } blockStatusUpdates := make([]store.BlockStatusUpdate, len(orphanedBlocks)) @@ -760,12 +770,15 @@ func (p *Processor) updateOrphans(ctx context.Context, incomingBlock *blocktx_ap err = p.store.UpdateBlocksStatuses(ctx, blockStatusUpdates) if err != nil { - return nil, err + return nil, false, err } chain = append(chain, orphanedBlocks...) - return chain, nil + // if we found any orphans and marked them as STALE + // we need to find out if they are part of the longest + // or stale chain, so competing is returned as true + return chain, true, nil } func (p *Processor) performReorg(ctx context.Context, staleChainTip *blocktx_api.Block) ([]store.TransactionBlock, error) { diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index a728452b6..b4bedd861 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -152,7 +152,7 @@ func TestHandleBlock(t *testing.T) { RollbackFunc: func() error { return nil }, - LockBlocksTableFunc: func(ctx context.Context) error { + WriteLockBlocksTableFunc: func(ctx context.Context) error { return nil }, } @@ -162,7 +162,7 @@ func TestHandleBlock(t *testing.T) { }, GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) { if tc.blockAlreadyExists { - return &blocktx_api.Block{}, nil + return &blocktx_api.Block{Processed: true}, nil } return nil, store.ErrBlockNotFound }, @@ -276,12 +276,25 @@ func TestHandleBlock(t *testing.T) { func TestHandleBlockReorgAndOrphans(t *testing.T) { testCases := []struct { name string + blockAlreadyExists bool prevBlockStatus blocktx_api.Status hasCompetingBlock bool hasGreaterChainwork bool expectedStatus blocktx_api.Status shouldFindOrphanChain bool }{ + { + name: "block already exists - no orphans - should be ingored", + blockAlreadyExists: true, + shouldFindOrphanChain: false, + expectedStatus: blocktx_api.Status_UNKNOWN, + }, + { + name: "block already exists - orphans found - reorg", + blockAlreadyExists: true, + shouldFindOrphanChain: true, + expectedStatus: blocktx_api.Status_LONGEST, + }, { name: "previous block longest - no competing - no reorg", prevBlockStatus: blocktx_api.Status_LONGEST, @@ -367,7 +380,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { Chainwork: "34364008516618225545", // greatest chainwork - should cause reorg if STALE } - shouldReturnNoBlock := true + shouldReturnNoBlock := !tc.blockAlreadyExists shouldCheckUpdateStatuses := true txMock := &storeMocks.DbTransactionMock{ @@ -377,7 +390,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { RollbackFunc: func() error { return nil }, - LockBlocksTableFunc: func(ctx context.Context) error { + WriteLockBlocksTableFunc: func(ctx context.Context) error { return nil }, } @@ -392,7 +405,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { } return &blocktx_api.Block{ - Status: tc.prevBlockStatus, + Status: tc.prevBlockStatus, + Processed: true, }, nil }, GetBlockByHeightFunc: func(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) { @@ -444,7 +458,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { shouldCheckUpdateStatuses = false tipStatusUpdate := blockStatusUpdates[len(blockStatusUpdates)-1] require.Equal(t, orphanedChainTip.Hash, tipStatusUpdate.Hash) - require.Equal(t, insertedBlockStatus, tipStatusUpdate.Status) + require.Equal(t, blocktx_api.Status_STALE, tipStatusUpdate.Status) mtx.Unlock() } return nil diff --git a/internal/blocktx/store/mocks/blocktx_db_tx_mock.go b/internal/blocktx/store/mocks/blocktx_db_tx_mock.go index c43f1a976..6096aa9ba 100644 --- a/internal/blocktx/store/mocks/blocktx_db_tx_mock.go +++ b/internal/blocktx/store/mocks/blocktx_db_tx_mock.go @@ -22,12 +22,12 @@ var _ store.DbTransaction = &DbTransactionMock{} // CommitFunc: func() error { // panic("mock out the Commit method") // }, -// LockBlocksTableFunc: func(ctx context.Context) error { -// panic("mock out the LockBlocksTable method") -// }, // RollbackFunc: func() error { // panic("mock out the Rollback method") // }, +// WriteLockBlocksTableFunc: func(ctx context.Context) error { +// panic("mock out the WriteLockBlocksTable method") +// }, // } // // // use mockedDbTransaction in code that requires store.DbTransaction @@ -38,29 +38,29 @@ type DbTransactionMock struct { // CommitFunc mocks the Commit method. CommitFunc func() error - // LockBlocksTableFunc mocks the LockBlocksTable method. - LockBlocksTableFunc func(ctx context.Context) error - // RollbackFunc mocks the Rollback method. RollbackFunc func() error + // WriteLockBlocksTableFunc mocks the WriteLockBlocksTable method. + WriteLockBlocksTableFunc func(ctx context.Context) error + // calls tracks calls to the methods. calls struct { // Commit holds details about calls to the Commit method. Commit []struct { } - // LockBlocksTable holds details about calls to the LockBlocksTable method. - LockBlocksTable []struct { - // Ctx is the ctx argument value. - Ctx context.Context - } // Rollback holds details about calls to the Rollback method. Rollback []struct { } + // WriteLockBlocksTable holds details about calls to the WriteLockBlocksTable method. + WriteLockBlocksTable []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } } - lockCommit sync.RWMutex - lockLockBlocksTable sync.RWMutex - lockRollback sync.RWMutex + lockCommit sync.RWMutex + lockRollback sync.RWMutex + lockWriteLockBlocksTable sync.RWMutex } // Commit calls CommitFunc. @@ -90,38 +90,6 @@ func (mock *DbTransactionMock) CommitCalls() []struct { return calls } -// LockBlocksTable calls LockBlocksTableFunc. -func (mock *DbTransactionMock) LockBlocksTable(ctx context.Context) error { - if mock.LockBlocksTableFunc == nil { - panic("DbTransactionMock.LockBlocksTableFunc: method is nil but DbTransaction.LockBlocksTable was just called") - } - callInfo := struct { - Ctx context.Context - }{ - Ctx: ctx, - } - mock.lockLockBlocksTable.Lock() - mock.calls.LockBlocksTable = append(mock.calls.LockBlocksTable, callInfo) - mock.lockLockBlocksTable.Unlock() - return mock.LockBlocksTableFunc(ctx) -} - -// LockBlocksTableCalls gets all the calls that were made to LockBlocksTable. -// Check the length with: -// -// len(mockedDbTransaction.LockBlocksTableCalls()) -func (mock *DbTransactionMock) LockBlocksTableCalls() []struct { - Ctx context.Context -} { - var calls []struct { - Ctx context.Context - } - mock.lockLockBlocksTable.RLock() - calls = mock.calls.LockBlocksTable - mock.lockLockBlocksTable.RUnlock() - return calls -} - // Rollback calls RollbackFunc. func (mock *DbTransactionMock) Rollback() error { if mock.RollbackFunc == nil { @@ -148,3 +116,35 @@ func (mock *DbTransactionMock) RollbackCalls() []struct { mock.lockRollback.RUnlock() return calls } + +// WriteLockBlocksTable calls WriteLockBlocksTableFunc. +func (mock *DbTransactionMock) WriteLockBlocksTable(ctx context.Context) error { + if mock.WriteLockBlocksTableFunc == nil { + panic("DbTransactionMock.WriteLockBlocksTableFunc: method is nil but DbTransaction.WriteLockBlocksTable was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockWriteLockBlocksTable.Lock() + mock.calls.WriteLockBlocksTable = append(mock.calls.WriteLockBlocksTable, callInfo) + mock.lockWriteLockBlocksTable.Unlock() + return mock.WriteLockBlocksTableFunc(ctx) +} + +// WriteLockBlocksTableCalls gets all the calls that were made to WriteLockBlocksTable. +// Check the length with: +// +// len(mockedDbTransaction.WriteLockBlocksTableCalls()) +func (mock *DbTransactionMock) WriteLockBlocksTableCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockWriteLockBlocksTable.RLock() + calls = mock.calls.WriteLockBlocksTable + mock.lockWriteLockBlocksTable.RUnlock() + return calls +} diff --git a/internal/blocktx/store/mocks/blocktx_store_mock.go b/internal/blocktx/store/mocks/blocktx_store_mock.go index 635554645..36ec992b6 100644 --- a/internal/blocktx/store/mocks/blocktx_store_mock.go +++ b/internal/blocktx/store/mocks/blocktx_store_mock.go @@ -57,9 +57,6 @@ var _ store.BlocktxStore = &BlocktxStoreMock{} // GetOrphanedChainUpFromHashFunc: func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) { // panic("mock out the GetOrphanedChainUpFromHash method") // }, -// GetPreviousBlocksFunc: func(ctx context.Context, hash *chainhash.Hash, n int) ([]*blocktx_api.Block, error) { -// panic("mock out the GetPreviousBlocks method") -// }, // GetRegisteredTransactionsFunc: func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) { // panic("mock out the GetRegisteredTransactions method") // }, @@ -136,9 +133,6 @@ type BlocktxStoreMock struct { // GetOrphanedChainUpFromHashFunc mocks the GetOrphanedChainUpFromHash method. GetOrphanedChainUpFromHashFunc func(ctx context.Context, hash []byte) ([]*blocktx_api.Block, error) - // GetPreviousBlocksFunc mocks the GetPreviousBlocks method. - GetPreviousBlocksFunc func(ctx context.Context, hash *chainhash.Hash, n int) ([]*blocktx_api.Block, error) - // GetRegisteredTransactionsFunc mocks the GetRegisteredTransactions method. GetRegisteredTransactionsFunc func(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) @@ -256,15 +250,6 @@ type BlocktxStoreMock struct { // Hash is the hash argument value. Hash []byte } - // GetPreviousBlocks holds details about calls to the GetPreviousBlocks method. - GetPreviousBlocks []struct { - // Ctx is the ctx argument value. - Ctx context.Context - // Hash is the hash argument value. - Hash *chainhash.Hash - // N is the n argument value. - N int - } // GetRegisteredTransactions holds details about calls to the GetRegisteredTransactions method. GetRegisteredTransactions []struct { // Ctx is the ctx argument value. @@ -363,7 +348,6 @@ type BlocktxStoreMock struct { lockGetLongestChainFromHeight sync.RWMutex lockGetMinedTransactions sync.RWMutex lockGetOrphanedChainUpFromHash sync.RWMutex - lockGetPreviousBlocks sync.RWMutex lockGetRegisteredTransactions sync.RWMutex lockGetRegisteredTxsByBlockHashes sync.RWMutex lockGetStaleChainBackFromHash sync.RWMutex @@ -804,46 +788,6 @@ func (mock *BlocktxStoreMock) GetOrphanedChainUpFromHashCalls() []struct { return calls } -// GetPreviousBlocks calls GetPreviousBlocksFunc. -func (mock *BlocktxStoreMock) GetPreviousBlocks(ctx context.Context, hash *chainhash.Hash, n int) ([]*blocktx_api.Block, error) { - if mock.GetPreviousBlocksFunc == nil { - panic("BlocktxStoreMock.GetPreviousBlocksFunc: method is nil but BlocktxStore.GetPreviousBlocks was just called") - } - callInfo := struct { - Ctx context.Context - Hash *chainhash.Hash - N int - }{ - Ctx: ctx, - Hash: hash, - N: n, - } - mock.lockGetPreviousBlocks.Lock() - mock.calls.GetPreviousBlocks = append(mock.calls.GetPreviousBlocks, callInfo) - mock.lockGetPreviousBlocks.Unlock() - return mock.GetPreviousBlocksFunc(ctx, hash, n) -} - -// GetPreviousBlocksCalls gets all the calls that were made to GetPreviousBlocks. -// Check the length with: -// -// len(mockedBlocktxStore.GetPreviousBlocksCalls()) -func (mock *BlocktxStoreMock) GetPreviousBlocksCalls() []struct { - Ctx context.Context - Hash *chainhash.Hash - N int -} { - var calls []struct { - Ctx context.Context - Hash *chainhash.Hash - N int - } - mock.lockGetPreviousBlocks.RLock() - calls = mock.calls.GetPreviousBlocks - mock.lockGetPreviousBlocks.RUnlock() - return calls -} - // GetRegisteredTransactions calls GetRegisteredTransactionsFunc. func (mock *BlocktxStoreMock) GetRegisteredTransactions(ctx context.Context, blockHashes [][]byte) ([]store.TransactionBlock, error) { if mock.GetRegisteredTransactionsFunc == nil { diff --git a/internal/blocktx/store/postgresql/fixtures/get_previous_blocks/blocktx.blocks.yaml b/internal/blocktx/store/postgresql/fixtures/get_previous_blocks/blocktx.blocks.yaml deleted file mode 100644 index c27f530c0..000000000 --- a/internal/blocktx/store/postgresql/fixtures/get_previous_blocks/blocktx.blocks.yaml +++ /dev/null @@ -1,60 +0,0 @@ -- inserted_at: 2023-12-15 14:00:00 - id: 0 - hash: 0x000000000000000005aa39a25e7e8bf440c270ec9a1bd30e99ab026f39207ef9 - prevhash: 0xb71ab063c5f96cad71cdc59dcc94182a20a69cbd7eed2d070000000000000000 - merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 - height: 822013 - processed_at: 2023-12-15 14:10:00 - size: 86840000 - tx_count: 23477 - status: 10 # LONGEST - chainwork: '123456' - is_longest: true -- inserted_at: 2023-12-15 14:00:00 - id: 1 - hash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 - prevhash: 0x000000000000000005aa39a25e7e8bf440c270ec9a1bd30e99ab026f39207ef9 - merkleroot: 0x7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483 - height: 822014 - processed_at: 2023-12-15 14:10:00 - size: 86840000 - tx_count: 23477 - status: 10 # LONGEST - chainwork: '123456' - is_longest: true -- inserted_at: 2023-12-15 14:30:00 - id: 2 - hash: 0x000000000000000003b15d668b54c4b91ae81a86298ee209d9f39fd7a769bcde - prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 - merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 - height: 822015 - processed_at: 2023-12-15 14:30:00 - size: 20160000 - tx_count: 6523 - status: 10 # LONGEST - chainwork: '123456' - is_longest: true -- inserted_at: 2023-12-15 14:30:00 - id: 3 - hash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 - prevhash: 0x0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067 - merkleroot: 0x7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257 - height: 822015 - processed_at: 2023-12-15 14:30:00 - size: 20160000 - tx_count: 6523 - status: 20 # STALE - competing block - chainwork: '123456' - is_longest: false -- inserted_at: 2023-12-15 14:40:00 - id: 4 - hash: 0x0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1 - prevhash: 0x00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9 - merkleroot: 0x4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1 - height: 822016 - processed_at: 2023-12-15 14:40:00 - size: 299650000 - tx_count: 62162 - status: 20 # STALE - chainwork: '123456' - is_longest: false diff --git a/internal/blocktx/store/postgresql/get_previous_blocks.go b/internal/blocktx/store/postgresql/get_previous_blocks.go deleted file mode 100644 index 394c0e4ca..000000000 --- a/internal/blocktx/store/postgresql/get_previous_blocks.go +++ /dev/null @@ -1,53 +0,0 @@ -package postgresql - -import ( - "context" - - "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/libsv/go-p2p/chaincfg/chainhash" -) - -func (p *PostgreSQL) GetPreviousBlocks(ctx context.Context, hash *chainhash.Hash, n int) ([]*blocktx_api.Block, error) { - q := ` - WITH RECURSIVE prevBlocks AS ( - SELECT - hash - ,prevhash - ,merkleroot - ,height - ,processed_at - ,status - ,chainwork - ,1 AS n - FROM blocktx.blocks WHERE hash = $1 - UNION ALL - SELECT - b.hash - ,b.prevhash - ,b.merkleroot - ,b.height - ,b.processed_at - ,b.status - ,b.chainwork - ,p.n+1 AS n - FROM blocktx.blocks b JOIN prevBlocks p ON b.hash = p.prevhash AND p.n < $2 - ) - SELECT - hash - ,prevhash - ,merkleroot - ,height - ,processed_at - ,status - ,chainwork - FROM prevBlocks - ` - - rows, err := p.db.QueryContext(ctx, q, hash[:], n) - if err != nil { - return nil, err - } - defer rows.Close() - - return p.parseBlocks(rows) -} diff --git a/internal/blocktx/store/postgresql/mark_block_as_done.go b/internal/blocktx/store/postgresql/mark_block_as_done.go index 573dce4f0..3e379426c 100644 --- a/internal/blocktx/store/postgresql/mark_block_as_done.go +++ b/internal/blocktx/store/postgresql/mark_block_as_done.go @@ -19,7 +19,7 @@ func (p *PostgreSQL) MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, SET processed_at = $4, size = $1, tx_count = $2 - WHERE hash = $3 AND processed_at IS NOT NULL + WHERE hash = $3 AND processed_at IS NULL ` if _, err := p.db.ExecContext(ctx, q, size, txCount, hash[:], p.now()); err != nil { diff --git a/internal/blocktx/store/postgresql/postgres_test.go b/internal/blocktx/store/postgresql/postgres_test.go index fef786ce8..98d9d770f 100644 --- a/internal/blocktx/store/postgresql/postgres_test.go +++ b/internal/blocktx/store/postgresql/postgres_test.go @@ -202,52 +202,6 @@ func TestPostgresDB(t *testing.T) { require.Equal(t, expectedTipHeight, actualBlock.Height) }) - t.Run("get previous blocks", func(t *testing.T) { - // given - prepareDb(t, postgresDB, "fixtures/get_previous_blocks") - - blockHash := testutils.RevChainhash(t, "0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1") - numberOfBlocks := 3 - - expectedBlocks := []*blocktx_api.Block{ - { - Hash: testutils.RevChainhash(t, "0000000000000000082ec88d757ddaeb0aa87a5d5408b5960f27e7e67312dfe1")[:], - PreviousHash: testutils.RevChainhash(t, "00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9")[:], - MerkleRoot: testutils.RevChainhash(t, "4b58b0402a84012269b124f78c91a78a814eb3c9caa03f1df1d33172b23082d1")[:], - Height: 822016, - Processed: true, - Status: blocktx_api.Status_STALE, - Chainwork: "123456", - }, - { - Hash: testutils.RevChainhash(t, "00000000000000000659df0d3cf98ebe46931b67117502168418f9dce4e1b4c9")[:], - PreviousHash: testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067")[:], - MerkleRoot: testutils.RevChainhash(t, "7382df1b717287ab87e5e3e25759697c4c45eea428f701cdd0c77ad3fc707257")[:], - Height: 822015, - Processed: true, - Status: blocktx_api.Status_STALE, - Chainwork: "123456", - }, - { - Hash: testutils.RevChainhash(t, "0000000000000000025855b62f4c2e3732dad363a6f2ead94e4657ef96877067")[:], - PreviousHash: testutils.RevChainhash(t, "000000000000000005aa39a25e7e8bf440c270ec9a1bd30e99ab026f39207ef9")[:], - MerkleRoot: testutils.RevChainhash(t, "7f4019eb006f5333cce752df387fa8443035c22291eb771ee5b16a02b81c8483")[:], - Height: 822014, - Processed: true, - Status: blocktx_api.Status_LONGEST, - Chainwork: "123456", - }, - } - - // when - actualBlocks, err := postgresDB.GetPreviousBlocks(context.Background(), blockHash, numberOfBlocks) - - // then - require.NoError(t, err) - require.Equal(t, numberOfBlocks, len(actualBlocks)) - require.Equal(t, expectedBlocks, actualBlocks) - }) - t.Run("get block gaps", func(t *testing.T) { // given prepareDb(t, postgresDB, "fixtures/get_block_gaps") diff --git a/internal/blocktx/store/store.go b/internal/blocktx/store/store.go index a62d665a7..452a62c12 100644 --- a/internal/blocktx/store/store.go +++ b/internal/blocktx/store/store.go @@ -29,7 +29,6 @@ type BlocktxStore interface { GetBlock(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) GetBlockByHeight(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) GetChainTip(ctx context.Context) (*blocktx_api.Block, error) - GetPreviousBlocks(ctx context.Context, hash *chainhash.Hash, n int) ([]*blocktx_api.Block, error) InsertBlock(ctx context.Context, block *blocktx_api.Block) (uint64, error) UpsertBlockTransactions(ctx context.Context, blockId uint64, txsWithMerklePaths []TxWithMerklePath) error MarkBlockAsDone(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error