diff --git a/cosmos/config/config.go b/cosmos/config/config.go index 1c3859f44..79da5b9d9 100644 --- a/cosmos/config/config.go +++ b/cosmos/config/config.go @@ -94,6 +94,20 @@ func readConfigFromAppOptsParser(parser AppOptionsParser) (*Config, error) { return nil, err } + if conf.Polar.IsValidator, err = parser.GetBool(flags.IsValidator); err != nil { + return nil, err + } + + if conf.Polar.ValidatorJSONRPCEndpoint, err = + parser.GetString(flags.ValidatorJSONRPCEndpoint); err != nil { + return nil, err + } + + if conf.Polar.ForceForwardReCheckTxs, err = parser.GetBool( + flags.ForceForwardReCheckTxs); err != nil { + return nil, err + } + // Polar Miner settings if conf.Polar.Miner.Etherbase, err = parser.GetCommonAddress(flags.MinerEtherbase); err != nil { diff --git a/cosmos/config/flags/flags.go b/cosmos/config/flags/flags.go index 90ae84edc..8627aae9f 100644 --- a/cosmos/config/flags/flags.go +++ b/cosmos/config/flags/flags.go @@ -24,9 +24,12 @@ const ( OptimisticExecution = "polaris.optimistic-execution" // Polar Root. - RPCEvmTimeout = "polaris.polar.rpc-evm-timeout" - RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap" - RPCGasCap = "polaris.polar.rpc-gas-cap" + RPCEvmTimeout = "polaris.polar.rpc-evm-timeout" + RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap" + RPCGasCap = "polaris.polar.rpc-gas-cap" + IsValidator = "polaris.polar.is-validator" + ValidatorJSONRPCEndpoint = "polaris.polar.validator-jsonrpc-endpoint" + ForceForwardReCheckTxs = "polaris.polar.force-forward-recheck-txs" // Miner. MinerEtherbase = "polaris.polar.miner.etherbase" diff --git a/cosmos/config/template.go b/cosmos/config/template.go index 9a04dae72..a4ec9ee17 100644 --- a/cosmos/config/template.go +++ b/cosmos/config/template.go @@ -40,6 +40,15 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}" # Transaction fee cap for RPC requests rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}" +# JSON-RPC endpoint for forwarding ethereum transactions directly to validators. +validator-jsonrpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}" + +# Whether the node is a validator +is-validator = {{ .Polaris.Polar.IsValidator }} + +# If we want to force forwarding on ReCheckTxs +force-forward-recheck-txs = {{ .Polaris.Polar.ForceForwardReCheckTxs }} + # Chain config [polaris.polar.chain] chain-id = "{{ .Polaris.Polar.Chain.ChainID }}" diff --git a/cosmos/runtime/ante/ante.go b/cosmos/runtime/ante/ante.go index 05c4de043..05f056188 100644 --- a/cosmos/runtime/ante/ante.go +++ b/cosmos/runtime/ante/ante.go @@ -39,12 +39,13 @@ import ( type Provider struct { evmAnteHandler sdk.AnteHandler // Ante handler for EVM transactions cosmosAnteHandler sdk.AnteHandler // Ante handler for Cosmos transactions + isValidator bool } // NewAnteHandler creates a new Provider with a mempool and Cosmos ante handler. // It sets up the EVM ante handler with the necessary decorators. func NewAnteHandler( - mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, + mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, isValidator bool, ) *Provider { evmAnteDecorators := []sdk.AnteDecorator{ ante.NewSetUpContextDecorator(), // Set up the context decorator for the EVM ante handler @@ -55,6 +56,7 @@ func NewAnteHandler( return &Provider{ evmAnteHandler: sdk.ChainAnteDecorators(evmAnteDecorators...), cosmosAnteHandler: cosmosAnteHandler, + isValidator: isValidator, } } @@ -65,8 +67,11 @@ func (ah *Provider) AnteHandler() func( ) (sdk.Context, error) { return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { // If the transaction contains a single EVM transaction, use the EVM ante handler - if len(tx.GetMsgs()) == 1 { + if len(tx.GetMsgs()) == 1 { //nolint:nestif // todo:fix. if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok { + if ah.isValidator { + return ctx, errors.New("validator cannot accept EVM from comet") + } return ah.evmAnteHandler(ctx, tx, simulate) } else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok { if ctx.ExecMode() != sdk.ExecModeCheck { diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 880ccaf84..322e5c3f9 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -92,6 +92,8 @@ type Polaris struct { // into the txpool are happening during this process. The mempool object then read locks for // adding transactions into the txpool. blockBuilderMu sync.RWMutex + + cfg *eth.Config } // New creates a new Polaris runtime from the provided dependencies. @@ -105,6 +107,7 @@ func New( var err error p := &Polaris{ logger: logger, + cfg: cfg, } ctx := sdk.Context{}. @@ -124,11 +127,15 @@ func New( priceLimit := big.NewInt(0).SetUint64(cfg.Polar.LegacyTxPool.PriceLimit) p.WrappedTxPool = txpool.New( + p.logger, p.ExecutionLayer.Backend().Blockchain(), p.ExecutionLayer.Backend().TxPool(), - int64(cfg.Polar.LegacyTxPool.Lifetime), + cfg.Polar.LegacyTxPool.Lifetime, &p.blockBuilderMu, priceLimit, + p.cfg.Polar.IsValidator, + p.cfg.Polar.ForceForwardReCheckTxs, + p.cfg.Polar.ValidatorJSONRPCEndpoint, ) return p @@ -163,7 +170,7 @@ func (p *Polaris) Build( } app.SetAnteHandler( - antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler).AnteHandler(), + antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler, p.cfg.Polar.IsValidator).AnteHandler(), ) return nil @@ -177,7 +184,7 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error { clientCtx.TxConfig, evmtypes.WrapPayload)) // Initialize the txpool with a new transaction serializer. - p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( + p.WrappedTxPool.Init(clientCtx, libtx.NewSerializer[*ethtypes.Transaction]( clientCtx.TxConfig, evmtypes.WrapTx)) // Register services with Polaris. diff --git a/cosmos/runtime/txpool/ante.go b/cosmos/runtime/txpool/ante.go index 00215f04b..0f1906ac6 100644 --- a/cosmos/runtime/txpool/ante.go +++ b/cosmos/runtime/txpool/ante.go @@ -33,28 +33,25 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" ) -// AnteHandle implements sdk.AnteHandler. -// It is used to determine whether transactions should be ejected -// from the comet mempool. +// AnteHandle implements sdk.AnteHandler. It is used to determine whether transactions should be +// ejected from the comet mempool. func (m *Mempool) AnteHandle( ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler, ) (sdk.Context, error) { - // The transaction put into this function by CheckTx - // is a transaction from CometBFT mempool. + // The transaction put into this function by CheckTx is a transaction from CometBFT mempool. telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs) msgs := tx.GetMsgs() - // TODO: Record the time it takes to build a payload. - // We only want to eject transactions from comet on recheck. if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck { if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok { ethTx := wet.Unwrap() - if shouldEject := m.shouldEjectFromCometMempool( - ctx.BlockTime().Unix(), ethTx, - ); shouldEject { + if shouldEject := m.shouldEjectFromCometMempool(ethTx); shouldEject { telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs) return ctx, errors.New("eject from comet mempool") + } else if ctx.ExecMode() == sdk.ExecModeReCheck && m.forceBroadcastOnRecheck { + // We optionally force a re-broadcast. + m.ForwardToValidator(ethTx) } } } @@ -63,51 +60,45 @@ func (m *Mempool) AnteHandle( // shouldEject returns true if the transaction should be ejected from the CometBFT mempool. func (m *Mempool) shouldEjectFromCometMempool( - currentTime int64, tx *ethtypes.Transaction, + tx *ethtypes.Transaction, ) bool { defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject) if tx == nil { return false } - // First check things that are stateless. - if m.validateStateless(tx, currentTime) { - return true - } - - // Then check for things that are stateful. - return m.validateStateful(tx) + return m.isInvalidLocal(tx) || m.includedCanonicalChain(tx) } -// validateStateless returns whether the tx of the given hash is stateless. -func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool { - txHash := tx.Hash() - // 1. If the transaction has been in the mempool for longer than the configured timeout. - // 2. If the transaction's gas params are less than or equal to the configured limit. - expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime +// isInvalidLocal returns whether the tx is invalid for the local node's mempool settings. +func (m *Mempool) isInvalidLocal(tx *ethtypes.Transaction) bool { + // 1. If the transaction's gas params are less than or equal to the configured limit. priceLeLimit := tx.GasPrice().Cmp(m.priceLimit) <= 0 + if priceLeLimit { + telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) + } + + // If the transaction was not seen before, it is valid for now. + timeFirstSeen, notSeen := m.crc.TimeFirstSeen(tx.Hash()) + if !notSeen { + return false + } + // 2. If the transaction has been in the mempool for longer than the configured lifetime. + expired := time.Since(timeFirstSeen) > m.lifetime if expired { telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectExpiredTx) } - if priceLeLimit { - telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit) - } - return expired || priceLeLimit + return priceLeLimit || expired } // includedCanonicalChain returns whether the tx of the given hash is included in the canonical // Eth chain. -func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool { - // // 1. If the transaction has been included in a block. - // signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID) - // if _, err := ethtypes.Sender(signer, tx); err != nil { - // return true - // } - - // tx.Nonce() < +func (m *Mempool) includedCanonicalChain(tx *ethtypes.Transaction) bool { included := m.chain.GetTransactionLookup(tx.Hash()) != nil - telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + if included { + telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion) + } return included } diff --git a/cosmos/runtime/txpool/comet.go b/cosmos/runtime/txpool/comet.go index 4e5c5c68d..aecd06bb6 100644 --- a/cosmos/runtime/txpool/comet.go +++ b/cosmos/runtime/txpool/comet.go @@ -36,18 +36,17 @@ const ( type CometRemoteCache interface { IsRemoteTx(txHash common.Hash) bool MarkRemoteSeen(txHash common.Hash) bool - TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp + TimeFirstSeen(txHash common.Hash) (time.Time, bool) } // Thread-safe implementation of CometRemoteCache. type cometRemoteCache struct { - timeInserted *lru.Cache[common.Hash, int64] + timeInserted *lru.Cache[common.Hash, time.Time] } func newCometRemoteCache() *cometRemoteCache { return &cometRemoteCache{ - - timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize), + timeInserted: lru.NewCache[common.Hash, time.Time](defaultCacheSize), } } @@ -58,13 +57,12 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool { // Record the time the tx was inserted from Comet successfully. func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool { if !crc.timeInserted.Contains(txHash) { - crc.timeInserted.Add(txHash, time.Now().Unix()) + crc.timeInserted.Add(txHash, time.Now()) return true } return false } -func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 { - i, _ := crc.timeInserted.Get(txHash) - return i +func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) (time.Time, bool) { + return crc.timeInserted.Get(txHash) } diff --git a/cosmos/runtime/txpool/handler.go b/cosmos/runtime/txpool/handler.go index 069003b68..a3ae8afd2 100644 --- a/cosmos/runtime/txpool/handler.go +++ b/cosmos/runtime/txpool/handler.go @@ -96,28 +96,35 @@ type handler struct { // Queue for failed transactions failedTxs chan *failedTx + + isValidator bool } // newHandler creates a new handler. func newHandler( clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer, - crc CometRemoteCache, logger log.Logger, + crc CometRemoteCache, logger log.Logger, isValidator bool, ) *handler { h := &handler{ - logger: logger, - clientCtx: clientCtx, - serializer: serializer, - crc: crc, - txPool: txPool, - txsCh: make(chan core.NewTxsEvent, txChanSize), - stopCh: make(chan struct{}), - failedTxs: make(chan *failedTx, txChanSize), + logger: logger, + clientCtx: clientCtx, + serializer: serializer, + crc: crc, + txPool: txPool, + txsCh: make(chan core.NewTxsEvent, txChanSize), + stopCh: make(chan struct{}), + failedTxs: make(chan *failedTx, txChanSize), + isValidator: isValidator, } return h } // Start starts the handler. func (h *handler) Start() error { + if h.isValidator { + return nil + } + if h.running.Load() { return errors.New("handler already started") } @@ -129,6 +136,9 @@ func (h *handler) Start() error { // Stop stops the handler. func (h *handler) Stop() error { + if h.isValidator { + return nil + } if !h.Running() { return errors.New("handler already stopped") } diff --git a/cosmos/runtime/txpool/handler_test.go b/cosmos/runtime/txpool/handler_test.go index a99202a26..6606c50ab 100644 --- a/cosmos/runtime/txpool/handler_test.go +++ b/cosmos/runtime/txpool/handler_test.go @@ -60,7 +60,9 @@ var _ = Describe("", func() { subprovider = mocks.NewTxSubProvider(t) subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription) serializer = mocks.NewTxSerializer(t) - h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t)) + h = newHandler( + broadcaster, subprovider, serializer, + newCometRemoteCache(), log.NewTestLogger(t), false) err := h.Start() Expect(err).NotTo(HaveOccurred()) for !h.Running() { diff --git a/cosmos/runtime/txpool/mempool.go b/cosmos/runtime/txpool/mempool.go index 0d58c8b2a..87eaf6dfe 100644 --- a/cosmos/runtime/txpool/mempool.go +++ b/cosmos/runtime/txpool/mempool.go @@ -25,6 +25,7 @@ import ( "errors" "math/big" "sync" + "time" "cosmossdk.io/log" @@ -39,6 +40,12 @@ import ( ethtxpool "github.com/ethereum/go-ethereum/core/txpool" ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +const ( + attempts = 5 + retryInterval = 5 * time.Second ) // Mempool implements the mempool.Mempool & Lifecycle interfaces. @@ -64,36 +71,75 @@ type GethTxPool interface { // geth txpool during `CheckTx`, that is the only purpose of `Mempool“. type Mempool struct { eth.TxPool - lifetime int64 + logger log.Logger + lifetime time.Duration chain core.ChainReader handler Lifecycle crc CometRemoteCache blockBuilderMu *sync.RWMutex priceLimit *big.Int + + isValidator bool + validatorJSONRPC string + forceBroadcastOnRecheck bool + ethclient *ethclient.Client } // New creates a new Mempool. func New( - chain core.ChainReader, txpool eth.TxPool, lifetime int64, - blockBuilderMu *sync.RWMutex, priceLimit *big.Int, + logger log.Logger, + chain core.ChainReader, txpool eth.TxPool, lifetime time.Duration, + blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator, + forceBroadcastOnRecheck bool, + validatorJSONRPC string, ) *Mempool { + var ( + ec *ethclient.Client + err error + ) + + if validatorJSONRPC != "" { + for attempt := 0; attempt < attempts; attempt++ { + logger.Info("Attempting to dial validator JSON RPC", + "url", validatorJSONRPC, "attempt", attempt+1) + ec, err = ethclient.Dial(validatorJSONRPC) + if err == nil { + logger.Info( + "Successfully connected to validator JSON RPC", "url", validatorJSONRPC) + break + } + if attempt < attempts-1 { + logger.Error("Failed to dial validator JSON RPC, retrying...", "error", err) + time.Sleep(retryInterval) + } else { + logger.Error( + "Failed to dial validator JSON RPC, no more retries left", "error", err) + panic(err) + } + } + } + return &Mempool{ - TxPool: txpool, - chain: chain, - lifetime: lifetime, - crc: newCometRemoteCache(), - blockBuilderMu: blockBuilderMu, - priceLimit: priceLimit, + logger: logger, + TxPool: txpool, + chain: chain, + lifetime: lifetime, + crc: newCometRemoteCache(), + blockBuilderMu: blockBuilderMu, + priceLimit: priceLimit, + isValidator: isValidator, + forceBroadcastOnRecheck: forceBroadcastOnRecheck, + validatorJSONRPC: validatorJSONRPC, + ethclient: ec, } } // Init initializes the Mempool (notably the TxHandler). func (m *Mempool) Init( - logger log.Logger, txBroadcaster TxBroadcaster, txSerializer TxSerializer, ) { - m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger) + m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, m.logger, m.isValidator) } // Start starts the Mempool TxHandler. @@ -108,6 +154,10 @@ func (m *Mempool) Stop() error { // Insert attempts to insert a Tx into the app-side mempool returning an error upon failure. func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { + if m.isValidator { + return errors.New("validator cannot insert transactions into the mempool from comet") + } + sCtx := sdk.UnwrapSDKContext(ctx) msgs := sdkTx.GetMsgs() if len(msgs) != 1 { @@ -123,6 +173,9 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { // Add the eth tx to the Geth txpool. ethTx := wet.Unwrap() + // Fowrad to a validator if we have one. + m.ForwardToValidator(ethTx) + // Insert the tx into the txpool as a remote. m.blockBuilderMu.RLock() errs := m.TxPool.Add([]*ethtypes.Transaction{ethTx}, false, false) @@ -145,6 +198,18 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error { return nil } +func (m *Mempool) ForwardToValidator(ethTx *ethtypes.Transaction) { + if m.ethclient != nil { + // Broadcast the transaction to the validator. + // Note: we don't care about the response here. + go func() { + if err := m.ethclient.SendTransaction(context.Background(), ethTx); err != nil { + m.logger.Error("failed to broadcast transaction to validator", "error", err) + } + }() + } +} + // CountTx returns the number of transactions currently in the mempool. func (m *Mempool) CountTx() int { runnable, blocked := m.TxPool.Stats() diff --git a/eth/polar/config.go b/eth/polar/config.go index 1de919bc6..8c7fc1bf4 100644 --- a/eth/polar/config.go +++ b/eth/polar/config.go @@ -102,4 +102,15 @@ type Config struct { // RPCTxFeeCap is the global transaction fee(price * gaslimit) cap for // send-transaction variants. The unit is ether. RPCTxFeeCap float64 + + // ValidatorJSONRPCEndpoint is the JSON-RPC endpoint of a validator, you + // want to forward transactions to. + ValidatorJSONRPCEndpoint string + + // IsValidator is a flag to indicate if the node is a validator. + IsValidator bool + + // ForceForwardReCheckTxs is a flag to indicate if the node should forward + // transactions on recheck. + ForceForwardReCheckTxs bool }