Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blocktx): move merkle_path to block_transactions_map table #582

Merged
merged 9 commits into from
Sep 17, 2024
256 changes: 92 additions & 164 deletions internal/blocktx/blocktx_api/blocktx_api.pb.go

Large diffs are not rendered by default.

8 changes: 0 additions & 8 deletions internal/blocktx/blocktx_api/blocktx_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,10 @@ message RowsAffectedResponse {
int64 rows = 1;
}


message TransactionAndSource {
kuba-4chain marked this conversation as resolved.
Show resolved Hide resolved
bytes hash = 1;
string source = 2;
}


message DelUnfinishedBlockProcessingRequest {
string processed_by = 1;
}


message MerkleRootVerificationRequest {
string merkle_root = 1;
uint64 block_height = 2;
Expand Down
2 changes: 1 addition & 1 deletion internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 26 additions & 40 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (p *Processor) StartFillGaps(peers []p2p.PeerI) {

func (p *Processor) StartProcessRegisterTxs() {
p.waitGroup.Add(1)
txHashes := make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes := make([][]byte, 0, p.registerTxsBatchSize)

ticker := time.NewTicker(p.registerTxsInterval)
go func() {
Expand All @@ -234,16 +234,14 @@ func (p *Processor) StartProcessRegisterTxs() {
case <-p.ctx.Done():
return
case txHash := <-p.registerTxsChan:
txHashes = append(txHashes, &blocktx_api.TransactionAndSource{
Hash: txHash,
})
txHashes = append(txHashes, txHash)

if len(txHashes) < p.registerTxsBatchSize {
continue
}

p.registerTransactions(txHashes[:])
txHashes = make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)

case <-ticker.C:
Expand All @@ -252,7 +250,7 @@ func (p *Processor) StartProcessRegisterTxs() {
}

p.registerTransactions(txHashes[:])
txHashes = make([]*blocktx_api.TransactionAndSource, 0, p.registerTxsBatchSize)
txHashes = txHashes[:0]
ticker.Reset(p.registerTxsInterval)
}
}
Expand Down Expand Up @@ -337,7 +335,7 @@ func (p *Processor) publishMinedTxs(txHashes []*chainhash.Hash) error {
return nil
}

func (p *Processor) registerTransactions(txHashes []*blocktx_api.TransactionAndSource) {
func (p *Processor) registerTransactions(txHashes [][]byte) {
updatedTxs, err := p.store.RegisterTransactions(p.ctx, txHashes)
if err != nil {
p.logger.Error("failed to register transactions", slog.String("err", err.Error()))
Expand Down Expand Up @@ -410,6 +408,12 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
previousBlockHash := msg.Header.PrevBlock
merkleRoot := msg.Header.MerkleRoot

// don't process block that was already processed
existingBlock, _ := p.store.GetBlock(ctx, &blockHash)
if existingBlock != nil {
return nil
}

prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash)
if err != nil {
p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error()))
Expand Down Expand Up @@ -444,6 +448,9 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
return err
}

// find competing chains back to the common ancestor
// get all registered transactions
// prepare msg with competing blocks
kuba-4chain marked this conversation as resolved.
Show resolved Hide resolved
incomingBlock.Status = blocktx_api.Status_STALE

if hasGreatestChainwork {
Expand Down Expand Up @@ -479,16 +486,7 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) error {
return err
}

block := &p2p.Block{
Hash: &blockHash,
MerkleRoot: &merkleRoot,
PreviousHash: &previousBlockHash,
Height: msg.Height,
Size: msg.Size,
TxCount: uint64(len(msg.TransactionHashes)),
}

if err = p.markBlockAsProcessed(ctx, block); err != nil {
if err = p.store.MarkBlockAsDone(ctx, &blockHash, msg.Size, uint64(len(msg.TransactionHashes))); err != nil {
p.logger.Error("unable to mark block as processed", slog.String("hash", blockHash.String()), slog.String("err", err.Error()))
return err
}
Expand Down Expand Up @@ -601,8 +599,7 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
ctx, span = tracer.Start(ctx, "markTransactionsAsMined")
defer span.End()
}
txs := make([]*blocktx_api.TransactionAndSource, 0, p.transactionStorageBatchSize)
merklePaths := make([]string, 0, p.transactionStorageBatchSize)
txs := make([]store.TxWithMerklePath, 0, p.transactionStorageBatchSize)
leaves := merkleTree[:(len(merkleTree)+1)/2]

var totalSize int
Expand All @@ -627,11 +624,6 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
break
}

// Otherwise they're txids, which should have merkle paths calculated.
txs = append(txs, &blocktx_api.TransactionAndSource{
Hash: hash[:],
})

bump, err := bc.NewBUMPFromMerkleTreeAndIndex(blockHeight, merkleTree, uint64(txIndex))
if err != nil {
return fmt.Errorf("failed to create new bump for tx hash %s from merkle tree and index at block height %d: %v", hash.String(), blockHeight, err)
Expand All @@ -642,19 +634,22 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
return fmt.Errorf("failed to get string from bump for tx hash %s at block height %d: %v", hash.String(), blockHeight, err)
}

merklePaths = append(merklePaths, bumpHex)
txs = append(txs, store.TxWithMerklePath{
Hash: hash[:],
MerklePath: bumpHex,
})

if (txIndex+1)%p.transactionStorageBatchSize == 0 {
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs, merklePaths)
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
}
// free up memory
txs = make([]*blocktx_api.TransactionAndSource, 0, p.transactionStorageBatchSize)
merklePaths = make([]string, 0, p.transactionStorageBatchSize)
txs = txs[:0]

