From 8e417941c82ed6f85f0fd7e7075d7a51b13b5acb Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 12 Nov 2024 14:22:25 -0700 Subject: [PATCH 1/3] #303: Rename applicator instance to 'a' and clarify block context of functions --- internal/p2p/applicator.go | 187 +++++++++++++++++++------------------ 1 file changed, 94 insertions(+), 93 deletions(-) diff --git a/internal/p2p/applicator.go b/internal/p2p/applicator.go index 5f04d79..2801af8 100644 --- a/internal/p2p/applicator.go +++ b/internal/p2p/applicator.go @@ -29,7 +29,7 @@ type blockApplicationStatus struct { err error } -type transactionApplicatorRequest struct { +type transactionApplicationRequest struct { trx *protocol.Transaction errChan chan<- error ctx context.Context @@ -50,12 +50,13 @@ type Applicator struct { blocksByHeight map[uint64]map[string]void pendingBlocks map[string]void - newBlockChan chan *blockEntry - forkHeadsChan chan *broadcast.ForkHeads - blockBroadcastChan chan *broadcast.BlockAccepted + newBlockChan chan *blockEntry + forkHeadsChan chan *broadcast.ForkHeads + blockBroadcastChan chan *broadcast.BlockAccepted + blockStatusChan chan *blockApplicationStatus + applyBlockChan chan *blockApplicationRequest - blockStatusChan chan *blockApplicationStatus - applyTransactionChan chan *transactionApplicatorRequest + applyTransactionChan chan *transactionApplicationRequest opts options.ApplicatorOptions } @@ -80,23 +81,23 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, cache *TransactionCach newBlockChan: make(chan *blockEntry, 10), forkHeadsChan: make(chan *broadcast.ForkHeads, 10), blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10), - applyBlockChan: make(chan *blockApplicationRequest, 10), blockStatusChan: make(chan *blockApplicationStatus, 10), - applyTransactionChan: make(chan *transactionApplicatorRequest, 10), + applyBlockChan: make(chan *blockApplicationRequest, 10), + applyTransactionChan: make(chan *transactionApplicationRequest, 10), opts: opts, }, nil } // ApplyBlock will apply the block to the chain at the appropriate time -func (b *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) error { - err := b.forkWatchdog.Add(block) +func (a *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) error { + err := a.forkWatchdog.Add(block) if err != nil { return err } errChan := make(chan error, 1) - b.newBlockChan <- &blockEntry{block: block, errChans: []chan<- error{errChan}} + a.newBlockChan <- &blockEntry{block: block, errChans: []chan<- error{errChan}} select { case err := <-errChan: @@ -107,10 +108,10 @@ func (b *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) erro } } -func (b *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transaction) error { +func (a *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transaction) error { errChan := make(chan error, 1) - b.applyTransactionChan <- &transactionApplicatorRequest{trx, errChan, ctx} + a.applyTransactionChan <- &transactionApplicationRequest{trx, errChan, ctx} select { case err := <-errChan: @@ -122,44 +123,44 @@ func (b *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transac } // HandleForkHeads handles a fork heads broadcast -func (b *Applicator) HandleForkHeads(forkHeads *broadcast.ForkHeads) { - b.forkHeadsChan <- forkHeads +func (a *Applicator) HandleForkHeads(forkHeads *broadcast.ForkHeads) { + a.forkHeadsChan <- forkHeads } // HandleBlockBroadcast handles a block broadcast -func (b *Applicator) HandleBlockBroadcast(blockAccept *broadcast.BlockAccepted) { - b.blockBroadcastChan <- blockAccept +func (a *Applicator) HandleBlockBroadcast(blockAccept *broadcast.BlockAccepted) { + a.blockBroadcastChan <- blockAccept } -func (b *Applicator) addEntry(ctx context.Context, entry *blockEntry) { +func (a *Applicator) addBlockEntry(ctx context.Context, entry *blockEntry) { id := string(entry.block.Id) previousId := string(entry.block.Header.Previous) height := entry.block.Header.Height - if oldEntry, ok := b.blocksById[id]; ok { + if oldEntry, ok := a.blocksById[id]; ok { oldEntry.errChans = append(oldEntry.errChans, entry.errChans...) return } else { - b.blocksById[id] = entry + a.blocksById[id] = entry } - if _, ok := b.blocksByPrevious[previousId]; !ok { - b.blocksByPrevious[previousId] = make(map[string]void) + if _, ok := a.blocksByPrevious[previousId]; !ok { + a.blocksByPrevious[previousId] = make(map[string]void) } - b.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} + a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} - if _, ok := b.blocksByHeight[height]; !ok { - b.blocksByHeight[height] = make(map[string]void) + if _, ok := a.blocksByHeight[height]; !ok { + a.blocksByHeight[height] = make(map[string]void) } - b.blocksByHeight[height][id] = void{} + a.blocksByHeight[height][id] = void{} - if entry.block.Header.Height <= b.highestBlock+1 { - b.requestApplication(ctx, entry.block) + if entry.block.Header.Height <= a.highestBlock+1 { + a.requestBlockApplication(ctx, entry.block) } } -func (b *Applicator) removeEntry(ctx context.Context, id string, err error) { - if entry, ok := b.blocksById[id]; ok { +func (a *Applicator) removeEntry(ctx context.Context, id string, err error) { + if entry, ok := a.blocksById[id]; ok { for _, ch := range entry.errChans { select { case ch <- err: @@ -168,47 +169,47 @@ func (b *Applicator) removeEntry(ctx context.Context, id string, err error) { } } - delete(b.blocksById, id) + delete(a.blocksById, id) previousId := string(entry.block.Header.Previous) height := entry.block.Header.Height - if blocks, ok := b.blocksByPrevious[previousId]; ok { + if blocks, ok := a.blocksByPrevious[previousId]; ok { delete(blocks, id) - if len(b.blocksByPrevious[previousId]) == 0 { - delete(b.blocksByPrevious, previousId) + if len(a.blocksByPrevious[previousId]) == 0 { + delete(a.blocksByPrevious, previousId) } } - if blocks, ok := b.blocksByHeight[height]; ok { + if blocks, ok := a.blocksByHeight[height]; ok { delete(blocks, id) - if len(b.blocksByHeight[height]) == 0 { - delete(b.blocksByHeight, height) + if len(a.blocksByHeight[height]) == 0 { + delete(a.blocksByHeight, height) } } } } -func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Block) { +func (a *Applicator) requestBlockApplication(ctx context.Context, block *protocol.Block) { // If there is already a pending application of the block, return - if _, ok := b.pendingBlocks[string(block.Id)]; ok { + if _, ok := a.pendingBlocks[string(block.Id)]; ok { return } - b.pendingBlocks[string(block.Id)] = void{} + a.pendingBlocks[string(block.Id)] = void{} go func() { errChan := make(chan error, 1) // If block is more than 4 seconds in the future, do not apply it until // it is less than 4 seconds in the future. - applicationThreshold := time.Now().Add(b.opts.DelayThreshold) + applicationThreshold := time.Now().Add(a.opts.DelayThreshold) blockTime := time.Unix(int64(block.Header.Timestamp/1000), int64(block.Header.Timestamp%1000)) if blockTime.After(applicationThreshold) { - delayCtx, delayCancel := context.WithTimeout(ctx, b.opts.DelayTimeout) + delayCtx, delayCancel := context.WithTimeout(ctx, a.opts.DelayTimeout) defer delayCancel() timerCtx, timerCancel := context.WithTimeout(ctx, blockTime.Sub(applicationThreshold)) defer timerCancel() @@ -216,7 +217,7 @@ func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Blo select { case <-timerCtx.Done(): case <-delayCtx.Done(): - b.blockStatusChan <- &blockApplicationStatus{ + a.blockStatusChan <- &blockApplicationStatus{ block: block, err: ctx.Err(), } @@ -224,11 +225,11 @@ func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Blo } } - b.applyBlockChan <- &blockApplicationRequest{block, errChan, ctx} + a.applyBlockChan <- &blockApplicationRequest{block, errChan, ctx} select { case err := <-errChan: - b.blockStatusChan <- &blockApplicationStatus{ + a.blockStatusChan <- &blockApplicationStatus{ block: block, err: err, } @@ -238,27 +239,27 @@ func (b *Applicator) requestApplication(ctx context.Context, block *protocol.Blo }() } -func (b *Applicator) handleBlockStatus(ctx context.Context, status *blockApplicationStatus) { - delete(b.pendingBlocks, string(status.block.Id)) +func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplicationStatus) { + delete(a.pendingBlocks, string(status.block.Id)) if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) { - b.requestApplication(ctx, status.block) + a.requestBlockApplication(ctx, status.block) } else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) { - b.removeEntry(ctx, string(status.block.Id), status.err) + a.removeEntry(ctx, string(status.block.Id), status.err) } } -func (b *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { +func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { var err error - if entry.block.Header.Height > b.highestBlock+b.opts.MaxHeightDelta { + if entry.block.Header.Height > a.highestBlock+a.opts.MaxHeightDelta { err = p2perrors.ErrMaxHeight - } else if len(b.blocksById) >= int(b.opts.MaxPendingBlocks) { + } else if len(a.blocksById) >= int(a.opts.MaxPendingBlocks) { err = p2perrors.ErrMaxPendingBlocks - } else if entry.block.Header.Height <= b.lib { + } else if entry.block.Header.Height <= a.lib { err = p2perrors.ErrBlockIrreversibility } else { - b.addEntry(ctx, entry) + a.addBlockEntry(ctx, entry) } if err != nil { @@ -272,87 +273,87 @@ func (b *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { } } -func (b *Applicator) checkChildren(ctx context.Context, blockID string) { - if children, ok := b.blocksByPrevious[string(blockID)]; ok { +func (a *Applicator) checkBlockChildren(ctx context.Context, blockID string) { + if children, ok := a.blocksByPrevious[string(blockID)]; ok { for id := range children { - if entry, ok := b.blocksById[id]; ok { - b.requestApplication(ctx, entry.block) + if entry, ok := a.blocksById[id]; ok { + a.requestBlockApplication(ctx, entry.block) } } } } -func (b *Applicator) handleForkHeads(ctx context.Context, forkHeads *broadcast.ForkHeads) { - oldLib := b.lib - atomic.StoreUint64(&b.lib, forkHeads.LastIrreversibleBlock.Height) +func (a *Applicator) handleForkHeads(ctx context.Context, forkHeads *broadcast.ForkHeads) { + oldLib := a.lib + atomic.StoreUint64(&a.lib, forkHeads.LastIrreversibleBlock.Height) - b.forkWatchdog.Purge(forkHeads.LastIrreversibleBlock.Height) + a.forkWatchdog.Purge(forkHeads.LastIrreversibleBlock.Height) for _, head := range forkHeads.Heads { - b.checkChildren(ctx, string(head.Id)) + a.checkBlockChildren(ctx, string(head.Id)) } // Blocks at or before LIB are automatically rejected, so all entries have height - // greater than previous LIB. We check every block height greater than old LIB, + // greater than previous LIA. We check every block height greater than old LIB, // up to and including the current LIB and remove their entry. // Some blocks may be on unreachable forks at this point. That's ok. // Because they are not reachable, we will never get a parent block that causes their // application and they'll be cleaned up using this logic once their height passes // beyond irreversibility for h := oldLib + 1; h <= forkHeads.LastIrreversibleBlock.Height; h++ { - if ids, ok := b.blocksByHeight[h]; ok { + if ids, ok := a.blocksByHeight[h]; ok { for id := range ids { - b.removeEntry(ctx, id, p2perrors.ErrBlockIrreversibility) + a.removeEntry(ctx, id, p2perrors.ErrBlockIrreversibility) } } } } -func (b *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broadcast.BlockAccepted) { - b.transactionCache.CheckBlock(blockAccept.Block) +func (a *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broadcast.BlockAccepted) { + a.transactionCache.CheckBlock(blockAccept.Block) // It is not possible for a block with a new highest height to not be head, so this check is sufficient - if blockAccept.Block.Header.Height > b.highestBlock { - b.highestBlock = blockAccept.Block.Header.Height + if blockAccept.Block.Header.Height > a.highestBlock { + a.highestBlock = blockAccept.Block.Header.Height } - b.checkChildren(ctx, string(blockAccept.Block.Id)) + a.checkBlockChildren(ctx, string(blockAccept.Block.Id)) } -func (b *Applicator) handleApplyBlock(request *blockApplicationRequest) { +func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) { var err error - if request.block.Header.Height <= atomic.LoadUint64(&b.lib) { + if request.block.Header.Height <= atomic.LoadUint64(&a.lib) { err = p2perrors.ErrBlockIrreversibility } else { - _, err = b.rpc.ApplyBlock(request.ctx, request.block) + _, err = a.rpc.ApplyBlock(request.ctx, request.block) } request.errChan <- err close(request.errChan) } -func (b *Applicator) handleApplyTransaction(request *transactionApplicatorRequest) { +func (a *Applicator) handleApplyTransaction(request *transactionApplicationRequest) { var err error - if b.transactionCache.CheckTransactions(request.trx) == 0 { - _, err = b.rpc.ApplyTransaction(request.ctx, request.trx) + if a.transactionCache.CheckTransactions(request.trx) == 0 { + _, err = a.rpc.ApplyTransaction(request.ctx, request.trx) } request.errChan <- err close(request.errChan) } -func (b *Applicator) Start(ctx context.Context) { +func (a *Applicator) Start(ctx context.Context) { go func() { for { select { - case status := <-b.blockStatusChan: - b.handleBlockStatus(ctx, status) - case entry := <-b.newBlockChan: - b.handleNewBlock(ctx, entry) - case forkHeads := <-b.forkHeadsChan: - b.handleForkHeads(ctx, forkHeads) - case blockBroadcast := <-b.blockBroadcastChan: - b.handleBlockBroadcast(ctx, blockBroadcast) + case status := <-a.blockStatusChan: + a.handleBlockStatus(ctx, status) + case entry := <-a.newBlockChan: + a.handleNewBlock(ctx, entry) + case forkHeads := <-a.forkHeadsChan: + a.handleForkHeads(ctx, forkHeads) + case blockBroadcast := <-a.blockBroadcastChan: + a.handleBlockBroadcast(ctx, blockBroadcast) case <-ctx.Done(): return @@ -360,18 +361,18 @@ func (b *Applicator) Start(ctx context.Context) { } }() - for i := 0; i < b.opts.ApplicationJobs; i++ { + for i := 0; i < a.opts.ApplicationJobs; i++ { go func() { for { select { - case request := <-b.applyBlockChan: - b.handleApplyBlock(request) + case request := <-a.applyBlockChan: + a.handleApplyBlock(request) default: select { - case request := <-b.applyBlockChan: - b.handleApplyBlock(request) - case request := <-b.applyTransactionChan: - b.handleApplyTransaction(request) + case request := <-a.applyBlockChan: + a.handleApplyBlock(request) + case request := <-a.applyTransactionChan: + a.handleApplyTransaction(request) case <-ctx.Done(): return } From b912d0f3590284dd0340d7c374cd9c7137414186 Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 12 Nov 2024 16:07:56 -0700 Subject: [PATCH 2/3] #303: 'removeEntry' -> 'removeBlockEntry' --- internal/p2p/applicator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/p2p/applicator.go b/internal/p2p/applicator.go index 2801af8..605d06c 100644 --- a/internal/p2p/applicator.go +++ b/internal/p2p/applicator.go @@ -159,7 +159,7 @@ func (a *Applicator) addBlockEntry(ctx context.Context, entry *blockEntry) { } } -func (a *Applicator) removeEntry(ctx context.Context, id string, err error) { +func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) { if entry, ok := a.blocksById[id]; ok { for _, ch := range entry.errChans { select { @@ -245,7 +245,7 @@ func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplica if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) { a.requestBlockApplication(ctx, status.block) } else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) { - a.removeEntry(ctx, string(status.block.Id), status.err) + a.removeBlockEntry(ctx, string(status.block.Id), status.err) } } @@ -303,7 +303,7 @@ func (a *Applicator) handleForkHeads(ctx context.Context, forkHeads *broadcast.F for h := oldLib + 1; h <= forkHeads.LastIrreversibleBlock.Height; h++ { if ids, ok := a.blocksByHeight[h]; ok { for id := range ids { - a.removeEntry(ctx, id, p2perrors.ErrBlockIrreversibility) + a.removeBlockEntry(ctx, id, p2perrors.ErrBlockIrreversibility) } } } From 4b14aa4bcaeb1af26b0e4b3b81b7916c34c77d6d Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 12 Nov 2024 16:20:58 -0700 Subject: [PATCH 3/3] #306: Implement force block application for child blocks --- internal/options/applicator_options.go | 36 +++++---- internal/p2p/applicator.go | 108 +++++++++++++++++-------- internal/p2p/applicator_test.go | 46 +++++++++++ 3 files changed, 143 insertions(+), 47 deletions(-) diff --git a/internal/options/applicator_options.go b/internal/options/applicator_options.go index 243aeb1..7f35de4 100644 --- a/internal/options/applicator_options.go +++ b/internal/options/applicator_options.go @@ -14,29 +14,35 @@ func min(x, y int) int { } const ( - maxPendingBlocksDefault = 2500 - maxHeightDeltaDefault = 60 - delayThresholdDefault = time.Second * 4 - delayTimeoutDefault = time.Second * 60 - applicationJobsDefault = 8 + maxPendingBlocksDefault = 2500 + maxHeightDeltaDefault = 60 + delayThresholdDefault = time.Second * 4 + delayTimeoutDefault = time.Second * 60 + applicationJobsDefault = 8 + forceChildRequestThresholdDefault = time.Minute + forceApplicationRetryDelayDefault = 50 * time.Millisecond ) // ApplicatorOptions are options for Applicator type ApplicatorOptions struct { - MaxPendingBlocks uint64 - MaxHeightDelta uint64 - DelayThreshold time.Duration - DelayTimeout time.Duration - ApplicationJobs int + MaxPendingBlocks uint64 + MaxHeightDelta uint64 + DelayThreshold time.Duration + DelayTimeout time.Duration + ApplicationJobs int + ForceChildRequestThreshold time.Duration + ForceApplicationRetryDelay time.Duration } // NewApplicatorOptions returns default initialized ApplicatorOptions func NewApplicatorOptions() *ApplicatorOptions { return &ApplicatorOptions{ - MaxPendingBlocks: maxPendingBlocksDefault, - MaxHeightDelta: maxHeightDeltaDefault, - DelayThreshold: delayThresholdDefault, - DelayTimeout: delayTimeoutDefault, - ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()), + MaxPendingBlocks: maxPendingBlocksDefault, + MaxHeightDelta: maxHeightDeltaDefault, + DelayThreshold: delayThresholdDefault, + DelayTimeout: delayTimeoutDefault, + ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()), + ForceChildRequestThreshold: forceChildRequestThresholdDefault, + ForceApplicationRetryDelay: forceApplicationRetryDelayDefault, } } diff --git a/internal/p2p/applicator.go b/internal/p2p/applicator.go index 605d06c..c6133a1 100644 --- a/internal/p2p/applicator.go +++ b/internal/p2p/applicator.go @@ -18,7 +18,12 @@ type blockEntry struct { errChans []chan<- error } -type blockApplicationRequest struct { +type tryBlockApplicationRequest struct { + block *protocol.Block + force bool +} + +type applyBlockRequest struct { block *protocol.Block errChan chan<- error ctx context.Context @@ -29,7 +34,7 @@ type blockApplicationStatus struct { err error } -type transactionApplicationRequest struct { +type applyTransactionRequest struct { trx *protocol.Transaction errChan chan<- error ctx context.Context @@ -54,9 +59,10 @@ type Applicator struct { forkHeadsChan chan *broadcast.ForkHeads blockBroadcastChan chan *broadcast.BlockAccepted blockStatusChan chan *blockApplicationStatus + tryBlockChan chan *tryBlockApplicationRequest - applyBlockChan chan *blockApplicationRequest - applyTransactionChan chan *transactionApplicationRequest + applyBlockChan chan *applyBlockRequest + applyTransactionChan chan *applyTransactionRequest opts options.ApplicatorOptions } @@ -82,8 +88,9 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, cache *TransactionCach forkHeadsChan: make(chan *broadcast.ForkHeads, 10), blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10), blockStatusChan: make(chan *blockApplicationStatus, 10), - applyBlockChan: make(chan *blockApplicationRequest, 10), - applyTransactionChan: make(chan *transactionApplicationRequest, 10), + tryBlockChan: make(chan *tryBlockApplicationRequest, 10), + applyBlockChan: make(chan *applyBlockRequest, 10), + applyTransactionChan: make(chan *applyTransactionRequest, 10), opts: opts, }, nil } @@ -111,7 +118,7 @@ func (a *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) erro func (a *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transaction) error { errChan := make(chan error, 1) - a.applyTransactionChan <- &transactionApplicationRequest{trx, errChan, ctx} + a.applyTransactionChan <- &applyTransactionRequest{trx, errChan, ctx} select { case err := <-errChan: @@ -139,24 +146,32 @@ func (a *Applicator) addBlockEntry(ctx context.Context, entry *blockEntry) { if oldEntry, ok := a.blocksById[id]; ok { oldEntry.errChans = append(oldEntry.errChans, entry.errChans...) - return } else { a.blocksById[id] = entry - } - if _, ok := a.blocksByPrevious[previousId]; !ok { - a.blocksByPrevious[previousId] = make(map[string]void) + if _, ok := a.blocksByPrevious[previousId]; !ok { + a.blocksByPrevious[previousId] = make(map[string]void) + } + a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} + + if _, ok := a.blocksByHeight[height]; !ok { + a.blocksByHeight[height] = make(map[string]void) + } + a.blocksByHeight[height][id] = void{} } - a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{} - if _, ok := a.blocksByHeight[height]; !ok { - a.blocksByHeight[height] = make(map[string]void) + // If the block height is greater than the highest block we have seen plus one, + // we know we cannot apply it yet. Wait + if entry.block.Header.Height > a.highestBlock+1 { + return } - a.blocksByHeight[height][id] = void{} - if entry.block.Header.Height <= a.highestBlock+1 { - a.requestBlockApplication(ctx, entry.block) + // If the parent block is currently being applied, we cannot apply it yet. Wait + if _, ok := a.pendingBlocks[string(entry.block.Header.Previous)]; ok { + return } + + a.tryBlockApplication(ctx, entry.block, false) } func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) { @@ -192,13 +207,35 @@ func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) } } -func (a *Applicator) requestBlockApplication(ctx context.Context, block *protocol.Block) { +func (a *Applicator) tryBlockApplication(ctx context.Context, block *protocol.Block, force bool) { + go func() { + select { + case a.tryBlockChan <- &tryBlockApplicationRequest{ + block: block, + force: force, + }: + case <-ctx.Done(): + } + }() +} + +func (a *Applicator) handleTryBlockApplication(ctx context.Context, request *tryBlockApplicationRequest) { // If there is already a pending application of the block, return - if _, ok := a.pendingBlocks[string(block.Id)]; ok { + if _, ok := a.pendingBlocks[string(request.block.Id)]; ok { + if request.force { + go func() { + select { + case <-time.After(a.opts.ForceApplicationRetryDelay): + a.tryBlockApplication(ctx, request.block, request.force) + case <-ctx.Done(): + } + }() + } + return } - a.pendingBlocks[string(block.Id)] = void{} + a.pendingBlocks[string(request.block.Id)] = void{} go func() { errChan := make(chan error, 1) @@ -206,7 +243,7 @@ func (a *Applicator) requestBlockApplication(ctx context.Context, block *protoco // If block is more than 4 seconds in the future, do not apply it until // it is less than 4 seconds in the future. applicationThreshold := time.Now().Add(a.opts.DelayThreshold) - blockTime := time.Unix(int64(block.Header.Timestamp/1000), int64(block.Header.Timestamp%1000)) + blockTime := time.Unix(int64(request.block.Header.Timestamp/1000), int64(request.block.Header.Timestamp%1000)) if blockTime.After(applicationThreshold) { delayCtx, delayCancel := context.WithTimeout(ctx, a.opts.DelayTimeout) @@ -218,22 +255,26 @@ func (a *Applicator) requestBlockApplication(ctx context.Context, block *protoco case <-timerCtx.Done(): case <-delayCtx.Done(): a.blockStatusChan <- &blockApplicationStatus{ - block: block, + block: request.block, err: ctx.Err(), } return + case <-ctx.Done(): + return } } - a.applyBlockChan <- &blockApplicationRequest{block, errChan, ctx} + a.applyBlockChan <- &applyBlockRequest{request.block, errChan, ctx} select { case err := <-errChan: - a.blockStatusChan <- &blockApplicationStatus{ - block: block, + select { + case a.blockStatusChan <- &blockApplicationStatus{ + block: request.block, err: err, + }: + case <-ctx.Done(): } - case <-ctx.Done(): } }() @@ -243,7 +284,7 @@ func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplica delete(a.pendingBlocks, string(status.block.Id)) if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) { - a.requestBlockApplication(ctx, status.block) + a.tryBlockApplication(ctx, status.block, false) } else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) { a.removeBlockEntry(ctx, string(status.block.Id), status.err) } @@ -258,8 +299,6 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { err = p2perrors.ErrMaxPendingBlocks } else if entry.block.Header.Height <= a.lib { err = p2perrors.ErrBlockIrreversibility - } else { - a.addBlockEntry(ctx, entry) } if err != nil { @@ -270,6 +309,8 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) { case <-ctx.Done(): } } + } else { + a.addBlockEntry(ctx, entry) } } @@ -277,7 +318,8 @@ func (a *Applicator) checkBlockChildren(ctx context.Context, blockID string) { if children, ok := a.blocksByPrevious[string(blockID)]; ok { for id := range children { if entry, ok := a.blocksById[id]; ok { - a.requestBlockApplication(ctx, entry.block) + force := time.Since(time.UnixMilli(int64(entry.block.Header.Timestamp))) < a.opts.ForceChildRequestThreshold + a.tryBlockApplication(ctx, entry.block, force) } } } @@ -320,7 +362,7 @@ func (a *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broa a.checkBlockChildren(ctx, string(blockAccept.Block.Id)) } -func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) { +func (a *Applicator) handleApplyBlock(request *applyBlockRequest) { var err error if request.block.Header.Height <= atomic.LoadUint64(&a.lib) { err = p2perrors.ErrBlockIrreversibility @@ -332,7 +374,7 @@ func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) { close(request.errChan) } -func (a *Applicator) handleApplyTransaction(request *transactionApplicationRequest) { +func (a *Applicator) handleApplyTransaction(request *applyTransactionRequest) { var err error if a.transactionCache.CheckTransactions(request.trx) == 0 { _, err = a.rpc.ApplyTransaction(request.ctx, request.trx) @@ -354,6 +396,8 @@ func (a *Applicator) Start(ctx context.Context) { a.handleForkHeads(ctx, forkHeads) case blockBroadcast := <-a.blockBroadcastChan: a.handleBlockBroadcast(ctx, blockBroadcast) + case tryApplyBlock := <-a.tryBlockChan: + a.handleTryBlockApplication(ctx, tryApplyBlock) case <-ctx.Done(): return diff --git a/internal/p2p/applicator_test.go b/internal/p2p/applicator_test.go index 602cdea..606a193 100644 --- a/internal/p2p/applicator_test.go +++ b/internal/p2p/applicator_test.go @@ -328,6 +328,52 @@ func TestApplicatorLimits(t *testing.T) { <-testChan } +func TestDelayBlock(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rpc := applicatorTestRPC{ + blocksToFail: make(map[string]void), + unlinkableBlocks: make(map[string]void), + head: []byte{0x00}, + invalidNonceTrxs: make(map[string]void), + } + + applicator, err := NewApplicator(ctx, &rpc, NewTransactionCache(time.Minute), *options.NewApplicatorOptions()) + if err != nil { + t.Error(err) + } + + block := &protocol.Block{ + Id: []byte{0x01}, + Header: &protocol.BlockHeader{ + Height: 1, + Previous: []byte{0}, + Timestamp: uint64(time.Now().Add(6 * time.Second).UnixMilli()), + }, + } + + applicator.Start(ctx) + + timer, timerCancel := context.WithTimeout(ctx, 6*time.Second) + defer timerCancel() + + go func() { + select { + case <-timer.Done(): + t.Error("block not applied in time") + case <-ctx.Done(): + } + }() + + err = applicator.ApplyBlock(ctx, block) + + if err != nil { + t.Error(err) + } + + cancel() +} + func TestInvalidNonce(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()