diff --git a/internal/options/applicator_options.go b/internal/options/applicator_options.go index 243aeb1..7f35de4 100644 --- a/internal/options/applicator_options.go +++ b/internal/options/applicator_options.go @@ -14,29 +14,35 @@ func min(x, y int) int { } const ( - maxPendingBlocksDefault = 2500 - maxHeightDeltaDefault = 60 - delayThresholdDefault = time.Second * 4 - delayTimeoutDefault = time.Second * 60 - applicationJobsDefault = 8 + maxPendingBlocksDefault = 2500 + maxHeightDeltaDefault = 60 + delayThresholdDefault = time.Second * 4 + delayTimeoutDefault = time.Second * 60 + applicationJobsDefault = 8 + forceChildRequestThresholdDefault = time.Minute + forceApplicationRetryDelayDefault = 50 * time.Millisecond ) // ApplicatorOptions are options for Applicator type ApplicatorOptions struct { - MaxPendingBlocks uint64 - MaxHeightDelta uint64 - DelayThreshold time.Duration - DelayTimeout time.Duration - ApplicationJobs int + MaxPendingBlocks uint64 + MaxHeightDelta uint64 + DelayThreshold time.Duration + DelayTimeout time.Duration + ApplicationJobs int + ForceChildRequestThreshold time.Duration + ForceApplicationRetryDelay time.Duration } // NewApplicatorOptions returns default initialized ApplicatorOptions func NewApplicatorOptions() *ApplicatorOptions { return &ApplicatorOptions{ - MaxPendingBlocks: maxPendingBlocksDefault, - MaxHeightDelta: maxHeightDeltaDefault, - DelayThreshold: delayThresholdDefault, - DelayTimeout: delayTimeoutDefault, - ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()), + MaxPendingBlocks: maxPendingBlocksDefault, + MaxHeightDelta: maxHeightDeltaDefault, + DelayThreshold: delayThresholdDefault, + DelayTimeout: delayTimeoutDefault, + ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()), + ForceChildRequestThreshold: forceChildRequestThresholdDefault, + ForceApplicationRetryDelay: forceApplicationRetryDelayDefault, } } diff --git a/internal/p2p/applicator.go b/internal/p2p/applicator.go index 605d06c..c6133a1 100644 --- a/internal/p2p/applicator.go +++ b/internal/p2p/applicator.go @@ -18,7 +18,12 @@ type blockEntry struct { errChans []chan<- error } -type blockApplicationRequest struct { +type tryBlockApplicationRequest struct { + block *protocol.Block + force bool +} + +type applyBlockRequest struct { block *protocol.Block errChan chan<- error ctx context.Context @@ -29,7 +34,7 @@ type blockApplicationStatus struct { err error } -type transactionApplicationRequest struct { +type applyTransactionRequest struct { trx *protocol.Transaction errChan chan<- error ctx context.Context @@ -54,9 +59,10 @@ type Applicator struct { forkHeadsChan chan *broadcast.ForkHeads blockBroadcastChan chan *broadcast.BlockAccepted blockStatusChan chan *blockApplicationStatus + tryBlockChan chan *tryBlockApplicationRequest - applyBlockChan chan *blockApplicationRequest - applyTransactionChan chan *transactionApplicationRequest + applyBlockChan chan *applyBlockRequest + applyTransactionChan chan *applyTransactionRequest opts options.ApplicatorOptions } @@ -82,8 +88,9 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, cache *TransactionCach forkHeadsChan: make(chan *broadcast.ForkHeads, 10), blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10), blockStatusChan: make(chan *blockApplicationStatus, 10), - applyBlockChan: make(chan *blockApplicationRequest, 10), - applyTransactionChan: make(chan *transactionApplicationRequest, 10), + tryBlockChan: make(chan *tryBlockApplicationRequest, 10), + applyBlockChan: make(chan *applyBlockRequest, 10), + applyTransactionChan: make(chan *applyTransactionRequest, 10), opts: opts, }, nil } @@ -111,7 +118,7 @@ func (a *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) erro func (a *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transaction) error { errChan := make(chan error, 1) - a.applyTransactionChan <- &transactionApplicationRequest{trx, errChan, ctx} + a.applyTransactionChan <- &applyTransactionRequest{trx, errChan, ctx} select { case err := <-errChan: @@ -139,24 +146,32 @@ func (a *Applicator) addBlockEntry(ctx context.Context, entry *blockEntry) { if oldEntry, ok := a.blocksById[id]; ok { oldEntry.errChans = append(oldEntry.errChans, entry.errChans...) - return } else { a.blocksById[id] = entry - } - if _, ok := a.blocksByPrevious[previousId]; !ok { - a.blocksByPrevious[previousId] = make(map[string]void) + if _, ok := a.blocksByPrevious[previousId]; !ok { + a.blocksByPrevious[previousId] = make(map[string]void) + } + a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} + + if _, ok := a.blocksByHeight[height]; !ok { + a.blocksByHeight[height] = make(map[string]void) + } + a.blocksByHeight[height][id] = void{} } - a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} - if _, ok := a.blocksByHeight[height]; !ok { - a.blocksByHeight[height] = make(map[string]void) + // If the block height is greater than the highest block we have seen plus one, + // we know we cannot apply it yet. Wait + if entry.block.Header.Height > a.highestBlock+1 { + return } - a.blocksByHeight[height][id] = void{} - if entry.block.Header.Height <= a.highestBlock+1 { - a.requestBlockApplication(ctx, entry.block) + // If the parent block is currently being applied, we cannot apply it yet. Wait + if _, ok := a.pendingBlocks[string(entry.block.Header.Previous)]; ok { + return } + + a.tryBlockApplication(ctx, entry.block, false) } func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) { @@ -192,13 +207,35 @@ func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) } } -func (a *Applicator) requestBlockApplication(ctx context.Context, block *protocol.Block) { +func (a *Applicator) tryBlockApplication(ctx context.Context, block *protocol.Block, force bool) { + go func() { + select { + case a.tryBlockChan <- &tryBlockApplicationRequest{ + block: block, + force: force, + }: + case <-ctx.Done(): + } + }() +} + +func (a *Applicator) handleTryBlockApplication(ctx context.Context, request *tryBlockApplicationRequest) { // If there is already a pending application of the block, return - if _, ok := a.pendingBlocks[string(block.Id)]; ok { + if _, ok := a.pendingBlocks[string(request.block.Id)]; ok { + if request.force { + go func() { + select { + case <-time.After(a.opts.ForceApplicationRetryDelay): + a.tryBlockApplication(ctx, request.block, request.force) + case <-ctx.Done(): + } + }() + } + return } - a.pendingBlocks[string(block.Id)] = void{} + a.pendingBlocks[string(request.block.Id)] = void{} go func() { errChan := make(chan error, 1) @@ -206,7 +243,7 @@ func (a *Applicator) requestBlockApplication(ctx context.Context, block *protoco // If block is more than 4 seconds in the future, do not apply it until // it is less than 4 seconds in the future. applicationThreshold := time.Now().Add(a.opts.DelayThreshold) - blockTime := time.Unix(int64(block.Header.Timestamp/1000), int64(block.Header.Timestamp%1000)) + blockTime := time.Unix(int64(request.block.Header.Timestamp/1000), int64(request.block.Header.Timestamp%1000)) if blockTime.After(applicationThreshold) { delayCtx, delayCancel := context.WithTimeout(ctx, a.opts.DelayTimeout) @@ -218,22 +255,26 @@ func (a *Applicator) requestBlockApplication(ctx context.Context, block *protoco case <-timerCtx.Done(): case <-delayCtx.Done(): a.blockStatusChan <- &blockApplicationStatus{ - block: block, + block: request.block, err: ctx.Err(), } return + case <-ctx.Done(): + return } } - a.applyBlockChan <- &blockApplicationRequest{block, errChan, ctx} + a.applyBlockChan <- &applyBlockRequest{request.block, errChan, ctx} select { case err := <-errChan: - a.blockStatusChan <- &blockApplicationStatus{ - block: block, + select { + case a.blockStatusChan <- &blockApplicationStatus{ + block: request.block, err: err, + }: + case <-ctx.Done(): } - case <-ctx.Done(): } }() @@ -243,7 +284,7 @@ func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplica delete(a.pendingBlocks, string(status.block.Id)) if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) { - a.requestBlockApplication(ctx, status.block) + a.tryBlockApplication(ctx, status.block, false) } else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) { a.removeBlockEntry(ctx, string(status.block.Id), status.err) } @@ -258,8 +299,6 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { err = p2perrors.ErrMaxPendingBlocks } else if entry.block.Header.Height <= a.lib { err = p2perrors.ErrBlockIrreversibility - } else { - a.addBlockEntry(ctx, entry) } if err != nil { @@ -270,6 +309,8 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { case <-ctx.Done(): } } + } else { + a.addBlockEntry(ctx, entry) } } @@ -277,7 +318,8 @@ func (a *Applicator) checkBlockChildren(ctx context.Context, blockID string) { if children, ok := a.blocksByPrevious[string(blockID)]; ok { for id := range children { if entry, ok := a.blocksById[id]; ok { - a.requestBlockApplication(ctx, entry.block) + force := time.Since(time.UnixMilli(int64(entry.block.Header.Timestamp))) < a.opts.ForceChildRequestThreshold + a.tryBlockApplication(ctx, entry.block, force) } } } @@ -320,7 +362,7 @@ func (a *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broa a.checkBlockChildren(ctx, string(blockAccept.Block.Id)) } -func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) { +func (a *Applicator) handleApplyBlock(request *applyBlockRequest) { var err error if request.block.Header.Height <= atomic.LoadUint64(&a.lib) { err = p2perrors.ErrBlockIrreversibility @@ -332,7 +374,7 @@ func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) { close(request.errChan) } -func (a *Applicator) handleApplyTransaction(request *transactionApplicationRequest) { +func (a *Applicator) handleApplyTransaction(request *applyTransactionRequest) { var err error if a.transactionCache.CheckTransactions(request.trx) == 0 { _, err = a.rpc.ApplyTransaction(request.ctx, request.trx) @@ -354,6 +396,8 @@ func (a *Applicator) Start(ctx context.Context) { a.handleForkHeads(ctx, forkHeads) case blockBroadcast := <-a.blockBroadcastChan: a.handleBlockBroadcast(ctx, blockBroadcast) + case tryApplyBlock := <-a.tryBlockChan: + a.handleTryBlockApplication(ctx, tryApplyBlock) case <-ctx.Done(): return diff --git a/internal/p2p/applicator_test.go b/internal/p2p/applicator_test.go index 602cdea..606a193 100644 --- a/internal/p2p/applicator_test.go +++ b/internal/p2p/applicator_test.go @@ -328,6 +328,52 @@ func TestApplicatorLimits(t *testing.T) { <-testChan } +func TestDelayBlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rpc := applicatorTestRPC{ + blocksToFail: make(map[string]void), + unlinkableBlocks: make(map[string]void), + head: []byte{0x00}, + invalidNonceTrxs: make(map[string]void), + } + + applicator, err := NewApplicator(ctx, &rpc, NewTransactionCache(time.Minute), *options.NewApplicatorOptions()) + if err != nil { + t.Error(err) + } + + block := &protocol.Block{ + Id: []byte{0x01}, + Header: &protocol.BlockHeader{ + Height: 1, + Previous: []byte{0}, + Timestamp: uint64(time.Now().Add(6 * time.Second).UnixMilli()), + }, + } + + applicator.Start(ctx) + + timer, timerCancel := context.WithTimeout(ctx, 6*time.Second) + defer timerCancel() + + go func() { + select { + case <-timer.Done(): + t.Error("block not applied in time") + case <-ctx.Done(): + } + }() + + err = applicator.ApplyBlock(ctx, block) + + if err != nil { + t.Error(err) + } + + cancel() +} + func TestInvalidNonce(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()