Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarks tps blue #44

Draft
wants to merge 6 commits into
base: rc/v1.7.next1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion txcache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const maxNumBytesLowerBound = maxNumItemsLowerBound * 1
const maxNumBytesUpperBound = 1_073_741_824 // one GB
const maxNumItemsPerSenderLowerBound = 1
const maxNumBytesPerSenderLowerBound = maxNumItemsPerSenderLowerBound * 1
const maxNumBytesPerSenderUpperBound = 33_554_432 // 32 MB
const maxNumBytesPerSenderUpperBound = 4_000_000_000 // 32 MB
const numTxsToPreemptivelyEvictLowerBound = 1
const numSendersToPreemptivelyEvictLowerBound = 1

Expand Down
9 changes: 5 additions & 4 deletions txcache/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ func (journal *evictionJournal) display() {

// Diagnose checks the state of the cache for inconsistencies and displays a summary
func (cache *TxCache) Diagnose(deep bool) {
cache.diagnoseShallowly()
if deep {
cache.diagnoseDeeply()
}
// Disabled for benchmark.
// cache.diagnoseShallowly()
// if deep {
// cache.diagnoseDeeply()
// }
}

func (cache *TxCache) diagnoseShallowly() {
Expand Down
4 changes: 3 additions & 1 deletion txcache/txCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type TxCache struct {

// NewTxCache creates a new transaction cache
func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, error) {
config.EvictionEnabled = false

log.Debug("NewTxCache", "config", config.String())
monitoring.MonitorNewCache(config.Name, uint64(config.NumBytesThreshold))

Expand Down Expand Up @@ -124,7 +126,7 @@ func (cache *TxCache) doSelectTransactions(numRequested int, batchSizePerSender
copiedInThisPass := 0

for _, txList := range snapshotOfSenders {
batchSizeWithScoreCoefficient := batchSizePerSender * int(txList.getLastComputedScore()+1)
batchSizeWithScoreCoefficient := batchSizePerSender // * int(txList.getLastComputedScore()+1)
// Reset happens on first pass only
isFirstBatch := pass == 0
journal := txList.selectBatchTo(isFirstBatch, result[resultFillIndex:], batchSizeWithScoreCoefficient, bandwidthPerSender)
Expand Down
17 changes: 9 additions & 8 deletions txcache/txListBySenderMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/multiversx/mx-chain-storage-go/txcache/maps"
)

const numberOfScoreChunks = uint32(100)
const numberOfScoreChunks = uint32(1)

// txListBySenderMap is a map-like structure for holding and accessing transactions by sender
type txListBySenderMap struct {
Expand Down Expand Up @@ -75,20 +75,21 @@ func (txMap *txListBySenderMap) getListForSender(sender string) (*txListForSende
}

func (txMap *txListBySenderMap) addSender(sender string) *txListForSender {
listForSender := newTxListForSender(sender, &txMap.senderConstraints, txMap.notifyScoreChange)
listForSender := newTxListForSender(sender, &txMap.senderConstraints)

txMap.backingMap.Set(listForSender)
txMap.backingMap.NotifyScoreChange(listForSender, 0)
txMap.counter.Increment()

return listForSender
}

// This function should only be called in a critical section managed by a "txListForSender"
func (txMap *txListBySenderMap) notifyScoreChange(txList *txListForSender, scoreParams senderScoreParams) {
score := txMap.scoreComputer.computeScore(scoreParams)
txList.setLastComputedScore(score)
txMap.backingMap.NotifyScoreChange(txList, score)
}
// // This function should only be called in a critical section managed by a "txListForSender"
// func (txMap *txListBySenderMap) notifyScoreChange(txList *txListForSender, scoreParams senderScoreParams) {
// score := uint32(0)
// txList.setLastComputedScore(score)
// txMap.backingMap.NotifyScoreChange(txList, score)
// }

// removeTx removes a transaction from the map
func (txMap *txListBySenderMap) removeTx(tx *WrappedTransaction) bool {
Expand Down
137 changes: 69 additions & 68 deletions txcache/txListForSender.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package txcache

import (
"bytes"
"container/list"
"sync"

Expand All @@ -14,22 +13,22 @@ var _ maps.BucketSortedMapItem = (*txListForSender)(nil)

// txListForSender represents a sorted list of transactions of a particular sender
type txListForSender struct {
copyDetectedGap bool
lastComputedScore atomic.Uint32
accountNonceKnown atomic.Flag
sweepable atomic.Flag
copyPreviousNonce uint64
sender string
items *list.List
copyBatchIndex *list.Element
constraints *senderConstraints
scoreChunk *maps.MapChunk
accountNonce atomic.Uint64
totalBytes atomic.Counter
totalGas atomic.Counter
totalFeeScore atomic.Counter
copyDetectedGap bool
lastComputedScore atomic.Uint32
accountNonceKnown atomic.Flag
sweepable atomic.Flag
copyPreviousNonce uint64
sender string
items *list.List
copyBatchIndex *list.Element
constraints *senderConstraints
scoreChunk *maps.MapChunk
accountNonce atomic.Uint64
//totalBytes atomic.Counter
//totalGas atomic.Counter
//totalFeeScore atomic.Counter
numFailedSelections atomic.Counter
onScoreChange scoreChangeCallback
//onScoreChange scoreChangeCallback

scoreChunkMutex sync.RWMutex
mutex sync.RWMutex
Expand All @@ -38,12 +37,11 @@ type txListForSender struct {
type scoreChangeCallback func(value *txListForSender, scoreParams senderScoreParams)

// newTxListForSender creates a new (sorted) list of transactions
func newTxListForSender(sender string, constraints *senderConstraints, onScoreChange scoreChangeCallback) *txListForSender {
func newTxListForSender(sender string, constraints *senderConstraints) *txListForSender {
return &txListForSender{
items: list.New(),
sender: sender,
constraints: constraints,
onScoreChange: onScoreChange,
items: list.New(),
sender: sender,
constraints: constraints,
}
}

Expand All @@ -66,8 +64,8 @@ func (listForSender *txListForSender) AddTx(tx *WrappedTransaction, gasHandler T
}

listForSender.onAddedTransaction(tx, gasHandler, txFeeHelper)
evicted := listForSender.applySizeConstraints()
listForSender.triggerScoreChange()
evicted := [][]byte{}
//listForSender.triggerScoreChange()
return true, evicted
}

Expand All @@ -93,65 +91,65 @@ func (listForSender *txListForSender) applySizeConstraints() [][]byte {
}

func (listForSender *txListForSender) isCapacityExceeded() bool {
maxBytes := int64(listForSender.constraints.maxNumBytes)
maxNumTxs := uint64(listForSender.constraints.maxNumTxs)
tooManyBytes := listForSender.totalBytes.Get() > maxBytes
tooManyTxs := listForSender.countTx() > maxNumTxs

return tooManyBytes || tooManyTxs
return false
}

func (listForSender *txListForSender) onAddedTransaction(tx *WrappedTransaction, gasHandler TxGasHandler, txFeeHelper feeHelper) {
listForSender.totalBytes.Add(tx.Size)
listForSender.totalGas.Add(int64(estimateTxGas(tx)))
listForSender.totalFeeScore.Add(int64(estimateTxFeeScore(tx, gasHandler, txFeeHelper)))
// listForSender.totalBytes.Add(tx.Size)
// listForSender.totalGas.Add(int64(estimateTxGas(tx)))
// listForSender.totalFeeScore.Add(int64(estimateTxFeeScore(tx, gasHandler, txFeeHelper)))
}

func (listForSender *txListForSender) triggerScoreChange() {
scoreParams := listForSender.getScoreParams()
listForSender.onScoreChange(listForSender, scoreParams)
// scoreParams := listForSender.getScoreParams()
// listForSender.onScoreChange(listForSender, scoreParams)
}

// This function should only be used in critical section (listForSender.mutex)
func (listForSender *txListForSender) getScoreParams() senderScoreParams {
fee := listForSender.totalFeeScore.GetUint64()
gas := listForSender.totalGas.GetUint64()
count := listForSender.countTx()
// fee := listForSender.totalFeeScore.GetUint64()
// gas := listForSender.totalGas.GetUint64()
// count := listForSender.countTx()

return senderScoreParams{count: count, feeScore: fee, gas: gas}
// return senderScoreParams{count: count, feeScore: fee, gas: gas}
return senderScoreParams{}
}

// This function should only be used in critical section (listForSender.mutex)
func (listForSender *txListForSender) findInsertionPlace(incomingTx *WrappedTransaction) (*list.Element, error) {
incomingNonce := incomingTx.Tx.GetNonce()
incomingGasPrice := incomingTx.Tx.GetGasPrice()
incomingNonce := incomingTx.TxDirectPointer.Nonce
//incomingGasPrice := incomingTx.Tx.GetGasPrice()

for element := listForSender.items.Back(); element != nil; element = element.Prev() {
currentTx := element.Value.(*WrappedTransaction)
currentTxNonce := currentTx.Tx.GetNonce()
currentTxGasPrice := currentTx.Tx.GetGasPrice()
currentTxNonce := currentTx.TxDirectPointer.Nonce
//currentTxGasPrice := currentTx.Tx.GetGasPrice()

if incomingTx.sameAs(currentTx) {
// The incoming transaction will be discarded
return nil, common.ErrItemAlreadyInCache
}
// if incomingTx.sameAsKnowingThatSenderIsSame(currentTx) {
// // The incoming transaction will be discarded
// return nil, common.ErrItemAlreadyInCache
// }

if currentTxNonce == incomingNonce {
if currentTxGasPrice > incomingGasPrice {
// The incoming transaction will be placed right after the existing one, which has same nonce but higher price.
// If the nonces are the same, but the incoming gas price is higher or equal, the search loop continues.
return element, nil
}
if currentTxGasPrice == incomingGasPrice {
// The incoming transaction will be placed right after the existing one, which has same nonce and the same price.
// (but different hash, because of some other fields like receiver, value or data)
// This will order out the transactions having the same nonce and gas price
if bytes.Compare(currentTx.TxHash, incomingTx.TxHash) < 0 {
return element, nil
}
}
return nil, common.ErrItemAlreadyInCache
}

// if currentTxNonce == incomingNonce {
// if currentTxGasPrice > incomingGasPrice {
// // The incoming transaction will be placed right after the existing one, which has same nonce but higher price.
// // If the nonces are the same, but the incoming gas price is higher or equal, the search loop continues.
// return element, nil
// }
// if currentTxGasPrice == incomingGasPrice {
// // The incoming transaction will be placed right after the existing one, which has same nonce and the same price.
// // (but different hash, because of some other fields like receiver, value or data)
// // This will order out the transactions having the same nonce and gas price
// if bytes.Compare(currentTx.TxHash, incomingTx.TxHash) < 0 {
// return element, nil
// }
// }
// }

if currentTxNonce < incomingNonce {
// We've found the first transaction with a lower nonce than the incoming one,
// thus the incoming transaction will be placed right after this one.
Expand All @@ -174,34 +172,35 @@ func (listForSender *txListForSender) RemoveTx(tx *WrappedTransaction) bool {
if isFound {
listForSender.items.Remove(marker)
listForSender.onRemovedListElement(marker)
listForSender.triggerScoreChange()
//listForSender.triggerScoreChange()
}

return isFound
}

func (listForSender *txListForSender) onRemovedListElement(element *list.Element) {
value := element.Value.(*WrappedTransaction)
// value := element.Value.(*WrappedTransaction)

listForSender.totalBytes.Subtract(value.Size)
listForSender.totalGas.Subtract(int64(estimateTxGas(value)))
listForSender.totalFeeScore.Subtract(int64(value.TxFeeScoreNormalized))
// listForSender.totalBytes.Subtract(value.Size)
// listForSender.totalGas.Subtract(int64(estimateTxGas(value)))
// listForSender.totalFeeScore.Subtract(int64(value.TxFeeScoreNormalized))
}

// This function should only be used in critical section (listForSender.mutex)
func (listForSender *txListForSender) findListElementWithTx(txToFind *WrappedTransaction) *list.Element {
txToFindHash := txToFind.TxHash
txToFindNonce := txToFind.Tx.GetNonce()
// txToFindHash := txToFind.TxHash
txToFindNonce := txToFind.TxDirectPointer.Nonce

for element := listForSender.items.Front(); element != nil; element = element.Next() {
value := element.Value.(*WrappedTransaction)
thisNonce := value.TxDirectPointer.Nonce

if bytes.Equal(value.TxHash, txToFindHash) {
if thisNonce == txToFindNonce {
return element
}

// Optimization: stop search at this point, since the list is sorted by nonce
if value.Tx.GetNonce() > txToFindNonce {
if thisNonce > txToFindNonce {
break
}
}
Expand Down Expand Up @@ -245,6 +244,7 @@ func (listForSender *txListForSender) selectBatchTo(isFirstBatch bool, destinati
// There is an exception though: if this is the first read operation for the sender in the current selection process and the sender is in the grace period,
// then one transaction will be returned. But subsequent reads for this sender will return nothing.
if detectedGap {
log.Debug("Detected gap for sender", "sender", listForSender.sender, "nonce", previousNonce)
if isFirstBatch && listForSender.isInGracePeriod() {
journal.isGracePeriod = true
batchSize = 1
Expand All @@ -266,6 +266,7 @@ func (listForSender *txListForSender) selectBatchTo(isFirstBatch bool, destinati
lastTxGasLimit = value.Tx.GetGasLimit()

if previousNonce > 0 && txNonce > previousNonce+1 {
log.Debug("Detected gap for sender - middle", "sender", listForSender.sender, "nonce", previousNonce)
listForSender.copyDetectedGap = true
journal.hasMiddleGap = true
break
Expand Down
2 changes: 2 additions & 0 deletions txcache/wrappedTransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"

"github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/transaction"
)

const processFeeFactor = float64(0.8) // 80%

// WrappedTransaction contains a transaction, its hash and extra information
type WrappedTransaction struct {
Tx data.TransactionHandler
TxDirectPointer *transaction.Transaction
TxHash []byte
SenderShardID uint32
ReceiverShardID uint32
Expand Down
Loading