diff --git a/txcache/config.go b/txcache/config.go index 40236d3d..b48a31bf 100644 --- a/txcache/config.go +++ b/txcache/config.go @@ -14,7 +14,7 @@ const maxNumBytesLowerBound = maxNumItemsLowerBound * 1 const maxNumBytesUpperBound = 1_073_741_824 // one GB const maxNumItemsPerSenderLowerBound = 1 const maxNumBytesPerSenderLowerBound = maxNumItemsPerSenderLowerBound * 1 -const maxNumBytesPerSenderUpperBound = 33_554_432 // 32 MB +const maxNumBytesPerSenderUpperBound = 4_000_000_000 // 32 MB const numTxsToPreemptivelyEvictLowerBound = 1 const numSendersToPreemptivelyEvictLowerBound = 1 diff --git a/txcache/monitoring.go b/txcache/monitoring.go index 7d8ad284..d27295c9 100644 --- a/txcache/monitoring.go +++ b/txcache/monitoring.go @@ -120,10 +120,11 @@ func (journal *evictionJournal) display() { // Diagnose checks the state of the cache for inconsistencies and displays a summary func (cache *TxCache) Diagnose(deep bool) { - cache.diagnoseShallowly() - if deep { - cache.diagnoseDeeply() - } + // Disabled for benchmark. + // cache.diagnoseShallowly() + // if deep { + // cache.diagnoseDeeply() + // } } func (cache *TxCache) diagnoseShallowly() { diff --git a/txcache/txCache.go b/txcache/txCache.go index d938b976..b993876f 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -33,6 +33,8 @@ type TxCache struct { // NewTxCache creates a new transaction cache func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, error) { + config.EvictionEnabled = false + log.Debug("NewTxCache", "config", config.String()) monitoring.MonitorNewCache(config.Name, uint64(config.NumBytesThreshold)) @@ -124,7 +126,7 @@ func (cache *TxCache) doSelectTransactions(numRequested int, batchSizePerSender copiedInThisPass := 0 for _, txList := range snapshotOfSenders { - batchSizeWithScoreCoefficient := batchSizePerSender * int(txList.getLastComputedScore()+1) + batchSizeWithScoreCoefficient := batchSizePerSender // * int(txList.getLastComputedScore()+1) // Reset happens on first pass only isFirstBatch := pass == 0 journal := txList.selectBatchTo(isFirstBatch, result[resultFillIndex:], batchSizeWithScoreCoefficient, bandwidthPerSender) diff --git a/txcache/txListBySenderMap.go b/txcache/txListBySenderMap.go index ccda1ce0..4a9629aa 100644 --- a/txcache/txListBySenderMap.go +++ b/txcache/txListBySenderMap.go @@ -7,7 +7,7 @@ import ( "github.com/multiversx/mx-chain-storage-go/txcache/maps" ) -const numberOfScoreChunks = uint32(100) +const numberOfScoreChunks = uint32(1) // txListBySenderMap is a map-like structure for holding and accessing transactions by sender type txListBySenderMap struct { @@ -75,20 +75,21 @@ func (txMap *txListBySenderMap) getListForSender(sender string) (*txListForSende } func (txMap *txListBySenderMap) addSender(sender string) *txListForSender { - listForSender := newTxListForSender(sender, &txMap.senderConstraints, txMap.notifyScoreChange) + listForSender := newTxListForSender(sender, &txMap.senderConstraints) txMap.backingMap.Set(listForSender) + txMap.backingMap.NotifyScoreChange(listForSender, 0) txMap.counter.Increment() return listForSender } -// This function should only be called in a critical section managed by a "txListForSender" -func (txMap *txListBySenderMap) notifyScoreChange(txList *txListForSender, scoreParams senderScoreParams) { - score := txMap.scoreComputer.computeScore(scoreParams) - txList.setLastComputedScore(score) - txMap.backingMap.NotifyScoreChange(txList, score) -} +// // This function should only be called in a critical section managed by a "txListForSender" +// func (txMap *txListBySenderMap) notifyScoreChange(txList *txListForSender, scoreParams senderScoreParams) { +// score := uint32(0) +// txList.setLastComputedScore(score) +// txMap.backingMap.NotifyScoreChange(txList, score) +// } // removeTx removes a transaction from the map func (txMap *txListBySenderMap) removeTx(tx *WrappedTransaction) bool { diff --git a/txcache/txListForSender.go b/txcache/txListForSender.go index a12a91d1..de1b983d 100644 --- a/txcache/txListForSender.go +++ b/txcache/txListForSender.go @@ -1,7 +1,6 @@ package txcache import ( - "bytes" "container/list" "sync" @@ -14,22 +13,22 @@ var _ maps.BucketSortedMapItem = (*txListForSender)(nil) // txListForSender represents a sorted list of transactions of a particular sender type txListForSender struct { - copyDetectedGap bool - lastComputedScore atomic.Uint32 - accountNonceKnown atomic.Flag - sweepable atomic.Flag - copyPreviousNonce uint64 - sender string - items *list.List - copyBatchIndex *list.Element - constraints *senderConstraints - scoreChunk *maps.MapChunk - accountNonce atomic.Uint64 - totalBytes atomic.Counter - totalGas atomic.Counter - totalFeeScore atomic.Counter + copyDetectedGap bool + lastComputedScore atomic.Uint32 + accountNonceKnown atomic.Flag + sweepable atomic.Flag + copyPreviousNonce uint64 + sender string + items *list.List + copyBatchIndex *list.Element + constraints *senderConstraints + scoreChunk *maps.MapChunk + accountNonce atomic.Uint64 + //totalBytes atomic.Counter + //totalGas atomic.Counter + //totalFeeScore atomic.Counter numFailedSelections atomic.Counter - onScoreChange scoreChangeCallback + //onScoreChange scoreChangeCallback scoreChunkMutex sync.RWMutex mutex sync.RWMutex @@ -38,12 +37,11 @@ type txListForSender struct { type scoreChangeCallback func(value *txListForSender, scoreParams senderScoreParams) // newTxListForSender creates a new (sorted) list of transactions -func newTxListForSender(sender string, constraints *senderConstraints, onScoreChange scoreChangeCallback) *txListForSender { +func newTxListForSender(sender string, constraints *senderConstraints) *txListForSender { return &txListForSender{ - items: list.New(), - sender: sender, - constraints: constraints, - onScoreChange: onScoreChange, + items: list.New(), + sender: sender, + constraints: constraints, } } @@ -66,8 +64,8 @@ func (listForSender *txListForSender) AddTx(tx *WrappedTransaction, gasHandler T } listForSender.onAddedTransaction(tx, gasHandler, txFeeHelper) - evicted := listForSender.applySizeConstraints() - listForSender.triggerScoreChange() + evicted := [][]byte{} + //listForSender.triggerScoreChange() return true, evicted } @@ -93,65 +91,65 @@ func (listForSender *txListForSender) applySizeConstraints() [][]byte { } func (listForSender *txListForSender) isCapacityExceeded() bool { - maxBytes := int64(listForSender.constraints.maxNumBytes) - maxNumTxs := uint64(listForSender.constraints.maxNumTxs) - tooManyBytes := listForSender.totalBytes.Get() > maxBytes - tooManyTxs := listForSender.countTx() > maxNumTxs - - return tooManyBytes || tooManyTxs + return false } func (listForSender *txListForSender) onAddedTransaction(tx *WrappedTransaction, gasHandler TxGasHandler, txFeeHelper feeHelper) { - listForSender.totalBytes.Add(tx.Size) - listForSender.totalGas.Add(int64(estimateTxGas(tx))) - listForSender.totalFeeScore.Add(int64(estimateTxFeeScore(tx, gasHandler, txFeeHelper))) + // listForSender.totalBytes.Add(tx.Size) + // listForSender.totalGas.Add(int64(estimateTxGas(tx))) + // listForSender.totalFeeScore.Add(int64(estimateTxFeeScore(tx, gasHandler, txFeeHelper))) } func (listForSender *txListForSender) triggerScoreChange() { - scoreParams := listForSender.getScoreParams() - listForSender.onScoreChange(listForSender, scoreParams) + // scoreParams := listForSender.getScoreParams() + // listForSender.onScoreChange(listForSender, scoreParams) } // This function should only be used in critical section (listForSender.mutex) func (listForSender *txListForSender) getScoreParams() senderScoreParams { - fee := listForSender.totalFeeScore.GetUint64() - gas := listForSender.totalGas.GetUint64() - count := listForSender.countTx() + // fee := listForSender.totalFeeScore.GetUint64() + // gas := listForSender.totalGas.GetUint64() + // count := listForSender.countTx() - return senderScoreParams{count: count, feeScore: fee, gas: gas} + // return senderScoreParams{count: count, feeScore: fee, gas: gas} + return senderScoreParams{} } // This function should only be used in critical section (listForSender.mutex) func (listForSender *txListForSender) findInsertionPlace(incomingTx *WrappedTransaction) (*list.Element, error) { - incomingNonce := incomingTx.Tx.GetNonce() - incomingGasPrice := incomingTx.Tx.GetGasPrice() + incomingNonce := incomingTx.TxDirectPointer.Nonce + //incomingGasPrice := incomingTx.Tx.GetGasPrice() for element := listForSender.items.Back(); element != nil; element = element.Prev() { currentTx := element.Value.(*WrappedTransaction) - currentTxNonce := currentTx.Tx.GetNonce() - currentTxGasPrice := currentTx.Tx.GetGasPrice() + currentTxNonce := currentTx.TxDirectPointer.Nonce + //currentTxGasPrice := currentTx.Tx.GetGasPrice() - if incomingTx.sameAs(currentTx) { - // The incoming transaction will be discarded - return nil, common.ErrItemAlreadyInCache - } + // if incomingTx.sameAsKnowingThatSenderIsSame(currentTx) { + // // The incoming transaction will be discarded + // return nil, common.ErrItemAlreadyInCache + // } if currentTxNonce == incomingNonce { - if currentTxGasPrice > incomingGasPrice { - // The incoming transaction will be placed right after the existing one, which has same nonce but higher price. - // If the nonces are the same, but the incoming gas price is higher or equal, the search loop continues. - return element, nil - } - if currentTxGasPrice == incomingGasPrice { - // The incoming transaction will be placed right after the existing one, which has same nonce and the same price. - // (but different hash, because of some other fields like receiver, value or data) - // This will order out the transactions having the same nonce and gas price - if bytes.Compare(currentTx.TxHash, incomingTx.TxHash) < 0 { - return element, nil - } - } + return nil, common.ErrItemAlreadyInCache } + // if currentTxNonce == incomingNonce { + // if currentTxGasPrice > incomingGasPrice { + // // The incoming transaction will be placed right after the existing one, which has same nonce but higher price. + // // If the nonces are the same, but the incoming gas price is higher or equal, the search loop continues. + // return element, nil + // } + // if currentTxGasPrice == incomingGasPrice { + // // The incoming transaction will be placed right after the existing one, which has same nonce and the same price. + // // (but different hash, because of some other fields like receiver, value or data) + // // This will order out the transactions having the same nonce and gas price + // if bytes.Compare(currentTx.TxHash, incomingTx.TxHash) < 0 { + // return element, nil + // } + // } + // } + if currentTxNonce < incomingNonce { // We've found the first transaction with a lower nonce than the incoming one, // thus the incoming transaction will be placed right after this one. @@ -174,34 +172,35 @@ func (listForSender *txListForSender) RemoveTx(tx *WrappedTransaction) bool { if isFound { listForSender.items.Remove(marker) listForSender.onRemovedListElement(marker) - listForSender.triggerScoreChange() + //listForSender.triggerScoreChange() } return isFound } func (listForSender *txListForSender) onRemovedListElement(element *list.Element) { - value := element.Value.(*WrappedTransaction) + // value := element.Value.(*WrappedTransaction) - listForSender.totalBytes.Subtract(value.Size) - listForSender.totalGas.Subtract(int64(estimateTxGas(value))) - listForSender.totalFeeScore.Subtract(int64(value.TxFeeScoreNormalized)) + // listForSender.totalBytes.Subtract(value.Size) + // listForSender.totalGas.Subtract(int64(estimateTxGas(value))) + // listForSender.totalFeeScore.Subtract(int64(value.TxFeeScoreNormalized)) } // This function should only be used in critical section (listForSender.mutex) func (listForSender *txListForSender) findListElementWithTx(txToFind *WrappedTransaction) *list.Element { - txToFindHash := txToFind.TxHash - txToFindNonce := txToFind.Tx.GetNonce() + // txToFindHash := txToFind.TxHash + txToFindNonce := txToFind.TxDirectPointer.Nonce for element := listForSender.items.Front(); element != nil; element = element.Next() { value := element.Value.(*WrappedTransaction) + thisNonce := value.TxDirectPointer.Nonce - if bytes.Equal(value.TxHash, txToFindHash) { + if thisNonce == txToFindNonce { return element } // Optimization: stop search at this point, since the list is sorted by nonce - if value.Tx.GetNonce() > txToFindNonce { + if thisNonce > txToFindNonce { break } } @@ -245,6 +244,7 @@ func (listForSender *txListForSender) selectBatchTo(isFirstBatch bool, destinati // There is an exception though: if this is the first read operation for the sender in the current selection process and the sender is in the grace period, // then one transaction will be returned. But subsequent reads for this sender will return nothing. if detectedGap { + log.Debug("Detected gap for sender", "sender", listForSender.sender, "nonce", previousNonce) if isFirstBatch && listForSender.isInGracePeriod() { journal.isGracePeriod = true batchSize = 1 @@ -266,6 +266,7 @@ func (listForSender *txListForSender) selectBatchTo(isFirstBatch bool, destinati lastTxGasLimit = value.Tx.GetGasLimit() if previousNonce > 0 && txNonce > previousNonce+1 { + log.Debug("Detected gap for sender - middle", "sender", listForSender.sender, "nonce", previousNonce) listForSender.copyDetectedGap = true journal.hasMiddleGap = true break diff --git a/txcache/wrappedTransaction.go b/txcache/wrappedTransaction.go index 281dd8ab..e995cc94 100644 --- a/txcache/wrappedTransaction.go +++ b/txcache/wrappedTransaction.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-core-go/data/transaction" ) const processFeeFactor = float64(0.8) // 80% @@ -11,6 +12,7 @@ const processFeeFactor = float64(0.8) // 80% // WrappedTransaction contains a transaction, its hash and extra information type WrappedTransaction struct { Tx data.TransactionHandler + TxDirectPointer *transaction.Transaction TxHash []byte SenderShardID uint32 ReceiverShardID uint32