Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txdag: Add the dependency of read-before-write to the DAG #184

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 170 additions & 32 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ type RWTxList struct {
list []int
}

func NewStateWrites() *RWTxList {
func NewRWTxList() *RWTxList {
return &RWTxList{
list: make([]int, 0),
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *RWTxList) SearchTxIndex(txIndex int) (int, bool) {
return i, i < n && w.list[i] == txIndex
}

func (w *RWTxList) FindLastWrite(txIndex int) int {
func (w *RWTxList) FindLastTx(txIndex int) int {
var i, _ = w.SearchTxIndex(txIndex)
for j := i - 1; j >= 0; j-- {
if w.list[j] < txIndex {
Expand All @@ -296,7 +296,7 @@ func (w *RWTxList) FindLastWrite(txIndex int) int {
return -1
}

func (w *RWTxList) FindPrevWrites(txIndex int) []int {
func (w *RWTxList) FindPrevTxs(txIndex int) []int {
var i, _ = w.SearchTxIndex(txIndex)
for j := i - 1; j >= 0; j-- {
if w.list[j] < txIndex {
Expand Down Expand Up @@ -329,10 +329,9 @@ var (
)

type MVStates struct {
rwSets []RWSet
accWriteSet map[common.Address]map[AccountState]*RWTxList
slotWriteSet map[common.Address]map[common.Hash]*RWTxList
// TODO: maintain read tx list for states here
rwSets []RWSet
accWriteSet map[common.Address]map[AccountState]*RWTxList
slotWriteSet map[common.Address]map[common.Hash]*RWTxList
accReadSet map[common.Address]map[AccountState]*RWTxList
slotReadSet map[common.Address]map[common.Hash]*RWTxList
nextFinaliseIndex int
Expand All @@ -358,6 +357,8 @@ func NewMVStates(txCount int, gasFeeReceivers []common.Address) *MVStates {
s := &MVStates{
accWriteSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
slotWriteSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
accReadSet: make(map[common.Address]map[AccountState]*RWTxList, txCount),
slotReadSet: make(map[common.Address]map[common.Hash]*RWTxList, txCount),
rwEventCh: make(chan []RWEventItem, 100),
gasFeeReceivers: gasFeeReceivers,
}
Expand Down Expand Up @@ -394,6 +395,14 @@ func (s *MVStates) Copy() *MVStates {
ns.accWriteSet[addr][state] = writes.Copy()
}
}
for addr, sub := range s.accReadSet {
for state, reads := range sub {
if _, ok := ns.accReadSet[addr]; !ok {
ns.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
ns.accReadSet[addr][state] = reads.Copy()
}
}
for addr, sub := range s.slotWriteSet {
for slot, writes := range sub {
if _, ok := ns.slotWriteSet[addr]; !ok {
Expand All @@ -402,6 +411,14 @@ func (s *MVStates) Copy() *MVStates {
ns.slotWriteSet[addr][slot] = writes.Copy()
}
}
for addr, sub := range s.slotReadSet {
for slot, reads := range sub {
if _, ok := ns.slotReadSet[addr]; !ok {
ns.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
ns.slotReadSet[addr][slot] = reads.Copy()
}
}
return ns
}

Expand All @@ -421,18 +438,24 @@ func (s *MVStates) asyncRWEventLoop() {

func (s *MVStates) handleRWEvents(items []RWEventItem) {
readFrom, readTo := -1, -1
writeFrom, writeTo := -1, -1
recordNewTx := false
for i, item := range items {
// init next RWSet, and finalise previous RWSet
if item.Event == NewTxRWEvent {
// handle previous rw set
if recordNewTx {
var prevItems []RWEventItem
var prevReadItems []RWEventItem
if readFrom >= 0 && readTo > readFrom {
prevItems = items[readFrom:readTo]
prevReadItems = items[readFrom:readTo]
}
s.finalisePreviousRWSet(prevItems)
var prevWriteItems []RWEventItem
if writeFrom >= 0 && writeTo > writeFrom {
prevWriteItems = items[writeFrom:writeTo]
}
s.finalisePreviousRWSet(prevReadItems, prevWriteItems)
readFrom, readTo = -1, -1
writeFrom, writeTo = -1, -1
}
recordNewTx = true
s.asyncRWSet = RWSet{
Expand All @@ -445,15 +468,29 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) {
}
switch item.Event {
// recorde current read/write event
case ReadAccRWEvent, ReadSlotRWEvent:
// TODO: maintain read list here
case ReadAccRWEvent:
if readFrom < 0 {
readFrom = i
}
readTo = i + 1
s.finaliseAccRead(s.asyncRWSet.index, item.Addr, item.State)
case ReadSlotRWEvent:
if readFrom < 0 {
readFrom = i
}
readTo = i + 1
s.finaliseSlotRead(s.asyncRWSet.index, item.Addr, item.Slot)
case WriteAccRWEvent:
if writeFrom < 0 {
writeFrom = i
}
writeTo = i + 1
s.finaliseAccWrite(s.asyncRWSet.index, item.Addr, item.State)
case WriteSlotRWEvent:
if writeFrom < 0 {
writeFrom = i
}
writeTo = i + 1
s.finaliseSlotWrite(s.asyncRWSet.index, item.Addr, item.Slot)
// recorde current as cannot gas fee delay
case CannotGasFeeDelayRWEvent:
Expand All @@ -462,15 +499,19 @@ func (s *MVStates) handleRWEvents(items []RWEventItem) {
}
// handle last tx rw set
if recordNewTx {
var prevItems []RWEventItem
var prevReadItems []RWEventItem
if readFrom >= 0 && readTo > readFrom {
prevItems = items[readFrom:readTo]
prevReadItems = items[readFrom:readTo]
}
var prevWriteItems []RWEventItem
if writeFrom >= 0 && writeTo > writeFrom {
prevWriteItems = items[writeFrom:writeTo]
}
s.finalisePreviousRWSet(prevItems)
s.finalisePreviousRWSet(prevReadItems, prevWriteItems)
}
}

func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) {
func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem, writes []RWEventItem) {
if s.asyncRWSet.index < 0 {
return
}
Expand All @@ -486,7 +527,7 @@ func (s *MVStates) finalisePreviousRWSet(reads []RWEventItem) {
}
// reset nextFinaliseIndex to index+1, it may revert to previous txs
s.nextFinaliseIndex = index + 1
s.resolveDepsMapCacheByWrites(index, reads)
s.resolveDepsMapCacheByWrites(index, reads, writes)
}

func (s *MVStates) RecordNewTx(index int) {
Expand Down Expand Up @@ -669,7 +710,26 @@ func (s *MVStates) FinaliseWithRWSet(rwSet *RWSet) error {
})
}
}
s.resolveDepsMapCacheByWrites(i, reads)
writes := make([]RWEventItem, 0, len(s.rwSets[i].accWriteSet)+len(s.rwSets[i].slotWriteSet))
for addr, sub := range s.rwSets[i].accWriteSet {
for state := range sub {
writes = append(writes, RWEventItem{
Event: WriteAccRWEvent,
Addr: addr,
State: state,
})
}
}
for addr, sub := range s.rwSets[i].slotWriteSet {
for slot := range sub {
writes = append(writes, RWEventItem{
Event: WriteSlotRWEvent,
Addr: addr,
Slot: slot,
})
}
}
s.resolveDepsMapCacheByWrites(i, reads, writes)
}

return nil
Expand Down Expand Up @@ -698,22 +758,44 @@ func (s *MVStates) innerFinalise(index int, applyWriteSet bool) error {
}
for state := range sub {
if _, exist := s.accWriteSet[addr][state]; !exist {
s.accWriteSet[addr][state] = NewStateWrites()
s.accWriteSet[addr][state] = NewRWTxList()
}
s.accWriteSet[addr][state].Append(index)
}
}
for addr, sub := range rwSet.accReadSet {
if _, exist := s.accReadSet[addr]; !exist {
s.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
for state := range sub {
if _, exist := s.accReadSet[addr][state]; !exist {
s.accReadSet[addr][state] = NewRWTxList()
}
s.accReadSet[addr][state].Append(index)
}
}
for addr, sub := range rwSet.slotWriteSet {
if _, exist := s.slotWriteSet[addr]; !exist {
s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList)
}
for slot := range sub {
if _, exist := s.slotWriteSet[addr][slot]; !exist {
s.slotWriteSet[addr][slot] = NewStateWrites()
s.slotWriteSet[addr][slot] = NewRWTxList()
}
s.slotWriteSet[addr][slot].Append(index)
}
}
for addr, sub := range rwSet.slotReadSet {
if _, exist := s.slotReadSet[addr]; !exist {
s.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
for slot := range sub {
if _, exist := s.slotReadSet[addr][slot]; !exist {
s.slotReadSet[addr][slot] = NewRWTxList()
}
s.slotReadSet[addr][slot].Append(index)
}
}
return nil
}

Expand All @@ -723,38 +805,74 @@ func (s *MVStates) finaliseSlotWrite(index int, addr common.Address, slot common
s.slotWriteSet[addr] = make(map[common.Hash]*RWTxList)
}
if _, exist := s.slotWriteSet[addr][slot]; !exist {
s.slotWriteSet[addr][slot] = NewStateWrites()
s.slotWriteSet[addr][slot] = NewRWTxList()
}
s.slotWriteSet[addr][slot].Append(index)
}

func (s *MVStates) finaliseSlotRead(index int, addr common.Address, slot common.Hash) {
// append to pending read set
if _, exist := s.slotReadSet[addr]; !exist {
s.slotReadSet[addr] = make(map[common.Hash]*RWTxList)
}
if _, exist := s.slotReadSet[addr][slot]; !exist {
s.slotReadSet[addr][slot] = NewRWTxList()
}
s.slotReadSet[addr][slot].Append(index)
}

func (s *MVStates) finaliseAccWrite(index int, addr common.Address, state AccountState) {
// append to pending write set
if _, exist := s.accWriteSet[addr]; !exist {
s.accWriteSet[addr] = make(map[AccountState]*RWTxList)
}
if _, exist := s.accWriteSet[addr][state]; !exist {
s.accWriteSet[addr][state] = NewStateWrites()
s.accWriteSet[addr][state] = NewRWTxList()
}
s.accWriteSet[addr][state].Append(index)
}

func (s *MVStates) finaliseAccRead(index int, addr common.Address, state AccountState) {
// append to pending read set
if _, exist := s.accReadSet[addr]; !exist {
s.accReadSet[addr] = make(map[AccountState]*RWTxList)
}
if _, exist := s.accReadSet[addr][state]; !exist {
s.accReadSet[addr][state] = NewRWTxList()
}
s.accReadSet[addr][state].Append(index)
}

func (s *MVStates) queryAccWrites(addr common.Address, state AccountState) *RWTxList {
if _, exist := s.accWriteSet[addr]; !exist {
return nil
}
return s.accWriteSet[addr][state]
}

func (s *MVStates) queryAccReads(addr common.Address, state AccountState) *RWTxList {
if _, exist := s.accReadSet[addr]; !exist {
return nil
}
return s.accReadSet[addr][state]
}

func (s *MVStates) querySlotWrites(addr common.Address, slot common.Hash) *RWTxList {
if _, exist := s.slotWriteSet[addr]; !exist {
return nil
}
return s.slotWriteSet[addr][slot]
}

func (s *MVStates) querySlotReads(addr common.Address, slot common.Hash) *RWTxList {
if _, exist := s.slotReadSet[addr]; !exist {
return nil
}
return s.slotReadSet[addr][slot]
}

// resolveDepsMapCacheByWrites must be executed in order
func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem, writes []RWEventItem) {
for index >= len(s.txDepCache) {
s.txDepCache = append(s.txDepCache, TxDep{})
}
Expand All @@ -769,14 +887,14 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
// check tx dependency, only check key
for _, item := range reads {
// check account states & slots
var writes *RWTxList
var depWrites *RWTxList
if item.Event == ReadAccRWEvent {
writes = s.queryAccWrites(item.Addr, item.State)
depWrites = s.queryAccWrites(item.Addr, item.State)
} else {
writes = s.querySlotWrites(item.Addr, item.Slot)
depWrites = s.querySlotWrites(item.Addr, item.Slot)
}
if writes != nil {
if find := writes.FindLastWrite(index); find >= 0 {
if depWrites != nil {
if find := depWrites.FindLastTx(index); find >= 0 {
if tx := uint64(find); !depSlice.exist(tx) {
depSlice.add(tx)
}
Expand All @@ -788,16 +906,36 @@ func (s *MVStates) resolveDepsMapCacheByWrites(index int, reads []RWEventItem) {
continue
}
addrMap[item.Addr] = struct{}{}
writes = s.queryAccWrites(item.Addr, AccountSuicide)
if writes != nil {
if find := writes.FindLastWrite(index); find >= 0 {
depWrites = s.queryAccWrites(item.Addr, AccountSuicide)
if depWrites != nil {
if find := depWrites.FindLastTx(index); find >= 0 {
if tx := uint64(find); !depSlice.exist(tx) {
depSlice.add(tx)
}
}
}
}
// TODO: check read before write dependency here
// Looking for read operations before write operations, e.g: read->read->read/write execution sequence,
// we need the write transaction to occur after the read transactions.
for _, item := range writes {
var depReads *RWTxList
if item.Event == WriteAccRWEvent {
depReads = s.queryAccReads(item.Addr, item.State)
} else {
depReads = s.querySlotReads(item.Addr, item.Slot)
}
if depReads != nil {
if finds := depReads.FindPrevTxs(index); len(finds) >= 0 {
for _, tx := range finds {
tx := uint64(tx)
if !depSlice.exist(tx) {
depSlice.add(tx)
}
}
}
}
}

for _, addr := range s.gasFeeReceivers {
if _, ok := addrMap[addr]; ok {
rwSet.cannotGasFeeDelay = true
Expand Down
Loading