Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): max skew based on time instead of batches #1140

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("commit block: %w", err)
}

// update last block time
m.State.SetLastBlockTime(block.Header.GetTimestamp())

// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.
Expand All @@ -80,7 +83,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
Expand Down
4 changes: 3 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ func (m *Manager) syncFromSettlement() error {
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
lastBlockTimestamp := res.BlockDescriptors[len(res.BlockDescriptors)-1].GetTimestamp()
m.State.SetLastSubmittedBlockTime(lastBlockTimestamp)

err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bytes"
"errors"
"fmt"
"time"

errorsmod "cosmossdk.io/errors"

Expand Down Expand Up @@ -110,6 +111,8 @@
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))

s.SetLastSubmittedBlockTime(time.Now())
Fixed Show fixed Hide fixed

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
}

func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
Expand All @@ -123,7 +126,6 @@
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

s.SetHeight(height)

if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
47 changes: 27 additions & 20 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.State.GetSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,8 +44,9 @@
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
skewTime func() time.Duration,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -60,33 +62,35 @@
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

submitter.Nudge()

if maxBatchSkew < skewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
case <-ctx.Done():
return ctx.Err()
case <-trigger.C:
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
Expand All @@ -96,15 +100,17 @@
case <-submitter.C:
}
pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))

types.RollappPendingSubmissionsSkewBytes.Set(float64(pending))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime

lastSubmissionIsRecent := skewTime() < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand All @@ -125,7 +131,6 @@
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
Expand Down Expand Up @@ -237,6 +242,8 @@

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSubmittedHeight.Store(batch.EndHeight())
m.State.SetLastSubmittedBlockTime(time.Now())
Fixed Show fixed Hide fixed

return nil
}

Expand Down
52 changes: 31 additions & 21 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
type testArgs struct {
nParallel int // number of instances to run in parallel
testDuration time.Duration // how long to run one instance of the test (should be short)
batchSkew uint64 // max number of batches to get ahead
batchSkew time.Duration // time between last block produced and submitted
skewMargin time.Duration // skew margin allowed
batchBytes uint64 // max number of bytes in a batch
maxTime time.Duration // maximum time to wait before submitting submissions
submitTime time.Duration // how long it takes to submit a batch
Expand Down Expand Up @@ -59,9 +60,20 @@ func testSubmitLoopInner(
nProducedBytes := atomic.Uint64{} // tracking how many actual bytes have been produced but not submitted so far
producedBytesC := make(chan int) // producer sends on here, and can be blocked by not consuming from here

// the time of the last block produced or the last batch submitted or the last starting of the node
timeLastProgress := atomic.Int64{}
lastSubmittedBlockTime := atomic.Uint64{}
lastProducedBlockTime := atomic.Uint64{}
lastProducedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))
lastSubmittedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))

skewTime := func() time.Duration {

lastSubmitted := time.Unix(0, int64(lastSubmittedBlockTime.Load()))
lastProduced := time.Unix(0, int64(lastProducedBlockTime.Load()))
if lastProduced.Before(lastSubmitted) {
return 0
}
return lastProduced.Sub(lastSubmitted)
}
go func() { // simulate block production
go func() { // another thread to check system properties
for {
Expand All @@ -71,9 +83,7 @@ func testSubmitLoopInner(
default:
}
// producer shall not get too far ahead
absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
nProduced := nProducedBytes.Load()
require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
require.True(t, skewTime() < args.batchSkew+args.skewMargin, "last produced blocks time not less than maximum skew time", "produced block skew time", skewTime(), "max skew time", args.batchSkew)
}
}()
for {
Expand All @@ -82,39 +92,37 @@ func testSubmitLoopInner(
return
default:
}

time.Sleep(approx(args.produceTime))

if args.batchSkew <= skewTime() {
continue
}
nBytes := rand.Intn(args.produceBytes) // simulate block production
nProducedBytes.Add(uint64(nBytes))
producedBytesC <- nBytes
pendingBlocks.Add(1) // increase pending blocks to be submitted counter

timeLastProgress.Store(time.Now().Unix())
lastProducedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))
}
}()

submitBatch := func(maxSize uint64) (uint64, error) { // mock the batch submission
time.Sleep(approx(args.submitTime))
if rand.Float64() < args.submissionHaltProbability {
time.Sleep(args.submissionHaltTime)
timeLastProgress.Store(time.Now().Unix()) // we have now recovered
}
consumed := rand.Intn(int(maxSize))
nProducedBytes.Add(^uint64(consumed - 1)) // subtract

timeLastProgressT := time.Unix(timeLastProgress.Load(), 0)
absoluteMax := int64(2 * float64(args.maxTime)) // allow some leeway for code execution. Tests may run on small boxes (GH actions)
timeSinceLast := time.Since(timeLastProgressT).Milliseconds()
require.True(t, timeSinceLast < absoluteMax, "too long since last update", "timeSinceLast", timeSinceLast, "max", absoluteMax)

pendingBlocks.Store(0) // no pending blocks to be submitted
timeLastProgress.Store(time.Now().Unix()) // we have submitted batch
lastSubmittedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))

