Skip to content

Commit

Permalink
#306: Implement force block application for child blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
mvandeberg committed Nov 13, 2024
1 parent b912d0f commit 4b14aa4
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 47 deletions.
36 changes: 21 additions & 15 deletions internal/options/applicator_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,35 @@ func min(x, y int) int {
}

const (
maxPendingBlocksDefault = 2500
maxHeightDeltaDefault = 60
delayThresholdDefault = time.Second * 4
delayTimeoutDefault = time.Second * 60
applicationJobsDefault = 8
maxPendingBlocksDefault = 2500
maxHeightDeltaDefault = 60
delayThresholdDefault = time.Second * 4
delayTimeoutDefault = time.Second * 60
applicationJobsDefault = 8
forceChildRequestThresholdDefault = time.Minute
forceApplicationRetryDelayDefault = 50 * time.Millisecond
)

// ApplicatorOptions are options for Applicator
type ApplicatorOptions struct {
MaxPendingBlocks uint64
MaxHeightDelta uint64
DelayThreshold time.Duration
DelayTimeout time.Duration
ApplicationJobs int
MaxPendingBlocks uint64
MaxHeightDelta uint64
DelayThreshold time.Duration
DelayTimeout time.Duration
ApplicationJobs int
ForceChildRequestThreshold time.Duration
ForceApplicationRetryDelay time.Duration
}

// NewApplicatorOptions returns default initialized ApplicatorOptions
func NewApplicatorOptions() *ApplicatorOptions {
return &ApplicatorOptions{
MaxPendingBlocks: maxPendingBlocksDefault,
MaxHeightDelta: maxHeightDeltaDefault,
DelayThreshold: delayThresholdDefault,
DelayTimeout: delayTimeoutDefault,
ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()),
MaxPendingBlocks: maxPendingBlocksDefault,
MaxHeightDelta: maxHeightDeltaDefault,
DelayThreshold: delayThresholdDefault,
DelayTimeout: delayTimeoutDefault,
ApplicationJobs: min(applicationJobsDefault, runtime.NumCPU()),
ForceChildRequestThreshold: forceChildRequestThresholdDefault,
ForceApplicationRetryDelay: forceApplicationRetryDelayDefault,
}
}
108 changes: 76 additions & 32 deletions internal/p2p/applicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ type blockEntry struct {
errChans []chan<- error
}

