Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): added MsgUpsertSequencer consensus message #1120

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ca7a5a4
temp commit
Sep 19, 2024
e382846
temp commit
Sep 23, 2024
c6639e7
add consensus messages to the block data
Sep 23, 2024
336e3ca
Execute block
Sep 23, 2024
9209ab6
previous proto gen
Sep 23, 2024
d08e81b
add consensus messages stream
Sep 24, 2024
45164f1
add some fixes
Sep 26, 2024
adb695c
goimports
Oct 4, 2024
f24ea2f
remove nil check
Oct 4, 2024
0fa4c8d
goimports
Oct 4, 2024
4b061ec
remove line from proto
Oct 4, 2024
038a2a3
remove proto comment
Oct 4, 2024
130d86c
Merge branch 'main' into feat/consensus-messages
keruch Oct 4, 2024
102fa8e
merge commit
keruch Oct 4, 2024
dc2cafe
feat(executor): added MsgUpsertSequencer consensus message
keruch Oct 4, 2024
9f507a4
feat(executor): MsgUpsertSequecner
keruch Oct 7, 2024
d55070b
tests fix 1
keruch Oct 7, 2024
ef2a708
tests fix 2
keruch Oct 7, 2024
f357f7d
Updated cometbft version.
omritoptix Oct 8, 2024
08965fc
Merge branch 'feat/consensus-messages' into kirill/1248-sequencer-rew…
keruch Oct 9, 2024
0f93728
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 9, 2024
ae84159
linked tasks
keruch Oct 9, 2024
6b340dd
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 14, 2024
ab8d8d2
feat(proto): added hub and rdk x/sequencer
keruch Oct 14, 2024
fae24da
feat(proto): added hub and rdk x/sequencer
keruch Oct 14, 2024
cffbeab
Merge branch 'kirill/1083-proto' into kirill/1248-sequencer-reward-addr
keruch Oct 14, 2024
05b4930
fix build
keruch Oct 15, 2024
0fea268
Merge branch 'main' into kirill/1248-sequencer-reward-addr
keruch Oct 15, 2024
e3fc776
threads
keruch Oct 16, 2024
3a7aebd
threads 1
keruch Oct 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions block/consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package block

import (
"github.com/gogo/protobuf/proto"
)

type ConsensusMessagesStream interface {
GetConsensusMessages() ([]proto.Message, error)
}
80 changes: 65 additions & 15 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"time"

proto2 "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/types"
abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand All @@ -21,11 +23,12 @@ const minBlockMaxBytes = 10000

