Skip to content

Commit

Permalink
feat: implement metamorph store methods and test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain committed Aug 27, 2024
1 parent 6a748e2 commit 125648a
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 136 deletions.
94 changes: 49 additions & 45 deletions internal/blocktx/blocktx_api/blocktx_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/blocktx/blocktx_api/blocktx_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum Status {
UNKNOWN = 0; // This value is unused for now, but it's required by protobuf to have enum field with value 0
LONGEST = 10;
STALE = 20;
ORPHANED = 30;
}

// swagger:model Block {
Expand Down
150 changes: 61 additions & 89 deletions internal/blocktx/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -57,72 +56,6 @@ type Processor struct {
ctx context.Context
}

func WithMessageQueueClient(mqClient MessageQueueClient) func(handler *Processor) {
return func(p *Processor) {
p.mqClient = mqClient
}
}

func WithTransactionBatchSize(size int) func(handler *Processor) {
return func(p *Processor) {
p.transactionStorageBatchSize = size
}
}

func WithRetentionDays(dataRetentionDays int) func(handler *Processor) {
return func(p *Processor) {
p.dataRetentionDays = dataRetentionDays
}
}

func WithFillGapsInterval(interval time.Duration) func(handler *Processor) {
return func(handler *Processor) {
handler.fillGapsInterval = interval
}
}

func WithRegisterTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerTxsInterval = d
}
}

func WithRegisterRequestTxsInterval(d time.Duration) func(handler *Processor) {
return func(p *Processor) {
p.registerRequestTxsInterval = d
}
}

func WithRegisterTxsChan(registerTxsChan chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsChan = registerTxsChan
}
}

func WithRequestTxChan(requestTxChannel chan []byte) func(handler *Processor) {
return func(handler *Processor) {
handler.requestTxChannel = requestTxChannel
}
}

func WithRegisterTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerTxsBatchSize = size
}
}

func WithRegisterRequestTxsBatchSize(size int) func(handler *Processor) {
return func(handler *Processor) {
handler.registerRequestTxsBatchSize = size
}
}

func WithTracer() func(handler *Processor) {
return func(_ *Processor) {
tracer = otel.GetTracerProvider().Tracer("")
}
}