type blockApplicationRequest struct {
type tryBlockApplicationRequest struct {
block *protocol.Block
force bool
}

type applyBlockRequest struct {
block *protocol.Block
errChan chan<- error
ctx context.Context
Expand All @@ -29,7 +34,7 @@ type blockApplicationStatus struct {
err error
}

type transactionApplicationRequest struct {
type applyTransactionRequest struct {
trx *protocol.Transaction
errChan chan<- error
ctx context.Context
Expand All @@ -54,9 +59,10 @@ type Applicator struct {
forkHeadsChan chan *broadcast.ForkHeads
blockBroadcastChan chan *broadcast.BlockAccepted
blockStatusChan chan *blockApplicationStatus
tryBlockChan chan *tryBlockApplicationRequest

applyBlockChan chan *blockApplicationRequest
applyTransactionChan chan *transactionApplicationRequest
applyBlockChan chan *applyBlockRequest
applyTransactionChan chan *applyTransactionRequest

opts options.ApplicatorOptions
}
Expand All @@ -82,8 +88,9 @@ func NewApplicator(ctx context.Context, rpc rpc.LocalRPC, cache *TransactionCach
forkHeadsChan: make(chan *broadcast.ForkHeads, 10),
blockBroadcastChan: make(chan *broadcast.BlockAccepted, 10),
blockStatusChan: make(chan *blockApplicationStatus, 10),
applyBlockChan: make(chan *blockApplicationRequest, 10),
applyTransactionChan: make(chan *transactionApplicationRequest, 10),
tryBlockChan: make(chan *tryBlockApplicationRequest, 10),
applyBlockChan: make(chan *applyBlockRequest, 10),
applyTransactionChan: make(chan *applyTransactionRequest, 10),
opts: opts,
}, nil
}
Expand Down Expand Up @@ -111,7 +118,7 @@ func (a *Applicator) ApplyBlock(ctx context.Context, block *protocol.Block) erro
func (a *Applicator) ApplyTransaction(ctx context.Context, trx *protocol.Transaction) error {
errChan := make(chan error, 1)

a.applyTransactionChan <- &transactionApplicationRequest{trx, errChan, ctx}
a.applyTransactionChan <- &applyTransactionRequest{trx, errChan, ctx}

select {
case err := <-errChan:
Expand Down Expand Up @@ -139,24 +146,32 @@ func (a *Applicator) addBlockEntry(ctx context.Context, entry *blockEntry) {

if oldEntry, ok := a.blocksById[id]; ok {
oldEntry.errChans = append(oldEntry.errChans, entry.errChans...)
return
} else {
a.blocksById[id] = entry
}

if _, ok := a.blocksByPrevious[previousId]; !ok {
a.blocksByPrevious[previousId] = make(map[string]void)
if _, ok := a.blocksByPrevious[previousId]; !ok {
a.blocksByPrevious[previousId] = make(map[string]void)
}
a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{}

if _, ok := a.blocksByHeight[height]; !ok {
a.blocksByHeight[height] = make(map[string]void)
}
a.blocksByHeight[height][id] = void{}
}
a.blocksByPrevious[string(entry.block.Header.Previous)][id] = void{}

if _, ok := a.blocksByHeight[height]; !ok {
a.blocksByHeight[height] = make(map[string]void)
// If the block height is greater than the highest block we have seen plus one,
// we know we cannot apply it yet. Wait
if entry.block.Header.Height > a.highestBlock+1 {
return
}
a.blocksByHeight[height][id] = void{}

if entry.block.Header.Height <= a.highestBlock+1 {
a.requestBlockApplication(ctx, entry.block)
// If the parent block is currently being applied, we cannot apply it yet. Wait
if _, ok := a.pendingBlocks[string(entry.block.Header.Previous)]; ok {
return
}

a.tryBlockApplication(ctx, entry.block, false)
}

func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error) {
Expand Down Expand Up @@ -192,21 +207,43 @@ func (a *Applicator) removeBlockEntry(ctx context.Context, id string, err error)
}
}

func (a *Applicator) requestBlockApplication(ctx context.Context, block *protocol.Block) {
func (a *Applicator) tryBlockApplication(ctx context.Context, block *protocol.Block, force bool) {
go func() {
select {
case a.tryBlockChan <- &tryBlockApplicationRequest{
block: block,
force: force,
}:
case <-ctx.Done():
}
}()
}

func (a *Applicator) handleTryBlockApplication(ctx context.Context, request *tryBlockApplicationRequest) {
// If there is already a pending application of the block, return
if _, ok := a.pendingBlocks[string(block.Id)]; ok {
if _, ok := a.pendingBlocks[string(request.block.Id)]; ok {
if request.force {
go func() {
select {
case <-time.After(a.opts.ForceApplicationRetryDelay):
a.tryBlockApplication(ctx, request.block, request.force)
case <-ctx.Done():
}
}()
}

return
}

a.pendingBlocks[string(block.Id)] = void{}
a.pendingBlocks[string(request.block.Id)] = void{}

go func() {
errChan := make(chan error, 1)

// If block is more than 4 seconds in the future, do not apply it until
// it is less than 4 seconds in the future.
applicationThreshold := time.Now().Add(a.opts.DelayThreshold)
blockTime := time.Unix(int64(block.Header.Timestamp/1000), int64(block.Header.Timestamp%1000))
blockTime := time.Unix(int64(request.block.Header.Timestamp/1000), int64(request.block.Header.Timestamp%1000))

if blockTime.After(applicationThreshold) {
delayCtx, delayCancel := context.WithTimeout(ctx, a.opts.DelayTimeout)
Expand All @@ -218,22 +255,26 @@ func (a *Applicator) requestBlockApplication(ctx context.Context, block *protoco
case <-timerCtx.Done():
case <-delayCtx.Done():
a.blockStatusChan <- &blockApplicationStatus{
block: block,
block: request.block,
err: ctx.Err(),
}
return
case <-ctx.Done():
return
}
}

a.applyBlockChan <- &blockApplicationRequest{block, errChan, ctx}
a.applyBlockChan <- &applyBlockRequest{request.block, errChan, ctx}

select {
case err := <-errChan:
a.blockStatusChan <- &blockApplicationStatus{
block: block,
select {
case a.blockStatusChan <- &blockApplicationStatus{
block: request.block,
err: err,
}:
case <-ctx.Done():
}

case <-ctx.Done():
}
}()
Expand All @@ -243,7 +284,7 @@ func (a *Applicator) handleBlockStatus(ctx context.Context, status *blockApplica
delete(a.pendingBlocks, string(status.block.Id))

if status.err != nil && (errors.Is(status.err, p2perrors.ErrBlockState)) {
a.requestBlockApplication(ctx, status.block)
a.tryBlockApplication(ctx, status.block, false)
} else if status.err == nil || !errors.Is(status.err, p2perrors.ErrUnknownPreviousBlock) {
a.removeBlockEntry(ctx, string(status.block.Id), status.err)
}
Expand All @@ -258,8 +299,6 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) {
err = p2perrors.ErrMaxPendingBlocks
} else if entry.block.Header.Height <= a.lib {
err = p2perrors.ErrBlockIrreversibility
} else {
a.addBlockEntry(ctx, entry)
}

if err != nil {
Expand All @@ -270,14 +309,17 @@ func (a *Applicator) handleNewBlock(ctx context.Context, entry *blockEntry) {
case <-ctx.Done():
}
}
} else {
a.addBlockEntry(ctx, entry)
}
}

func (a *Applicator) checkBlockChildren(ctx context.Context, blockID string) {
if children, ok := a.blocksByPrevious[string(blockID)]; ok {
for id := range children {
if entry, ok := a.blocksById[id]; ok {
a.requestBlockApplication(ctx, entry.block)
force := time.Since(time.UnixMilli(int64(entry.block.Header.Timestamp))) < a.opts.ForceChildRequestThreshold
a.tryBlockApplication(ctx, entry.block, force)
}
}
}
Expand Down Expand Up @@ -320,7 +362,7 @@ func (a *Applicator) handleBlockBroadcast(ctx context.Context, blockAccept *broa
a.checkBlockChildren(ctx, string(blockAccept.Block.Id))
}

func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) {
func (a *Applicator) handleApplyBlock(request *applyBlockRequest) {
var err error
if request.block.Header.Height <= atomic.LoadUint64(&a.lib) {
err = p2perrors.ErrBlockIrreversibility
Expand All @@ -332,7 +374,7 @@ func (a *Applicator) handleApplyBlock(request *blockApplicationRequest) {
close(request.errChan)
}

func (a *Applicator) handleApplyTransaction(request *transactionApplicationRequest) {
func (a *Applicator) handleApplyTransaction(request *applyTransactionRequest) {
var err error
if a.transactionCache.CheckTransactions(request.trx) == 0 {
_, err = a.rpc.ApplyTransaction(request.ctx, request.trx)
Expand All @@ -354,6 +396,8 @@ func (a *Applicator) Start(ctx context.Context) {
a.handleForkHeads(ctx, forkHeads)
case blockBroadcast := <-a.blockBroadcastChan:
a.handleBlockBroadcast(ctx, blockBroadcast)
case tryApplyBlock := <-a.tryBlockChan:
a.handleTryBlockApplication(ctx, tryApplyBlock)

case <-ctx.Done():
return
Expand Down
46 changes: 46 additions & 0 deletions internal/p2p/applicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,52 @@ func TestApplicatorLimits(t *testing.T) {
<-testChan
}

func TestDelayBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rpc := applicatorTestRPC{
blocksToFail: make(map[string]void),
unlinkableBlocks: make(map[string]void),
head: []byte{0x00},
invalidNonceTrxs: make(map[string]void),
}

applicator, err := NewApplicator(ctx, &rpc, NewTransactionCache(time.Minute), *options.NewApplicatorOptions())
if err != nil {
t.Error(err)
}

block := &protocol.Block{
Id: []byte{0x01},
Header: &protocol.BlockHeader{
Height: 1,
Previous: []byte{0},
Timestamp: uint64(time.Now().Add(6 * time.Second).UnixMilli()),
},
}

applicator.Start(ctx)

timer, timerCancel := context.WithTimeout(ctx, 6*time.Second)
defer timerCancel()

go func() {
select {
case <-timer.Done():
t.Error("block not applied in time")
case <-ctx.Done():
}
}()

err = applicator.ApplyBlock(ctx, block)

if err != nil {
t.Error(err)
}

cancel()
}

func TestInvalidNonce(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 4b14aa4

Please sign in to comment.