Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Qi denom aggregation for first tx in block #2146

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
var totalEtxAppendTime time.Duration
var totalEtxCoinbaseTime time.Duration
totalQiProcessTimes := make(map[string]time.Duration)
firstQiTx := true
for i, tx := range block.Transactions() {
startProcess := time.Now()

Expand All @@ -346,10 +347,11 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
if _, ok := senders[tx.Hash()]; ok {
checkSig = false
}
fees, etxs, err, timing := ProcessQiTx(tx, p.hc, checkSig, header, batch, p.hc.headerDb, gp, usedGas, p.hc.pool.signer, p.hc.NodeLocation(), *p.config.ChainID, &etxRLimit, &etxPLimit, utxosCreatedDeleted)
fees, etxs, err, timing := ProcessQiTx(tx, p.hc, checkSig, firstQiTx, header, batch, p.hc.headerDb, gp, usedGas, p.hc.pool.signer, p.hc.NodeLocation(), *p.config.ChainID, &etxRLimit, &etxPLimit, utxosCreatedDeleted)
if err != nil {
return nil, nil, nil, nil, 0, 0, nil, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
firstQiTx = false
startEtxAppend := time.Now()
for _, etx := range etxs {
emittedEtxs = append(emittedEtxs, types.NewTx(etx))
Expand Down Expand Up @@ -813,9 +815,6 @@ func ValidateQiTxInputs(tx *types.Transaction, chain ChainContext, db ethdb.Read
outputs[uint(txOut.Denomination)] -= 1 // This output no longer exists because it has been aggregated
}
}
if err := CheckDenominations(inputs, outputs); err != nil {
return nil, err
}
return totalQitIn, nil

}
Expand Down Expand Up @@ -966,7 +965,7 @@ func ValidateQiTxOutputsAndSignature(tx *types.Transaction, chain ChainContext,
return txFeeInQit, nil
}

func ProcessQiTx(tx *types.Transaction, chain ChainContext, checkSig bool, currentHeader *types.WorkObject, batch ethdb.Batch, db ethdb.Reader, gp *types.GasPool, usedGas *uint64, signer types.Signer, location common.Location, chainId big.Int, etxRLimit, etxPLimit *int, utxosCreatedDeleted *UtxosCreatedDeleted) (*big.Int, []*types.ExternalTx, error, map[string]time.Duration) {
func ProcessQiTx(tx *types.Transaction, chain ChainContext, checkSig, isFirstQiTx bool, currentHeader *types.WorkObject, batch ethdb.Batch, db ethdb.Reader, gp *types.GasPool, usedGas *uint64, signer types.Signer, location common.Location, chainId big.Int, etxRLimit, etxPLimit *int, utxosCreatedDeleted *UtxosCreatedDeleted) (*big.Int, []*types.ExternalTx, error, map[string]time.Duration) {
var elapsedTime time.Duration
stepTimings := make(map[string]time.Duration)

Expand Down Expand Up @@ -1191,8 +1190,10 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, checkSig bool, curre

// Start timing for signature check
stepStart = time.Now()
if err := CheckDenominations(inputs, outputs); err != nil {
return nil, nil, err, nil
if !isFirstQiTx {
if err := CheckDenominations(inputs, outputs); err != nil {
return nil, nil, err, nil
}
}
// Ensure the transaction signature is valid
if checkSig {
Expand Down
47 changes: 43 additions & 4 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ var (
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 1 * time.Minute // Time interval to report transaction pool stats
qiExpirationCheckInterval = 10 * time.Minute // Time interval to check for expired Qi transactions
qiExpirationCheckDivisor = 5 // Check 1/nth of the pool for expired Qi transactions every interval
)

var (
Expand Down Expand Up @@ -187,6 +189,7 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
QiPoolSize uint64 // Maximum number of Qi transactions to store
QiTxLifetime time.Duration // Maximum amount of time Qi transactions are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReorgFrequency time.Duration // Frequency of reorgs outside of new head events
}
Expand All @@ -208,6 +211,7 @@ var DefaultTxPoolConfig = TxPoolConfig{
AccountQueue: 3,
GlobalQueue: 20048,
QiPoolSize: 10024,
QiTxLifetime: 30 * time.Minute,
Lifetime: 3 * time.Hour,
ReorgFrequency: 1 * time.Second,
}
Expand Down Expand Up @@ -272,6 +276,13 @@ func (config *TxPoolConfig) sanitize(logger *log.Logger) TxPoolConfig {
}).Warn("Sanitizing invalid txpool Qi pool size")
conf.QiPoolSize = DefaultTxPoolConfig.QiPoolSize
}
if conf.QiTxLifetime < time.Second {
logger.WithFields(log.Fields{
"provided": conf.QiTxLifetime,
"updated": DefaultTxPoolConfig.QiTxLifetime,
}).Warn("Sanitizing invalid txpool Qi transaction lifetime")
conf.QiTxLifetime = DefaultTxPoolConfig.QiTxLifetime
}
if conf.Lifetime < 1 {
logger.WithFields(log.Fields{
"provided": conf.Lifetime,
Expand Down Expand Up @@ -462,6 +473,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
go pool.poolLimiterGoroutine()
go pool.feesGoroutine()
go pool.invalidQiTxGoroutine()
go pool.qiTxExpirationGoroutine()
return pool
}

Expand Down Expand Up @@ -1228,7 +1240,7 @@ func (pool *TxPool) addQiTxs(txs types.Transactions) []error {
errs = append(errs, err)
continue
}
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, misc.QiToQuai(currentBlock.WorkObjectHeader(), fee))
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, misc.QiToQuai(currentBlock.WorkObjectHeader(), fee), time.Now())
if err != nil {
errs = append(errs, err)
continue
Expand Down Expand Up @@ -1306,7 +1318,7 @@ func (pool *TxPool) addQiTxsWithoutValidationLocked(txs types.Transactions) {
pool.logger.Error("feesCh is full, skipping until there is room")
}
}
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, misc.QiToQuai(pool.chain.CurrentBlock().WorkObjectHeader(), fee))
txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, misc.QiToQuai(pool.chain.CurrentBlock().WorkObjectHeader(), fee), time.Now())
if err != nil {
pool.logger.Error("Error creating txWithMinerFee: " + err.Error())
continue
Expand Down Expand Up @@ -2356,6 +2368,33 @@ func (pool *TxPool) invalidQiTxGoroutine() {
}
}

func (pool *TxPool) qiTxExpirationGoroutine() {
defer func() {
if r := recover(); r != nil {
pool.logger.WithFields(log.Fields{
"error": r,
"stacktrace": string(debug.Stack()),
}).Error("Go-Quai Panicked")
}
}()
ticker := time.NewTicker(qiExpirationCheckInterval)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
// Remove expired QiTxs
// Grabbing lock is not necessary as LRU already has lock internally
for i := 0; i < pool.qiPool.Len()/qiExpirationCheckDivisor; i++ {
_, oldestTx, _ := pool.qiPool.GetOldest()
if time.Since(oldestTx.Received()) > pool.config.QiTxLifetime {
pool.qiPool.Remove(oldestTx.Tx().Hash())
}
}
}
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.InternalAddress
Expand Down
13 changes: 8 additions & 5 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,19 +953,22 @@ func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
type TxWithMinerFee struct {
tx *Transaction
minerFee *big.Int
received time.Time
}

func (tx *TxWithMinerFee) Tx() *Transaction { return tx.tx }
func (tx *TxWithMinerFee) MinerFee() *big.Int { return tx.minerFee }
func (tx *TxWithMinerFee) Tx() *Transaction { return tx.tx }
func (tx *TxWithMinerFee) MinerFee() *big.Int { return tx.minerFee }
func (tx *TxWithMinerFee) Received() time.Time { return tx.received }

// NewTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap.
func NewTxWithMinerFee(tx *Transaction, baseFee *big.Int, qiTxFee *big.Int) (*TxWithMinerFee, error) {
func NewTxWithMinerFee(tx *Transaction, baseFee *big.Int, qiTxFee *big.Int, received time.Time) (*TxWithMinerFee, error) {
if tx.Type() == QiTxType {
return &TxWithMinerFee{
tx: tx,
minerFee: qiTxFee,
received: received,
}, nil
}
minerFee, err := tx.EffectiveGasTip(baseFee)
Expand Down Expand Up @@ -1030,7 +1033,7 @@ func NewTransactionsByPriceAndNonce(signer Signer, qiTxs []*TxWithMinerFee, txs
if err != nil {
continue
}
wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee, nil)
wrapped, err := NewTxWithMinerFee(accTxs[0], baseFee, nil, time.Time{})
// Remove transaction if sender doesn't match from, or if wrapping fails.
if acc.Bytes20() != from || err != nil {
delete(txs, from)
Expand Down Expand Up @@ -1073,7 +1076,7 @@ func (t *TransactionsByPriceAndNonce) GetFee() *big.Int {
// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsByPriceAndNonce) Shift(acc common.AddressBytes, sort bool) {
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
if wrapped, err := NewTxWithMinerFee(txs[0], t.baseFee, nil); err == nil {
if wrapped, err := NewTxWithMinerFee(txs[0], t.baseFee, nil, time.Time{}); err == nil {
t.heads[0], t.txs[acc] = wrapped, txs[1:]
if sort {
heap.Fix(&t.heads, 0)
Expand Down
12 changes: 8 additions & 4 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ func (w *worker) commitTransactions(env *environment, parent *types.WorkObject,
w.logger.WithField("err", err).Error("Failed to commit an etx")
}
}
firstQiTx := true
for {
// In the following two cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
Expand Down Expand Up @@ -965,7 +966,7 @@ func (w *worker) commitTransactions(env *environment, parent *types.WorkObject,
}).Trace("Not enough gas for further transactions")
break
}
if err := w.processQiTx(tx, env, parent); err != nil {
if err := w.processQiTx(tx, env, parent, firstQiTx); err != nil {
if strings.Contains(err.Error(), "emits too many") || strings.Contains(err.Error(), "double spends") {
// This is not an invalid tx, our block is just full of ETXs
// Alternatively, a tx double spends a cached deleted UTXO, likely replaced-by-fee
Expand All @@ -989,6 +990,7 @@ func (w *worker) commitTransactions(env *environment, parent *types.WorkObject,
// It's unlikely that this transaction will be valid in the future so remove it asynchronously
qiTxsToRemove = append(qiTxsToRemove, &hash)
}
firstQiTx = false
txs.PopNoSort()
continue
}
Expand Down Expand Up @@ -1535,7 +1537,7 @@ func (w *worker) CurrentInfo(header *types.WorkObject) bool {
return header.NumberU64(w.hc.NodeCtx())+c_startingPrintLimit > w.hc.CurrentHeader().NumberU64(w.hc.NodeCtx())
}

func (w *worker) processQiTx(tx *types.Transaction, env *environment, parent *types.WorkObject) error {
func (w *worker) processQiTx(tx *types.Transaction, env *environment, parent *types.WorkObject, firstQiTx bool) error {
location := w.hc.NodeLocation()
if tx.Type() != types.QiTxType {
return fmt.Errorf("tx %032x is not a QiTx", tx.Hash())
Expand Down Expand Up @@ -1744,8 +1746,10 @@ func (w *worker) processQiTx(tx *types.Transaction, env *environment, parent *ty
env.utxosDelete = append(env.utxosDelete, utxosDeleteHashes...)
env.utxosCreate = append(env.utxosCreate, utxosCreateHashes...)

if err := CheckDenominations(inputs, outputs); err != nil {
return err
if !firstQiTx { // The first transaction in the block can skip denominations check
if err := CheckDenominations(inputs, outputs); err != nil {
return err
}
}
// We could add signature verification here, but it's already checked in the mempool and the signature can't be changed, so duplication is largely unnecessary
return nil
Expand Down
Loading