Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): max skew based on time instead of batches #1140

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstat
Votes: nil,
},
ByzantineValidators: nil,
ConsensusMessages: block.Data.ConsensusMessages,
//ConsensusMessages: block.Data.ConsensusMessages,
srene marked this conversation as resolved.
Show resolved Hide resolved
})
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func (m *Manager) syncFromSettlement() error {
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
Expand Down
5 changes: 4 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bytes"
"errors"
"fmt"
"time"

errorsmod "cosmossdk.io/errors"

Expand Down Expand Up @@ -65,6 +66,7 @@
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
s.SetHeight(0)
s.LastBlockTime = time.Now()
Fixed Show fixed Hide fixed
copy(s.AppHash[:], genDoc.AppHash)

err = s.SetRollappParamsFromGenesis(genDoc.AppState)
Expand Down Expand Up @@ -110,6 +112,7 @@
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))
s.LastSubmittedBlockTime = time.Now()
Fixed Show fixed Hide fixed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand this, why it happens after init chain?

Copy link
Contributor Author

@srene srene Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a way to initialize the LastSubmittedBlockTime to measure the gap between produced and submitted. this way the gap can be still measured when there is no submitted batch yet...

}

func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
Expand All @@ -123,7 +126,7 @@
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

s.SetHeight(height)

s.LastBlockTime = time.Now()
Fixed Show fixed Hide fixed
if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
24 changes: 15 additions & 9 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.State.GetSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,8 +44,9 @@
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
skewTime func() time.Duration,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -60,7 +62,7 @@
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
if maxBatchSkew < skewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -79,14 +81,15 @@
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
Expand All @@ -96,15 +99,17 @@
case <-submitter.C:
}
pending := pendingBytes.Load()

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime

lastSubmissionIsRecent := skewTime() < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand All @@ -125,7 +130,6 @@
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
Expand Down Expand Up @@ -237,6 +241,8 @@

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSubmittedHeight.Store(batch.EndHeight())
m.State.LastSubmittedBlockTime = time.Now()
Fixed Show fixed Hide fixed

return nil
}

