Skip to content

Commit

Permalink
txdag: support multi flags, and supported in pevm; (bnb-chain#22)
Browse files Browse the repository at this point in the history
* txdag: add excluded flag;
mvstates: generate txdag with excluded flag;

* pevm: support txdag with excluded tx;

* blockchain: opt txdag file mode;

* pevm: fix dispatch bugs;

* pevm: opt txdag dispatch;

---------

Co-authored-by: galaio <[email protected]>
  • Loading branch information
2 people authored and sunny2022da committed Oct 11, 2024
1 parent 1b1bd09 commit 11ae91c
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 238 deletions.
5 changes: 2 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ var (
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil)
txDAGDispatchTimer = metrics.NewRegisteredTimer("chain/block/txdag/dispatch", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)
Expand Down Expand Up @@ -2708,9 +2707,9 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string) {

// write handler
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output)
log.Error("OpenFile when open the txDAG output file", "file", output, "err", err)
return
}
bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000)
Expand Down
15 changes: 9 additions & 6 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4723,12 +4723,15 @@ func TestEIP3651(t *testing.T) {

func TestTxDAGFile_ReadWrite(t *testing.T) {
path := filepath.Join(os.TempDir(), "test.csv")
defer func() {
os.Remove(path)
}()
except := map[uint64]types.TxDAG{
0: types.NewEmptyTxDAG(),
1: makeEmptyPlainTxDAG(1),
2: makeEmptyPlainTxDAG(2),
2: makeEmptyPlainTxDAG(2, types.NonDependentRelFlag),
}
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
require.NoError(t, err)
for num, dag := range except {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
Expand All @@ -4737,9 +4740,9 @@ func TestTxDAGFile_ReadWrite(t *testing.T) {

except2 := map[uint64]types.TxDAG{
3: types.NewEmptyTxDAG(),
4: makeEmptyPlainTxDAG(4),
4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag),
}
writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
require.NoError(t, err)
for num, dag := range except2 {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
Expand All @@ -4756,10 +4759,10 @@ func TestTxDAGFile_ReadWrite(t *testing.T) {
}
}

func makeEmptyPlainTxDAG(cnt int) *types.PlainTxDAG {
func makeEmptyPlainTxDAG(cnt int, flags ...uint8) *types.PlainTxDAG {
dag := types.NewPlainTxDAG(cnt)
for i := range dag.TxDeps {
dag.TxDeps[i].TxIndexes = make([]uint64, 0)
dag.TxDeps[i] = types.NewTxDep(make([]uint64, 0), flags...)
}
return dag
}
122 changes: 61 additions & 61 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -49,7 +47,6 @@ type ParallelStateProcessor struct {
inConfirmStage2 bool
targetStage2Count int // when executed txNUM reach it, enter stage2 RT confirm
nextStage2TxIndex int
disableStealTx bool
delayGasFee bool // it is provided by TxDAG
}

Expand Down Expand Up @@ -180,45 +177,6 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
p.nextStage2TxIndex = 0
}

// doStaticDispatchV2 could dispatch by TxDAG metadata
// txReqs must order by TxIndex
// txDAG must convert to dependency relation
// 1. The TxDAG generates parallel execution merge paths that will ignore cross slot tx dep;
// 2. It will dispatch the most hungry slot for every isolate execution path;
// 3. TODO(galaio) it need to schedule the slow dep tx path properly;
// 4. TODO(galaio) it is unfriendly for cross slot deps, maybe we can delay dispatch when tx cross in slots, it may increase PEVM parallelism;
func (p *ParallelStateProcessor) doStaticDispatchV2(txReqs []*ParallelTxRequest, txDAG types.TxDAG) {
p.disableStealTx = false
p.delayGasFee = false
// only support PlainTxDAG dispatch now.
if txDAG == nil || txDAG.Type() != types.PlainTxDAGType {
p.doStaticDispatch(txReqs)
return
}

if metrics.EnabledExpensive {
defer func(start time.Time) {
txDAGDispatchTimer.Update(time.Since(start))
}(time.Now())
}
// resolve isolate execution paths from TxDAG, it indicates the tx dispatch
paths := types.MergeTxDAGExecutionPaths(txDAG)
log.Debug("doStaticDispatchV2 merge parallel execution paths", "slots", len(p.slotState), "paths", len(paths))

for _, path := range paths {
slotIndex := p.mostHungrySlot()
for _, index := range path {
txReqs[index].staticSlotIndex = slotIndex // txReq is better to be executed in this slot
slot := p.slotState[slotIndex]
slot.pendingTxReqList = append(slot.pendingTxReqList, txReqs[index])
}
}

// it's unnecessary to enable slot steal mechanism, opt the steal mechanism later;
p.disableStealTx = true
p.delayGasFee = true
}

// Benefits of StaticDispatch:
//
// ** try best to make Txs with same From() in same slot
Expand Down Expand Up @@ -555,7 +513,7 @@ func (p *ParallelStateProcessor) runSlotLoop(slotIndex int, slotType int32) {
// fmt.Printf("Dav -- runInLoop, - loopbody tail - TxREQ: %d\n", txReq.txIndex)
}
// switched to the other slot.
if interrupted || p.disableStealTx {
if interrupted {
continue
}

Expand Down Expand Up @@ -742,11 +700,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

misc.EnsureCreate2Deployer(p.config, block.Time(), statedb)

txNum := len(block.Transactions())
p.resetState(txNum, statedb)

// Iterate over and process the individual transactions
commonTxs := make([]*types.Transaction, 0, txNum)
allTxs := block.Transactions()
p.resetState(len(allTxs), statedb)

var (
// with parallel mode, vmenv will be created inside of slot
Expand All @@ -758,10 +713,47 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
if beaconRoot := block.BeaconRoot(); beaconRoot != nil {
ProcessBeaconBlockRoot(*beaconRoot, vmenv, statedb)
}

statedb.MarkFullProcessed()

var (
txDAG types.TxDAG
)
if p.bc.enableTxDAG {
// TODO(galaio): load TxDAG from block
// or load cache txDAG from file
if txDAG == nil && len(p.bc.txDAGMapping) > 0 {
txDAG = p.bc.txDAGMapping[block.NumberU64()]
}
if txDAG != nil && txDAG.TxCount() != len(block.Transactions()) {
log.Warn("parallel process cannot apply the TxDAG with wrong txs length",
"block", block.NumberU64(), "txs", len(block.Transactions()), "txdag", txDAG.TxCount())
txDAG = nil
}
// TODO(galaio): check TxDAG validation & excludedTxs in head and continuous
// we only support this format
// convert to normal plain txdag
//parallelIndex := -1
//if txDAG != nil && txDAG.Type() == types.PlainTxDAGType {
// for i := range allTxs {
// if !txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
// if parallelIndex == -1 {
// parallelIndex = i
// }
// continue
// }
// if i > 0 && !txDAG.TxDep(i-1).CheckFlag(types.ExcludedTxFlag) {
// return nil, nil, 0, errors.New("cannot support non-continuous excludedTxs")
// }
// }
//}
}

txNum := len(allTxs)
latestExcludedTx := -1
// Iterate over and process the individual transactions
commonTxs := make([]*types.Transaction, 0, txNum)
// var txReqs []*ParallelTxRequest
for i, tx := range block.Transactions() {
for i, tx := range allTxs {
// can be moved it into slot for efficiency, but signer is not concurrent safe
// Parallel Execution 1.0&2.0 is for full sync mode, Nonce PreCheck is not necessary
// And since we will do out-of-order execution, the Nonce PreCheck could fail.
Expand All @@ -771,6 +763,15 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if txDAG != nil && txDAG.TxDep(i).Count() > 0 {
latestDepTx = int(txDAG.TxDep(i).TxIndexes[txDAG.TxDep(i).Count()-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
Expand All @@ -789,7 +790,13 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))
}
p.allTxReqs = append(p.allTxReqs, txReq)
if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
latestExcludedTx = i
}
}
// set up stage2 enter criteria
p.targetStage2Count = len(p.allTxReqs)
Expand All @@ -799,18 +806,11 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.targetStage2Count = p.targetStage2Count - stage2AheadNum
}

var (
txDAG types.TxDAG
)
if p.bc.enableTxDAG {
// TODO(galaio): load TxDAG from block
// or load cache txDAG from file
if txDAG == nil && len(p.bc.txDAGMapping) > 0 {
txDAG = p.bc.txDAGMapping[block.NumberU64()]
}
p.delayGasFee = false
p.doStaticDispatch(p.allTxReqs)
if txDAG != nil && txDAG.DelayGasFeeDistribution() {
p.delayGasFee = true
}
// From now on, entering parallel execution.
p.doStaticDispatchV2(p.allTxReqs, txDAG) // todo: put txReqs in unit?

// after static dispatch, we notify the slot to work.
for _, slot := range p.slotState {
Expand Down
18 changes: 11 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,7 @@ func (s *StateDB) RecordRead(key types.RWKey, val interface{}) {
if s.isParallel && s.parallel.isSlotDB {
return
}
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return
}
s.rwSet.RecordRead(key, types.StateVersion{
Expand All @@ -2424,7 +2424,7 @@ func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) {
if s.isParallel && s.parallel.isSlotDB {
return
}
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return
}
s.rwSet.RecordWrite(key, val)
Expand All @@ -2442,9 +2442,11 @@ func (s *StateDB) FinaliseRWSet() error {
if s.isParallel && s.parallel.isSlotDB {
return nil
}
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return nil
}
rwSet := s.rwSet
stat := s.stat
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.TxDAGGenerate += time.Since(start)
Expand All @@ -2453,7 +2455,7 @@ func (s *StateDB) FinaliseRWSet() error {
ver := types.StateVersion{
TxIndex: s.txIndex,
}
if ver != s.rwSet.Version() {
if ver != rwSet.Version() {
return errors.New("you finalize a wrong ver of RWSet")
}

Expand All @@ -2480,8 +2482,10 @@ func (s *StateDB) FinaliseRWSet() error {
}
}

s.rwSet.SetRWRecordDone()
return s.mvStates.FulfillRWSet(s.rwSet, s.stat)
// reset stateDB
s.rwSet = nil
s.stat = nil
return s.mvStates.FulfillRWSet(rwSet, stat)
}

func (s *StateDB) getStateObjectsDegetstruct(addr common.Address) (*types.StateAccount, bool) {
Expand Down Expand Up @@ -2554,7 +2558,7 @@ func (s *StateDB) RecordSystemTxRWSet(index int) {
}
s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{
TxIndex: index,
}).WithSerialFlag(), types.NewExeStat(index).WithSerialFlag())
}).WithExcludedTxFlag(), types.NewExeStat(index).WithSerialFlag())
}

// copySet returns a deep-copied set.
Expand Down
4 changes: 4 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// if systemTx or depositTx, tag it
if tx.IsSystemTx() || tx.IsDepositTx() {
statedb.RecordSystemTxRWSet(i)
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if metrics.EnabledExpensive {
Expand Down
6 changes: 4 additions & 2 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ func (st *StateTransition) preCheck() error {
// However if any consensus issue encountered, return the error directly with
// nil evm execution result.
func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
// start record rw set in here
if !st.msg.IsSystemTx && !st.msg.IsDepositTx {
st.state.BeforeTxTransition()
}
if mint := st.msg.Mint; mint != nil {
mintU256, overflow := uint256.FromBig(mint)
if overflow {
Expand Down Expand Up @@ -463,8 +467,6 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
}

func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {
// start record rw set in here
st.state.BeforeTxTransition()
// First check this message satisfies all consensus rules before
// applying the message. The rules include these clauses
//
Expand Down
Loading

0 comments on commit 11ae91c

Please sign in to comment.