Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extra logging and algorithm analysis #1667

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README_CHIESA.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Build

Just run `make all` and the binaries will be in `build\bin`

Either add that directory to your path or copy the binaries to a location that is already on the path.

## Running

Set up two nodes on a private network based on the instructions at [this link](https://docs.goquorum.consensys.net/tutorials/private-network/create-qbft-network)

Run the nodes with the following command lines:

### Node 1
`geth --datadir data --networkid 1337 --nodiscover --verbosity 5 --syncmode full --istanbul.blockperiod 5 --mine --miner.threads 1 --miner.gasprice 0 --emitcheckpoints --http --http.addr 127.0.0.1 --http.port 22000 --http.corsdomain "*" --http.vhosts "*" --ws --ws.addr 127.0.0.1 --ws.port 32000 --ws.origins "*" --http.api admin,eth,debug,miner,net,txpool,personal,web3,istanbul --ws.api admin,eth,debug,miner,net,txpool,personal,web3,istanbul --unlock ${ADDRESS} --allow-insecure-unlock --password ./data/keystore/accountPassword --port 30300 --miner.gaslimit 100000000000000000`

### Node 2
`geth --datadir data --networkid 1337 --nodiscover --verbosity 5 --syncmode full --istanbul.blockperiod 5 --mine --miner.threads 1 --miner.gasprice 0 --emitcheckpoints --http --http.addr 127.0.0.1 --http.port 22001 --http.corsdomain "*" --http.vhosts "*" --ws --ws.addr 127.0.0.1 --ws.port 32001 --ws.origins "*" --http.api admin,eth,debug,miner,net,txpool,personal,web3,istanbul --ws.api admin,eth,debug,miner,net,txpool,personal,web3,istanbul --unlock ${ADDRESS} --allow-insecure-unlock --password ./data/keystore/accountPassword --port 30301 --miner.gaslimit 1000000000000000`

## Explanation of Behavior

When there are a quorum of nodes available that are well-behaved, QBFT appears to behave as follows:

* The validators wait for requests to arrive
* When a request is received, the block commit process starts with a pre-prepare message from the proposer
* The other validators receive the pre-prepare message and broadcast their own prepare messages
* Once a quorum of prepare messages has been received, they validators move to the commit phase
* When a quorum of commit messages has been received, the state transitions from prepared to committed
* Once in the committed state, the new block is added
* The validators go back to waiting for requests

When things are in a bad state, such as when there are no longer enough nodes for a quorum:

* The round change timer expires.
* The round increases and the timeout grows.
* While the number of nodes is under the minimum needed for consensus, no blocks can be validated/committed
* With our simple two-node network, both nodes are needed for a quorum.
* When the number of nodes reaches the threshold required for a quorum, block processing can continue.
* The round number will reset and the round timeouts will fall back to the lowest setting (10 seconds)
3 changes: 3 additions & 0 deletions consensus/istanbul/qbft/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -87,6 +88,7 @@ func (c *core) broadcastCommit() {
// - when quorum of COMMIT messages is reached then update state and commits
func (c *core) handleCommitMsg(commit *qbfttypes.Commit) error {
logger := c.currentLogger(true, commit)
logger.Marc(log.LvlTrace, "VALIDATING COMMIT MESSAGE")

logger.Info("QBFT: handle COMMIT message", "commits.count", c.current.QBFTCommits.Size(), "quorum", c.QuorumSize())

Expand All @@ -107,6 +109,7 @@ func (c *core) handleCommitMsg(commit *qbfttypes.Commit) error {
// If we reached thresho
if c.current.QBFTCommits.Size() >= c.QuorumSize() {
logger.Info("QBFT: received quorum of COMMIT messages")
logger.Marc(log.LvlTrace, "RECEIVED QUORUM OF COMMIT MESSAGES, MOVING TO COMMIT QBFT")
c.commitQBFT()
} else {
logger.Debug("QBFT: accepted new COMMIT messages")
Expand Down
20 changes: 20 additions & 0 deletions consensus/istanbul/qbft/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"fmt"
"math"
"math/big"
"sync"
Expand Down Expand Up @@ -107,6 +108,9 @@ func (c *core) IsProposer() bool {
if v == nil {
return false
}
if v.IsProposer(c.backend.Address()) {
c.logger.Marc(log.LvlTrace, "I AM THE PROPOSER NODE")
}
return v.IsProposer(c.backend.Address())
}

Expand Down Expand Up @@ -136,6 +140,7 @@ func (c *core) startNewRound(round *big.Int) {

if c.current == nil {
logger.Debug("QBFT: start at the initial round")
logger.Marc(log.LvlTrace, "STARTING AT THE INITIAL ROUND BECAUSE CURRENT STATE IS NIL")
} else if lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
diff := new(big.Int).Sub(lastProposal.Number(), c.current.Sequence())
sequenceMeter.Mark(new(big.Int).Add(diff, common.Big1).Int64())
Expand All @@ -144,16 +149,20 @@ func (c *core) startNewRound(round *big.Int) {
consensusTimer.UpdateSince(c.consensusTimestamp)
c.consensusTimestamp = time.Time{}
}
logger.Marc(log.LvlTrace, "CATCHING UP TO LAST PROPOSAL")
logger.Debug("QBFT: catch up last block proposal")
} else if lastProposal.Number().Cmp(big.NewInt(c.current.Sequence().Int64()-1)) == 0 {
if round.Cmp(common.Big0) == 0 {
// same seq and round, don't need to start new round
logger.Marc(log.LvlTrace, "THE PROPOSED ROUND IS EQUAL TO THE CURRENT ROUND, NO CHANGE NECESSARY")
logger.Debug("QBFT: same round, no need to start new round")
return
} else if round.Cmp(c.current.Round()) < 0 {
logger.Marc(log.LvlWarn, "THE PROPOSED ROUND IS LESS THAN THE CURRENT ROUND, NO CHANGE NECESSARY")
logger.Warn("QBFT: next round is inferior to current round")
return
}
logger.Marc(log.LvlInfo, "A ROUND CHANGE IS NEEDED")
roundChange = true
} else {
logger.Warn("QBFT: next sequence is before last block proposal")
Expand All @@ -162,8 +171,10 @@ func (c *core) startNewRound(round *big.Int) {

var oldLogger log.Logger
if c.current == nil {
logger.Marc(log.LvlInfo, "JUST STARTED, NO PREVIOUS ROUND DATA TO RECORD")
oldLogger = c.logger.New("old.round", -1, "old.seq", 0)
} else {
logger.Marc(log.LvlInfo, fmt.Sprintf("ROUND: %d, SEQUENCE: %d", c.current.Round().Uint64(), c.current.Sequence().Uint64()))
oldLogger = c.logger.New("old.round", c.current.Round().Uint64(), "old.sequence", c.current.Sequence().Uint64(), "old.state", c.state.String(), "old.proposer", c.valSet.GetProposer())
}

Expand All @@ -187,6 +198,7 @@ func (c *core) startNewRound(round *big.Int) {

// Calculate new proposer
c.valSet.CalcProposer(lastProposer, newView.Round.Uint64())
c.logger.Marc(log.LvlTrace, fmt.Sprintf("PROPOSER CALCULATION RESULT: %s", c.valSet.GetProposer().String()))
c.setState(StateAcceptRequest)

if c.current != nil && round.Cmp(c.current.Round()) > 0 {
Expand All @@ -201,9 +213,11 @@ func (c *core) startNewRound(round *big.Int) {
// Clear earlier round messages
c.roundChangeSet.ClearLowerThan(round)
}
logger.Marc(log.LvlTrace, "STARTING NEW ROUND")
c.roundChangeSet.NewRound(round)

if round.Uint64() > 0 {
logger.Marc(log.LvlTrace, "STARTING ROUND CHANGE TIMER")
c.newRoundChangeTimer()
}

Expand All @@ -223,14 +237,17 @@ func (c *core) setState(state State) {
if c.state != state {
oldState := c.state
c.state = state
c.logger.Marc(log.LvlTrace, fmt.Sprintf("STATE CHANGE: OLD STATE: %s; NEW STATE: %s", oldState.String(), state.String()))
c.currentLogger(false, nil).Info("QBFT: changed state", "old.state", oldState.String(), "new.state", state.String())
}
if state == StateAcceptRequest {
c.logger.Marc(log.LvlInfo, "STATE: PROCESSING PENDING REQUESTS")
c.processPendingRequests()
}

// each time we change state, we process backlog for possible message that are
// now ready
c.logger.Marc(log.LvlInfo, "PROCESSING BACKLOG")
c.processBacklog()
}

Expand Down Expand Up @@ -270,6 +287,7 @@ func (c *core) newRoundChangeTimer() {
timeout = maxRequestTimeout
}

c.logger.Marc(log.LvlTrace, fmt.Sprintf("NEW ROUND CHANGE TIME WITH TIMEOUT %g SECONDS", timeout.Seconds()))
c.currentLogger(true, nil).Trace("QBFT: start new ROUND-CHANGE timer", "timeout", timeout.Seconds())
c.roundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(timeoutEvent{})
Expand All @@ -283,9 +301,11 @@ func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address,
func (c *core) QuorumSize() int {
if c.config.Get2FPlus1Enabled(c.current.sequence) || c.config.Ceil2Nby3Block == nil || (c.current != nil && c.current.sequence.Cmp(c.config.Ceil2Nby3Block) < 0) {
c.currentLogger(true, nil).Trace("QBFT: confirmation Formula used 2F+ 1")
c.logger.Marc(log.LvlInfo, fmt.Sprintf("2F+1 Quorum size: %d", (2*c.valSet.F())+1))
return (2 * c.valSet.F()) + 1
}
c.currentLogger(true, nil).Trace("QBFT: confirmation Formula used ceil(2N/3)")
c.logger.Marc(log.LvlInfo, fmt.Sprintf("ceil(2N/3) Quorum size: %d", int(math.Ceil(float64(2*c.valSet.Size())/3))))
return int(math.Ceil(float64(2*c.valSet.Size()) / 3))
}

Expand Down
13 changes: 13 additions & 0 deletions consensus/istanbul/qbft/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (c *core) handleEvents() {
}()

for {
c.logger.Marc(log.LvlTrace, "MAIN EVENT HANDLING LOOP")
select {
case event, ok := <-c.events.Chan():
if !ok {
Expand All @@ -112,13 +113,15 @@ func (c *core) handleEvents() {
r := &Request{
Proposal: ev.Proposal,
}
c.logger.Marc(log.LvlTrace, "HANDLE REQUEST")
err := c.handleRequest(r)
if err == errFutureMessage {
// store request for later treatment
c.storeRequestMsg(r)
}
case istanbul.MessageEvent:
// we received a message from another validator
c.logger.Marc(log.LvlTrace, "HANDLE MESSAGE FROM VALIDATOR")
if err := c.handleEncodedMsg(ev.Code, ev.Payload); err != nil {
continue
}
Expand All @@ -128,6 +131,7 @@ func (c *core) handleEvents() {
case backlogEvent:
// we process again a future message that was backlogged
// no need to check signature as it was already node when we first received message
c.logger.Marc(log.LvlTrace, "HANDLE DECODED MESSAGE ON BACKLOG")
if err := c.handleDecodedMessage(ev.msg); err != nil {
continue
}
Expand All @@ -152,6 +156,7 @@ func (c *core) handleEvents() {
if !ok {
return
}
c.logger.Marc(log.LvlTrace, "HANDLE COMMITTED")
switch event.Data.(type) {
case istanbul.FinalCommittedEvent:
c.handleFinalCommitted()
Expand Down Expand Up @@ -180,6 +185,8 @@ func (c *core) handleEncodedMsg(code uint64, data []byte) error {
return err
}

c.logger.Marc(log.LvlTrace, "MESSAGE DECODED, HANDLING")

// Verify signatures and set source address
if err = c.verifySignatures(m); err != nil {
return err
Expand All @@ -193,6 +200,7 @@ func (c *core) handleDecodedMessage(m qbfttypes.QBFTMessage) error {
if err := c.checkMessage(m.Code(), &view); err != nil {
// Store in the backlog it it's a future message
if err == errFutureMessage {
c.logger.Marc(log.LvlTrace, "FUTURE MESSAGE, ADDING TO BACKLOG")
c.addToBacklog(m)
}
return err
Expand All @@ -207,12 +215,16 @@ func (c *core) deliverMessage(m qbfttypes.QBFTMessage) error {

switch m.Code() {
case qbfttypes.PreprepareCode:
c.logger.Marc(log.LvlTrace, "MESSAGE WAS PREPREPARE")
err = c.handlePreprepareMsg(m.(*qbfttypes.Preprepare))
case qbfttypes.PrepareCode:
c.logger.Marc(log.LvlTrace, "MESSAGE WAS PREPARE")
err = c.handlePrepare(m.(*qbfttypes.Prepare))
case qbfttypes.CommitCode:
c.logger.Marc(log.LvlTrace, "MESSAGE WAS COMMIT")
err = c.handleCommitMsg(m.(*qbfttypes.Commit))
case qbfttypes.RoundChangeCode:
c.logger.Marc(log.LvlTrace, "MESSAGE WAS ROUND CHANGE")
err = c.handleRoundChange(m.(*qbfttypes.RoundChange))
default:
c.logger.Error("QBFT: invalid message code", "code", m.Code())
Expand All @@ -224,6 +236,7 @@ func (c *core) deliverMessage(m qbfttypes.QBFTMessage) error {

func (c *core) handleTimeoutMsg() {
logger := c.currentLogger(true, nil)
logger.Marc(log.LvlTrace, "HANDLING ROUND CHANGE TIMEOUT MESSAGE")
// Start the new round
round := c.current.Round()
nextRound := new(big.Int).Add(round, common.Big1)
Expand Down
5 changes: 5 additions & 0 deletions consensus/istanbul/qbft/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package core
import (
"github.com/ethereum/go-ethereum/common/hexutil"
qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

Expand Down Expand Up @@ -72,6 +73,7 @@ func (c *core) broadcastPrepare() {
// - when quorum is reached update states to "Prepared" and broadcast COMMIT
func (c *core) handlePrepare(prepare *qbfttypes.Prepare) error {
logger := c.currentLogger(true, prepare).New()
logger.Marc(log.LvlTrace, "VALIDATING PREPARE MESSAGE")

logger.Info("QBFT: handle PREPARE message", "prepares.count", c.current.QBFTPrepares.Size(), "quorum", c.QuorumSize())

Expand All @@ -94,6 +96,8 @@ func (c *core) handlePrepare(prepare *qbfttypes.Prepare) error {
if (c.current.QBFTPrepares.Size() >= c.QuorumSize()) && c.state.Cmp(StatePrepared) < 0 {
logger.Info("QBFT: received quorum of PREPARE messages")

logger.Marc(log.LvlTrace, "QUORUM OF PREPARE MESSAGES RECEIVED")

// Accumulates PREPARE messages
c.current.preparedRound = c.currentView().Round
c.QBFTPreparedPrepares = make([]*qbfttypes.Prepare, 0)
Expand All @@ -109,6 +113,7 @@ func (c *core) handlePrepare(prepare *qbfttypes.Prepare) error {
c.current.preparedBlock = c.current.Proposal()
}

logger.Marc(log.LvlTrace, "SETTING STATE TO PREPARED AND MOVING TO BROADCAST COMMIT")
c.setState(StatePrepared)
c.broadcastCommit()
} else {
Expand Down
8 changes: 8 additions & 0 deletions consensus/istanbul/qbft/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
qbfttypes "github.com/ethereum/go-ethereum/consensus/istanbul/qbft/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

Expand All @@ -38,6 +39,7 @@ func (c *core) sendPreprepareMsg(request *Request) {

// If I'm the proposer and I have the same sequence with the proposal
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
c.logger.Marc(log.LvlTrace, "CREATING AND SIGNING PREPREPARE MESASGE WITH BLOCK PROPOSED AS PROPOSER")
// Creates PRE-PREPARE message
curView := c.currentView()
preprepare := qbfttypes.NewPreprepare(curView.Sequence, curView.Round, request.Proposal)
Expand Down Expand Up @@ -102,6 +104,7 @@ func (c *core) sendPreprepareMsg(request *Request) {
// - validates PRE-PREPARE message block proposal
func (c *core) handlePreprepareMsg(preprepare *qbfttypes.Preprepare) error {
logger := c.currentLogger(true, preprepare)
logger.Marc(log.LvlTrace, "HANDLING PRE-PREPARE MESSAGE")

logger = logger.New("proposal.number", preprepare.Proposal.Number().Uint64(), "proposal.hash", preprepare.Proposal.Hash().String())

Expand All @@ -123,8 +126,10 @@ func (c *core) handlePreprepareMsg(preprepare *qbfttypes.Preprepare) error {

// Validates PRE-PREPARE block proposal we received
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
logger.Marc(log.LvlTrace, "PRE-PREPARE VERIFICATION ERROR")
// if it's a future block, we will handle it again after the duration
if err == consensus.ErrFutureBlock {
logger.Marc(log.LvlTrace, "PROPOSAL IS FOR FUTURE BLOCK, TO BE HANDLED LATER")
logger.Info("QBFT: PRE-PREPARE block proposal is in the future (will be treated again later)", "duration", duration)

// start a timer to re-input PRE-PREPARE message as a backlog event
Expand All @@ -146,8 +151,10 @@ func (c *core) handlePreprepareMsg(preprepare *qbfttypes.Preprepare) error {
// Here is about to accept the PRE-PREPARE
if c.state == StateAcceptRequest {
c.logger.Info("QBFT: accepted PRE-PREPARE message")
logger.Marc(log.LvlTrace, "ACCEPTED PRE-PREPARE MESSAGE")

// Re-initialize ROUND-CHANGE timer
logger.Marc(log.LvlTrace, "STARTING NEW ROUND CHANGE TIMER")
c.newRoundChangeTimer()
c.consensusTimestamp = time.Now()

Expand All @@ -156,6 +163,7 @@ func (c *core) handlePreprepareMsg(preprepare *qbfttypes.Preprepare) error {
c.setState(StatePreprepared)

// Broadcast prepare message to other validators
logger.Marc(log.LvlTrace, "BROADCASTING PREPARE MESSAGE TO VALIDATORS")
c.broadcastPrepare()
}

Expand Down
8 changes: 8 additions & 0 deletions consensus/istanbul/qbft/core/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"fmt"
"time"

"github.com/ethereum/go-ethereum/consensus/istanbul"
Expand Down Expand Up @@ -46,12 +47,15 @@ func (c *core) handleRequest(request *Request) error {

c.current.pendingRequest = request
if c.state == StateAcceptRequest {
logger.Marc(log.LvlTrace, "STATE: ACCEPT REQUEST")
config := c.config.GetConfig(c.current.Sequence())
if config.EmptyBlockPeriod == 0 { // emptyBlockPeriod is not set
// Start ROUND-CHANGE timer
c.logger.Marc(log.LvlTrace, "START ROUND CHANGE TIMER")
c.newRoundChangeTimer()

// Send PRE-PREPARE message to other validators
c.logger.Marc(log.LvlTrace, "SENDING PREPREPARE")
c.sendPreprepareMsg(request)
} else { // emptyBlockPeriod is set
c.newRoundMutex.Lock()
Expand Down Expand Up @@ -79,15 +83,19 @@ func (c *core) handleRequest(request *Request) error {
}
}
if delay > 0 {
logger.Marc(log.LvlTrace, fmt.Sprintf("DELAYING START OF ROUND CHANGE TIMER BY %d millis", delay.Milliseconds()))
c.newRoundTimer = time.AfterFunc(delay, func() {
c.newRoundTimer = nil
// Start ROUND-CHANGE timer
c.newRoundChangeTimer()

logger.Marc(log.LvlTrace, "ROUND CHANGE TIMER STARTED, SENDING PREPREPARE MSG")
// Send PRE-PREPARE message to other validators
c.sendPreprepareMsg(request)
})
} else {
logger.Marc(log.LvlTrace, "STARTING ROUND CHANGE TIMER IMMEDIATELY AND SENDING PREPREPARE MSG")

// Start ROUND-CHANGE timer
c.newRoundChangeTimer()

Expand Down
Loading