Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added sequencer set polling and consensus msgs queue #1144

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ca7a5a4
temp commit
Sep 19, 2024
e382846
temp commit
Sep 23, 2024
c6639e7
add consensus messages to the block data
Sep 23, 2024
336e3ca
Execute block
Sep 23, 2024
9209ab6
previous proto gen
Sep 23, 2024
d08e81b
add consensus messages stream
Sep 24, 2024
45164f1
add some fixes
Sep 26, 2024
adb695c
goimports
Oct 4, 2024
f24ea2f
remove nil check
Oct 4, 2024
0fa4c8d
goimports
Oct 4, 2024
4b061ec
remove line from proto
Oct 4, 2024
038a2a3
remove proto comment
Oct 4, 2024
130d86c
Merge branch 'main' into feat/consensus-messages
keruch Oct 4, 2024
102fa8e
merge commit
keruch Oct 4, 2024
dc2cafe
feat(executor): added MsgUpsertSequencer consensus message
keruch Oct 4, 2024
9f507a4
feat(executor): MsgUpsertSequecner
keruch Oct 7, 2024
d55070b
tests fix 1
keruch Oct 7, 2024
ef2a708
tests fix 2
keruch Oct 7, 2024
f357f7d
Updated cometbft version.
omritoptix Oct 8, 2024
08965fc
Merge branch 'feat/consensus-messages' into kirill/1248-sequencer-rew…
keruch Oct 9, 2024
0f93728
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 9, 2024
ae84159
linked tasks
keruch Oct 9, 2024
6b340dd
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 14, 2024
ab8d8d2
feat(proto): added hub and rdk x/sequencer
keruch Oct 14, 2024
fae24da
feat(proto): added hub and rdk x/sequencer
keruch Oct 14, 2024
cffbeab
Merge branch 'kirill/1083-proto' into kirill/1248-sequencer-reward-addr
keruch Oct 14, 2024
05b4930
fix build
keruch Oct 15, 2024
0fea268
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 15, 2024
e3fc776
threads
keruch Oct 16, 2024
3a7aebd
threads 1
keruch Oct 16, 2024
c1b5049
feat: added sequencer set polling and consensus msgs queue
keruch Oct 16, 2024
d0d0076
completed polling
keruch Oct 17, 2024
327d493
tests
keruch Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion block/consensus.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,60 @@
package block

import (
"fmt"
"sync"

"github.com/gogo/protobuf/proto"

"github.com/dymensionxyz/dymint/types"
rdktypes "github.com/dymensionxyz/dymint/types/pb/rollapp/sequencers/types"
protoutils "github.com/dymensionxyz/dymint/utils/proto"
"github.com/dymensionxyz/dymint/utils/queue"
)

type ConsensusMessagesStream interface {
GetConsensusMessages() ([]proto.Message, error)
Add(...proto.Message)
Get() []proto.Message
}

type ConsensusMessagesQueue struct {
mu sync.Mutex
queue *queue.Queue[proto.Message]
}

func NewConsensusMessagesQueue() *ConsensusMessagesQueue {
return &ConsensusMessagesQueue{
mu: sync.Mutex{},
queue: queue.New[proto.Message](),
}
}

func (q *ConsensusMessagesQueue) Add(message ...proto.Message) {
q.mu.Lock()
defer q.mu.Unlock()
q.queue.Enqueue(message...)
}

func (q *ConsensusMessagesQueue) Get() []proto.Message {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.DequeueAll()
}

// ConsensusMsgsOnSequencerSetUpdate forms a list of consensus messages to handle the sequencer set update.
func ConsensusMsgsOnSequencerSetUpdate(newSequencers []types.Sequencer) ([]proto.Message, error) {
msgs := make([]proto.Message, 0, len(newSequencers))
for _, s := range newSequencers {
anyPK, err := s.AnyConsPubKey()
if err != nil {
return nil, fmt.Errorf("sequencer consensus public key: %w", err)
}
msgs = append(msgs, &rdktypes.ConsensusMsgUpsertSequencer{
Operator: s.SettlementAddress,
ConsPubKey: protoutils.CosmosToGogo(anyPK),
RewardAddr: s.RewardAddr,
Relayers: s.WhitelistedRelayers,
})
}
return msgs, nil
}
21 changes: 10 additions & 11 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

proto2 "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/types"

abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand Down Expand Up @@ -104,7 +103,8 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Vali
})
}

// CreateBlock reaps transactions from mempool and builds a block.
// CreateBlock reaps transactions from mempool and builds a block. Optionally, executes consensus messages that
// gets from the consensus messages stream or from the method args.
func (e *Executor) CreateBlock(
height uint64,
lastCommit *types.Commit,
Expand All @@ -115,14 +115,9 @@ func (e *Executor) CreateBlock(
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)

var consensusAnyMessages []*proto.Any
var consensusMsgs []proto2.Message
if e.consensusMessagesStream != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what situation is this nil? possibly redundnat validatoin?

consensusMessages, err := e.consensusMessagesStream.GetConsensusMessages()
if err != nil {
e.logger.Error("Failed to get consensus messages", "error", err)
}

consensusAnyMessages = fromProtoMsgSliceToAnySlice(consensusMessages)
consensusMsgs = e.consensusMessagesStream.Get()
}

block := &types.Block{
Expand All @@ -145,7 +140,7 @@ func (e *Executor) CreateBlock(
Txs: toDymintTxs(mempoolTxs),
IntermediateStateRoots: types.IntermediateStateRoots{RawRootsList: nil},
Evidence: types.EvidenceData{Evidence: nil},
ConsensusMessages: consensusAnyMessages,
ConsensusMessages: fromProtoMsgSliceToAnySlice(consensusMsgs...),
},
LastCommit: *lastCommit,
}
Expand Down Expand Up @@ -300,6 +295,10 @@ func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block
return err
}