func NewProcessor(
logger *slog.Logger,
storeI store.BlocktxStore,
Expand Down Expand Up @@ -471,12 +404,8 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) {
previousBlockHash := msg.Header.PrevBlock
merkleRoot := msg.Header.MerkleRoot

prevBlock, err := p.store.GetBlock(ctx, &previousBlockHash)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
_, errDel := p.store.DelBlockProcessing(ctx, &blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing - after getting previous block failed", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
prevBlock, err := p.getPrevBlock(ctx, &previousBlockHash, &blockHash)
if err != nil {
p.logger.Error("unable to get previous block from db", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("prevHash", previousBlockHash.String()), slog.String("err", err.Error()))
return
}
Expand All @@ -485,32 +414,22 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) {

competing, err := p.competingChainsExist(ctx, incomingBlock)
if err != nil {
_, errDel := p.store.DelBlockProcessing(ctx, &blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing - after checking for competing chains", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
p.logger.Error("unable to check competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
p.logger.Error("unable to check for competing chains", slog.String("hash", blockHash.String()), slog.Uint64("height", msg.Height), slog.String("err", err.Error()))
return
}

if competing {
p.logger.Info("Competing blocks found", slog.String("incoming block hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height))

tip, err := p.store.GetChainTip(ctx)
greatestChainwork, err := p.hasGreatestChainwork(ctx, incomingBlock)
if err != nil {
// TODO: handle this
p.logger.Error("unable to get the chain tip to verify chainwork", slog.String("hash", blockHash.String()), slog.Uint64("height", incomingBlock.Height), slog.String("err", err.Error()))
return
}

tipChainWork := new(big.Int)
tipChainWork.SetString(tip.Chainwork, 10)

incomingBlockChainwork := new(big.Int)
incomingBlockChainwork.SetString(incomingBlock.Chainwork, 10)

if tipChainWork.Cmp(incomingBlockChainwork) < 0 {
if greatestChainwork {
// TODO: perform reorg - next ticket
incomingBlock.Status = blocktx_api.Status_LONGEST
// TODO: perform reorg
} else {
incomingBlock.Status = blocktx_api.Status_STALE
}
Expand Down Expand Up @@ -570,10 +489,33 @@ func (p *Processor) processBlock(msg *p2p.BlockMessage) {
p.logger.Info("Processed block", slog.String("hash", blockHash.String()), slog.Int("txs", len(msg.TransactionHashes)), slog.String("duration", time.Since(timeStart).String()))
}

func (p *Processor) getPrevBlock(ctx context.Context, prevHash, blockHash *chainhash.Hash) (*blocktx_api.Block, error) {
prevBlock, err := p.store.GetBlock(ctx, prevHash)

if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
_, errDel := p.store.DelBlockProcessing(ctx, blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing - after getting previous block failed", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
return nil, err
}

return prevBlock, nil
}

func (p *Processor) competingChainsExist(ctx context.Context, block *blocktx_api.Block) (bool, error) {
if block.Status == blocktx_api.Status_ORPHANED {
return false, nil
}

if block.Status == blocktx_api.Status_LONGEST {
competingBlock, err := p.store.GetBlockByHeight(ctx, block.Height)
competingBlock, err := p.store.GetBlockByHeight(ctx, block.Height, blocktx_api.Status_LONGEST)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
blockHash := (*chainhash.Hash)(block.Hash)
_, errDel := p.store.DelBlockProcessing(ctx, blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing - after checking for competing chains", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
return false, err
}

Expand All @@ -583,10 +525,40 @@ func (p *Processor) competingChainsExist(ctx context.Context, block *blocktx_api

return false, nil
}

// If STALE status
return true, nil
}

func (p *Processor) hasGreatestChainwork(ctx context.Context, incomingBlock *blocktx_api.Block) (bool, error) {
tip, err := p.store.GetChainTip(ctx)
if err != nil && !errors.Is(err, store.ErrBlockNotFound) {
blockHash := (*chainhash.Hash)(incomingBlock.Hash)
_, errDel := p.store.DelBlockProcessing(ctx, blockHash, p.hostname)
if errDel != nil {
p.logger.Error("failed to delete block processing - after checking for competing chains", slog.String("hash", blockHash.String()), slog.String("err", errDel.Error()))
}
return false, err
}

// this can happen only in case the blocks table is empty
if tip == nil {
return true, nil
}

tipChainWork := new(big.Int)
tipChainWork.SetString(tip.Chainwork, 10)

incomingBlockChainwork := new(big.Int)
incomingBlockChainwork.SetString(incomingBlock.Chainwork, 10)

if tipChainWork.Cmp(incomingBlockChainwork) < 0 {
return true, nil
}

return false, nil
}

func (p *Processor) markTransactionsAsMined(ctx context.Context, blockId uint64, merkleTree []*chainhash.Hash, blockHeight uint64, blockhash *chainhash.Hash) error {
if tracer != nil {
var span trace.Span
Expand Down
9 changes: 8 additions & 1 deletion internal/blocktx/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@ func createBlock(msg *p2p.BlockMessage, prevBlock *blocktx_api.Block) *blocktx_a
merkleRoot := msg.Header.MerkleRoot
chainwork := calculateChainwork(msg.Header.Bits)

var status blocktx_api.Status
if prevBlock == nil {
status = blocktx_api.Status_ORPHANED
} else {
status = prevBlock.Status
}

return &blocktx_api.Block{
Hash: hash[:],
PreviousHash: prevHash[:],
MerkleRoot: merkleRoot[:],
Height: msg.Height,
Status: prevBlock.Status,
Status: status,
Chainwork: chainwork.String(),
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/blocktx/processor_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func TestChainWork(t *testing.T) {
bits: 0x1d00ffff,
expectedChainWork: "4295032833",
},
{
height: 50_000,
bits: 0x1c2a1115,
expectedChainWork: "26137323115",
},
{
height: 100_000,
bits: 0x1b04864c,
Expand Down
Loading

0 comments on commit 125648a

Please sign in to comment.