Expand Down
41 changes: 22 additions & 19 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
type testArgs struct {
nParallel int // number of instances to run in parallel
testDuration time.Duration // how long to run one instance of the test (should be short)
batchSkew uint64 // max number of batches to get ahead
batchSkew time.Duration // time between last block produced and submitted
skewMargin time.Duration // skew margin allowed
batchBytes uint64 // max number of bytes in a batch
maxTime time.Duration // maximum time to wait before submitting submissions
submitTime time.Duration // how long it takes to submit a batch
Expand Down Expand Up @@ -59,9 +60,12 @@ func testSubmitLoopInner(
nProducedBytes := atomic.Uint64{} // tracking how many actual bytes have been produced but not submitted so far
producedBytesC := make(chan int) // producer sends on here, and can be blocked by not consuming from here

// the time of the last block produced or the last batch submitted or the last starting of the node
timeLastProgress := atomic.Int64{}
lastSubmittedBlockTime := time.Now()
lastProducedBlockTime := time.Now()

skewTime := func() time.Duration {
return lastProducedBlockTime.Sub(lastSubmittedBlockTime)
}
go func() { // simulate block production
go func() { // another thread to check system properties
for {
Expand All @@ -71,9 +75,7 @@ func testSubmitLoopInner(
default:
}
// producer shall not get too far ahead
absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
nProduced := nProducedBytes.Load()
require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
require.True(t, skewTime() < args.batchSkew+args.skewMargin, "last produced blocks time not less than maximum skew time", "produced block skew time", skewTime(), "max skew time", args.batchSkew)
}
}()
for {
Expand All @@ -82,39 +84,38 @@ func testSubmitLoopInner(
return
default:
}

time.Sleep(approx(args.produceTime))

if args.batchSkew <= skewTime() {
continue
}
nBytes := rand.Intn(args.produceBytes) // simulate block production
nProducedBytes.Add(uint64(nBytes))
producedBytesC <- nBytes
pendingBlocks.Add(1) // increase pending blocks to be submitted counter
lastProducedBlockTime = time.Now()

timeLastProgress.Store(time.Now().Unix())
}
}()

submitBatch := func(maxSize uint64) (uint64, error) { // mock the batch submission
time.Sleep(approx(args.submitTime))
if rand.Float64() < args.submissionHaltProbability {
time.Sleep(args.submissionHaltTime)
timeLastProgress.Store(time.Now().Unix()) // we have now recovered
}
consumed := rand.Intn(int(maxSize))
nProducedBytes.Add(^uint64(consumed - 1)) // subtract

timeLastProgressT := time.Unix(timeLastProgress.Load(), 0)
absoluteMax := int64(2 * float64(args.maxTime)) // allow some leeway for code execution. Tests may run on small boxes (GH actions)
timeSinceLast := time.Since(timeLastProgressT).Milliseconds()
require.True(t, timeSinceLast < absoluteMax, "too long since last update", "timeSinceLast", timeSinceLast, "max", absoluteMax)

pendingBlocks.Store(0) // no pending blocks to be submitted
timeLastProgress.Store(time.Now().Unix()) // we have submitted batch
lastSubmittedBlockTime = time.Now()

return uint64(consumed), nil
}
accumulatedBlocks := func() uint64 {
return pendingBlocks.Load()
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch)
block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, skewTime, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand All @@ -124,7 +125,8 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
testArgs{
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchSkew: 40 * time.Millisecond,
skewMargin: 5 * time.Millisecond,
batchBytes: 100,
maxTime: 10 * time.Millisecond,
submitTime: 2 * time.Millisecond,
Expand All @@ -133,7 +135,7 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
// a relatively long possibility of the submitter halting
// tests the case where we need to stop the producer getting too far ahead
submissionHaltTime: 50 * time.Millisecond,
submissionHaltProbability: 0.01,
submissionHaltProbability: 0.05,
},
)
}
Expand All @@ -145,7 +147,8 @@ func TestSubmitLoopTimer(t *testing.T) {
testArgs{
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchSkew: 150 * time.Millisecond,
skewMargin: 5 * time.Millisecond,
batchBytes: 100,
maxTime: 10 * time.Millisecond,
submitTime: 2 * time.Millisecond,
Expand Down
4 changes: 3 additions & 1 deletion block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestSubmissionByTime(t *testing.T) {
managerConfig := config.BlockManagerConfig{
BlockTime: blockTime,
MaxIdleTime: 0,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
BatchSubmitTime: submitTimeout,
BatchSubmitBytes: 1000,
}
Expand All @@ -253,6 +253,7 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.State.LastSubmittedBlockTime = time.Now()
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -325,6 +326,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.State.LastSubmittedBlockTime = time.Now()

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type BlockManagerConfig struct {
// BatchSubmitMaxTime is how long should block manager wait for before submitting batch
BatchSubmitTime time.Duration `mapstructure:"batch_submit_time"`
// BatchSkew is the number of batches waiting to be submitted. Block production will be paused if this limit is reached.
BatchSkew uint64 `mapstructure:"max_batch_skew"`
BatchSkew time.Duration `mapstructure:"max_skew_time"`
// The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer.
BatchSubmitBytes uint64 `mapstructure:"batch_submit_bytes"`
}
Expand Down Expand Up @@ -159,8 +159,8 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("batch_submit_bytes must be positive")
}

if c.BatchSkew <= 0 {
return fmt.Errorf("max_batch_skew must be positive")
if c.BatchSkew < c.BatchSubmitTime {
return fmt.Errorf("max_skew_time cannot be less than batch_submit_time. max_skew_time: %s batch_submit_time: %s", c.BatchSkew, c.BatchSubmitTime)
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestNodeConfig_Validate(t *testing.T) {
},
wantErr: assert.Error,
}, {
name: "max_batch_skew 0",
name: "max_skew_time 0",
malleate: func(nc *config.NodeConfig) {
nc.BlockManagerConfig.BatchSkew = 0
},
Expand Down Expand Up @@ -187,7 +187,7 @@ func fullNodeConfig() config.NodeConfig {
MaxIdleTime: 20 * time.Second,
MaxProofTime: 20 * time.Second,
BatchSubmitTime: 20 * time.Second,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
BatchSubmitBytes: 10000,
},
DAConfig: "da-config",
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func DefaultConfig(home string) *NodeConfig {
MaxIdleTime: 3600 * time.Second,
MaxProofTime: 100 * time.Second,
BatchSubmitTime: 3600 * time.Second,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
BatchSubmitBytes: 500000,
},
SettlementLayer: "mock",
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
max_batch_skew = {{ .BlockManagerConfig.BatchSkew }}
max_skew_time = "{{ .BlockManagerConfig.BatchSkew }}"


# triggers to submit batch to DA and settlement (both required)
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMempoolDirectly(t *testing.T) {
BlockTime: 1 * time.Second,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 100000,
BatchSkew: 10,
BatchSkew: 24 * 7 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down
1 change: 1 addition & 0 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message State {

SequencerSet sequencerSet = 18 [(gogoproto.nullable) = false];
RollappParams rollapp_params = 19 [(gogoproto.nullable) = false];
google.protobuf.Timestamp last_submitted_block_time = 20 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];

}

Expand Down
10 changes: 5 additions & 5 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestGenesisChunked(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestValidatorSetHandling(t *testing.T) {
BlockTime: 10 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
SettlementConfig: settlement.Config{
ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes),
Expand Down Expand Up @@ -873,7 +873,7 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -978,7 +978,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
}, key1, signingKey1, proxy.NewLocalClientCreator(app), genesis, log.TestingLogger(), mempool.NopMetrics())
Expand All @@ -994,7 +994,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
P2PConfig: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
Expand Down
Loading
Loading