From 7860b3858cf5dd0eeb621dda3865905b88343f6b Mon Sep 17 00:00:00 2001 From: bylingo <790653865@qq.com> Date: Wed, 2 Aug 2023 20:22:51 +0800 Subject: [PATCH 1/3] modify SimTx SimGasUsed log print and add AVC-PResMsg --- libs/tendermint/consensus/consensus_commit.go | 10 +++++- .../tendermint/consensus/consensus_propose.go | 35 ++++++++++++++++++- libs/tendermint/consensus/reactor.go | 9 +++-- libs/tendermint/mempool/clist_mempool.go | 23 ++++++++++-- libs/tendermint/state/execution.go | 9 +++++ 5 files changed, 79 insertions(+), 7 deletions(-) diff --git a/libs/tendermint/consensus/consensus_commit.go b/libs/tendermint/consensus/consensus_commit.go index 6f4ca9a7b..6e1778f28 100644 --- a/libs/tendermint/consensus/consensus_commit.go +++ b/libs/tendermint/consensus/consensus_commit.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/hex" "fmt" + "time" + "github.com/okx/okbchain/libs/iavl" iavlcfg "github.com/okx/okbchain/libs/iavl/config" "github.com/okx/okbchain/libs/system/trace" @@ -14,7 +16,6 @@ import ( sm "github.com/okx/okbchain/libs/tendermint/state" "github.com/okx/okbchain/libs/tendermint/types" tmtime "github.com/okx/okbchain/libs/tendermint/types/time" - "time" ) func (cs *State) dumpElapsed(trc *trace.Tracer, schema string) { @@ -256,6 +257,13 @@ func (cs *State) finalizeCommit(height int64) { return } + //Add trace.SimTx After ApplyBlock using isBLockProducer + isBlockProducer := cs.isBlockProducerAVC(height) + err = cs.blockExec.MempoolLogPgu(isBlockProducer) + if err != nil { + cs.Logger.Error("Failed to print PGU log from mempool", "Height", height, "err", err) + } + //reset offset after commitGap if iavl.EnableAsyncCommit && height%iavlcfg.DynamicConfig.GetCommitGapHeight() == iavl.GetFinalCommitGapOffset() { diff --git a/libs/tendermint/consensus/consensus_propose.go b/libs/tendermint/consensus/consensus_propose.go index 4ba90e73b..c545f5541 100644 --- a/libs/tendermint/consensus/consensus_propose.go +++ b/libs/tendermint/consensus/consensus_propose.go @@ -75,9 +75,41 @@ func (cs *State) isBlockProducer() (string, string) { return isBlockProducer, strings.ToLower(bpStr) } +// Used to determine if a node is still a block producer +// when an active view change event is triggered. +func (cs *State) isBlockProducerAVC(height int64) bool { + isStr, _ := cs.isBlockProducer() + if !GetActiveVC() { + if isStr == "y" { + return true + } else { + return false + } + } + + isBlockProducer := false + if cs.vcHeight[height] != "" && cs.Round == 0 { + avcStr := cs.vcHeight[height] + if cs.privValidator != nil && cs.privValidatorPubKey != nil { + address := cs.privValidatorPubKey.Address() + addrStr := address.String() + + if bytes.Equal([]byte(avcStr), []byte(addrStr)) { + isBlockProducer = true + } + } + } else if isStr == "y" { + isBlockProducer = true + } + + return isBlockProducer +} + // Enter (CreateEmptyBlocks): from enterNewRound(height,round) // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): -// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval +// +// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval +// // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool func (cs *State) enterPropose(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) @@ -102,6 +134,7 @@ func (cs *State) enterPropose(height int64, round int) { newProposer = "-avc-" + cs.vcHeight[height][:6] } cs.stateMtx.RUnlock() + cs.trc.Pin("enterPropose-%d-%s-%s%s", round, isBlockProducer, bpAddr, newProposer) logger.Info(fmt.Sprintf("enterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) diff --git a/libs/tendermint/consensus/reactor.go b/libs/tendermint/consensus/reactor.go index 0015c2668..b81e7410a 100644 --- a/libs/tendermint/consensus/reactor.go +++ b/libs/tendermint/consensus/reactor.go @@ -3,12 +3,13 @@ package consensus import ( "bytes" "fmt" - "github.com/okx/okbchain/libs/tendermint/crypto" - "github.com/okx/okbchain/libs/tendermint/libs/automation" "reflect" "sync" "time" + "github.com/okx/okbchain/libs/tendermint/crypto" + "github.com/okx/okbchain/libs/tendermint/libs/automation" + "github.com/pkg/errors" amino "github.com/tendermint/go-amino" @@ -380,6 +381,10 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { conR.conS.peerMsgQueue <- msgInfo{msg, ""} case *ProposeResponseMessage: conR.conS.peerMsgQueue <- msgInfo{msg, ""} + conR.conS.stateMtx.Lock() + defer conR.conS.stateMtx.Unlock() + conR.conS.vcHeight[msg.Height] = conR.conS.privValidatorPubKey.Address().String() + conR.conS.trc.Pin("AVC-PResMsg-%s", conR.conS.privValidatorPubKey.Address().String()[:6]) case *ProposeRequestMessage: conR.conS.stateMtx.Lock() defer conR.conS.stateMtx.Unlock() diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 61371674e..75e846acf 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -1000,9 +1000,9 @@ func (mem *CListMempool) Update( if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses) } - trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d", mem.info.txCount)) - trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d", mem.info.gasUsed)) - mem.info.reset() + //trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d", mem.info.txCount)) + //trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d", mem.info.gasUsed)) + //mem.info.reset() // Set height atomic.StoreInt64(&mem.height, height) @@ -1246,6 +1246,23 @@ func (mem *CListMempool) GetConfig() *cfg.MempoolConfig { return mem.config } +func (mem *CListMempool) LogPgu(isBlockProducer bool) { + // no need to print pguInfo + if mem.config.Sealed { + return + } + + if isBlockProducer { + trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d", mem.info.txCount)) + trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d", mem.info.gasUsed)) + } else { + trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d", 0)) + trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d", 0)) + } + + mem.info.reset() +} + func MultiPriceBump(rawPrice *big.Int, priceBump int64) *big.Int { tmpPrice := new(big.Int).Div(rawPrice, big.NewInt(100)) inc := new(big.Int).Mul(tmpPrice, big.NewInt(priceBump)) diff --git a/libs/tendermint/state/execution.go b/libs/tendermint/state/execution.go index f53b6ff24..e9aabbecb 100644 --- a/libs/tendermint/state/execution.go +++ b/libs/tendermint/state/execution.go @@ -457,6 +457,15 @@ func (blockExec *BlockExecutor) commit( return res, res.RetainHeight, err } +func (blockExec *BlockExecutor) MempoolLogPgu(isBlockProducer bool) error { + clistmempool, ok := blockExec.mempool.(*mempl.CListMempool) + if !ok { + return fmt.Errorf("error on Printing PGU Log SimTx and SimGasUsed in mempool") + } + clistmempool.LogPgu(isBlockProducer) + return nil +} + func transTxsToBytes(txs types.Txs) [][]byte { ret := make([][]byte, 0) for _, v := range txs { From d1f5f27ffc55334941471a252c053f9c1a008f71 Mon Sep 17 00:00:00 2001 From: bylingo <790653865@qq.com> Date: Thu, 3 Aug 2023 15:55:00 +0800 Subject: [PATCH 2/3] add avcp in State --- libs/tendermint/consensus/consensus.go | 8 ++++++-- libs/tendermint/consensus/consensus_commit.go | 1 + .../consensus/consensus_main_routine.go | 9 ++++++--- .../tendermint/consensus/consensus_propose.go | 20 +++++-------------- libs/tendermint/consensus/reactor.go | 4 ---- libs/tendermint/mempool/clist_mempool.go | 3 --- 6 files changed, 18 insertions(+), 27 deletions(-) diff --git a/libs/tendermint/consensus/consensus.go b/libs/tendermint/consensus/consensus.go index 06c1aa08f..adf5a5f74 100644 --- a/libs/tendermint/consensus/consensus.go +++ b/libs/tendermint/consensus/consensus.go @@ -6,6 +6,9 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/spf13/viper" + "github.com/okx/okbchain/libs/system/trace" cfg "github.com/okx/okbchain/libs/tendermint/config" cstypes "github.com/okx/okbchain/libs/tendermint/consensus/types" @@ -16,8 +19,6 @@ import ( "github.com/okx/okbchain/libs/tendermint/p2p" sm "github.com/okx/okbchain/libs/tendermint/state" "github.com/okx/okbchain/libs/tendermint/types" - "github.com/pkg/errors" - "github.com/spf13/viper" ) //----------------------------------------------------------------------------- @@ -162,6 +163,8 @@ type State struct { vcMsg *ViewChangeMessage vcHeight map[int64]string + //AVC proposer + avcp bool preBlockTaskChan chan *preBlockTask taskResultChan chan *preBlockTaskRes @@ -206,6 +209,7 @@ func NewState( bt: &BlockTransport{}, blockTimeTrc: trace.NewTracer(trace.LastBlockTime), vcHeight: make(map[int64]string), + avcp: false, taskResultChan: make(chan *preBlockTaskRes, 1), preBlockTaskChan: make(chan *preBlockTask, 1), } diff --git a/libs/tendermint/consensus/consensus_commit.go b/libs/tendermint/consensus/consensus_commit.go index 6e1778f28..a738c9d01 100644 --- a/libs/tendermint/consensus/consensus_commit.go +++ b/libs/tendermint/consensus/consensus_commit.go @@ -409,6 +409,7 @@ func (cs *State) updateToState(state sm.State) { cs.LastValidators = state.LastValidators cs.TriggeredTimeoutPrecommit = false cs.state = state + cs.avcp = false // Finally, broadcast RoundState cs.newStep() diff --git a/libs/tendermint/consensus/consensus_main_routine.go b/libs/tendermint/consensus/consensus_main_routine.go index 4bc59535c..4958ee2b0 100644 --- a/libs/tendermint/consensus/consensus_main_routine.go +++ b/libs/tendermint/consensus/consensus_main_routine.go @@ -3,14 +3,15 @@ package consensus import ( "bytes" "fmt" + "reflect" + "runtime/debug" + "time" + cfg "github.com/okx/okbchain/libs/tendermint/config" cstypes "github.com/okx/okbchain/libs/tendermint/consensus/types" "github.com/okx/okbchain/libs/tendermint/libs/fail" "github.com/okx/okbchain/libs/tendermint/types" tmtime "github.com/okx/okbchain/libs/tendermint/types/time" - "reflect" - "runtime/debug" - "time" ) //----------------------------------------- @@ -117,6 +118,8 @@ func (cs *State) handleAVCProposal(proposal *types.Proposal) { part := res.blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } + cs.avcp = true + cs.trc.Pin("isAVCProposer") } // state transitions on complete-proposal, 2/3-any, 2/3-one diff --git a/libs/tendermint/consensus/consensus_propose.go b/libs/tendermint/consensus/consensus_propose.go index c545f5541..c94982eaa 100644 --- a/libs/tendermint/consensus/consensus_propose.go +++ b/libs/tendermint/consensus/consensus_propose.go @@ -86,19 +86,11 @@ func (cs *State) isBlockProducerAVC(height int64) bool { return false } } - + // determine producer when avc happen isBlockProducer := false - if cs.vcHeight[height] != "" && cs.Round == 0 { - avcStr := cs.vcHeight[height] - if cs.privValidator != nil && cs.privValidatorPubKey != nil { - address := cs.privValidatorPubKey.Address() - addrStr := address.String() - - if bytes.Equal([]byte(avcStr), []byte(addrStr)) { - isBlockProducer = true - } - } - } else if isStr == "y" { + if isStr == "y" && cs.vcHeight[height] == "" { + isBlockProducer = true + } else if isStr == "n" && cs.avcp { isBlockProducer = true } @@ -107,9 +99,7 @@ func (cs *State) isBlockProducerAVC(height int64) bool { // Enter (CreateEmptyBlocks): from enterNewRound(height,round) // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): -// -// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval -// +// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool func (cs *State) enterPropose(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) diff --git a/libs/tendermint/consensus/reactor.go b/libs/tendermint/consensus/reactor.go index b81e7410a..db2ea8f99 100644 --- a/libs/tendermint/consensus/reactor.go +++ b/libs/tendermint/consensus/reactor.go @@ -381,10 +381,6 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { conR.conS.peerMsgQueue <- msgInfo{msg, ""} case *ProposeResponseMessage: conR.conS.peerMsgQueue <- msgInfo{msg, ""} - conR.conS.stateMtx.Lock() - defer conR.conS.stateMtx.Unlock() - conR.conS.vcHeight[msg.Height] = conR.conS.privValidatorPubKey.Address().String() - conR.conS.trc.Pin("AVC-PResMsg-%s", conR.conS.privValidatorPubKey.Address().String()[:6]) case *ProposeRequestMessage: conR.conS.stateMtx.Lock() defer conR.conS.stateMtx.Unlock() diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 75e846acf..4082c1aae 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -1000,9 +1000,6 @@ func (mem *CListMempool) Update( if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses) } - //trace.GetElapsedInfo().AddInfo(trace.SimTx, fmt.Sprintf("%d", mem.info.txCount)) - //trace.GetElapsedInfo().AddInfo(trace.SimGasUsed, fmt.Sprintf("%d", mem.info.gasUsed)) - //mem.info.reset() // Set height atomic.StoreInt64(&mem.height, height) From 22b298cc17a2dd9c37fa7946ebeacb305d5d12d5 Mon Sep 17 00:00:00 2001 From: bylingo <790653865@qq.com> Date: Thu, 3 Aug 2023 16:18:09 +0800 Subject: [PATCH 3/3] add needLogPgu --- libs/tendermint/consensus/consensus.go | 6 ++--- libs/tendermint/consensus/consensus_commit.go | 7 +++-- .../consensus/consensus_main_routine.go | 2 +- .../tendermint/consensus/consensus_propose.go | 27 +++---------------- 4 files changed, 11 insertions(+), 31 deletions(-) diff --git a/libs/tendermint/consensus/consensus.go b/libs/tendermint/consensus/consensus.go index adf5a5f74..80b780206 100644 --- a/libs/tendermint/consensus/consensus.go +++ b/libs/tendermint/consensus/consensus.go @@ -163,8 +163,8 @@ type State struct { vcMsg *ViewChangeMessage vcHeight map[int64]string - //AVC proposer - avcp bool + //actual proposer when avc + needLogPgu bool preBlockTaskChan chan *preBlockTask taskResultChan chan *preBlockTaskRes @@ -209,7 +209,7 @@ func NewState( bt: &BlockTransport{}, blockTimeTrc: trace.NewTracer(trace.LastBlockTime), vcHeight: make(map[int64]string), - avcp: false, + needLogPgu: false, taskResultChan: make(chan *preBlockTaskRes, 1), preBlockTaskChan: make(chan *preBlockTask, 1), } diff --git a/libs/tendermint/consensus/consensus_commit.go b/libs/tendermint/consensus/consensus_commit.go index a738c9d01..0e49cb695 100644 --- a/libs/tendermint/consensus/consensus_commit.go +++ b/libs/tendermint/consensus/consensus_commit.go @@ -257,9 +257,8 @@ func (cs *State) finalizeCommit(height int64) { return } - //Add trace.SimTx After ApplyBlock using isBLockProducer - isBlockProducer := cs.isBlockProducerAVC(height) - err = cs.blockExec.MempoolLogPgu(isBlockProducer) + //Add trace.SimTx After ApplyBlock using needLogPgu + err = cs.blockExec.MempoolLogPgu(cs.needLogPgu) if err != nil { cs.Logger.Error("Failed to print PGU log from mempool", "Height", height, "err", err) } @@ -409,7 +408,7 @@ func (cs *State) updateToState(state sm.State) { cs.LastValidators = state.LastValidators cs.TriggeredTimeoutPrecommit = false cs.state = state - cs.avcp = false + cs.needLogPgu = false // Finally, broadcast RoundState cs.newStep() diff --git a/libs/tendermint/consensus/consensus_main_routine.go b/libs/tendermint/consensus/consensus_main_routine.go index 4958ee2b0..06003289a 100644 --- a/libs/tendermint/consensus/consensus_main_routine.go +++ b/libs/tendermint/consensus/consensus_main_routine.go @@ -118,7 +118,7 @@ func (cs *State) handleAVCProposal(proposal *types.Proposal) { part := res.blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } - cs.avcp = true + cs.needLogPgu = true cs.trc.Pin("isAVCProposer") } diff --git a/libs/tendermint/consensus/consensus_propose.go b/libs/tendermint/consensus/consensus_propose.go index c94982eaa..1b5c9b327 100644 --- a/libs/tendermint/consensus/consensus_propose.go +++ b/libs/tendermint/consensus/consensus_propose.go @@ -75,31 +75,11 @@ func (cs *State) isBlockProducer() (string, string) { return isBlockProducer, strings.ToLower(bpStr) } -// Used to determine if a node is still a block producer -// when an active view change event is triggered. -func (cs *State) isBlockProducerAVC(height int64) bool { - isStr, _ := cs.isBlockProducer() - if !GetActiveVC() { - if isStr == "y" { - return true - } else { - return false - } - } - // determine producer when avc happen - isBlockProducer := false - if isStr == "y" && cs.vcHeight[height] == "" { - isBlockProducer = true - } else if isStr == "n" && cs.avcp { - isBlockProducer = true - } - - return isBlockProducer -} - // Enter (CreateEmptyBlocks): from enterNewRound(height,round) // Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): -// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval +// +// after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval +// // Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool func (cs *State) enterPropose(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) @@ -202,6 +182,7 @@ func (cs *State) defaultDecideProposal(height int64, round int) { part := blockParts.GetPart(i) cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}) } + cs.needLogPgu = true cs.Logger.Info("Signed proposal", "height", height, "round", round, "proposal", proposal) cs.Logger.Debug(fmt.Sprintf("Signed proposal block: %v", block)) } else if !cs.replayMode {