// Executor creates and applies blocks and maintains state.
type Executor struct {
localAddress []byte
chainID string
proxyAppConsensusConn proxy.AppConnConsensus
proxyAppQueryConn proxy.AppConnQuery
mempool mempool.Mempool
localAddress []byte
chainID string
proxyAppConsensusConn proxy.AppConnConsensus
proxyAppQueryConn proxy.AppConnQuery
mempool mempool.Mempool
consensusMessagesStream ConsensusMessagesStream

eventBus *tmtypes.EventBus

Expand All @@ -34,15 +37,24 @@ type Executor struct {

// NewExecutor creates new instance of BlockExecutor.
// localAddress will be used in sequencer mode only.
func NewExecutor(localAddress []byte, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger types.Logger) (*Executor, error) {
func NewExecutor(
localAddress []byte,
chainID string,
mempool mempool.Mempool,
proxyApp proxy.AppConns,
eventBus *tmtypes.EventBus,
consensusMessagesStream ConsensusMessagesStream,
logger types.Logger,
) (*Executor, error) {
be := Executor{
localAddress: localAddress,
chainID: chainID,
proxyAppConsensusConn: proxyApp.Consensus(),
proxyAppQueryConn: proxyApp.Query(),
mempool: mempool,
eventBus: eventBus,
logger: logger,
localAddress: localAddress,
chainID: chainID,
proxyAppConsensusConn: proxyApp.Consensus(),
proxyAppQueryConn: proxyApp.Query(),
mempool: mempool,
eventBus: eventBus,
consensusMessagesStream: consensusMessagesStream,
logger: logger,
}
return &be, nil
}
Expand Down Expand Up @@ -91,11 +103,27 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Vali
})
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash, nextSeqHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block {
// CreateBlock reaps transactions from mempool and builds a block. Optionally, executes consensus messages that
// gets from the consensus messages stream or from the method args.
func (e *Executor) CreateBlock(
height uint64,
lastCommit *types.Commit,
lastHeaderHash, nextSeqHash [32]byte,
state *types.State,
maxBlockDataSizeBytes uint64,
consensusMsgs ...proto2.Message,
) *types.Block {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)

if e.consensusMessagesStream != nil {
consensusMessages, err := e.consensusMessagesStream.GetConsensusMessages()
if err != nil {
e.logger.Error("Failed to get consensus messages", "error", err)
}
consensusMsgs = append(consensusMsgs, consensusMessages...)
}

block := &types.Block{
Header: types.Header{
Version: types.Version{
Expand All @@ -116,6 +144,7 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
Txs: toDymintTxs(mempoolTxs),
IntermediateStateRoots: types.IntermediateStateRoots{RawRootsList: nil},
Evidence: types.EvidenceData{Evidence: nil},
ConsensusMessages: fromProtoMsgSliceToAnySlice(consensusMsgs...),
},
LastCommit: *lastCommit,
}
Expand Down Expand Up @@ -209,6 +238,7 @@ func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstat
Votes: nil,
},
ByzantineValidators: nil,
ConsensusMessages: block.Data.ConsensusMessages,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -284,3 +314,23 @@ func fromDymintTxs(optiTxs types.Txs) tmtypes.Txs {
}
return txs
}

func fromProtoMsgToAny(msg proto2.Message) *proto.Any {
theType, err := proto2.Marshal(msg)
if err != nil {
return nil
}

return &proto.Any{
TypeUrl: proto2.MessageName(msg),
Value: theType,
}
}

func fromProtoMsgSliceToAnySlice(msgs ...proto2.Message) []*proto.Any {
result := make([]*proto.Any, len(msgs))
for i, msg := range msgs {
result[i] = fromProtoMsgToAny(msg)
}
return result
}
95 changes: 93 additions & 2 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/groupcache/testpb"

"github.com/dymensionxyz/dymint/block"

cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
Expand Down Expand Up @@ -47,7 +51,7 @@ func TestCreateBlock(t *testing.T) {
require.NotNil(abciClient)

mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, nil, logger)
assert.NoError(err)

maxBytes := uint64(100)
Expand Down Expand Up @@ -86,6 +90,93 @@ func TestCreateBlock(t *testing.T) {
assert.Len(block.Data.Txs, 2)
}

func TestCreateBlockWithConsensusMessages(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logger := log.TestingLogger()
app := &tmmocks.MockApplication{}
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
clientCreator := proxy.NewLocalClientCreator(app)
abciClient, err := clientCreator.NewABCIClient()
require.NoError(err)
require.NotNil(clientCreator)
require.NotNil(abciClient)
mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)

name, city := "test1", ""
theMsg1 := &testpb.TestMessage{
Name: &name,
City: &city,
}

name, city = "test2", ""
theMsg2 := &testpb.TestMessage{
Name: &name,
City: &city,
}

// Create a mock ConsensusMessagesStream
mockStream := &MockConsensusMessagesStream{}
mockStream.On("GetConsensusMessages").Return([]proto.Message{
theMsg1,
theMsg2,
}, nil)

executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, mockStream, logger)
assert.NoError(err)

maxBytes := uint64(1000)
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000

block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()[:]), state, maxBytes)

require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.ConsensusMessages, 2)

// Verify the content of ConsensusMessages
theType, err := proto.Marshal(theMsg1)
require.NoError(err)

anyMsg1 := &prototypes.Any{
TypeUrl: proto.MessageName(theMsg1),
Value: theType,
}
require.NoError(err)

theType, err = proto.Marshal(theMsg2)
require.NoError(err)

anyMsg2 := &prototypes.Any{
TypeUrl: proto.MessageName(theMsg2),
Value: theType,
}
require.NoError(err)

assert.True(proto.Equal(anyMsg1, block.Data.ConsensusMessages[0]))
assert.True(proto.Equal(anyMsg2, block.Data.ConsensusMessages[1]))

mockStream.AssertExpectations(t)
}

// MockConsensusMessagesStream is a mock implementation of ConsensusMessagesStream
type MockConsensusMessagesStream struct {
mock.Mock
}

func (m *MockConsensusMessagesStream) GetConsensusMessages() ([]proto.Message, error) {
args := m.Called()
return args.Get(0).([]proto.Message), args.Error(1)
}

func TestApplyBlock(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down Expand Up @@ -138,7 +229,7 @@ func TestApplyBlock(t *testing.T) {
appConns := &tmmocksproxy.MockAppConns{}
appConns.On("Consensus").Return(abciClient)
appConns.On("Query").Return(abciClient)
executor, err := block.NewExecutor([]byte("test address"), chainID, mpool, appConns, eventBus, logger)
executor, err := block.NewExecutor([]byte("test address"), chainID, mpool, appConns, eventBus, nil, logger)
assert.NoError(err)

// Subscribe to tx events
Expand Down
10 changes: 9 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ func NewManager(
if err != nil {
return nil, err
}
exec, err := NewExecutor(localAddress, genesis.ChainID, mempool, proxyApp, eventBus, logger)
exec, err := NewExecutor(
localAddress,
genesis.ChainID,
mempool,
proxyApp,
eventBus,
nil, // TODO add ConsensusMessagesStream
logger,
)
if err != nil {
return nil, fmt.Errorf("create block executor: %w", err)
}
Expand Down
Loading
Loading