Skip to content

Commit

Permalink
Add the dependency of read-before-write to the DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
welkin22 committed Sep 24, 2024
1 parent df85193 commit 9c77c0c
Showing 1 changed file with 170 additions and 32 deletions.
202 changes: 170 additions & 32 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type RWTxList struct {
list []int
}

func NewStateWrites() *RWTxList {
func NewRWTxList() *RWTxList {
return &RWTxList{
list: make([]int, 0),
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *RWTxList) SearchTxIndex(txIndex int) (int, bool) {
return i, i < n && w.list[i] == txIndex
}

func (w *RWTxList) FindLastWrite(txIndex int) int {
func (w *RWTxList) FindLastTx(txIndex int) int {
var i, _ = w.SearchTxIndex(txIndex)
for j := i - 1; j >= 0; j-- {
if w.list[j] < txIndex {
Expand All @@ -296,7 +296,7 @@ func (w *RWTxList) FindLastWrite(txIndex int) int {
return -1
}

func (w *RWTxList) FindPrevWrites(txIndex int) []int {
func (w *RWTxList) FindPrevTxs(txIndex int) []int {
var i, _ = w.SearchTxIndex(txIndex)
for j := i - 1; j >= 0; j-- {
if w.list[j] < txIndex {
Expand Down Expand Up @@ -329,10 +329,9 @@ var (
)

type MVStates struct {
rwSets []RWSet
accWriteSet map[common.Address]map[AccountState]*RWTxList
slotWriteSet map[common.Address]map[common.Hash]*RWTxList
// TODO: maintain read tx list for states here
rwSets []RWSet
accWriteSet map[common.Address]map[AccountState]*RWTxList
slotWriteSet map[common.Address]map[common.Hash]*RWTxList
accReadSet map[common.Address]map[AccountState]*RWTxList
slotReadSet map[common.Address]map[common.Hash]*RWTxList
nextFinaliseIndex int
Expand All @@ -358,6 +357,8 @@ func NewMVStates(txCount int, gasFeeReceivers []common.Address) *MVStates {
s := &MVStates{
accWriteSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
slotWriteSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
accReadSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
slotReadSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
rwEventCh: make(chan []RWEventItem, 100),
gasFeeReceivers: gasFeeReceivers,
}
Expand Down Expand Up @@ -394,6 +395,14 @@ func (s *MVStates) Copy() *MVStates {
ns.accWriteSet[addr][state] = writes.Copy()
}
}
for addr, sub := range s.accReadSet {
for state, reads := range sub {
if _, ok := ns.accReadSet[addr]; !ok {
ns.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
ns.accReadSet[addr][state] = reads.Copy()
}
}
for addr, sub := range s.slotWriteSet {
for slot, writes := range sub {
if _, ok := ns.slotWriteSet[addr]; !ok {
Expand All @@ -402,6 +411,14 @@ func (s *MVStates) Copy() *MVStates {
ns.slotWriteSet[addr][slot] = writes.Copy()
}
}
for addr, sub := range s.slotReadSet {
for slot, reads := range sub {
if _, ok := ns.slotReadSet[addr]; !ok {
ns.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
ns.slotReadSet[addr][slot] = reads.Copy()
}
}
return ns
}

Expand All @@ -421,18 +438,24 @@ func (s *MVStates) asyncRWEventLoop() {

func (s *MVStates) handleRWEvents(items []RWEventItem) {
readFrom, readTo := -1, -1
writeFrom, writeTo := -1, -1
recordNewTx := false
for i, item := range items {
// init next RWSet, and finalise previous RWSet
if item.Event == NewTxRWEvent {
// handle previous rw set
if recordNewTx {
var prevItems []RWEventItem
var prevReadItems []RWEventItem
if readFrom >= 0 && readTo > readFrom {
prevItems = items[readFrom:readTo]
prevReadItems = items[readFrom:readTo]
}
s.finalisePreviousRWSet(prevItems)
var prevWriteItems []RWEventItem
if writeFrom >= 0 && writeTo > writeFrom {
prevWriteItems = items[writeFrom:writeTo]
}
s.finalisePreviousRWSet(prevReadItems, prevWriteItems)
readFrom, readTo = -1, -1
writeFrom, writeTo = -1, -1
}
recordNewTx = true
s.asyncRWSet = RWSet{
Expand All @@ -445,15 +468,29 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) {
}
switch item.Event {
// recorde current read/write event
case ReadAccRWEvent, ReadSlotRWEvent:
// TODO: maintain read list here
case ReadAccRWEvent:
if readFrom < 0 {
readFrom = i
}
readTo = i + 1
s.finaliseAccRead(s.asyncRWSet.index, item.Addr, item.State)
case ReadSlotRWEvent:
if readFrom < 0 {
readFrom = i
}
readTo = i + 1
s.finaliseSlotRead(s.asyncRWSet.index, item.Addr, item.Slot)
case WriteAccRWEvent:
if writeFrom < 0 {
writeFrom = i
}
writeTo = i + 1
s.finaliseAccWrite(s.asyncRWSet.index, item.Addr, item.State)
case WriteSlotRWEvent:
if writeFrom < 0 {
writeFrom = i
}
writeTo = i + 1
s.finaliseSlotWrite(s.asyncRWSet.index, item.Addr, item.Slot)
// recorde current as cannot gas fee delay
case CannotGasFeeDelayRWEvent:
Expand All @@ -462,15 +499,19 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) {
}
// handle last tx rw set
if recordNewTx {
var prevItems []RWEventItem
var prevReadItems []RWEventItem
if readFrom >= 0 && readTo > readFrom {
prevItems = items[readFrom:readTo]
prevReadItems = items[readFrom:readTo]
}
var prevWriteItems []RWEventItem
if writeFrom >= 0 && writeTo > writeFrom {
prevWriteItems = items[writeFrom:writeTo]
}
s.finalisePreviousRWSet(prevItems)
s.finalisePreviousRWSet(prevReadItems, prevWriteItems)
}
}

func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) {
func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem, writes []RWEventItem) {
if s.asyncRWSet.index < 0 {
return
}
Expand All @@ -486,7 +527,7 @@ func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) {
}
// reset nextFinaliseIndex to index+1, it may revert to previous txs
s.nextFinaliseIndex = index + 1
s.resolveDepsMapCacheByWrites(index, reads)
s.resolveDepsMapCacheByWrites(index, reads, writes)
}

func (s *MVStates) RecordNewTx(index int) {
Expand Down Expand Up @@ -669,7 +710,26 @@ func (s *MVStates) FinaliseWithRWSet(rwSet *RWSet) error {
})
}
}
s.resolveDepsMapCacheByWrites(i, reads)
writes := make([]RWEventItem, 0, len(s.rwSets[i].accWriteSet)+len(s.rwSets[i].slotWriteSet))
for addr, sub := range s.rwSets[i].accWriteSet {
for state := range sub {
writes = append(writes, RWEventItem{
Event: WriteAccRWEvent,
Addr: addr,
State: state,
})
}
}
for addr, sub := range s.rwSets[i].slotWriteSet {
for slot := range sub {
writes = append(writes, RWEventItem{
Event: WriteSlotRWEvent,
Addr: addr,
Slot: slot,
})
}
}
s.resolveDepsMapCacheByWrites(i, reads, writes)
}

return nil
Expand Down Expand Up @@ -698,22 +758,44 @@ func (s *MVStates) innerFinalise(index int, applyWriteSet bool) error {
}
for state := range sub {
if _, exist := s.accWriteSet[addr][state]; !exist {
s.accWriteSet[addr][state] = NewStateWrites()
s.accWriteSet[addr][state] = NewRWTxList()
}
s.accWriteSet[addr][state].Append(index)
}
}
for addr, sub := range rwSet.accReadSet {
if _, exist := s.accReadSet[addr]; !exist {
s.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
for state := range sub {
if _, exist := s.accReadSet[addr][state]; !exist {
s.accReadSet[addr][state] = NewRWTxList()
}
s.accReadSet[addr][state].Append(index)
}
}
for addr, sub := range rwSet.slotWriteSet {
if _, exist := s.slotWriteSet[addr]; !exist {
s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList)
}
for slot := range sub {
if _, exist := s.slotWriteSet[addr][slot]; !exist {
s.slotWriteSet[addr][slot] = NewStateWrites()
s.slotWriteSet[addr][slot] = NewRWTxList()
}
s.slotWriteSet[addr][slot].Append(index)
}
}
for addr, sub := range rwSet.slotReadSet {
if _, exist := s.slotReadSet[addr]; !exist {
s.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
for slot := range sub {
if _, exist := s.slotReadSet[addr][slot]; !exist {
s.slotReadSet[addr][slot] = NewRWTxList()
}
s.slotReadSet[addr][slot].Append(index)
}
}
return nil
}

Expand All @@ -723,38 +805,74 @@ func (s *MVStates) finaliseSlotWrite(index int, addr common.Address, slot common
s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList)
}
if _, exist := s.slotWriteSet[addr][slot]; !exist {
s.slotWriteSet[addr][slot] = NewStateWrites()
s.slotWriteSet[addr][slot] = NewRWTxList()
}
s.slotWriteSet[addr][slot].Append(index)
}

func (s *MVStates) finaliseSlotRead(index int, addr common.Address, slot common.Hash) {
// append to pending read set
if _, exist := s.slotReadSet[addr]; !exist {
s.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
if _, exist := s.slotReadSet[addr][slot]; !exist {
s.slotReadSet[addr][slot] = NewRWTxList()
}
s.slotReadSet[addr][slot].Append(index)
}

func (s *MVStates) finaliseAccWrite(index int, addr common.Address, state AccountState) {
// append to pending write set
if _, exist := s.accWriteSet[addr]; !exist {
s.accWriteSet[addr] = make(map[AccountState]*RWTxList)
}
if _, exist := s.accWriteSet[addr][state]; !exist {
s.accWriteSet[addr][state] = NewStateWrites()
s.accWriteSet[addr][state] = NewRWTxList()
}
s.accWriteSet[addr][state].Append(index)
}

func (s *MVStates) finaliseAccRead(index int, addr common.Address, state AccountState) {
// append to pending read set
if _, exist := s.accReadSet[addr]; !exist {
s.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
if _, exist := s.accReadSet[addr][state]; !exist {
s.accReadSet[addr][state] = NewRWTxList()
}
s.accReadSet[addr][state].Append(index)
}

func (s *MVStates) queryAccWrites(addr common.Address, state AccountState) *RWTxList {
if _, exist := s.accWriteSet[addr]; !exist {
return nil
}
return s.accWriteSet[addr][state]
}

func (s *MVStates) queryAccReads(addr common.Address, state AccountState) *RWTxList {
if _, exist := s.accReadSet[addr]; !exist {
return nil
}
return s.accReadSet[addr][state]
}

func (s *MVStates) querySlotWrites(addr common.Address, slot common.Hash) *RWTxList {
if _, exist := s.slotWriteSet[addr]; !exist {
return nil
}
return s.slotWriteSet[addr][slot]
}

func (s *MVStates) querySlotReads(addr common.Address, slot common.Hash) *RWTxList {
if _, exist := s.slotReadSet[addr]; !exist {
return nil
}
return s.slotReadSet[addr][slot]
}

// resolveDepsMapCacheByWrites must be executed in order
func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, writes []RWEventItem) {
for index >= len(s.txDepCache) {
s.txDepCache = append(s.txDepCache, TxDep{})
}
Expand All @@ -769,14 +887,14 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
// check tx dependency, only check key
for _, item := range reads {
// check account states & slots
var writes *RWTxList
var depWrites *RWTxList
if item.Event == ReadAccRWEvent {
writes = s.queryAccWrites(item.Addr, item.State)
depWrites = s.queryAccWrites(item.Addr, item.State)
} else {
writes = s.querySlotWrites(item.Addr, item.Slot)
depWrites = s.querySlotWrites(item.Addr, item.Slot)
}
if writes != nil {
if find := writes.FindLastWrite(index); find >= 0 {
if depWrites != nil {
if find := depWrites.FindLastTx(index); find >= 0 {
if tx := uint64(find); !depSlice.exist(tx) {
depSlice.add(tx)
}
Expand All @@ -788,16 +906,36 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
continue
}
addrMap[item.Addr] = struct{}{}
writes = s.queryAccWrites(item.Addr, AccountSuicide)
if writes != nil {
if find := writes.FindLastWrite(index); find >= 0 {
depWrites = s.queryAccWrites(item.Addr, AccountSuicide)
if depWrites != nil {
if find := depWrites.FindLastTx(index); find >= 0 {
if tx := uint64(find); !depSlice.exist(tx) {
depSlice.add(tx)
}
}
}
}
// TODO: check read before write dependency here
// Looking for read operations before write operations, similar to a read->read->read/write execution sequence,
// we need the write transaction to occur after the read transactions.
for _, item := range writes {
var depReads *RWTxList
if item.Event == WriteAccRWEvent {
depReads = s.queryAccReads(item.Addr, item.State)
} else {
depReads = s.querySlotReads(item.Addr, item.Slot)
}
if depReads != nil {
if finds := depReads.FindPrevTxs(index); len(finds) >= 0 {
for _, tx := range finds {
tx := uint64(tx)
if !depSlice.exist(tx) {
depSlice.add(tx)
}
}
}
}
}

for _, addr := range s.gasFeeReceivers {
if _, ok := addrMap[addr]; ok {
rwSet.cannotGasFeeDelay = true
Expand Down

0 comments on commit 9c77c0c

Please sign in to comment.