for _, updResp := range updateResp {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: updResp.TxHash[:],
TransactionHash: updResp.Hash[:],
BlockHash: blockhash[:],
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
Expand All @@ -678,14 +673,14 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
}

// update all remaining transactions
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs, merklePaths)
updateResp, err := p.store.UpsertBlockTransactions(ctx, blockId, txs)
if err != nil {
return fmt.Errorf("failed to insert block transactions at block height %d: %v", blockHeight, err)
}

for _, updResp := range updateResp {
txBlock := &blocktx_api.TransactionBlock{
TransactionHash: updResp.TxHash[:],
TransactionHash: updResp.Hash[:],
BlockHash: blockhash[:],
BlockHeight: blockHeight,
MerklePath: updResp.MerklePath,
Expand All @@ -699,15 +694,6 @@ func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64,
return nil
}

func (p *Processor) markBlockAsProcessed(ctx context.Context, block *p2p.Block) error {
err := p.store.MarkBlockAsDone(ctx, block.Hash, block.Size, block.TxCount)
if err != nil {
return err
}

return nil
}

func (p *Processor) Shutdown() {
p.cancelAll()
p.waitGroup.Wait()
Expand Down
69 changes: 41 additions & 28 deletions internal/blocktx/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,25 @@ func TestHandleBlock(t *testing.T) {
merkleRootHash1585018, _ := chainhash.NewHashFromStr("9c1fe95a7ac4502e281f4f2eaa2902e12b0f486cf610977c73afb3cd060bebde")

tt := []struct {
name string
prevBlockHash chainhash.Hash
merkleRoot chainhash.Hash
height uint64
txHashes []string
size uint64
nonce uint32
setBlockProcessingErr error
bhsProcInProg []*chainhash.Hash
name string
prevBlockHash chainhash.Hash
merkleRoot chainhash.Hash
height uint64
txHashes []string
size uint64
nonce uint32
blockAlreadyExists bool
}{
{
name: "block height 1573650",
txHashes: []string{}, // expect this block to not be processed
prevBlockHash: *prevBlockHash1573650,
merkleRoot: *merkleRootHash1573650,
height: 1573650,
nonce: 3694498168,
size: 216,
blockAlreadyExists: true,
},
{
name: "block height 1573650",
txHashes: []string{"3d64b2bb6bd4e85aacb6d1965a2407fa21846c08dd9a8616866ad2f5c80fda7f"},
Expand Down Expand Up @@ -136,6 +145,9 @@ func TestHandleBlock(t *testing.T) {
batchSize := 4
storeMock := &storeMocks.BlocktxStoreMock{
GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) {
if tc.blockAlreadyExists {
return &blocktx_api.Block{}, nil
}
return nil, store.ErrBlockNotFound
},
GetBlockByHeightFunc: func(ctx context.Context, height uint64, status blocktx_api.Status) (*blocktx_api.Block, error) {
Expand Down Expand Up @@ -170,41 +182,35 @@ func TestHandleBlock(t *testing.T) {

processor.StartBlockProcessing()

var expectedInsertedTransactions []*blocktx_api.TransactionAndSource
var expectedInsertedTransactions [][]byte
transactionHashes := make([]*chainhash.Hash, len(tc.txHashes))
for i, hash := range tc.txHashes {
txHash, err := chainhash.NewHashFromStr(hash)
require.NoError(t, err)
transactionHashes[i] = txHash

expectedInsertedTransactions = append(expectedInsertedTransactions, &blocktx_api.TransactionAndSource{Hash: txHash[:]})
expectedInsertedTransactions = append(expectedInsertedTransactions, txHash[:])
}

var insertedBlockTransactions []*blocktx_api.TransactionAndSource
var insertedBlockTransactions [][]byte

storeMock.UpsertBlockTransactionsFunc = func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) {
require.True(t, len(merklePaths) <= batchSize)
require.True(t, len(transactions) <= batchSize)
storeMock.UpsertBlockTransactionsFunc = func(ctx context.Context, blockId uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) {
require.True(t, len(txsWithMerklePaths) <= batchSize)

for i, path := range merklePaths {
bump, err := bc.NewBUMPFromStr(path)
for _, tx := range txsWithMerklePaths {
bump, err := bc.NewBUMPFromStr(tx.MerklePath)
require.NoError(t, err)
tx, err := chainhash.NewHash(transactions[i].GetHash())
tx, err := chainhash.NewHash(tx.Hash)
require.NoError(t, err)
root, err := bump.CalculateRootGivenTxid(tx.String())
require.NoError(t, err)

require.Equal(t, root, tc.merkleRoot.String())
}

insertedBlockTransactions = append(insertedBlockTransactions, transactions...)

result := make([]store.UpsertBlockTransactionsResult, len(transactions))
for i, tx := range transactions {
result[i] = store.UpsertBlockTransactionsResult{TxHash: tx.Hash}
insertedBlockTransactions = append(insertedBlockTransactions, tx[:])
}

return result, nil
return txsWithMerklePaths, nil
}

peer := &mocks.PeerMock{
Expand Down Expand Up @@ -301,8 +307,15 @@ func TestHandleBlockReorg(t *testing.T) {
var mtx sync.Mutex
var insertedBlock *blocktx_api.Block

shouldReturnNoBlock := true

storeMock := &storeMocks.BlocktxStoreMock{
GetBlockFunc: func(ctx context.Context, hash *chainhash.Hash) (*blocktx_api.Block, error) {
if shouldReturnNoBlock {
shouldReturnNoBlock = false
return nil, nil
}

return &blocktx_api.Block{
Status: tc.prevBlockStatus,
}, nil
Expand Down Expand Up @@ -348,8 +361,8 @@ func TestHandleBlockReorg(t *testing.T) {
MarkBlockAsDoneFunc: func(ctx context.Context, hash *chainhash.Hash, size uint64, txCount uint64) error {
return nil
},
UpsertBlockTransactionsFunc: func(ctx context.Context, blockId uint64, transactions []*blocktx_api.TransactionAndSource, merklePaths []string) ([]store.UpsertBlockTransactionsResult, error) {
return []store.UpsertBlockTransactionsResult{}, nil
UpsertBlockTransactionsFunc: func(ctx context.Context, blockId uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) {
return []store.TxWithMerklePath{}, nil
},
}

Expand Down Expand Up @@ -510,7 +523,7 @@ func TestStartProcessRegisterTxs(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
registerErrTest := tc.registerErr
storeMock := &storeMocks.BlocktxStoreMock{
RegisterTransactionsFunc: func(ctx context.Context, transaction []*blocktx_api.TransactionAndSource) ([]*chainhash.Hash, error) {
RegisterTransactionsFunc: func(ctx context.Context, transaction [][]byte) ([]*chainhash.Hash, error) {
return nil, registerErrTest
},
}
Expand Down
Loading
Loading