diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index bd19d08c7d7..c6eed0f469b 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -122,6 +122,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) { rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage) rootCmd.PersistentFlags().StringVar(&cfg.L2RpcUrl, utils.L2RpcUrlFlag.Name, utils.L2RpcUrlFlag.Value, utils.L2RpcUrlFlag.Usage) + rootCmd.PersistentFlags().Int64Var(&cfg.L2RpcLimit, utils.L2RpcLimitFlag.Name, utils.L2RpcLimitFlag.Value, utils.L2RpcLimitFlag.Usage) // X Layer API Keys rootCmd.PersistentFlags().StringVar(&cfg.HttpApiKeys, utils.HTTPApiKeysFlag.Name, utils.HTTPApiKeysFlag.Value, utils.HTTPApiKeysFlag.Usage) rootCmd.PersistentFlags().StringVar(&cfg.MethodRateLimit, utils.MethodRateLimitFlag.Name, utils.MethodRateLimitFlag.Value, utils.MethodRateLimitFlag.Usage) diff --git a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go index 083d72a1a8e..734d033bede 100644 --- a/cmd/rpcdaemon/cli/httpcfg/http_cfg.go +++ b/cmd/rpcdaemon/cli/httpcfg/http_cfg.go @@ -76,4 +76,5 @@ type HttpCfg struct { // For X Layer HttpApiKeys string MethodRateLimit string + L2RpcLimit int64 } diff --git a/cmd/rpcdaemon/commands/daemon.go b/cmd/rpcdaemon/commands/daemon.go index 6939562029c..d2db149bd1c 100644 --- a/cmd/rpcdaemon/commands/daemon.go +++ b/cmd/rpcdaemon/commands/daemon.go @@ -12,9 +12,9 @@ import ( "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/rpchelper" "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/zk/sequencer" "github.com/ledgerwatch/erigon/zk/syncer" txpool2 "github.com/ledgerwatch/erigon/zk/txpool" - "github.com/ledgerwatch/erigon/zk/sequencer" ) // APIList describes the list of available RPC apis @@ -33,6 +33,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool. base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs) base.SetL2RpcUrl(ethCfg.L2RpcUrl) base.SetGasless(ethCfg.Gasless) + base.SetL2RpcLimit(ethCfg.Zk.XLayer.L2RpcLimit) // For X Layer ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, ethCfg) erigonImpl := NewErigonAPI(base, db, eth) txpoolImpl := NewTxPoolAPI(base, db, txPool, rawPool, rpcUrl) @@ -165,6 +166,7 @@ func AuthAPIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClien base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs) base.SetL2RpcUrl(ethCfg.L2RpcUrl) base.SetGasless(ethCfg.Gasless) + base.SetL2RpcLimit(ethCfg.Zk.XLayer.L2RpcLimit) // For X Layer ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, ethCfg) engineImpl := NewEngineAPI(base, db, eth, cfg.InternalCL) diff --git a/cmd/rpcdaemon/commands/eth_accounts_zk.go b/cmd/rpcdaemon/commands/eth_accounts_zk.go index 3243d7a5c5c..ebc753b9bf4 100644 --- a/cmd/rpcdaemon/commands/eth_accounts_zk.go +++ b/cmd/rpcdaemon/commands/eth_accounts_zk.go @@ -1,13 +1,14 @@ package commands import ( - "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" "fmt" - "github.com/ledgerwatch/erigon/common/hexutil" libcommon "github.com/gateway-fm/cdk-erigon-lib/common" + "github.com/ledgerwatch/erigon/common/hexutil" "github.com/ledgerwatch/erigon/rpc" - "strings" "github.com/ledgerwatch/erigon/zkevm/hex" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" + "strings" ) func (api *APIImpl) sendGetTransactionCountToSequencer(rpcUrl string, address libcommon.Address, blockNrOrHash *rpc.BlockNumberOrHash) (*hexutil.Uint64, error) { @@ -22,7 +23,7 @@ func (api *APIImpl) sendGetTransactionCountToSequencer(rpcUrl string, address li } } - res, err := client.JSONRPCCall(rpcUrl, "eth_getTransactionCount", addressHex, blockNrOrHashValue) + res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{rpcUrl, api.l2RpcLimit}, rpcUrl, "eth_getTransactionCount", addressHex, blockNrOrHashValue) if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/eth_api.go b/cmd/rpcdaemon/commands/eth_api.go index d6c51b6f4c8..8bd6981513d 100644 --- a/cmd/rpcdaemon/commands/eth_api.go +++ b/cmd/rpcdaemon/commands/eth_api.go @@ -131,6 +131,9 @@ type BaseAPI struct { dirs datadir.Dirs l2RpcUrl string gasless bool + + // For X Layer + l2RpcLimit int64 } func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.AggregatorV3, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader, dirs datadir.Dirs) *BaseAPI { diff --git a/cmd/rpcdaemon/commands/eth_api_zkevm.go b/cmd/rpcdaemon/commands/eth_api_zkevm.go index c81cee659a5..495595a4e24 100644 --- a/cmd/rpcdaemon/commands/eth_api_zkevm.go +++ b/cmd/rpcdaemon/commands/eth_api_zkevm.go @@ -10,3 +10,8 @@ func (api *BaseAPI) GetL2RpcUrl() string { } return api.l2RpcUrl } + +// For X Layer +func (api *BaseAPI) SetL2RpcLimit(limit int64) { + api.l2RpcLimit = limit +} diff --git a/cmd/rpcdaemon/commands/eth_system_xlayer.go b/cmd/rpcdaemon/commands/eth_system_xlayer.go index e6a3b18af4c..2e25a410012 100644 --- a/cmd/rpcdaemon/commands/eth_system_xlayer.go +++ b/cmd/rpcdaemon/commands/eth_system_xlayer.go @@ -17,6 +17,7 @@ import ( "github.com/ledgerwatch/erigon/zk/apollo" "github.com/ledgerwatch/erigon/zk/sequencer" "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" "github.com/ledgerwatch/log/v3" ) @@ -72,7 +73,7 @@ func (api *APIImpl) getLatestBlockTxNum(ctx context.Context) (int, error) { } func (api *APIImpl) getGPFromTrustedNode(method string) (*big.Int, error) { - res, err := client.JSONRPCCall(api.l2RpcUrl, method) + res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RpcUrl, method) if err != nil { return nil, errors.New("failed to get gas price from trusted node") } diff --git a/cmd/rpcdaemon/commands/eth_system_zk.go b/cmd/rpcdaemon/commands/eth_system_zk.go index 1e825bedd40..df8ad6ad8cb 100644 --- a/cmd/rpcdaemon/commands/eth_system_zk.go +++ b/cmd/rpcdaemon/commands/eth_system_zk.go @@ -11,6 +11,7 @@ import ( "github.com/ledgerwatch/erigon/common/hexutil" "github.com/ledgerwatch/erigon/zkevm/encoding" "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" "github.com/ledgerwatch/log/v3" ) @@ -44,7 +45,7 @@ func (api *APIImpl) GasPrice(ctx context.Context) (*hexutil.Big, error) { return &price, nil } - res, err := client.JSONRPCCall(api.l2RpcUrl, "eth_gasPrice") + res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RpcUrl, "eth_gasPrice") if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/send_transaction_zk.go b/cmd/rpcdaemon/commands/send_transaction_zk.go index dca43bc8128..38ff113df1c 100644 --- a/cmd/rpcdaemon/commands/send_transaction_zk.go +++ b/cmd/rpcdaemon/commands/send_transaction_zk.go @@ -2,15 +2,15 @@ package commands import ( "fmt" - "strings" - "math/big" + "strings" "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/common/hexutility" "github.com/ledgerwatch/erigon/zk/sequencer" "github.com/ledgerwatch/erigon/zk/zkchainconfig" "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" ) func (api *APIImpl) isPoolManagerAddressSet() bool { @@ -22,7 +22,7 @@ func (api *APIImpl) isZkNonSequencer(chainId *big.Int) bool { } func (api *APIImpl) sendTxZk(rpcUrl string, encodedTx hexutility.Bytes, chainId uint64) (common.Hash, error) { - res, err := client.JSONRPCCall(rpcUrl, "eth_sendRawTransaction", encodedTx) + res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, rpcUrl, "eth_sendRawTransaction", encodedTx) if err != nil { return common.Hash{}, err } diff --git a/cmd/rpcdaemon/commands/txpool_api.go b/cmd/rpcdaemon/commands/txpool_api.go index 0065a7dbb7d..8c363482998 100644 --- a/cmd/rpcdaemon/commands/txpool_api.go +++ b/cmd/rpcdaemon/commands/txpool_api.go @@ -14,8 +14,9 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/rlp" - "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" "github.com/ledgerwatch/erigon/zk/txpool" + "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + jsonrpc "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" ) // NetAPI the interface for the net_ RPC commands @@ -46,7 +47,7 @@ func NewTxPoolAPI(base *BaseAPI, db kv.RoDB, pool proto_txpool.TxpoolClient, raw func (api *TxPoolAPIImpl) Content(ctx context.Context) (interface{}, error) { if api.l2RPCUrl != "" { - res, err := client.JSONRPCCall(api.l2RPCUrl, "txpool_content") + res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RPCUrl, "txpool_content") if err != nil { return nil, err } @@ -137,7 +138,7 @@ func (api *TxPoolAPIImpl) Content(ctx context.Context) (interface{}, error) { // Status returns the number of pending and queued transaction in the pool. func (api *TxPoolAPIImpl) Status(ctx context.Context) (interface{}, error) { if api.l2RPCUrl != "" { - res, err := client.JSONRPCCall(api.l2RPCUrl, "txpool_status") + res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RPCUrl, "txpool_status") if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/zkevm_api.go b/cmd/rpcdaemon/commands/zkevm_api.go index c72144d2ee6..fdda0f15d97 100644 --- a/cmd/rpcdaemon/commands/zkevm_api.go +++ b/cmd/rpcdaemon/commands/zkevm_api.go @@ -33,6 +33,7 @@ import ( "github.com/ledgerwatch/erigon/zk/witness" "github.com/ledgerwatch/erigon/zkevm/hex" "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client" + jsonrpc "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" "github.com/ledgerwatch/log/v3" ) @@ -1145,7 +1146,7 @@ func (api *ZkEvmAPIImpl) GetExitRootTable(ctx context.Context) ([]l1InfoTreeData } func (api *ZkEvmAPIImpl) sendGetBatchWitness(rpcUrl string, batchNumber uint64, mode *WitnessMode) (json.RawMessage, error) { - res, err := client.JSONRPCCall(rpcUrl, "zkevm_getBatchWitness", batchNumber, mode) + res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{rpcUrl, api.ethApi.l2RpcLimit}, rpcUrl, "zkevm_getBatchWitness", batchNumber, mode) if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 2d5cbb13e0e..a6761c213b7 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -31,6 +31,7 @@ func main() { ethConfig := ethconfig.Defaults ethConfig.L2RpcUrl = cfg.L2RpcUrl + ethConfig.Zk.XLayer.L2RpcLimit = cfg.L2RpcLimit // TODO: Replace with correct consensus Engine engine := ethash.NewFaker() diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7efe1f88ef1..83b4f2a54e2 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -360,6 +360,11 @@ var ( Usage: "Upstream L2 node RPC endpoint", Value: "", } + L2RpcLimitFlag = cli.Int64Flag{ // For X Layer + Name: "zkevm.l2-sequencer-rpc-limit", + Usage: "The limit of call L2 seq RPC endpoint", + Value: 0, + } L2DataStreamerUrlFlag = cli.StringFlag{ Name: "zkevm.l2-datastreamer-url", Usage: "L2 datastreamer endpoint", diff --git a/eth/ethconfig/config_xlayer.go b/eth/ethconfig/config_xlayer.go index 98694c22411..35211fd3d20 100644 --- a/eth/ethconfig/config_xlayer.go +++ b/eth/ethconfig/config_xlayer.go @@ -11,6 +11,8 @@ type XLayerConfig struct { EnableInnerTx bool // Sequencer SequencerBatchSleepDuration time.Duration + + L2RpcLimit int64 } var DefaultXLayerConfig = XLayerConfig{} diff --git a/test/config/test.erigon.rpc.config.yaml b/test/config/test.erigon.rpc.config.yaml index dce424d9534..f4c2d36e474 100644 --- a/test/config/test.erigon.rpc.config.yaml +++ b/test/config/test.erigon.rpc.config.yaml @@ -5,6 +5,7 @@ private.api.addr: localhost:9091 zkevm.l2-chain-id: 195 zkevm.l2-sequencer-rpc-url: http://erigon-seq:8545 zkevm.l2-datastreamer-url: erigon-seq:6900 +zkevm.l2-sequencer-rpc-limit: 1000 zkevm.l1-chain-id: 1337 zkevm.l1-rpc-url: http://erigon-mock-l1-network:8545 diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index da8813aba12..e14a89b5e6d 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -269,4 +269,5 @@ var DefaultFlags = []cli.Flag{ &utils.TxPoolFreeGasCountPerAddr, &utils.TxPoolFreeGasExAddrs, &utils.TxPoolFreeGasLimit, + &utils.L2RpcLimitFlag, } diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index b8cf4608d8a..9bf0eee604d 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -401,13 +401,14 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) { TxPoolApiAddr: ctx.String(utils.TxpoolApiAddrFlag.Name), StateCache: kvcache.DefaultCoherentConfig, - + DataStreamPort: ctx.Int(utils.DataStreamPort.Name), DataStreamHost: ctx.String(utils.DataStreamHost.Name), DataStreamWriteTimeout: ctx.Duration(utils.DataStreamWriteTimeout.Name), DataStreamInactivityTimeout: ctx.Duration(utils.DataStreamInactivityTimeout.Name), DataStreamInactivityCheckInterval: ctx.Duration(utils.DataStreamInactivityCheckInterval.Name), L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name), + L2RpcLimit: ctx.Int64(utils.L2RpcLimitFlag.Name), } if ctx.IsSet(utils.HttpCompressionFlag.Name) { c.HttpCompression = ctx.Bool(utils.HttpCompressionFlag.Name) diff --git a/turbo/cli/flags_xlayer.go b/turbo/cli/flags_xlayer.go index 692d5b920e8..05860ed1544 100644 --- a/turbo/cli/flags_xlayer.go +++ b/turbo/cli/flags_xlayer.go @@ -24,6 +24,7 @@ func ApplyFlagsForEthXLayerConfig(ctx *cli.Context, cfg *ethconfig.Config) { }, EnableInnerTx: ctx.Bool(utils.AllowInternalTransactions.Name), SequencerBatchSleepDuration: ctx.Duration(utils.SequencerBatchSleepDuration.Name), + L2RpcLimit: ctx.Int64(utils.L2RpcLimitFlag.Name), } if ctx.IsSet(utils.ApolloNamespaceName.Name) { diff --git a/zk/apollo/utils.go b/zk/apollo/utils.go index 477c6005f2b..4a4618154fe 100644 --- a/zk/apollo/utils.go +++ b/zk/apollo/utils.go @@ -142,6 +142,9 @@ func loadZkConfig(ctx *cli.Context, ethCfg *ethconfig.Config) { if ctx.IsSet(utils.AllowInternalTransactions.Name) { ethCfg.Zk.XLayer.EnableInnerTx = ctx.Bool(utils.AllowInternalTransactions.Name) } + if ctx.IsSet(utils.L2RpcLimitFlag.Name) { + ethCfg.Zk.XLayer.L2RpcLimit = ctx.Int64(utils.L2RpcLimitFlag.Name) + } } func getNamespacePrefix(namespace string) (string, error) { diff --git a/zkevm/jsonrpc/client/client.go b/zkevm/jsonrpc/client/client.go index 950731e947f..78c0e573a14 100644 --- a/zkevm/jsonrpc/client/client.go +++ b/zkevm/jsonrpc/client/client.go @@ -3,9 +3,11 @@ package client import ( "bytes" "encoding/json" + "errors" "fmt" "io" "net/http" + "sync/atomic" "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types" ) @@ -31,6 +33,8 @@ func (e *HTTPError) Error() string { return fmt.Sprintf("invalid status code, expected: %d, found: %d", http.StatusOK, e.StatusCode) } +var l2RpcCallCount atomic.Int64 + // JSONRPCCall executes a 2.0 JSON RPC HTTP Post Request to the provided URL with // the provided method and parameters, which is compatible with the Ethereum // JSON RPC Server. @@ -147,3 +151,19 @@ func JSONRPCBatchCall(url string, methods []string, parameterGroups ...[]interfa return batchResponse, nil } + +// For X Layer +func JSONRPCCallWhitLimit(l2RpcLimit types.L2RpcLimit, url, method string, parameters ...interface{}) (types.Response, error) { + if url == l2RpcLimit.L2Url { + if l2RpcLimit.CallLimit > 0 && l2RpcCallCount.Load() >= l2RpcLimit.CallLimit { + return types.Response{}, errors.New("rpc is too busy") + } + + l2RpcCallCount.Add(1) + defer func() { + l2RpcCallCount.Add(-1) + }() + } + + return JSONRPCCall(url, method, parameters...) +} diff --git a/zkevm/jsonrpc/types/xlayer.go b/zkevm/jsonrpc/types/xlayer.go new file mode 100644 index 00000000000..83a0f6b8916 --- /dev/null +++ b/zkevm/jsonrpc/types/xlayer.go @@ -0,0 +1,6 @@ +package types + +type L2RpcLimit struct { + L2Url string + CallLimit int64 +}