From 9c77c0c375138e25904edd56377516f3af0eb555 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Tue, 24 Sep 2024 16:31:48 +0800 Subject: [PATCH 1/2] Add the dependency of read-before-write to the DAG --- core/types/mvstates.go | 202 ++++++++++++++++++++++++++++++++++------- 1 file changed, 170 insertions(+), 32 deletions(-) diff --git a/core/types/mvstates.go b/core/types/mvstates.go index b93bb415a6..f6d0e0a49a 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -249,7 +249,7 @@ type RWTxList struct { list []int } -func NewStateWrites() *RWTxList { +func NewRWTxList() *RWTxList { return &RWTxList{ list: make([]int, 0), } @@ -285,7 +285,7 @@ func (w *RWTxList) SearchTxIndex(txIndex int) (int, bool) { return i, i < n && w.list[i] == txIndex } -func (w *RWTxList) FindLastWrite(txIndex int) int { +func (w *RWTxList) FindLastTx(txIndex int) int { var i, _ = w.SearchTxIndex(txIndex) for j := i - 1; j >= 0; j-- { if w.list[j] < txIndex { @@ -296,7 +296,7 @@ func (w *RWTxList) FindLastWrite(txIndex int) int { return -1 } -func (w *RWTxList) FindPrevWrites(txIndex int) []int { +func (w *RWTxList) FindPrevTxs(txIndex int) []int { var i, _ = w.SearchTxIndex(txIndex) for j := i - 1; j >= 0; j-- { if w.list[j] < txIndex { @@ -329,10 +329,9 @@ var ( ) type MVStates struct { - rwSets []RWSet - accWriteSet map[common.Address]map[AccountState]*RWTxList - slotWriteSet map[common.Address]map[common.Hash]*RWTxList - // TODO: maintain read tx list for states here + rwSets []RWSet + accWriteSet map[common.Address]map[AccountState]*RWTxList + slotWriteSet map[common.Address]map[common.Hash]*RWTxList accReadSet map[common.Address]map[AccountState]*RWTxList slotReadSet map[common.Address]map[common.Hash]*RWTxList nextFinaliseIndex int @@ -358,6 +357,8 @@ func NewMVStates(txCount int, gasFeeReceivers []common.Address) *MVStates { s := &MVStates{ accWriteSet: make(map[common.Address]map[AccountState]*RWTxList, txCount), slotWriteSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount), + accReadSet: make(map[common.Address]map[AccountState]*RWTxList, txCount), + slotReadSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount), rwEventCh: make(chan []RWEventItem, 100), gasFeeReceivers: gasFeeReceivers, } @@ -394,6 +395,14 @@ func (s *MVStates) Copy() *MVStates { ns.accWriteSet[addr][state] = writes.Copy() } } + for addr, sub := range s.accReadSet { + for state, reads := range sub { + if _, ok := ns.accReadSet[addr]; !ok { + ns.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + ns.accReadSet[addr][state] = reads.Copy() + } + } for addr, sub := range s.slotWriteSet { for slot, writes := range sub { if _, ok := ns.slotWriteSet[addr]; !ok { @@ -402,6 +411,14 @@ func (s *MVStates) Copy() *MVStates { ns.slotWriteSet[addr][slot] = writes.Copy() } } + for addr, sub := range s.slotReadSet { + for slot, reads := range sub { + if _, ok := ns.slotReadSet[addr]; !ok { + ns.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + ns.slotReadSet[addr][slot] = reads.Copy() + } + } return ns } @@ -421,18 +438,24 @@ func (s *MVStates) asyncRWEventLoop() { func (s *MVStates) handleRWEvents(items []RWEventItem) { readFrom, readTo := -1, -1 + writeFrom, writeTo := -1, -1 recordNewTx := false for i, item := range items { // init next RWSet, and finalise previous RWSet if item.Event == NewTxRWEvent { // handle previous rw set if recordNewTx { - var prevItems []RWEventItem + var prevReadItems []RWEventItem if readFrom >= 0 && readTo > readFrom { - prevItems = items[readFrom:readTo] + prevReadItems = items[readFrom:readTo] } - s.finalisePreviousRWSet(prevItems) + var prevWriteItems []RWEventItem + if writeFrom >= 0 && writeTo > writeFrom { + prevWriteItems = items[writeFrom:writeTo] + } + s.finalisePreviousRWSet(prevReadItems, prevWriteItems) readFrom, readTo = -1, -1 + writeFrom, writeTo = -1, -1 } recordNewTx = true s.asyncRWSet = RWSet{ @@ -445,15 +468,29 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) { } switch item.Event { // recorde current read/write event - case ReadAccRWEvent, ReadSlotRWEvent: - // TODO: maintain read list here + case ReadAccRWEvent: + if readFrom < 0 { + readFrom = i + } + readTo = i + 1 + s.finaliseAccRead(s.asyncRWSet.index, item.Addr, item.State) + case ReadSlotRWEvent: if readFrom < 0 { readFrom = i } readTo = i + 1 + s.finaliseSlotRead(s.asyncRWSet.index, item.Addr, item.Slot) case WriteAccRWEvent: + if writeFrom < 0 { + writeFrom = i + } + writeTo = i + 1 s.finaliseAccWrite(s.asyncRWSet.index, item.Addr, item.State) case WriteSlotRWEvent: + if writeFrom < 0 { + writeFrom = i + } + writeTo = i + 1 s.finaliseSlotWrite(s.asyncRWSet.index, item.Addr, item.Slot) // recorde current as cannot gas fee delay case CannotGasFeeDelayRWEvent: @@ -462,15 +499,19 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) { } // handle last tx rw set if recordNewTx { - var prevItems []RWEventItem + var prevReadItems []RWEventItem if readFrom >= 0 && readTo > readFrom { - prevItems = items[readFrom:readTo] + prevReadItems = items[readFrom:readTo] + } + var prevWriteItems []RWEventItem + if writeFrom >= 0 && writeTo > writeFrom { + prevWriteItems = items[writeFrom:writeTo] } - s.finalisePreviousRWSet(prevItems) + s.finalisePreviousRWSet(prevReadItems, prevWriteItems) } } -func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) { +func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem, writes []RWEventItem) { if s.asyncRWSet.index < 0 { return } @@ -486,7 +527,7 @@ func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) { } // reset nextFinaliseIndex to index+1, it may revert to previous txs s.nextFinaliseIndex = index + 1 - s.resolveDepsMapCacheByWrites(index, reads) + s.resolveDepsMapCacheByWrites(index, reads, writes) } func (s *MVStates) RecordNewTx(index int) { @@ -669,7 +710,26 @@ func (s *MVStates) FinaliseWithRWSet(rwSet *RWSet) error { }) } } - s.resolveDepsMapCacheByWrites(i, reads) + writes := make([]RWEventItem, 0, len(s.rwSets[i].accWriteSet)+len(s.rwSets[i].slotWriteSet)) + for addr, sub := range s.rwSets[i].accWriteSet { + for state := range sub { + writes = append(writes, RWEventItem{ + Event: WriteAccRWEvent, + Addr: addr, + State: state, + }) + } + } + for addr, sub := range s.rwSets[i].slotWriteSet { + for slot := range sub { + writes = append(writes, RWEventItem{ + Event: WriteSlotRWEvent, + Addr: addr, + Slot: slot, + }) + } + } + s.resolveDepsMapCacheByWrites(i, reads, writes) } return nil @@ -698,22 +758,44 @@ func (s *MVStates) innerFinalise(index int, applyWriteSet bool) error { } for state := range sub { if _, exist := s.accWriteSet[addr][state]; !exist { - s.accWriteSet[addr][state] = NewStateWrites() + s.accWriteSet[addr][state] = NewRWTxList() } s.accWriteSet[addr][state].Append(index) } } + for addr, sub := range rwSet.accReadSet { + if _, exist := s.accReadSet[addr]; !exist { + s.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + for state := range sub { + if _, exist := s.accReadSet[addr][state]; !exist { + s.accReadSet[addr][state] = NewRWTxList() + } + s.accReadSet[addr][state].Append(index) + } + } for addr, sub := range rwSet.slotWriteSet { if _, exist := s.slotWriteSet[addr]; !exist { s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList) } for slot := range sub { if _, exist := s.slotWriteSet[addr][slot]; !exist { - s.slotWriteSet[addr][slot] = NewStateWrites() + s.slotWriteSet[addr][slot] = NewRWTxList() } s.slotWriteSet[addr][slot].Append(index) } } + for addr, sub := range rwSet.slotReadSet { + if _, exist := s.slotReadSet[addr]; !exist { + s.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + for slot := range sub { + if _, exist := s.slotReadSet[addr][slot]; !exist { + s.slotReadSet[addr][slot] = NewRWTxList() + } + s.slotReadSet[addr][slot].Append(index) + } + } return nil } @@ -723,22 +805,44 @@ func (s *MVStates) finaliseSlotWrite(index int, addr common.Address, slot common s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList) } if _, exist := s.slotWriteSet[addr][slot]; !exist { - s.slotWriteSet[addr][slot] = NewStateWrites() + s.slotWriteSet[addr][slot] = NewRWTxList() } s.slotWriteSet[addr][slot].Append(index) } +func (s *MVStates) finaliseSlotRead(index int, addr common.Address, slot common.Hash) { + // append to pending read set + if _, exist := s.slotReadSet[addr]; !exist { + s.slotReadSet[addr] = make(map[common.Hash]*RWTxList) + } + if _, exist := s.slotReadSet[addr][slot]; !exist { + s.slotReadSet[addr][slot] = NewRWTxList() + } + s.slotReadSet[addr][slot].Append(index) +} + func (s *MVStates) finaliseAccWrite(index int, addr common.Address, state AccountState) { // append to pending write set if _, exist := s.accWriteSet[addr]; !exist { s.accWriteSet[addr] = make(map[AccountState]*RWTxList) } if _, exist := s.accWriteSet[addr][state]; !exist { - s.accWriteSet[addr][state] = NewStateWrites() + s.accWriteSet[addr][state] = NewRWTxList() } s.accWriteSet[addr][state].Append(index) } +func (s *MVStates) finaliseAccRead(index int, addr common.Address, state AccountState) { + // append to pending read set + if _, exist := s.accReadSet[addr]; !exist { + s.accReadSet[addr] = make(map[AccountState]*RWTxList) + } + if _, exist := s.accReadSet[addr][state]; !exist { + s.accReadSet[addr][state] = NewRWTxList() + } + s.accReadSet[addr][state].Append(index) +} + func (s *MVStates) queryAccWrites(addr common.Address, state AccountState) *RWTxList { if _, exist := s.accWriteSet[addr]; !exist { return nil @@ -746,6 +850,13 @@ func (s *MVStates) queryAccWrites(addr common.Address, state AccountState) *RWTx return s.accWriteSet[addr][state] } +func (s *MVStates) queryAccReads(addr common.Address, state AccountState) *RWTxList { + if _, exist := s.accReadSet[addr]; !exist { + return nil + } + return s.accReadSet[addr][state] +} + func (s *MVStates) querySlotWrites(addr common.Address, slot common.Hash) *RWTxList { if _, exist := s.slotWriteSet[addr]; !exist { return nil @@ -753,8 +864,15 @@ func (s *MVStates) querySlotWrites(addr common.Address, slot common.Hash) *RWTxL return s.slotWriteSet[addr][slot] } +func (s *MVStates) querySlotReads(addr common.Address, slot common.Hash) *RWTxList { + if _, exist := s.slotReadSet[addr]; !exist { + return nil + } + return s.slotReadSet[addr][slot] +} + // resolveDepsMapCacheByWrites must be executed in order -func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) { +func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, writes []RWEventItem) { for index >= len(s.txDepCache) { s.txDepCache = append(s.txDepCache, TxDep{}) } @@ -769,14 +887,14 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) { // check tx dependency, only check key for _, item := range reads { // check account states & slots - var writes *RWTxList + var depWrites *RWTxList if item.Event == ReadAccRWEvent { - writes = s.queryAccWrites(item.Addr, item.State) + depWrites = s.queryAccWrites(item.Addr, item.State) } else { - writes = s.querySlotWrites(item.Addr, item.Slot) + depWrites = s.querySlotWrites(item.Addr, item.Slot) } - if writes != nil { - if find := writes.FindLastWrite(index); find >= 0 { + if depWrites != nil { + if find := depWrites.FindLastTx(index); find >= 0 { if tx := uint64(find); !depSlice.exist(tx) { depSlice.add(tx) } @@ -788,16 +906,36 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) { continue } addrMap[item.Addr] = struct{}{} - writes = s.queryAccWrites(item.Addr, AccountSuicide) - if writes != nil { - if find := writes.FindLastWrite(index); find >= 0 { + depWrites = s.queryAccWrites(item.Addr, AccountSuicide) + if depWrites != nil { + if find := depWrites.FindLastTx(index); find >= 0 { if tx := uint64(find); !depSlice.exist(tx) { depSlice.add(tx) } } } } - // TODO: check read before write dependency here + // Looking for read operations before write operations, similar to a read->read->read/write execution sequence, + // we need the write transaction to occur after the read transactions. + for _, item := range writes { + var depReads *RWTxList + if item.Event == WriteAccRWEvent { + depReads = s.queryAccReads(item.Addr, item.State) + } else { + depReads = s.querySlotReads(item.Addr, item.Slot) + } + if depReads != nil { + if finds := depReads.FindPrevTxs(index); len(finds) >= 0 { + for _, tx := range finds { + tx := uint64(tx) + if !depSlice.exist(tx) { + depSlice.add(tx) + } + } + } + } + } + for _, addr := range s.gasFeeReceivers { if _, ok := addrMap[addr]; ok { rwSet.cannotGasFeeDelay = true From 96393738429a6f76b45162dfb9e59db075566f19 Mon Sep 17 00:00:00 2001 From: welkin22 Date: Tue, 24 Sep 2024 16:38:33 +0800 Subject: [PATCH 2/2] comments --- core/types/mvstates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/types/mvstates.go b/core/types/mvstates.go index f6d0e0a49a..be3d0ad20b 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -915,7 +915,7 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, w } } } - // Looking for read operations before write operations, similar to a read->read->read/write execution sequence, + // Looking for read operations before write operations, e.g: read->read->read/write execution sequence, // we need the write transaction to occur after the read transactions. for _, item := range writes { var depReads *RWTxList