From e98d8b8b65d86296014cd310c0e438043ebbac22 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 00:22:59 -0500 Subject: [PATCH 01/15] ugh --- cosmos/runtime/txpool/ante.go | 1 - cosmos/runtime/txpool/comet.go | 31 +++++++++---------------------- cosmos/runtime/txpool/mempool.go | 4 ---- 3 files changed, 9 insertions(+), 27 deletions(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 18ea5905c..00215f04b 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -53,7 +53,6 @@ func (m *Mempool) AnteHandle( if shouldEject := m.shouldEjectFromCometMempool( ctx.BlockTime().Unix(), ethTx, ); shouldEject { - m.crc.DropRemoteTx(ethTx.Hash()) telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c40809db9..82d3c68d6 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -21,14 +21,14 @@ package txpool import ( - "sync" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" ) const ( - defaultCacheSize = 4096 + defaultCacheSize = 100000 ) // CometRemoteCache is used to mark which txs are added to our Polaris node remotely from @@ -37,43 +37,30 @@ type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool MarkRemoteSeen(txHash common.Hash) TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp - DropRemoteTx(txHash common.Hash) } // Thread-safe implementation of CometRemoteCache. type cometRemoteCache struct { - timeInserted map[common.Hash]int64 - timeInsertedMu sync.RWMutex + timeInserted *lru.Cache[common.Hash, int64] } func newCometRemoteCache() *cometRemoteCache { return &cometRemoteCache{ - timeInserted: make(map[common.Hash]int64, defaultCacheSize), + + timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize), } } func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { - crc.timeInsertedMu.RLock() - defer crc.timeInsertedMu.RUnlock() - _, ok := crc.timeInserted[txHash] - return ok + return crc.timeInserted.Contains(txHash) } // Record the time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { - crc.timeInsertedMu.Lock() - crc.timeInserted[txHash] = time.Now().Unix() - crc.timeInsertedMu.Unlock() + crc.timeInserted.Add(txHash, time.Now().Unix()) } func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 { - crc.timeInsertedMu.RLock() - defer crc.timeInsertedMu.RUnlock() - return crc.timeInserted[txHash] -} - -func (crc *cometRemoteCache) DropRemoteTx(txHash common.Hash) { - crc.timeInsertedMu.Lock() - delete(crc.timeInserted, txHash) - crc.timeInsertedMu.Unlock() + i, _ := crc.timeInserted.Get(txHash) + return i } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index cb63403fe..4a043d516 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -172,10 +172,6 @@ func (m *Mempool) Remove(tx sdk.Tx) error { if err := ethTx.UnmarshalBinary(txBz); err != nil { continue } - txHash := ethTx.Hash() - - // Remove the eth tx from comet seen tx cache. - m.crc.DropRemoteTx(txHash) } } return nil From fdfada924b88c6cad4b9be49a699823e3f305fd7 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 00:27:46 -0500 Subject: [PATCH 02/15] boom --- cosmos/runtime/txpool/mempool.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 4a043d516..db7f9bad0 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -158,21 +158,5 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator { // Remove is an intentional no-op as the eth txpool handles removals. func (m *Mempool) Remove(tx sdk.Tx) error { - // Get the Eth payload envelope from the Cosmos transaction. - msgs := tx.GetMsgs() - if len(msgs) == 1 { - env, ok := utils.GetAs[*types.WrappedPayloadEnvelope](msgs[0]) - if !ok { - return nil - } - - // Unwrap the payload to unpack the individual eth transactions to remove from the txpool. - for _, txBz := range env.UnwrapPayload().ExecutionPayload.Transactions { - ethTx := new(ethtypes.Transaction) - if err := ethTx.UnmarshalBinary(txBz); err != nil { - continue - } - } - } return nil } From 7a0f5e9248eefbcba76ccf395b5c8bb2cff7caa9 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 00:31:56 -0500 Subject: [PATCH 03/15] mark seen --- cosmos/runtime/txpool/comet.go | 10 +++++++--- cosmos/runtime/txpool/mempool.go | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index 82d3c68d6..4e5c5c68d 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -35,7 +35,7 @@ const ( // Comet CheckTX and when. type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool - MarkRemoteSeen(txHash common.Hash) + MarkRemoteSeen(txHash common.Hash) bool TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp } @@ -56,8 +56,12 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { } // Record the time the tx was inserted from Comet successfully. -func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) { - crc.timeInserted.Add(txHash, time.Now().Unix()) +func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool { + if !crc.timeInserted.Contains(txHash) { + crc.timeInserted.Add(txHash, time.Now().Unix()) + return true + } + return false } func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 { diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index db7f9bad0..0d58c8b2a 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -140,7 +140,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { } // Add the eth tx to the remote cache. - m.crc.MarkRemoteSeen(ethTx.Hash()) + _ = m.crc.MarkRemoteSeen(ethTx.Hash()) return nil } @@ -157,6 +157,6 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator { } // Remove is an intentional no-op as the eth txpool handles removals. -func (m *Mempool) Remove(tx sdk.Tx) error { +func (m *Mempool) Remove(_ sdk.Tx) error { return nil } From a6378e0607deb727a0b57686c70443531fcf681c Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 15:26:03 -0500 Subject: [PATCH 04/15] based --- cosmos/config/config.go | 9 +++++ cosmos/config/flags/flags.go | 8 +++-- cosmos/config/template.go | 3 ++ cosmos/runtime/ante/ante.go | 7 +++- cosmos/runtime/runtime.go | 9 +++-- cosmos/runtime/txpool/handler.go | 28 ++++++++++----- cosmos/runtime/txpool/handler_test.go | 2 +- cosmos/runtime/txpool/mempool.go | 49 ++++++++++++++++++++++----- eth/polar/config.go | 3 ++ 9 files changed, 94 insertions(+), 24 deletions(-) diff --git a/cosmos/config/config.go b/cosmos/config/config.go index 1c3859f44..f98679b78 100644 --- a/cosmos/config/config.go +++ b/cosmos/config/config.go @@ -94,6 +94,15 @@ func readConfigFromAppOptsParser(parser AppOptionsParser) (*Config, error) { return nil, err } + if conf.Polar.IsValidator, err = parser.GetBool(flags.IsValidator); err != nil { + return nil, err + } + + if conf.Polar.ValidatorJSONRPCEndpoint, err = + parser.GetString(flags.ValidatorJSONRPCEndpoint); err != nil { + return nil, err + } + // Polar Miner settings if conf.Polar.Miner.Etherbase, err = parser.GetCommonAddress(flags.MinerEtherbase); err != nil { diff --git a/cosmos/config/flags/flags.go b/cosmos/config/flags/flags.go index 90ae84edc..02fc271b9 100644 --- a/cosmos/config/flags/flags.go +++ b/cosmos/config/flags/flags.go @@ -24,9 +24,11 @@ const ( OptimisticExecution = "polaris.optimistic-execution" // Polar Root. - RPCEvmTimeout = "polaris.polar.rpc-evm-timeout" - RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap" - RPCGasCap = "polaris.polar.rpc-gas-cap" + RPCEvmTimeout = "polaris.polar.rpc-evm-timeout" + RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap" + RPCGasCap = "polaris.polar.rpc-gas-cap" + IsValidator = "polaris.polar.is-validator" + ValidatorJSONRPCEndpoint = "polaris.polar.validator-jsonrpc-endpoint" // Miner. MinerEtherbase = "polaris.polar.miner.etherbase" diff --git a/cosmos/config/template.go b/cosmos/config/template.go index 9a04dae72..8e188ee95 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -40,6 +40,9 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}" # Transaction fee cap for RPC requests rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}" +validator-json-rpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}" +is-validator = {{ .Polaris.Polar.IsValidator }} + # Chain config [polaris.polar.chain] chain-id = "{{ .Polaris.Polar.Chain.ChainID }}" diff --git a/cosmos/runtime/ante/ante.go b/cosmos/runtime/ante/ante.go index 05c4de043..f27c336c4 100644 --- a/cosmos/runtime/ante/ante.go +++ b/cosmos/runtime/ante/ante.go @@ -39,12 +39,13 @@ import ( type Provider struct { evmAnteHandler sdk.AnteHandler // Ante handler for EVM transactions cosmosAnteHandler sdk.AnteHandler // Ante handler for Cosmos transactions + isValidator bool } // NewAnteHandler creates a new Provider with a mempool and Cosmos ante handler. // It sets up the EVM ante handler with the necessary decorators. func NewAnteHandler( - mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, + mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, isValidator bool, ) *Provider { evmAnteDecorators := []sdk.AnteDecorator{ ante.NewSetUpContextDecorator(), // Set up the context decorator for the EVM ante handler @@ -55,6 +56,7 @@ func NewAnteHandler( return &Provider{ evmAnteHandler: sdk.ChainAnteDecorators(evmAnteDecorators...), cosmosAnteHandler: cosmosAnteHandler, + isValidator: isValidator, } } @@ -67,6 +69,9 @@ func (ah *Provider) AnteHandler() func( // If the transaction contains a single EVM transaction, use the EVM ante handler if len(tx.GetMsgs()) == 1 { if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + if ah.isValidator { + return ctx, errors.New("validator cannot accept EVM from comet") + } return ah.evmAnteHandler(ctx, tx, simulate) } else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok { if ctx.ExecMode() != sdk.ExecModeCheck { diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 880ccaf84..4b4e91586 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -92,6 +92,8 @@ type Polaris struct { // into the txpool are happening during this process. The mempool object then read locks for // adding transactions into the txpool. blockBuilderMu sync.RWMutex + + cfg *eth.Config } // New creates a new Polaris runtime from the provided dependencies. @@ -105,6 +107,7 @@ func New( var err error p := &Polaris{ logger: logger, + cfg: cfg, } ctx := sdk.Context{}. @@ -129,6 +132,8 @@ func New( int64(cfg.Polar.LegacyTxPool.Lifetime), &p.blockBuilderMu, priceLimit, + p.cfg.Polar.IsValidator, + p.cfg.Polar.ValidatorJSONRPCEndpoint, ) return p @@ -163,7 +168,7 @@ func (p *Polaris) Build( } app.SetAnteHandler( - antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler).AnteHandler(), + antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler, p.cfg.Polar.IsValidator).AnteHandler(), ) return nil @@ -178,7 +183,7 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error { // Initialize the txpool with a new transaction serializer. p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( - clientCtx.TxConfig, evmtypes.WrapTx)) + clientCtx.TxConfig, evmtypes.WrapTx), p.cfg.Polar.IsValidator) // Register services with Polaris. p.RegisterLifecycles([]node.Lifecycle{ diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 069003b68..a3ae8afd2 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -96,28 +96,35 @@ type handler struct { // Queue for failed transactions failedTxs chan *failedTx + + isValidator bool } // newHandler creates a new handler. func newHandler( clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, - crc CometRemoteCache, logger log.Logger, + crc CometRemoteCache, logger log.Logger, isValidator bool, ) *handler { h := &handler{ - logger: logger, - clientCtx: clientCtx, - serializer: serializer, - crc: crc, - txPool: txPool, - txsCh: make(chan core.NewTxsEvent, txChanSize), - stopCh: make(chan struct{}), - failedTxs: make(chan *failedTx, txChanSize), + logger: logger, + clientCtx: clientCtx, + serializer: serializer, + crc: crc, + txPool: txPool, + txsCh: make(chan core.NewTxsEvent, txChanSize), + stopCh: make(chan struct{}), + failedTxs: make(chan *failedTx, txChanSize), + isValidator: isValidator, } return h } // Start starts the handler. func (h *handler) Start() error { + if h.isValidator { + return nil + } + if h.running.Load() { return errors.New("handler already started") } @@ -129,6 +136,9 @@ func (h *handler) Start() error { // Stop stops the handler. func (h *handler) Stop() error { + if h.isValidator { + return nil + } if !h.Running() { return errors.New("handler already stopped") } diff --git a/cosmos/runtime/txpool/handler_test.go b/cosmos/runtime/txpool/handler_test.go index a99202a26..f0f8172cc 100644 --- a/cosmos/runtime/txpool/handler_test.go +++ b/cosmos/runtime/txpool/handler_test.go @@ -60,7 +60,7 @@ var _ = Describe("", func() { subprovider = mocks.NewTxSubProvider(t) subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription) serializer = mocks.NewTxSerializer(t) - h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) + h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t), false) err := h.Start() Expect(err).NotTo(HaveOccurred()) for !h.Running() { diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 0d58c8b2a..cedf4a31d 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -39,6 +39,7 @@ import ( ethtxpool "github.com/ethereum/go-ethereum/core/txpool" ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" ) // Mempool implements the mempool.Mempool & Lifecycle interfaces. @@ -70,20 +71,40 @@ type Mempool struct { crc CometRemoteCache blockBuilderMu *sync.RWMutex priceLimit *big.Int + + isValidator bool + validatorJsonRPC string + ethclient *ethclient.Client } // New creates a new Mempool. func New( chain core.ChainReader, txpool eth.TxPool, lifetime int64, - blockBuilderMu *sync.RWMutex, priceLimit *big.Int, + blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator bool, + validatorJsonRPC string, ) *Mempool { + var ( + ec *ethclient.Client + err error + ) + + if !isValidator && validatorJsonRPC == "" { + ec, err = ethclient.Dial(validatorJsonRPC) + if err != nil { + panic(err) + } + } + return &Mempool{ - TxPool: txpool, - chain: chain, - lifetime: lifetime, - crc: newCometRemoteCache(), - blockBuilderMu: blockBuilderMu, - priceLimit: priceLimit, + TxPool: txpool, + chain: chain, + lifetime: lifetime, + crc: newCometRemoteCache(), + blockBuilderMu: blockBuilderMu, + priceLimit: priceLimit, + isValidator: isValidator, + validatorJsonRPC: validatorJsonRPC, + ethclient: ec, } } @@ -92,8 +113,9 @@ func (m *Mempool) Init( logger log.Logger, txBroadcaster TxBroadcaster, txSerializer TxSerializer, + isValidator bool, ) { - m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger) + m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger, isValidator) } // Start starts the Mempool TxHandler. @@ -108,6 +130,10 @@ func (m *Mempool) Stop() error { // Insert attempts to insert a Tx into the app-side mempool returning an error upon failure. func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { + if m.isValidator { + return errors.New("validator cannot insert transactions into the mempool from comet") + } + sCtx := sdk.UnwrapSDKContext(ctx) msgs := sdkTx.GetMsgs() if len(msgs) != 1 { @@ -123,6 +149,13 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { // Add the eth tx to the Geth txpool. ethTx := wet.Unwrap() + // Optmistically send to the validator. + if !m.isValidator && m.ethclient != nil { + // Broadcast the transaction to the validator. + // Note: we don't care about the response here. + _ = m.ethclient.SendTransaction(context.Background(), ethTx) + } + // Insert the tx into the txpool as a remote. m.blockBuilderMu.RLock() errs := m.TxPool.Add([]*ethtypes.Transaction{ethTx}, false, false) diff --git a/eth/polar/config.go b/eth/polar/config.go index 1de919bc6..0ec70fcab 100644 --- a/eth/polar/config.go +++ b/eth/polar/config.go @@ -102,4 +102,7 @@ type Config struct { // RPCTxFeeCap is the global transaction fee(price * gaslimit) cap for // send-transaction variants. The unit is ether. RPCTxFeeCap float64 + + ValidatorJSONRPCEndpoint string + IsValidator bool } From 56e7390e490887032468b05afad1868d3eeb083e Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 15:28:45 -0500 Subject: [PATCH 05/15] fix != --- cosmos/runtime/txpool/mempool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index cedf4a31d..bd073c92c 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -88,7 +88,7 @@ func New( err error ) - if !isValidator && validatorJsonRPC == "" { + if !isValidator && validatorJsonRPC != "" { ec, err = ethclient.Dial(validatorJsonRPC) if err != nil { panic(err) From 50bc4dc4f3accaeb8439630e3775bfb33ee80286 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 15:31:49 -0500 Subject: [PATCH 06/15] need 4 speed --- cosmos/runtime/txpool/mempool.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index bd073c92c..f542ab2fe 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -150,10 +150,13 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { ethTx := wet.Unwrap() // Optmistically send to the validator. - if !m.isValidator && m.ethclient != nil { + if m.ethclient != nil { // Broadcast the transaction to the validator. // Note: we don't care about the response here. - _ = m.ethclient.SendTransaction(context.Background(), ethTx) + go func() { + _ = m.ethclient.SendTransaction(context.Background(), ethTx) + + }() } // Insert the tx into the txpool as a remote. From 037780617b58aa36de290f9b0ad4db7dbcc60293 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 15:45:36 -0500 Subject: [PATCH 07/15] fix --- cosmos/config/template.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config/template.go b/cosmos/config/template.go index 8e188ee95..6401dba12 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -40,7 +40,7 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}" # Transaction fee cap for RPC requests rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}" -validator-json-rpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}" +validator-jsonrpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}" is-validator = {{ .Polaris.Polar.IsValidator }} # Chain config From 9c10313b3efd9a1ddbc137e1179a82de02382b71 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 17:03:02 -0500 Subject: [PATCH 08/15] hood --- cosmos/runtime/txpool/mempool.go | 10 ++++++++-- e2e/testapp/app.go | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index f542ab2fe..4052f94be 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -23,6 +23,7 @@ package txpool import ( "context" "errors" + "fmt" "math/big" "sync" @@ -88,7 +89,9 @@ func New( err error ) - if !isValidator && validatorJsonRPC != "" { + if validatorJsonRPC != "" { + fmt.Println("ATTEMPING TO CONNECTO TO VALIDATOR JSON RPC") + fmt.Println(validatorJsonRPC) ec, err = ethclient.Dial(validatorJsonRPC) if err != nil { panic(err) @@ -154,7 +157,10 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { // Broadcast the transaction to the validator. // Note: we don't care about the response here. go func() { - _ = m.ethclient.SendTransaction(context.Background(), ethTx) + sCtx.Logger().Info("broadcasting transaction to validator", "hash", ethTx.Hash().Hex()) + if err := m.ethclient.SendTransaction(context.Background(), ethTx); err != nil { + sCtx.Logger().Error("failed to broadcast transaction to validator", "error", err) + } }() } diff --git a/e2e/testapp/app.go b/e2e/testapp/app.go index 393c1161c..73ee4e91b 100644 --- a/e2e/testapp/app.go +++ b/e2e/testapp/app.go @@ -21,6 +21,7 @@ package testapp import ( + "fmt" "io" "os" "path/filepath" @@ -193,6 +194,9 @@ func NewPolarisApp( baseAppOptions = append(baseAppOptions, baseapp.SetOptimisticExecution()) } + fmt.Println(polarisConfig.Polar.IsValidator, "ISVALIDATOR") + fmt.Println(polarisConfig.Polar.ValidatorJSONRPCEndpoint, "VALIDATORJSONRPCEndpoint") + // Build the app using the app builder. app.App = appBuilder.Build(db, traceStore, baseAppOptions...) app.Polaris = polarruntime.New(app, From 4751e4e2459bfd01c8b0f522511168c1650076b8 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 18:12:33 -0500 Subject: [PATCH 09/15] cleanup --- cosmos/runtime/ante/ante.go | 2 +- cosmos/runtime/runtime.go | 5 +-- cosmos/runtime/txpool/handler_test.go | 4 ++- cosmos/runtime/txpool/mempool.go | 45 ++++++++++++++++++--------- e2e/testapp/app.go | 4 --- 5 files changed, 38 insertions(+), 22 deletions(-) diff --git a/cosmos/runtime/ante/ante.go b/cosmos/runtime/ante/ante.go index f27c336c4..05f056188 100644 --- a/cosmos/runtime/ante/ante.go +++ b/cosmos/runtime/ante/ante.go @@ -67,7 +67,7 @@ func (ah *Provider) AnteHandler() func( ) (sdk.Context, error) { return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { // If the transaction contains a single EVM transaction, use the EVM ante handler - if len(tx.GetMsgs()) == 1 { + if len(tx.GetMsgs()) == 1 { //nolint:nestif // todo:fix. if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { if ah.isValidator { return ctx, errors.New("validator cannot accept EVM from comet") diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 4b4e91586..077a16259 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -127,6 +127,7 @@ func New( priceLimit := big.NewInt(0).SetUint64(cfg.Polar.LegacyTxPool.PriceLimit) p.WrappedTxPool = txpool.New( + p.logger, p.ExecutionLayer.Backend().Blockchain(), p.ExecutionLayer.Backend().TxPool(), int64(cfg.Polar.LegacyTxPool.Lifetime), @@ -182,8 +183,8 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error { clientCtx.TxConfig, evmtypes.WrapPayload)) // Initialize the txpool with a new transaction serializer. - p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( - clientCtx.TxConfig, evmtypes.WrapTx), p.cfg.Polar.IsValidator) + p.WrappedTxPool.Init(clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( + clientCtx.TxConfig, evmtypes.WrapTx)) // Register services with Polaris. p.RegisterLifecycles([]node.Lifecycle{ diff --git a/cosmos/runtime/txpool/handler_test.go b/cosmos/runtime/txpool/handler_test.go index f0f8172cc..6606c50ab 100644 --- a/cosmos/runtime/txpool/handler_test.go +++ b/cosmos/runtime/txpool/handler_test.go @@ -60,7 +60,9 @@ var _ = Describe("", func() { subprovider = mocks.NewTxSubProvider(t) subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription) serializer = mocks.NewTxSerializer(t) - h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t), false) + h = newHandler( + broadcaster, subprovider, serializer, + newCometRemoteCache(), log.NewTestLogger(t), false) err := h.Start() Expect(err).NotTo(HaveOccurred()) for !h.Running() { diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 4052f94be..961dc7809 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -23,9 +23,9 @@ package txpool import ( "context" "errors" - "fmt" "math/big" "sync" + "time" "cosmossdk.io/log" @@ -43,6 +43,11 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) +const ( + attempts = 5 + retryInterval = 5 * time.Second +) + // Mempool implements the mempool.Mempool & Lifecycle interfaces. var ( _ mempool.Mempool = (*Mempool)(nil) @@ -66,6 +71,7 @@ type GethTxPool interface { // geth txpool during `CheckTx`, that is the only purpose of `Mempool“. type Mempool struct { eth.TxPool + logger log.Logger lifetime int64 chain core.ChainReader handler Lifecycle @@ -74,31 +80,45 @@ type Mempool struct { priceLimit *big.Int isValidator bool - validatorJsonRPC string + validatorJSONRPC string ethclient *ethclient.Client } // New creates a new Mempool. func New( + logger log.Logger, chain core.ChainReader, txpool eth.TxPool, lifetime int64, blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator bool, - validatorJsonRPC string, + validatorJSONRPC string, ) *Mempool { var ( ec *ethclient.Client err error ) - if validatorJsonRPC != "" { - fmt.Println("ATTEMPING TO CONNECTO TO VALIDATOR JSON RPC") - fmt.Println(validatorJsonRPC) - ec, err = ethclient.Dial(validatorJsonRPC) - if err != nil { - panic(err) + if validatorJSONRPC != "" { + for attempt := 0; attempt < attempts; attempt++ { + logger.Info("Attempting to dial validator JSON RPC", + "url", validatorJSONRPC, "attempt", attempt+1) + ec, err = ethclient.Dial(validatorJSONRPC) + if err == nil { + logger.Info( + "Successfully connected to validator JSON RPC", "url", validatorJSONRPC) + break + } + if attempt < attempts-1 { + logger.Error("Failed to dial validator JSON RPC, retrying...", "error", err) + time.Sleep(retryInterval) + } else { + logger.Error( + "Failed to dial validator JSON RPC, no more retries left", "error", err) + panic(err) + } } } return &Mempool{ + logger: logger, TxPool: txpool, chain: chain, lifetime: lifetime, @@ -106,19 +126,17 @@ func New( blockBuilderMu: blockBuilderMu, priceLimit: priceLimit, isValidator: isValidator, - validatorJsonRPC: validatorJsonRPC, + validatorJSONRPC: validatorJSONRPC, ethclient: ec, } } // Init initializes the Mempool (notably the TxHandler). func (m *Mempool) Init( - logger log.Logger, txBroadcaster TxBroadcaster, txSerializer TxSerializer, - isValidator bool, ) { - m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger, isValidator) + m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, m.logger, m.isValidator) } // Start starts the Mempool TxHandler. @@ -161,7 +179,6 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { if err := m.ethclient.SendTransaction(context.Background(), ethTx); err != nil { sCtx.Logger().Error("failed to broadcast transaction to validator", "error", err) } - }() } diff --git a/e2e/testapp/app.go b/e2e/testapp/app.go index 73ee4e91b..393c1161c 100644 --- a/e2e/testapp/app.go +++ b/e2e/testapp/app.go @@ -21,7 +21,6 @@ package testapp import ( - "fmt" "io" "os" "path/filepath" @@ -194,9 +193,6 @@ func NewPolarisApp( baseAppOptions = append(baseAppOptions, baseapp.SetOptimisticExecution()) } - fmt.Println(polarisConfig.Polar.IsValidator, "ISVALIDATOR") - fmt.Println(polarisConfig.Polar.ValidatorJSONRPCEndpoint, "VALIDATORJSONRPCEndpoint") - // Build the app using the app builder. app.App = appBuilder.Build(db, traceStore, baseAppOptions...) app.Polaris = polarruntime.New(app, From 1f5b8120571f592bbf1193f9c89a72c0b77d1084 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Wed, 31 Jan 2024 19:37:23 -0500 Subject: [PATCH 10/15] force recheck --- cosmos/config/config.go | 5 +++ cosmos/config/flags/flags.go | 1 + cosmos/config/template.go | 6 ++++ cosmos/runtime/runtime.go | 1 + cosmos/runtime/txpool/ante.go | 5 +-- cosmos/runtime/txpool/mempool.go | 56 ++++++++++++++++++-------------- eth/polar/config.go | 10 +++++- 7 files changed, 56 insertions(+), 28 deletions(-) diff --git a/cosmos/config/config.go b/cosmos/config/config.go index f98679b78..79da5b9d9 100644 --- a/cosmos/config/config.go +++ b/cosmos/config/config.go @@ -103,6 +103,11 @@ func readConfigFromAppOptsParser(parser AppOptionsParser) (*Config, error) { return nil, err } + if conf.Polar.ForceForwardReCheckTxs, err = parser.GetBool( + flags.ForceForwardReCheckTxs); err != nil { + return nil, err + } + // Polar Miner settings if conf.Polar.Miner.Etherbase, err = parser.GetCommonAddress(flags.MinerEtherbase); err != nil { diff --git a/cosmos/config/flags/flags.go b/cosmos/config/flags/flags.go index 02fc271b9..8627aae9f 100644 --- a/cosmos/config/flags/flags.go +++ b/cosmos/config/flags/flags.go @@ -29,6 +29,7 @@ const ( RPCGasCap = "polaris.polar.rpc-gas-cap" IsValidator = "polaris.polar.is-validator" ValidatorJSONRPCEndpoint = "polaris.polar.validator-jsonrpc-endpoint" + ForceForwardReCheckTxs = "polaris.polar.force-forward-recheck-txs" // Miner. MinerEtherbase = "polaris.polar.miner.etherbase" diff --git a/cosmos/config/template.go b/cosmos/config/template.go index 6401dba12..a4ec9ee17 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -40,9 +40,15 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}" # Transaction fee cap for RPC requests rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}" +# JSON-RPC endpoint for forwarding ethereum transactions directly to validators. validator-jsonrpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}" + +# Whether the node is a validator is-validator = {{ .Polaris.Polar.IsValidator }} +# If we want to force forwarding on ReCheckTxs +force-forward-recheck-txs = {{ .Polaris.Polar.ForceForwardReCheckTxs }} + # Chain config [polaris.polar.chain] chain-id = "{{ .Polaris.Polar.Chain.ChainID }}" diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 077a16259..79d80d3aa 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -134,6 +134,7 @@ func New( &p.blockBuilderMu, priceLimit, p.cfg.Polar.IsValidator, + p.cfg.Polar.ForceForwardReCheckTxs, p.cfg.Polar.ValidatorJSONRPCEndpoint, ) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 00215f04b..dc5fee89a 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -44,8 +44,6 @@ func (m *Mempool) AnteHandle( telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() - // TODO: Record the time it takes to build a payload. - // We only want to eject transactions from comet on recheck. if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { @@ -55,6 +53,9 @@ func (m *Mempool) AnteHandle( ); shouldEject { telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") + } else if ctx.ExecMode() == sdk.ExecModeReCheck && m.forceBroadcastOnRecheck { + // We optionally force a re-broadcast. + m.ForwardToValidator(ethTx) } } } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 961dc7809..e267ee175 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -79,16 +79,18 @@ type Mempool struct { blockBuilderMu *sync.RWMutex priceLimit *big.Int - isValidator bool - validatorJSONRPC string - ethclient *ethclient.Client + isValidator bool + validatorJSONRPC string + forceBroadcastOnRecheck bool + ethclient *ethclient.Client } // New creates a new Mempool. func New( logger log.Logger, chain core.ChainReader, txpool eth.TxPool, lifetime int64, - blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator bool, + blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator, + forceBroadcastOnRecheck bool, validatorJSONRPC string, ) *Mempool { var ( @@ -118,16 +120,17 @@ func New( } return &Mempool{ - logger: logger, - TxPool: txpool, - chain: chain, - lifetime: lifetime, - crc: newCometRemoteCache(), - blockBuilderMu: blockBuilderMu, - priceLimit: priceLimit, - isValidator: isValidator, - validatorJSONRPC: validatorJSONRPC, - ethclient: ec, + logger: logger, + TxPool: txpool, + chain: chain, + lifetime: lifetime, + crc: newCometRemoteCache(), + blockBuilderMu: blockBuilderMu, + priceLimit: priceLimit, + isValidator: isValidator, + forceBroadcastOnRecheck: forceBroadcastOnRecheck, + validatorJSONRPC: validatorJSONRPC, + ethclient: ec, } } @@ -170,17 +173,8 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { // Add the eth tx to the Geth txpool. ethTx := wet.Unwrap() - // Optmistically send to the validator. - if m.ethclient != nil { - // Broadcast the transaction to the validator. - // Note: we don't care about the response here. - go func() { - sCtx.Logger().Info("broadcasting transaction to validator", "hash", ethTx.Hash().Hex()) - if err := m.ethclient.SendTransaction(context.Background(), ethTx); err != nil { - sCtx.Logger().Error("failed to broadcast transaction to validator", "error", err) - } - }() - } + // Fowrad to a validator if we have one. + m.ForwardToValidator(ethTx) // Insert the tx into the txpool as a remote. m.blockBuilderMu.RLock() @@ -204,6 +198,18 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { return nil } +func (m *Mempool) ForwardToValidator(ethTx *ethtypes.Transaction) { + if m.ethclient != nil { + // Broadcast the transaction to the validator. + // Note: we don't care about the response here. + go func() { + if err := m.ethclient.SendTransaction(context.Background(), ethTx); err != nil { + m.logger.Error("failed to broadcast transaction to validator", "error", err) + } + }() + } +} + // CountTx returns the number of transactions currently in the mempool. func (m *Mempool) CountTx() int { runnable, blocked := m.TxPool.Stats() diff --git a/eth/polar/config.go b/eth/polar/config.go index 0ec70fcab..8c7fc1bf4 100644 --- a/eth/polar/config.go +++ b/eth/polar/config.go @@ -103,6 +103,14 @@ type Config struct { // send-transaction variants. The unit is ether. RPCTxFeeCap float64 + // ValidatorJSONRPCEndpoint is the JSON-RPC endpoint of a validator, you + // want to forward transactions to. ValidatorJSONRPCEndpoint string - IsValidator bool + + // IsValidator is a flag to indicate if the node is a validator. + IsValidator bool + + // ForceForwardReCheckTxs is a flag to indicate if the node should forward + // transactions on recheck. + ForceForwardReCheckTxs bool } From c67c25371e937f4d27103d3aa6eedf3f05063f81 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Thu, 1 Feb 2024 13:49:24 -0500 Subject: [PATCH 11/15] nanoseconds to seconds fix --- cosmos/runtime/runtime.go | 2 +- cosmos/runtime/txpool/mempool.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 79d80d3aa..322e5c3f9 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -130,7 +130,7 @@ func New( p.logger, p.ExecutionLayer.Backend().Blockchain(), p.ExecutionLayer.Backend().TxPool(), - int64(cfg.Polar.LegacyTxPool.Lifetime), + cfg.Polar.LegacyTxPool.Lifetime, &p.blockBuilderMu, priceLimit, p.cfg.Polar.IsValidator, diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index e267ee175..d7621ae4b 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -72,7 +72,7 @@ type GethTxPool interface { type Mempool struct { eth.TxPool logger log.Logger - lifetime int64 + lifetime int64 // mempool uses seconds as a unit for lifetime chain core.ChainReader handler Lifecycle crc CometRemoteCache @@ -88,7 +88,7 @@ type Mempool struct { // New creates a new Mempool. func New( logger log.Logger, - chain core.ChainReader, txpool eth.TxPool, lifetime int64, + chain core.ChainReader, txpool eth.TxPool, lifetime time.Duration, blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator, forceBroadcastOnRecheck bool, validatorJSONRPC string, @@ -123,7 +123,7 @@ func New( logger: logger, TxPool: txpool, chain: chain, - lifetime: lifetime, + lifetime: int64(lifetime.Seconds()), crc: newCometRemoteCache(), blockBuilderMu: blockBuilderMu, priceLimit: priceLimit, From 46fa37a8146240520d94b2d0ad0755e81f06ad86 Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Thu, 1 Feb 2024 14:12:30 -0500 Subject: [PATCH 12/15] use time duration --- cosmos/runtime/txpool/ante.go | 15 +++++++-------- cosmos/runtime/txpool/comet.go | 11 +++++------ cosmos/runtime/txpool/mempool.go | 4 ++-- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index dc5fee89a..8b1d7c1df 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -49,7 +49,7 @@ func (m *Mempool) AnteHandle( if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime().Unix(), ethTx, + ctx.BlockTime(), ethTx, ); shouldEject { telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") @@ -64,7 +64,7 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime int64, tx *ethtypes.Transaction, + currentTime time.Time, tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { @@ -81,16 +81,15 @@ func (m *Mempool) shouldEjectFromCometMempool( } // validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool { - txHash := tx.Hash() +func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime time.Time) bool { // 1. If the transaction has been in the mempool for longer than the configured timeout. - // 2. If the transaction's gas params are less than or equal to the configured limit. - expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime - priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 - + expired := currentTime.Sub(m.crc.TimeFirstSeen(tx.Hash())) > m.lifetime if expired { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx) } + + // 2. If the transaction's gas params are less than or equal to the configured limit. + priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 if priceLeLimit { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index 4e5c5c68d..c672ad92b 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -36,18 +36,17 @@ const ( type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool MarkRemoteSeen(txHash common.Hash) bool - TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp + TimeFirstSeen(txHash common.Hash) time.Time } // Thread-safe implementation of CometRemoteCache. type cometRemoteCache struct { - timeInserted *lru.Cache[common.Hash, int64] + timeInserted *lru.Cache[common.Hash, time.Time] } func newCometRemoteCache() *cometRemoteCache { return &cometRemoteCache{ - - timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize), + timeInserted: lru.NewCache[common.Hash, time.Time](defaultCacheSize), } } @@ -58,13 +57,13 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { // Record the time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool { if !crc.timeInserted.Contains(txHash) { - crc.timeInserted.Add(txHash, time.Now().Unix()) + crc.timeInserted.Add(txHash, time.Now()) return true } return false } -func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 { +func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) time.Time { i, _ := crc.timeInserted.Get(txHash) return i } diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index d7621ae4b..87eaf6dfe 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -72,7 +72,7 @@ type GethTxPool interface { type Mempool struct { eth.TxPool logger log.Logger - lifetime int64 // mempool uses seconds as a unit for lifetime + lifetime time.Duration chain core.ChainReader handler Lifecycle crc CometRemoteCache @@ -123,7 +123,7 @@ func New( logger: logger, TxPool: txpool, chain: chain, - lifetime: int64(lifetime.Seconds()), + lifetime: lifetime, crc: newCometRemoteCache(), blockBuilderMu: blockBuilderMu, priceLimit: priceLimit, From 60a34daca0037e6dbf8cee92fd9ed71c53825f12 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 1 Feb 2024 16:21:29 -0500 Subject: [PATCH 13/15] fix time bug --- cosmos/config/template.go | 2 +- cosmos/runtime/txpool/ante.go | 28 ++++++++++++++++------------ cosmos/runtime/txpool/comet.go | 7 +++---- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cosmos/config/template.go b/cosmos/config/template.go index a4ec9ee17..ca8ff7fa7 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -50,7 +50,7 @@ is-validator = {{ .Polaris.Polar.IsValidator }} force-forward-recheck-txs = {{ .Polaris.Polar.ForceForwardReCheckTxs }} # Chain config -[polaris.polar.chain] +[polaris.polar.chain] cccccbgkuclekfkddebkvictfhu chain-id = "{{ .Polaris.Polar.Chain.ChainID }}" # Homestead switch block diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 8b1d7c1df..4be7b6783 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -49,7 +49,7 @@ func (m *Mempool) AnteHandle( if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime(), ethTx, + ethTx, ); shouldEject { telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") @@ -64,7 +64,7 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime time.Time, tx *ethtypes.Transaction, + tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { @@ -72,7 +72,7 @@ func (m *Mempool) shouldEjectFromCometMempool( } // First check things that are stateless. - if m.validateStateless(tx, currentTime) { + if m.validateStateless(tx) { return true } @@ -81,20 +81,24 @@ func (m *Mempool) shouldEjectFromCometMempool( } // validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime time.Time) bool { - // 1. If the transaction has been in the mempool for longer than the configured timeout. - expired := currentTime.Sub(m.crc.TimeFirstSeen(tx.Hash())) > m.lifetime - if expired { - telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx) - } - - // 2. If the transaction's gas params are less than or equal to the configured limit. +func (m *Mempool) validateStateless(tx *ethtypes.Transaction) bool { + // 1. If the transaction's gas params are less than or equal to the configured limit. priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 if priceLeLimit { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } - return expired || priceLeLimit + // 2. If the transaction has been in the mempool for longer than the configured timeout. + tfs, found := m.crc.TimeFirstSeen(tx.Hash()) + if !found { + return false + } + + expired := time.Since(tfs) > m.lifetime + if expired { + telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx) + } + return priceLeLimit || expired } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index c672ad92b..aecd06bb6 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -36,7 +36,7 @@ const ( type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool MarkRemoteSeen(txHash common.Hash) bool - TimeFirstSeen(txHash common.Hash) time.Time + TimeFirstSeen(txHash common.Hash) (time.Time, bool) } // Thread-safe implementation of CometRemoteCache. @@ -63,7 +63,6 @@ func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool { return false } -func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) time.Time { - i, _ := crc.timeInserted.Get(txHash) - return i +func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) (time.Time, bool) { + return crc.timeInserted.Get(txHash) } From d4cd9a9f2f56dd5d463bfe47cef6e26f2253e537 Mon Sep 17 00:00:00 2001 From: itsdevbear Date: Thu, 1 Feb 2024 16:22:08 -0500 Subject: [PATCH 14/15] fix --- cosmos/config/template.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/config/template.go b/cosmos/config/template.go index ca8ff7fa7..a4ec9ee17 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -50,7 +50,7 @@ is-validator = {{ .Polaris.Polar.IsValidator }} force-forward-recheck-txs = {{ .Polaris.Polar.ForceForwardReCheckTxs }} # Chain config -[polaris.polar.chain] cccccbgkuclekfkddebkvictfhu +[polaris.polar.chain] chain-id = "{{ .Polaris.Polar.Chain.ChainID }}" # Homestead switch block From 2100f45a1fa3416b528fb6310bf0e699027e018f Mon Sep 17 00:00:00 2001 From: Cal Bera Date: Thu, 1 Feb 2024 16:54:58 -0500 Subject: [PATCH 15/15] clean --- cosmos/runtime/txpool/ante.go | 47 +++++++++++++---------------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 4be7b6783..0f1906ac6 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -33,14 +33,12 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" ) -// AnteHandle implements sdk.AnteHandler. -// It is used to determine whether transactions should be ejected -// from the comet mempool. +// AnteHandle implements sdk.AnteHandler. It is used to determine whether transactions should be +// ejected from the comet mempool. func (m *Mempool) AnteHandle( ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler, ) (sdk.Context, error) { - // The transaction put into this function by CheckTx - // is a transaction from CometBFT mempool. + // The transaction put into this function by CheckTx is a transaction from CometBFT mempool. telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() @@ -48,9 +46,7 @@ func (m *Mempool) AnteHandle( if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() - if shouldEject := m.shouldEjectFromCometMempool( - ethTx, - ); shouldEject { + if shouldEject := m.shouldEjectFromCometMempool(ethTx); shouldEject { telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") } else if ctx.ExecMode() == sdk.ExecModeReCheck && m.forceBroadcastOnRecheck { @@ -71,47 +67,38 @@ func (m *Mempool) shouldEjectFromCometMempool( return false } - // First check things that are stateless. - if m.validateStateless(tx) { - return true - } - - // Then check for things that are stateful. - return m.validateStateful(tx) + return m.isInvalidLocal(tx) || m.includedCanonicalChain(tx) } -// validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction) bool { +// isInvalidLocal returns whether the tx is invalid for the local node's mempool settings. +func (m *Mempool) isInvalidLocal(tx *ethtypes.Transaction) bool { // 1. If the transaction's gas params are less than or equal to the configured limit. priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 if priceLeLimit { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) } - // 2. If the transaction has been in the mempool for longer than the configured timeout. - tfs, found := m.crc.TimeFirstSeen(tx.Hash()) - if !found { + // If the transaction was not seen before, it is valid for now. + timeFirstSeen, notSeen := m.crc.TimeFirstSeen(tx.Hash()) + if !notSeen { return false } - expired := time.Since(tfs) > m.lifetime + // 2. If the transaction has been in the mempool for longer than the configured lifetime. + expired := time.Since(timeFirstSeen) > m.lifetime if expired { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx) } + return priceLeLimit || expired } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical // Eth chain. -func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool { - // // 1. If the transaction has been included in a block. - // signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID) - // if _, err := ethtypes.Sender(signer, tx); err != nil { - // return true - // } - - // tx.Nonce() < +func (m *Mempool) includedCanonicalChain(tx *ethtypes.Transaction) bool { included := m.chain.GetTransactionLookup(tx.Hash()) != nil - telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + if included { + telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + } return included }