func (e *Executor) GetConsensusMessagesStream() ConsensusMessagesStream {
return e.consensusMessagesStream
}

func toDymintTxs(txs tmtypes.Txs) types.Txs {
optiTxs := make(types.Txs, len(txs))
for i := range txs {
Expand Down Expand Up @@ -328,7 +327,7 @@ func fromProtoMsgToAny(msg proto2.Message) *proto.Any {
}
}

func fromProtoMsgSliceToAnySlice(msgs []proto2.Message) []*proto.Any {
func fromProtoMsgSliceToAnySlice(msgs ...proto2.Message) []*proto.Any {
result := make([]*proto.Any, len(msgs))
for i, msg := range msgs {
result[i] = fromProtoMsgToAny(msg)
Expand Down
10 changes: 7 additions & 3 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestCreateBlockWithConsensusMessages(t *testing.T) {

// Create a mock ConsensusMessagesStream
mockStream := &MockConsensusMessagesStream{}
mockStream.On("GetConsensusMessages").Return([]proto.Message{
mockStream.On("Get").Return([]proto.Message{
theMsg1,
theMsg2,
}, nil)
Expand Down Expand Up @@ -172,9 +172,13 @@ type MockConsensusMessagesStream struct {
mock.Mock
}

func (m *MockConsensusMessagesStream) GetConsensusMessages() ([]proto.Message, error) {
func (m *MockConsensusMessagesStream) Get() []proto.Message {
args := m.Called()
return args.Get(0).([]proto.Message), args.Error(1)
return args.Get(0).([]proto.Message)
}

func (m *MockConsensusMessagesStream) Add(msgs ...proto.Message) {
m.Called(msgs)
}

func TestApplyBlock(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewManager(
mempool,
proxyApp,
eventBus,
nil, // TODO add ConsensusMessagesStream
NewConsensusMessagesQueue(), // TODO properly specify ConsensusMessagesStream: https://github.com/dymensionxyz/dymint/issues/1125
logger,
)
if err != nil {
Expand Down Expand Up @@ -181,9 +181,6 @@ func (m *Manager) Start(ctx context.Context) error {
return m.PruningLoop(ctx)
})

// listen to new bonded sequencers events to add them in the sequencer set
go uevent.MustSubscribe(ctx, m.Pubsub, "newBondedSequencer", settlement.EventQueryNewBondedSequencer, m.UpdateSequencerSet, m.logger)

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
Expand Down Expand Up @@ -229,12 +226,18 @@ func (m *Manager) Start(ctx context.Context) error {
// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

// channel for sequencer set updates
sequencerSetUpdatesC := make(chan []types.Sequencer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can move all this polling logic and channel initialization to a different method to not bloat the Start?
related to my comment that not sure why the sequecer updates signal is in the block production loop


uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerSetUpdates(ctx, sequencerSetUpdatesC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- m.GetUnsubmittedBytes() // load unsubmitted bytes from previous run
return m.ProduceBlockLoop(ctx, bytesProducedC)
return m.ProduceBlockLoop(ctx, bytesProducedC, sequencerSetUpdatesC)
})
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorSequencerRotation(ctx, rotateSequencerC)
Expand Down Expand Up @@ -273,6 +276,7 @@ func (m *Manager) NextHeightToSubmit() uint64 {
}

// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
// The method modifies the state and is not thread-safe.
func (m *Manager) syncFromSettlement() error {
err := m.UpdateSequencerSetFromSL()
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@ import (
"time"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
uevent "github.com/dymensionxyz/dymint/utils/event"

tmed25519 "github.com/tendermint/tendermint/crypto/ed25519"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
uevent "github.com/dymensionxyz/dymint/utils/event"
)

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
// A signal will be sent to the bytesProduced channel for each block produced
// In this way it's possible to pause block production by not consuming the channel
func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) error {
func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int, sequencerSetUpdates <-chan []types.Sequencer) error {
m.logger.Info("Started block producer loop.")

ticker := time.NewTicker(m.Conf.BlockTime)
Expand All @@ -39,6 +37,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
select {
case <-ctx.Done():
return nil

case update := <-sequencerSetUpdates:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about the location of this logic in the produce block loop.
thinking if there is a better place. maybe it's own goroutine.

err := m.HandleSequencerSetUpdate(update)
if err != nil {
// occurs on serialization issues and shouldn't happen in practice
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
return err
}

case <-ticker.C:

// if empty blocks are configured to be enabled, and one is scheduled...
Expand Down Expand Up @@ -148,9 +155,11 @@ func (m *Manager) produceBlock(allowEmpty bool, nextProposerHash *[32]byte) (*ty
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
}

maxBlockDataSize := uint64(float64(m.Conf.BatchSubmitBytes) * types.MaxBlockSizeAdjustment)
proposerHashForBlock := [32]byte(m.State.Sequencers.ProposerHash())
// if nextProposerHash is set, we create a last block
var (
maxBlockDataSize = uint64(float64(m.Conf.BatchSubmitBytes) * types.MaxBlockSizeAdjustment)
proposerHashForBlock = [32]byte(m.State.Sequencers.ProposerHash())
)
// if nextProposerInfo is set, we create a last block
if nextProposerHash != nil {
maxBlockDataSize = 0
proposerHashForBlock = *nextProposerHash
Expand Down
Loading
Loading