Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): max skew based on time instead of batches #1140

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,12 @@ func (m *Manager) syncFromSettlement() error {
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
m.State.LastSubmittedBlockTime = res.BlockDescriptors[len(res.BlockDescriptors)-1].Timestamp
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bytes"
"errors"
"fmt"
"time"

errorsmod "cosmossdk.io/errors"

Expand Down Expand Up @@ -65,6 +66,7 @@
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
s.SetHeight(0)
s.LastBlockTime = time.Now()
Fixed Show fixed Hide fixed
copy(s.AppHash[:], genDoc.AppHash)

err = s.SetRollappParamsFromGenesis(genDoc.AppState)
Expand Down Expand Up @@ -123,7 +125,7 @@
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

s.SetHeight(height)

s.LastBlockTime = time.Now()
Fixed Show fixed Hide fixed
if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
40 changes: 31 additions & 9 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.GetTimeSkew,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,8 +44,9 @@
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksTime func() time.Duration,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -60,7 +62,8 @@
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
skewTime := unsubmittedBlocksTime()
if maxBatchSkew < skewTime {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -79,14 +82,15 @@
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
Expand All @@ -96,15 +100,19 @@
case <-submitter.C:
}
pending := pendingBytes.Load()
skewTime := unsubmittedBlocksTime()

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime
skewTime := unsubmittedBlocksTime()

lastSubmissionIsRecent := skewTime < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand All @@ -125,7 +133,6 @@
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
Expand Down Expand Up @@ -237,6 +244,8 @@

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSubmittedHeight.Store(batch.EndHeight())
m.State.LastSubmittedBlockTime = time.Now()
Fixed Show fixed Hide fixed

return nil
}

Expand Down Expand Up @@ -273,6 +282,19 @@
return m.State.Height() - m.LastSubmittedHeight.Load()
}

func (m *Manager) GetTimeSkew() time.Duration {
currentBlock, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return time.Duration(0)
}
lastSubmittedBlock, err := m.Store.LoadBlock(m.LastSubmittedHeight.Load())
if err != nil {
return time.Duration(0)
}
return currentBlock.Header.GetTimestamp().Sub(lastSubmittedBlock.Header.GetTimestamp())

}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
Expand Down
14 changes: 9 additions & 5 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type testArgs struct {
nParallel int // number of instances to run in parallel
testDuration time.Duration // how long to run one instance of the test (should be short)
batchSkew uint64 // max number of batches to get ahead
batchSkew time.Duration // time between last block produced and submitted
batchBytes uint64 // max number of bytes in a batch
maxTime time.Duration // maximum time to wait before submitting submissions
submitTime time.Duration // how long it takes to submit a batch
Expand Down Expand Up @@ -71,9 +71,9 @@ func testSubmitLoopInner(
default:
}
// producer shall not get too far ahead
absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
nProduced := nProducedBytes.Load()
require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
//absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
//nProduced := nProducedBytes.Load()
//require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
}
}()
for {
Expand Down Expand Up @@ -114,7 +114,11 @@ func testSubmitLoopInner(
return pendingBlocks.Load()
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch)
skewTime := func() time.Duration {
return time.Duration(0)
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, skewTime, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand Down
2 changes: 1 addition & 1 deletion block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestSubmissionByTime(t *testing.T) {
managerConfig := config.BlockManagerConfig{
BlockTime: blockTime,
MaxIdleTime: 0,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
BatchSubmitTime: submitTimeout,
BatchSubmitBytes: 1000,
}
Expand Down
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type BlockManagerConfig struct {
// BatchSubmitMaxTime is how long should block manager wait for before submitting batch
BatchSubmitTime time.Duration `mapstructure:"batch_submit_time"`
// BatchSkew is the number of batches waiting to be submitted. Block production will be paused if this limit is reached.
BatchSkew uint64 `mapstructure:"max_batch_skew"`
BatchSkew time.Duration `mapstructure:"max_skew_time"`
// The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer.
BatchSubmitBytes uint64 `mapstructure:"batch_submit_bytes"`
}
Expand Down Expand Up @@ -159,9 +159,9 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("batch_submit_bytes must be positive")
}

if c.BatchSkew <= 0 {
return fmt.Errorf("max_batch_skew must be positive")
}
/*if c.BatchSkew < c.BatchSubmitTime {
return fmt.Errorf("max_batch_skew cannot be less than batch_submit_time %s", c.BatchSubmitTime)
}*/

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func DefaultConfig(home string) *NodeConfig {
MaxIdleTime: 3600 * time.Second,
MaxProofTime: 100 * time.Second,
BatchSubmitTime: 3600 * time.Second,
BatchSkew: 10,
BatchSkew: 200 * time.Millisecond,
BatchSubmitBytes: 500000,
},
SettlementLayer: "mock",
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
max_batch_skew = {{ .BlockManagerConfig.BatchSkew }}
max_skew_time = {{ .BlockManagerConfig.BatchSkew }}


# triggers to submit batch to DA and settlement (both required)
Expand Down
10 changes: 5 additions & 5 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestGenesisChunked(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestValidatorSetHandling(t *testing.T) {
BlockTime: 10 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
SettlementConfig: settlement.Config{
ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes),
Expand Down Expand Up @@ -873,7 +873,7 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -978,7 +978,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
}, key1, signingKey1, proxy.NewLocalClientCreator(app), genesis, log.TestingLogger(), mempool.NopMetrics())
Expand All @@ -994,7 +994,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
},
P2PConfig: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
Expand Down
2 changes: 1 addition & 1 deletion rpc/json/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func getRPC(t *testing.T) (*tmmocks.MockApplication, *client.Client) {
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 1 * time.Second,
MaxIdleTime: 0,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
BatchSubmitTime: 30 * time.Minute,
BatchSubmitBytes: 1000,
},
Expand Down
1 change: 1 addition & 0 deletions settlement/dymension/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func convertStateInfoToResultRetrieveBatch(stateInfo *rollapptypes.StateInfo) (*
MetaData: &settlement.BatchMetaData{
DA: daMetaData,
},
BlockDescriptors: stateInfo.BDs.BD,
}
return &settlement.ResultRetrieveBatch{
ResultBase: settlement.ResultBase{Code: settlement.StatusSuccess, StateIndex: stateInfo.StateInfoIndex.Index},
Expand Down
10 changes: 6 additions & 4 deletions settlement/settlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package settlement
import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/dymint/types/pb/dymensionxyz/dymension/rollapp"
"github.com/tendermint/tendermint/libs/pubsub"
)

Expand Down Expand Up @@ -33,10 +34,11 @@ type BatchMetaData struct {

type Batch struct {
// sequencer is the bech32-encoded address of the sequencer sent the update
Sequencer string
StartHeight uint64
EndHeight uint64
AppHashes [][32]byte
Sequencer string
StartHeight uint64
EndHeight uint64
AppHashes [][32]byte
BlockDescriptors []rollapp.BlockDescriptor
// MetaData about the batch in the DA layer
MetaData *BatchMetaData
}
Expand Down
2 changes: 1 addition & 1 deletion testutil/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func GetManagerConfig() config.BlockManagerConfig {
BlockTime: 100 * time.Millisecond,
BatchSubmitBytes: 1000000,
BatchSubmitTime: 30 * time.Minute,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
}
}

Expand Down
2 changes: 1 addition & 1 deletion testutil/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func CreateNode(isSequencer bool, blockManagerConfig *config.BlockManagerConfig,
BlockTime: 100 * time.Millisecond,
BatchSubmitTime: 60 * time.Second,
BatchSubmitBytes: 1000,
BatchSkew: 10,
BatchSkew: 24 * time.Hour,
}
}
nodeConfig.BlockManagerConfig = *blockManagerConfig
Expand Down
5 changes: 5 additions & 0 deletions types/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ var RollappPendingSubmissionsSkewBatches = promauto.NewGauge(prometheus.GaugeOpt
Help: "The number of batches which have been accumulated but not yet submitted.",
})

