Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

protecccc validators bls #1473

Closed
wants to merge 16 commits into from
9 changes: 9 additions & 0 deletions cosmos/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ func readConfigFromAppOptsParser(parser AppOptionsParser) (*Config, error) {
return nil, err
}

if conf.Polar.IsValidator, err = parser.GetBool(flags.IsValidator); err != nil {
return nil, err
}

if conf.Polar.ValidatorJSONRPCEndpoint, err =
parser.GetString(flags.ValidatorJSONRPCEndpoint); err != nil {
return nil, err
}

// Polar Miner settings
if conf.Polar.Miner.Etherbase, err =
parser.GetCommonAddress(flags.MinerEtherbase); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions cosmos/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ const (
OptimisticExecution = "polaris.optimistic-execution"

// Polar Root.
RPCEvmTimeout = "polaris.polar.rpc-evm-timeout"
RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap"
RPCGasCap = "polaris.polar.rpc-gas-cap"
RPCEvmTimeout = "polaris.polar.rpc-evm-timeout"
RPCTxFeeCap = "polaris.polar.rpc-tx-fee-cap"
RPCGasCap = "polaris.polar.rpc-gas-cap"
IsValidator = "polaris.polar.is-validator"
ValidatorJSONRPCEndpoint = "polaris.polar.validator-jsonrpc-endpoint"

// Miner.
MinerEtherbase = "polaris.polar.miner.etherbase"
Expand Down
3 changes: 3 additions & 0 deletions cosmos/config/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ rpc-evm-timeout = "{{ .Polaris.Polar.RPCEVMTimeout }}"
# Transaction fee cap for RPC requests
rpc-tx-fee-cap = "{{ .Polaris.Polar.RPCTxFeeCap }}"

validator-json-rpc-endpoint = "{{ .Polaris.Polar.ValidatorJSONRPCEndpoint }}"
is-validator = {{ .Polaris.Polar.IsValidator }}

# Chain config
[polaris.polar.chain]
chain-id = "{{ .Polaris.Polar.Chain.ChainID }}"
Expand Down
7 changes: 6 additions & 1 deletion cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import (
type Provider struct {
evmAnteHandler sdk.AnteHandler // Ante handler for EVM transactions
cosmosAnteHandler sdk.AnteHandler // Ante handler for Cosmos transactions
isValidator bool
}

// NewAnteHandler creates a new Provider with a mempool and Cosmos ante handler.
// It sets up the EVM ante handler with the necessary decorators.
func NewAnteHandler(
mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler,
mempool *txpool.Mempool, cosmosAnteHandler sdk.AnteHandler, isValidator bool,
) *Provider {
evmAnteDecorators := []sdk.AnteDecorator{
ante.NewSetUpContextDecorator(), // Set up the context decorator for the EVM ante handler
Expand All @@ -55,6 +56,7 @@ func NewAnteHandler(
return &Provider{
evmAnteHandler: sdk.ChainAnteDecorators(evmAnteDecorators...),
cosmosAnteHandler: cosmosAnteHandler,
isValidator: isValidator,
}
}

Expand All @@ -67,6 +69,9 @@ func (ah *Provider) AnteHandler() func(
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 {
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ah.isValidator {
return ctx, errors.New("validator cannot accept EVM from comet")
}
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
Expand Down
9 changes: 7 additions & 2 deletions cosmos/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type Polaris struct {
// into the txpool are happening during this process. The mempool object then read locks for
// adding transactions into the txpool.
blockBuilderMu sync.RWMutex

cfg *eth.Config
}

// New creates a new Polaris runtime from the provided dependencies.
Expand All @@ -105,6 +107,7 @@ func New(
var err error
p := &Polaris{
logger: logger,
cfg: cfg,
}

ctx := sdk.Context{}.
Expand All @@ -129,6 +132,8 @@ func New(
int64(cfg.Polar.LegacyTxPool.Lifetime),
&p.blockBuilderMu,
priceLimit,
p.cfg.Polar.IsValidator,
p.cfg.Polar.ValidatorJSONRPCEndpoint,
)

return p
Expand Down Expand Up @@ -163,7 +168,7 @@ func (p *Polaris) Build(
}

app.SetAnteHandler(
antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler).AnteHandler(),
antelib.NewAnteHandler(p.WrappedTxPool, cosmHandler, p.cfg.Polar.IsValidator).AnteHandler(),
)

return nil
Expand All @@ -178,7 +183,7 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error {

// Initialize the txpool with a new transaction serializer.
p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction](
clientCtx.TxConfig, evmtypes.WrapTx))
clientCtx.TxConfig, evmtypes.WrapTx), p.cfg.Polar.IsValidator)

// Register services with Polaris.
p.RegisterLifecycles([]node.Lifecycle{
Expand Down
1 change: 0 additions & 1 deletion cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (m *Mempool) AnteHandle(
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
); shouldEject {
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
Expand Down
39 changes: 15 additions & 24 deletions cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,50 @@
package txpool

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
)

const (
defaultCacheSize = 4096
defaultCacheSize = 100000
)

// CometRemoteCache is used to mark which txs are added to our Polaris node remotely from
// Comet CheckTX and when.
type CometRemoteCache interface {
IsRemoteTx(txHash common.Hash) bool
MarkRemoteSeen(txHash common.Hash)
MarkRemoteSeen(txHash common.Hash) bool
TimeFirstSeen(txHash common.Hash) int64 // Unix timestamp
DropRemoteTx(txHash common.Hash)
}

// Thread-safe implementation of CometRemoteCache.
type cometRemoteCache struct {
timeInserted map[common.Hash]int64
timeInsertedMu sync.RWMutex
timeInserted *lru.Cache[common.Hash, int64]
}

func newCometRemoteCache() *cometRemoteCache {
return &cometRemoteCache{
timeInserted: make(map[common.Hash]int64, defaultCacheSize),

timeInserted: lru.NewCache[common.Hash, int64](defaultCacheSize),
}
}

func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
_, ok := crc.timeInserted[txHash]
return ok
return crc.timeInserted.Contains(txHash)
}

// Record the time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) bool {
if !crc.timeInserted.Contains(txHash) {
crc.timeInserted.Add(txHash, time.Now().Unix())
return true
}
return false
}

func (crc *cometRemoteCache) TimeFirstSeen(txHash common.Hash) int64 {
crc.timeInsertedMu.RLock()
defer crc.timeInsertedMu.RUnlock()
return crc.timeInserted[txHash]
}

func (crc *cometRemoteCache) DropRemoteTx(txHash common.Hash) {
crc.timeInsertedMu.Lock()
delete(crc.timeInserted, txHash)
crc.timeInsertedMu.Unlock()
i, _ := crc.timeInserted.Get(txHash)
return i
}
28 changes: 19 additions & 9 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,35 @@ type handler struct {

// Queue for failed transactions
failedTxs chan *failedTx

isValidator bool
}

// newHandler creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer,
crc CometRemoteCache, logger log.Logger,
crc CometRemoteCache, logger log.Logger, isValidator bool,
) *handler {
h := &handler{
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
stopCh: make(chan struct{}),
failedTxs: make(chan *failedTx, txChanSize),
isValidator: isValidator,
}
return h
}

// Start starts the handler.
func (h *handler) Start() error {
if h.isValidator {
return nil
}

if h.running.Load() {
return errors.New("handler already started")
}
Expand All @@ -129,6 +136,9 @@ func (h *handler) Start() error {

// Stop stops the handler.
func (h *handler) Stop() error {
if h.isValidator {
return nil
}
if !h.Running() {
return errors.New("handler already stopped")
}
Expand Down
2 changes: 1 addition & 1 deletion cosmos/runtime/txpool/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var _ = Describe("", func() {
subprovider = mocks.NewTxSubProvider(t)
subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription)
serializer = mocks.NewTxSerializer(t)
h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t), false)
err := h.Start()
Expect(err).NotTo(HaveOccurred())
for !h.Running() {
Expand Down
73 changes: 43 additions & 30 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

ethtxpool "github.com/ethereum/go-ethereum/core/txpool"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

// Mempool implements the mempool.Mempool & Lifecycle interfaces.
Expand Down Expand Up @@ -70,20 +71,40 @@ type Mempool struct {
crc CometRemoteCache
blockBuilderMu *sync.RWMutex
priceLimit *big.Int

isValidator bool
validatorJsonRPC string
ethclient *ethclient.Client
}

// New creates a new Mempool.
func New(
chain core.ChainReader, txpool eth.TxPool, lifetime int64,
blockBuilderMu *sync.RWMutex, priceLimit *big.Int,
blockBuilderMu *sync.RWMutex, priceLimit *big.Int, isValidator bool,
validatorJsonRPC string,
) *Mempool {
var (
ec *ethclient.Client
err error
)

if !isValidator && validatorJsonRPC == "" {
ec, err = ethclient.Dial(validatorJsonRPC)
if err != nil {
panic(err)
}
}
itsdevbear marked this conversation as resolved.
Show resolved Hide resolved

return &Mempool{
TxPool: txpool,
chain: chain,
lifetime: lifetime,
crc: newCometRemoteCache(),
blockBuilderMu: blockBuilderMu,
priceLimit: priceLimit,
TxPool: txpool,
chain: chain,
lifetime: lifetime,
crc: newCometRemoteCache(),
blockBuilderMu: blockBuilderMu,
priceLimit: priceLimit,
isValidator: isValidator,
validatorJsonRPC: validatorJsonRPC,
ethclient: ec,
}
}

Expand All @@ -92,8 +113,9 @@ func (m *Mempool) Init(
logger log.Logger,
txBroadcaster TxBroadcaster,
txSerializer TxSerializer,
isValidator bool,
) {
m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger)
m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger, isValidator)
}

// Start starts the Mempool TxHandler.
Expand All @@ -108,6 +130,10 @@ func (m *Mempool) Stop() error {

// Insert attempts to insert a Tx into the app-side mempool returning an error upon failure.
func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
if m.isValidator {
return errors.New("validator cannot insert transactions into the mempool from comet")
}
itsdevbear marked this conversation as resolved.
Show resolved Hide resolved

sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
Expand All @@ -123,6 +149,13 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
// Add the eth tx to the Geth txpool.
ethTx := wet.Unwrap()

// Optmistically send to the validator.
if !m.isValidator && m.ethclient != nil {
// Broadcast the transaction to the validator.
// Note: we don't care about the response here.
_ = m.ethclient.SendTransaction(context.Background(), ethTx)
}

// Insert the tx into the txpool as a remote.
m.blockBuilderMu.RLock()
errs := m.TxPool.Add([]*ethtypes.Transaction{ethTx}, false, false)
Expand All @@ -140,7 +173,7 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
}

// Add the eth tx to the remote cache.
m.crc.MarkRemoteSeen(ethTx.Hash())
_ = m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
}
Expand All @@ -157,26 +190,6 @@ func (m *Mempool) Select(context.Context, [][]byte) mempool.Iterator {
}

// Remove is an intentional no-op as the eth txpool handles removals.
func (m *Mempool) Remove(tx sdk.Tx) error {
// Get the Eth payload envelope from the Cosmos transaction.
msgs := tx.GetMsgs()
if len(msgs) == 1 {
env, ok := utils.GetAs[*types.WrappedPayloadEnvelope](msgs[0])
if !ok {
return nil
}

// Unwrap the payload to unpack the individual eth transactions to remove from the txpool.
for _, txBz := range env.UnwrapPayload().ExecutionPayload.Transactions {
ethTx := new(ethtypes.Transaction)
if err := ethTx.UnmarshalBinary(txBz); err != nil {
continue
}
txHash := ethTx.Hash()

// Remove the eth tx from comet seen tx cache.
m.crc.DropRemoteTx(txHash)
}
}
func (m *Mempool) Remove(_ sdk.Tx) error {
return nil
}
Loading
Loading