Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(cometBFTService): simplified CometBFTService #2026

Merged
merged 20 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 81 additions & 109 deletions mod/consensus/pkg/cometbft/service/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ import (
"github.com/sourcegraph/conc/iter"
)

//nolint:gocognit // todo fix.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side-effect of InitChain changes

var (
errInvalidHeight = errors.New("invalid height")
errNilFinalizeBlockState = errors.New("finalizeBlockState is nil")
)

func (s *Service[LoggerT]) InitChain(
_ context.Context,
req *cmtabci.InitChainRequest,
Expand All @@ -56,9 +60,6 @@ func (s *Service[LoggerT]) InitChain(
)
}

// On a new chain, we consider the init chain block height as 0, even though
// req.InitialHeight is 1 by default.
initHeader := cmtproto.Header{ChainID: req.ChainId, Time: req.Time}
Copy link
Collaborator Author

@abi87 abi87 Sep 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initHeader is really used only in defer call, to prepare context for next block. Moved it to defer to simplify logic and reduce its scope. This is different from cosmos SDK BaseApp where the header is used to setup the state

s.logger.Info(
"InitChain",
"initialHeight",
Expand All @@ -67,99 +68,92 @@ func (s *Service[LoggerT]) InitChain(
req.ChainId,
)

// Set the initial height, which will be used to determine if we are
// proposing
// Set the initial height, which will be used to determine if we are proposing
// or processing the first block or not.
s.initialHeight = req.InitialHeight
if s.initialHeight == 0 { // If initial height is 0, set it to 1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non informative comment, dropped

if s.initialHeight == 0 {
s.initialHeight = 1
}

// if req.InitialHeight is > 1, then we set the initial version on all
// stores
// if req.InitialHeight is > 1, then we set the initial version on all stores
if req.InitialHeight > 1 {
initHeader.Height = req.InitialHeight
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not really needed since Height will be overwritten in defer and initHeader is only read in defer.

if err := s.sm.CommitMultiStore().
SetInitialVersion(req.InitialHeight); err != nil {
return nil, err
}
}

s.setState(execModeFinalize)
s.finalizeBlockState = s.resetState()

defer func() {
// InitChain represents the state of the application BEFORE the first
// block, i.e. the genesis block. This means that when processing the
// app's InitChain handler, the block height is zero by default.
// However, after Commit is called
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Commit is not really called for setting initial state

// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
// However, after genesis is handled the height needs to reflect
// the true block height.
initHeader := cmtproto.Header{
nidhi-singh02 marked this conversation as resolved.
Show resolved Hide resolved
ChainID: req.ChainId,
Time: req.Time,
Height: req.InitialHeight,
}
s.finalizeBlockState.SetContext(
s.finalizeBlockState.Context().WithBlockHeader(initHeader),
)
}()

if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
}
Comment on lines -102 to -104
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant check since s.setState(execModeFinalize) is called right before. We don't do these checks after setState is called for proposals.


res, err := s.initChainer(s.finalizeBlockState.Context(), req)
resValidators, err := s.initChainer(
s.finalizeBlockState.Context(),
req.AppStateBytes,
)
if err != nil {
return nil, err
}

if res == nil {
return nil, errors.New(
"application init chain handler returned a nil response",
)
}

// check validators
if len(req.Validators) > 0 {
Copy link
Collaborator Author

@abi87 abi87 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: this allows an empty validator set. I saw that cosmos sdk baseApp does the same. But should we? In which case we are fine having no validators defined at genesis? @ocnc @itsdevbear

if len(req.Validators) != len(res.Validators) {
if len(req.Validators) != len(resValidators) {
return nil, fmt.Errorf(
"len(RequestInitChain.Validators) != len(GenesisValidators) (%d != %d)",
len(req.Validators),
len(res.Validators),
len(resValidators),
abi87 marked this conversation as resolved.
Show resolved Hide resolved
)
}

sort.Sort(cmtabci.ValidatorUpdates(req.Validators))

for i := range res.Validators {
if req.Validators[i].Power != res.Validators[i].Power {
for i := range resValidators {
if req.Validators[i].Power != resValidators[i].Power {
return nil, errors.New("mismatched power")
}
if !bytes.Equal(
req.Validators[i].PubKeyBytes, res.Validators[i].
req.Validators[i].PubKeyBytes, resValidators[i].
PubKeyBytes) {
return nil, errors.New("mismatched pubkey bytes")
}

if req.
Validators[i].PubKeyType != res.
Validators[i].PubKeyType {
if req.Validators[i].PubKeyType !=
resValidators[i].PubKeyType {
return nil, errors.New("mismatched pubkey types")
}
}
}

// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts
// from
// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts from
// this FinalizeBlockState.
return &cmtabci.InitChainResponse{
ConsensusParams: res.ConsensusParams,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsensusParams is not set by the response generated in initChainer, so this effectively assigning default value of ConsensusParams.
Replaced with ConsensusParams from request, which I believe is what we need to do

Validators: res.Validators,
ConsensusParams: req.ConsensusParams,
Validators: resValidators,
AppHash: s.sm.CommitMultiStore().LastCommitID().Hash,
}, nil
}

// InitChainer initializes the chain.
func (s *Service[LoggerT]) initChainer(
ctx sdk.Context,
req *cmtabci.InitChainRequest,
) (*cmtabci.InitChainResponse, error) {
appStateBytes []byte,
) ([]cmtabci.ValidatorUpdate, error) {
var genesisState map[string]json.RawMessage
if err := json.Unmarshal(req.AppStateBytes, &genesisState); err != nil {
if err := json.Unmarshal(appStateBytes, &genesisState); err != nil {
return nil, err
}
valUpdates, err := s.Middleware.InitGenesis(
Expand All @@ -170,17 +164,10 @@ func (s *Service[LoggerT]) initChainer(
return nil, err
}

convertedValUpdates, err := iter.MapErr(
return iter.MapErr(
valUpdates,
convertValidatorUpdate[cmtabci.ValidatorUpdate],
)
if err != nil {
return nil, err
}

return &cmtabci.InitChainResponse{
Validators: convertedValUpdates,
}, nil
}

func (s *Service[LoggerT]) Info(
Expand Down Expand Up @@ -212,13 +199,18 @@ func (s *Service[LoggerT]) PrepareProposal(
_ context.Context,
req *cmtabci.PrepareProposalRequest,
) (*cmtabci.PrepareProposalResponse, error) {
s.setState(execModePrepareProposal)

// CometBFT must never call PrepareProposal with a height of 0.
if req.Height < 1 {
return nil, errors.New("PrepareProposal called with invalid height")
return nil, fmt.Errorf(
"prepareProposal at height %v: %w",
req.Height,
errInvalidHeight,
)
}

// Always reset state given that PrepareProposal can timeout
// and be called again in a subsequent round.
s.prepareProposalState = s.resetState()
s.prepareProposalState.SetContext(
s.getContextForProposal(
s.prepareProposalState.Context(),
Expand Down Expand Up @@ -260,20 +252,21 @@ func (s *Service[LoggerT]) ProcessProposal(
) (*cmtabci.ProcessProposalResponse, error) {
// CometBFT must never call ProcessProposal with a height of 0.
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
return nil, fmt.Errorf(
"processProposal at height %v: %w",
req.Height,
errInvalidHeight,
)
}

s.setState(execModeProcessProposal)

// Since the application can get access to FinalizeBlock state and write to
// it, we must be sure to reset it in case ProcessProposal timeouts and is
// called
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the
// finalizeState
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
s.processProposalState = s.resetState()
if req.Height > s.initialHeight {
s.setState(execModeFinalize)
s.finalizeBlockState = s.resetState()
}

s.processProposalState.SetContext(
Expand Down Expand Up @@ -308,27 +301,17 @@ func (s *Service[LoggerT]) ProcessProposal(
}

func (s *Service[LoggerT]) internalFinalizeBlock(
ctx context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
if err := s.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if s.finalizeBlockState == nil {
s.setState(execModeFinalize)
}
if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
}
Comment on lines -321 to -323
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant check since s.setState(execModeFinalize) is called right before. We don't do these checks after setState is called for proposals.


// First check for an abort signal after beginBlock, as it's the first place
// we spend any significant amount of time.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
Comment on lines -325 to -331
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useful in cosmos sdk baseApp but not here since we don't call delivarTx. Same applies below

s.finalizeBlockState = s.resetState()
}

// Iterate over all raw transactions in the proposal and attempt to execute
Expand All @@ -338,14 +321,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock(
// vote extensions, so skip those.
txResults := make([]*cmtabci.ExecTxResult, 0, len(req.Txs))
for range req.Txs {
// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

//nolint:mnd // its okay for now.
txResults = append(txResults, &cmtabci.ExecTxResult{
Codespace: "sdk",
Expand All @@ -372,15 +347,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock(
return nil, err
}

// check after finalizeBlock if we should abort, to avoid propagating the
// result
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

return &cmtabci.FinalizeBlockResponse{
TxResults: txResults,
ValidatorUpdates: valUpdates,
Expand All @@ -392,7 +358,11 @@ func (s *Service[_]) validateFinalizeBlockHeight(
req *cmtabci.FinalizeBlockRequest,
) error {
if req.Height < 1 {
return fmt.Errorf("invalid height: %d", req.Height)
return fmt.Errorf(
"finalizeBlock at height %v: %w",
req.Height,
errInvalidHeight,
)
}

lastBlockHeight := s.LastBlockHeight()
Expand Down Expand Up @@ -428,7 +398,7 @@ func (s *Service[_]) FinalizeBlock(
_ context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
res, err := s.internalFinalizeBlock(context.Background(), req)
res, err := s.internalFinalizeBlock(req)
if res != nil {
res.AppHash = s.workingHash()
}
Expand All @@ -447,7 +417,9 @@ func (s *Service[LoggerT]) Commit(
context.Context, *cmtabci.CommitRequest,
) (*cmtabci.CommitResponse, error) {
if s.finalizeBlockState == nil {
return nil, errors.New("finalizeBlockState is nil")
// This is unexpected since CometBFT should call Commit only
// after FinalizeBlock has been called. Panic appeases nilaway.
panic(fmt.Errorf("commit: %w", errNilFinalizeBlockState))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CometBFT calls ordering guarantees that this cannot happen. CosmosSDK baseApp, which is our model here, does not do this check, but we need to appease nilaway.
Turning this into a panic, as we do with workingHash, since I believe it's a similar situation

}
header := s.finalizeBlockState.Context().BlockHeader()
retainHeight := s.GetBlockRetentionHeight(header.Height)
Expand All @@ -456,16 +428,13 @@ func (s *Service[LoggerT]) Commit(
if ok {
rms.SetCommitHeader(header)
}

s.sm.CommitMultiStore().Commit()

resp := &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}

s.finalizeBlockState = nil

return resp, nil
return &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}, nil
}

// workingHash gets the apphash that will be finalized in commit.
Expand All @@ -478,11 +447,12 @@ func (s *Service[LoggerT]) Commit(
func (s *Service[LoggerT]) workingHash() []byte {
// Write the FinalizeBlock state into branched storage and commit the
// MultiStore. The write to the FinalizeBlock state writes all state
// transitions to the root
// MultiStore (s.sm.CommitMultiStore()) so when Commit() is called it
// persists those values.
// transitions to the root MultiStore (s.sm.CommitMultiStore())
// so when Commit() is called it persists those values.
if s.finalizeBlockState == nil {
panic("workingHash() called before FinalizeBlock()")
// this is unexpected since workingHash is called only after
// internalFinalizeBlock. Panic appeases nilaway.
panic(fmt.Errorf("workingHash: %w", errNilFinalizeBlockState))
abi87 marked this conversation as resolved.
Show resolved Hide resolved
}
s.finalizeBlockState.ms.Write()

Expand All @@ -505,14 +475,16 @@ func (s *Service[LoggerT]) getContextForProposal(
ctx sdk.Context,
height int64,
) sdk.Context {
if height == s.initialHeight {
if s.finalizeBlockState == nil {
return ctx
}
ctx, _ = s.finalizeBlockState.Context().CacheContext()
if height != s.initialHeight {
return ctx
}

if s.finalizeBlockState == nil {
// this is unexpected since cometBFT won't call PrepareProposal
// on initialHeight. Panic appeases nilaway.
panic(fmt.Errorf("getContextForProposal: %w", errNilFinalizeBlockState))
}
ctx, _ = s.finalizeBlockState.Context().CacheContext()
return ctx
}

Expand Down
Loading
Loading