diff --git a/mod/consensus/pkg/cometbft/service/abci.go b/mod/consensus/pkg/cometbft/service/abci.go index ac820c23d4..cc8cda64fe 100644 --- a/mod/consensus/pkg/cometbft/service/abci.go +++ b/mod/consensus/pkg/cometbft/service/abci.go @@ -43,7 +43,11 @@ import ( "github.com/sourcegraph/conc/iter" ) -//nolint:gocognit // todo fix. +var ( + errInvalidHeight = errors.New("invalid height") + errNilFinalizeBlockState = errors.New("finalizeBlockState is nil") +) + func (s *Service[LoggerT]) InitChain( _ context.Context, req *cmtabci.InitChainRequest, @@ -56,9 +60,6 @@ func (s *Service[LoggerT]) InitChain( ) } - // On a new chain, we consider the init chain block height as 0, even though - // req.InitialHeight is 1 by default. - initHeader := cmtproto.Header{ChainID: req.ChainId, Time: req.Time} s.logger.Info( "InitChain", "initialHeight", @@ -67,88 +68,81 @@ func (s *Service[LoggerT]) InitChain( req.ChainId, ) - // Set the initial height, which will be used to determine if we are - // proposing + // Set the initial height, which will be used to determine if we are proposing // or processing the first block or not. s.initialHeight = req.InitialHeight - if s.initialHeight == 0 { // If initial height is 0, set it to 1 + if s.initialHeight == 0 { s.initialHeight = 1 } - // if req.InitialHeight is > 1, then we set the initial version on all - // stores + // if req.InitialHeight is > 1, then we set the initial version on all stores if req.InitialHeight > 1 { - initHeader.Height = req.InitialHeight if err := s.sm.CommitMultiStore(). SetInitialVersion(req.InitialHeight); err != nil { return nil, err } } - s.setState(execModeFinalize) + s.finalizeBlockState = s.resetState() defer func() { // InitChain represents the state of the application BEFORE the first // block, i.e. the genesis block. This means that when processing the // app's InitChain handler, the block height is zero by default. - // However, after Commit is called - // the height needs to reflect the true block height. - initHeader.Height = req.InitialHeight + // However, after genesis is handled the height needs to reflect + // the true block height. + initHeader := cmtproto.Header{ + ChainID: req.ChainId, + Time: req.Time, + Height: req.InitialHeight, + } s.finalizeBlockState.SetContext( s.finalizeBlockState.Context().WithBlockHeader(initHeader), ) }() - if s.finalizeBlockState == nil { - return nil, errors.New("finalizeBlockState is nil") - } - - res, err := s.initChainer(s.finalizeBlockState.Context(), req) + resValidators, err := s.initChainer( + s.finalizeBlockState.Context(), + req.AppStateBytes, + ) if err != nil { return nil, err } - if res == nil { - return nil, errors.New( - "application init chain handler returned a nil response", - ) - } - + // check validators if len(req.Validators) > 0 { - if len(req.Validators) != len(res.Validators) { + if len(req.Validators) != len(resValidators) { return nil, fmt.Errorf( "len(RequestInitChain.Validators) != len(GenesisValidators) (%d != %d)", len(req.Validators), - len(res.Validators), + len(resValidators), ) } sort.Sort(cmtabci.ValidatorUpdates(req.Validators)) - for i := range res.Validators { - if req.Validators[i].Power != res.Validators[i].Power { + for i := range resValidators { + if req.Validators[i].Power != resValidators[i].Power { return nil, errors.New("mismatched power") } if !bytes.Equal( - req.Validators[i].PubKeyBytes, res.Validators[i]. + req.Validators[i].PubKeyBytes, resValidators[i]. PubKeyBytes) { return nil, errors.New("mismatched pubkey bytes") } - if req. - Validators[i].PubKeyType != res. - Validators[i].PubKeyType { + if req.Validators[i].PubKeyType != + resValidators[i].PubKeyType { return nil, errors.New("mismatched pubkey types") } } } - // NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts - // from + // NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts from // this FinalizeBlockState. return &cmtabci.InitChainResponse{ - ConsensusParams: res.ConsensusParams, - Validators: res.Validators, + ConsensusParams: req.ConsensusParams, + Validators: resValidators, AppHash: s.sm.CommitMultiStore().LastCommitID().Hash, }, nil } @@ -156,10 +150,10 @@ func (s *Service[LoggerT]) InitChain( // InitChainer initializes the chain. func (s *Service[LoggerT]) initChainer( ctx sdk.Context, - req *cmtabci.InitChainRequest, -) (*cmtabci.InitChainResponse, error) { + appStateBytes []byte, +) ([]cmtabci.ValidatorUpdate, error) { var genesisState map[string]json.RawMessage - if err := json.Unmarshal(req.AppStateBytes, &genesisState); err != nil { + if err := json.Unmarshal(appStateBytes, &genesisState); err != nil { return nil, err } valUpdates, err := s.Middleware.InitGenesis( @@ -170,17 +164,10 @@ func (s *Service[LoggerT]) initChainer( return nil, err } - convertedValUpdates, err := iter.MapErr( + return iter.MapErr( valUpdates, convertValidatorUpdate[cmtabci.ValidatorUpdate], ) - if err != nil { - return nil, err - } - - return &cmtabci.InitChainResponse{ - Validators: convertedValUpdates, - }, nil } func (s *Service[LoggerT]) Info( @@ -212,13 +199,18 @@ func (s *Service[LoggerT]) PrepareProposal( _ context.Context, req *cmtabci.PrepareProposalRequest, ) (*cmtabci.PrepareProposalResponse, error) { - s.setState(execModePrepareProposal) - // CometBFT must never call PrepareProposal with a height of 0. if req.Height < 1 { - return nil, errors.New("PrepareProposal called with invalid height") + return nil, fmt.Errorf( + "prepareProposal at height %v: %w", + req.Height, + errInvalidHeight, + ) } + // Always reset state given that PrepareProposal can timeout + // and be called again in a subsequent round. + s.prepareProposalState = s.resetState() s.prepareProposalState.SetContext( s.getContextForProposal( s.prepareProposalState.Context(), @@ -260,20 +252,21 @@ func (s *Service[LoggerT]) ProcessProposal( ) (*cmtabci.ProcessProposalResponse, error) { // CometBFT must never call ProcessProposal with a height of 0. if req.Height < 1 { - return nil, errors.New("ProcessProposal called with invalid height") + return nil, fmt.Errorf( + "processProposal at height %v: %w", + req.Height, + errInvalidHeight, + ) } - s.setState(execModeProcessProposal) - - // Since the application can get access to FinalizeBlock state and write to - // it, we must be sure to reset it in case ProcessProposal timeouts and is - // called + // Since the application can get access to FinalizeBlock state and write to it, + // we must be sure to reset it in case ProcessProposal timeouts and is called // again in a subsequent round. However, we only want to do this after we've - // processed the first block, as we want to avoid overwriting the - // finalizeState + // processed the first block, as we want to avoid overwriting the finalizeState // after state changes during InitChain. + s.processProposalState = s.resetState() if req.Height > s.initialHeight { - s.setState(execModeFinalize) + s.finalizeBlockState = s.resetState() } s.processProposalState.SetContext( @@ -308,27 +301,17 @@ func (s *Service[LoggerT]) ProcessProposal( } func (s *Service[LoggerT]) internalFinalizeBlock( - ctx context.Context, req *cmtabci.FinalizeBlockRequest, ) (*cmtabci.FinalizeBlockResponse, error) { if err := s.validateFinalizeBlockHeight(req); err != nil { return nil, err } + // finalizeBlockState should be set on InitChain or ProcessProposal. If it is + // nil, it means we are replaying this block and we need to set the state here + // given that during block replay ProcessProposal is not executed by CometBFT. if s.finalizeBlockState == nil { - s.setState(execModeFinalize) - } - if s.finalizeBlockState == nil { - return nil, errors.New("finalizeBlockState is nil") - } - - // First check for an abort signal after beginBlock, as it's the first place - // we spend any significant amount of time. - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue + s.finalizeBlockState = s.resetState() } // Iterate over all raw transactions in the proposal and attempt to execute @@ -338,14 +321,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock( // vote extensions, so skip those. txResults := make([]*cmtabci.ExecTxResult, 0, len(req.Txs)) for range req.Txs { - // check after every tx if we should abort - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue - } - //nolint:mnd // its okay for now. txResults = append(txResults, &cmtabci.ExecTxResult{ Codespace: "sdk", @@ -372,15 +347,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock( return nil, err } - // check after finalizeBlock if we should abort, to avoid propagating the - // result - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue - } - return &cmtabci.FinalizeBlockResponse{ TxResults: txResults, ValidatorUpdates: valUpdates, @@ -392,7 +358,11 @@ func (s *Service[_]) validateFinalizeBlockHeight( req *cmtabci.FinalizeBlockRequest, ) error { if req.Height < 1 { - return fmt.Errorf("invalid height: %d", req.Height) + return fmt.Errorf( + "finalizeBlock at height %v: %w", + req.Height, + errInvalidHeight, + ) } lastBlockHeight := s.LastBlockHeight() @@ -428,7 +398,7 @@ func (s *Service[_]) FinalizeBlock( _ context.Context, req *cmtabci.FinalizeBlockRequest, ) (*cmtabci.FinalizeBlockResponse, error) { - res, err := s.internalFinalizeBlock(context.Background(), req) + res, err := s.internalFinalizeBlock(req) if res != nil { res.AppHash = s.workingHash() } @@ -447,7 +417,9 @@ func (s *Service[LoggerT]) Commit( context.Context, *cmtabci.CommitRequest, ) (*cmtabci.CommitResponse, error) { if s.finalizeBlockState == nil { - return nil, errors.New("finalizeBlockState is nil") + // This is unexpected since CometBFT should call Commit only + // after FinalizeBlock has been called. Panic appeases nilaway. + panic(fmt.Errorf("commit: %w", errNilFinalizeBlockState)) } header := s.finalizeBlockState.Context().BlockHeader() retainHeight := s.GetBlockRetentionHeight(header.Height) @@ -456,16 +428,13 @@ func (s *Service[LoggerT]) Commit( if ok { rms.SetCommitHeader(header) } - s.sm.CommitMultiStore().Commit() - resp := &cmtabci.CommitResponse{ - RetainHeight: retainHeight, - } - s.finalizeBlockState = nil - return resp, nil + return &cmtabci.CommitResponse{ + RetainHeight: retainHeight, + }, nil } // workingHash gets the apphash that will be finalized in commit. @@ -478,11 +447,12 @@ func (s *Service[LoggerT]) Commit( func (s *Service[LoggerT]) workingHash() []byte { // Write the FinalizeBlock state into branched storage and commit the // MultiStore. The write to the FinalizeBlock state writes all state - // transitions to the root - // MultiStore (s.sm.CommitMultiStore()) so when Commit() is called it - // persists those values. + // transitions to the root MultiStore (s.sm.CommitMultiStore()) + // so when Commit() is called it persists those values. if s.finalizeBlockState == nil { - panic("workingHash() called before FinalizeBlock()") + // this is unexpected since workingHash is called only after + // internalFinalizeBlock. Panic appeases nilaway. + panic(fmt.Errorf("workingHash: %w", errNilFinalizeBlockState)) } s.finalizeBlockState.ms.Write() @@ -505,14 +475,16 @@ func (s *Service[LoggerT]) getContextForProposal( ctx sdk.Context, height int64, ) sdk.Context { - if height == s.initialHeight { - if s.finalizeBlockState == nil { - return ctx - } - ctx, _ = s.finalizeBlockState.Context().CacheContext() + if height != s.initialHeight { return ctx } + if s.finalizeBlockState == nil { + // this is unexpected since cometBFT won't call PrepareProposal + // on initialHeight. Panic appeases nilaway. + panic(fmt.Errorf("getContextForProposal: %w", errNilFinalizeBlockState)) + } + ctx, _ = s.finalizeBlockState.Context().CacheContext() return ctx } diff --git a/mod/consensus/pkg/cometbft/service/service.go b/mod/consensus/pkg/cometbft/service/service.go index 9e6357ab5b..f3acf90530 100644 --- a/mod/consensus/pkg/cometbft/service/service.go +++ b/mod/consensus/pkg/cometbft/service/service.go @@ -23,7 +23,6 @@ package cometbft import ( "context" "errors" - "fmt" storetypes "cosmossdk.io/store/types" servercmtlog "github.com/berachain/beacon-kit/mod/consensus/pkg/cometbft/service/log" @@ -43,16 +42,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -type ( - execMode uint8 -) - -const ( - execModePrepareProposal execMode = iota - execModeProcessProposal - execModeFinalize -) - const ( initialAppVersion uint64 = 0 appName string = "beacond" @@ -68,13 +57,27 @@ type Service[ sm *statem.Manager Middleware MiddlewareI + // prepareProposalState is used for PrepareProposal, which is set based on the + // previous block's state. This state is never committed. In case of multiple + // consensus rounds, the state is always reset to the previous block's state. prepareProposalState *state + + // processProposalState is used for ProcessProposal, which is set based on the + // previous block's state. This state is never committed. In case of multiple + // consensus rounds, the state is always reset to the previous block's state. processProposalState *state - finalizeBlockState *state - interBlockCache storetypes.MultiStorePersistentCache - paramStore *params.ConsensusParamsStore - initialHeight int64 - minRetainBlocks uint64 + + // finalizeBlockState is used for FinalizeBlock, which is set based on the + // previous block's state. This state is committed. finalizeBlockState is set + // on InitChain and FinalizeBlock and set to nil on Commit. + finalizeBlockState *state + + interBlockCache storetypes.MultiStorePersistentCache + paramStore *params.ConsensusParamsStore + + // initialHeight is the initial height at which we start the node + initialHeight int64 + minRetainBlocks uint64 chainID string } @@ -160,8 +163,6 @@ func (s *Service[_]) Close() error { _ = s.node.Stop() } - // Close s.db (opened by cosmos-sdk/server/start.go call to openDB) - s.logger.Info("Closing application.db") if err := s.sm.Close(); err != nil { errs = append(errs, err) @@ -213,26 +214,16 @@ func (s *Service[_]) setInterBlockCache( s.interBlockCache = cache } -func (s *Service[LoggerT]) setState(mode execMode) { +// resetState provides a fresh state which can be used to reset +// prepareProposal/processProposal/finalizeBlock State. +// A state is explicitly returned to avoid false positives from +// nilaway tool. +func (s *Service[LoggerT]) resetState() *state { ms := s.sm.CommitMultiStore().CacheMultiStore() - baseState := &state{ + return &state{ ms: ms, ctx: sdk.NewContext(ms, false, servercmtlog.WrapSDKLogger(s.logger)), } - - switch mode { - case execModePrepareProposal: - s.prepareProposalState = baseState - - case execModeProcessProposal: - s.processProposalState = baseState - - case execModeFinalize: - s.finalizeBlockState = baseState - - default: - panic(fmt.Sprintf("invalid runTxMode for setState: %d", mode)) - } } // convertValidatorUpdate abstracts the conversion of a