var RollappPendingSubmissionsSkewTimeHours = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_pending_submissions_skew_time",
Help: "Time between the last block produced and the last block submitted in hours.",
})

var RollappPendingSubmissionsSkewBytes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_pending_submissions_skew_bytes",
Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.",
Expand Down
3 changes: 3 additions & 0 deletions types/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (s *State) ToProto() (*pb.State, error) {
ChainId: s.ChainID,
InitialHeight: int64(s.InitialHeight),
LastBlockHeight: int64(s.Height()),
LastBlockTime: s.LastBlockTime,
SequencerSet: *seqsProto,
BaseHeight: s.BaseHeight,
ConsensusParams: s.ConsensusParams,
Expand Down Expand Up @@ -289,6 +290,8 @@ func (s *State) FromProto(other *pb.State) error {
copy(s.LastResultsHash[:], other.LastResultsHash)
copy(s.AppHash[:], other.AppHash)
s.RollappParams = other.RollappParams
s.LastBlockTime = other.LastBlockTime

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"sync/atomic"
"time"

// TODO(tzdybal): copy to local project?

Expand Down Expand Up @@ -42,6 +43,11 @@ type State struct {

// New rollapp parameters .
RollappParams dymint.RollappParams

// Last block time
LastBlockTime time.Time
// Last submitted block time
LastSubmittedBlockTime time.Time
}

func (s *State) IsGenesis() bool {
Expand Down Expand Up @@ -71,6 +77,10 @@ func (s *State) NextHeight() uint64 {
return s.Height() + 1
}

func (s *State) GetSkewTime() time.Duration {
return s.LastBlockTime.Sub(s.LastSubmittedBlockTime)
}

// SetRollappParamsFromGenesis sets the rollapp consensus params from genesis
func (s *State) SetRollappParamsFromGenesis(appState json.RawMessage) error {
var objmap map[string]json.RawMessage
Expand Down
Loading