return uint64(consumed), nil
}
accumulatedBlocks := func() uint64 {
return pendingBlocks.Load()
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch)
block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, skewTime, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand All @@ -124,7 +132,8 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
testArgs{
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchSkew: 40 * time.Millisecond,
skewMargin: 5 * time.Millisecond,
batchBytes: 100,
maxTime: 10 * time.Millisecond,
submitTime: 2 * time.Millisecond,
Expand All @@ -133,7 +142,7 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
// a relatively long possibility of the submitter halting
// tests the case where we need to stop the producer getting too far ahead
submissionHaltTime: 50 * time.Millisecond,
submissionHaltProbability: 0.01,
submissionHaltProbability: 0.05,
},
)
}
Expand All @@ -143,9 +152,10 @@ func TestSubmitLoopTimer(t *testing.T) {
testSubmitLoop(
t,
testArgs{
nParallel: 50,
nParallel: 1,
testDuration: 2 * time.Second,
batchSkew: 10,
batchSkew: 150 * time.Millisecond,
skewMargin: 5 * time.Millisecond,
batchBytes: 100,
maxTime: 10 * time.Millisecond,
submitTime: 2 * time.Millisecond,
Expand Down
4 changes: 3 additions & 1 deletion block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestSubmissionByTime(t *testing.T) {
managerConfig := config.BlockManagerConfig{
BlockTime: blockTime,
MaxIdleTime: 0,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
BatchSubmitTime: submitTimeout,
BatchSubmitBytes: 1000,
}
Expand All @@ -253,6 +253,7 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.State.SetLastSubmittedBlockTime(time.Now())
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -325,6 +326,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.State.SetLastSubmittedBlockTime(time.Now())

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type BlockManagerConfig struct {
// BatchSubmitMaxTime is how long should block manager wait for before submitting batch
BatchSubmitTime time.Duration `mapstructure:"batch_submit_time"`
// BatchSkew is the number of batches waiting to be submitted. Block production will be paused if this limit is reached.
BatchSkew uint64 `mapstructure:"max_batch_skew"`
BatchSkew time.Duration `mapstructure:"max_skew_time"`
// The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer.
BatchSubmitBytes uint64 `mapstructure:"batch_submit_bytes"`
}
Expand Down Expand Up @@ -159,8 +159,8 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("batch_submit_bytes must be positive")
}

if c.BatchSkew <= 0 {
return fmt.Errorf("max_batch_skew must be positive")
if c.BatchSkew < c.BatchSubmitTime {
return fmt.Errorf("max_skew_time cannot be less than batch_submit_time. max_skew_time: %s batch_submit_time: %s", c.BatchSkew, c.BatchSubmitTime)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestNodeConfig_Validate(t *testing.T) {
},
wantErr: assert.Error,
}, {
name: "max_batch_skew 0",
name: "max_skew_time 0",
malleate: func(nc *config.NodeConfig) {
nc.BlockManagerConfig.BatchSkew = 0
},
Expand Down Expand Up @@ -187,7 +187,7 @@ func fullNodeConfig() config.NodeConfig {
MaxIdleTime: 20 * time.Second,
MaxProofTime: 20 * time.Second,
BatchSubmitTime: 20 * time.Second,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
BatchSubmitBytes: 10000,
},
DAConfig: "da-config",
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func DefaultConfig(home string) *NodeConfig {
MaxIdleTime: 3600 * time.Second,
MaxProofTime: 100 * time.Second,
BatchSubmitTime: 3600 * time.Second,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
BatchSubmitBytes: 500000,
},
SettlementLayer: "mock",
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
max_batch_skew = {{ .BlockManagerConfig.BatchSkew }}
max_skew_time = "{{ .BlockManagerConfig.BatchSkew }}"


# triggers to submit batch to DA and settlement (both required)
Expand Down
3 changes: 2 additions & 1 deletion indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
if !ok {
continue
}
pruned++

key, err := heightKey(h)
if err != nil {
logger.Debug("pruning block indexer getting height key", "err", err)
Expand All @@ -574,7 +576,6 @@ func (idx *BlockerIndexer) pruneBlocks(from, to uint64, logger log.Logger) (uint
logger.Debug("pruning block indexer events", "err", err)
continue
}
pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMempoolDirectly(t *testing.T) {
BlockTime: 1 * time.Second,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 100000,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down
Loading
Loading