Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of L2 rpc calls #125

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
rootCmd.PersistentFlags().IntVar(&cfg.ReturnDataLimit, utils.RpcReturnDataLimit.Name, utils.RpcReturnDataLimit.Value, utils.RpcReturnDataLimit.Usage)

rootCmd.PersistentFlags().StringVar(&cfg.L2RpcUrl, utils.L2RpcUrlFlag.Name, utils.L2RpcUrlFlag.Value, utils.L2RpcUrlFlag.Usage)
rootCmd.PersistentFlags().Int64Var(&cfg.L2RpcLimit, utils.L2RpcLimitFlag.Name, utils.L2RpcLimitFlag.Value, utils.L2RpcLimitFlag.Usage)
// X Layer API Keys
rootCmd.PersistentFlags().StringVar(&cfg.HttpApiKeys, utils.HTTPApiKeysFlag.Name, utils.HTTPApiKeysFlag.Value, utils.HTTPApiKeysFlag.Usage)
rootCmd.PersistentFlags().StringVar(&cfg.MethodRateLimit, utils.MethodRateLimitFlag.Name, utils.MethodRateLimitFlag.Value, utils.MethodRateLimitFlag.Usage)
Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon/cli/httpcfg/http_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ type HttpCfg struct {
// For X Layer
HttpApiKeys string
MethodRateLimit string
L2RpcLimit int64
}
4 changes: 3 additions & 1 deletion cmd/rpcdaemon/commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/zk/sequencer"
"github.com/ledgerwatch/erigon/zk/syncer"
txpool2 "github.com/ledgerwatch/erigon/zk/txpool"
"github.com/ledgerwatch/erigon/zk/sequencer"
)

// APIList describes the list of available RPC apis
Expand All @@ -33,6 +33,7 @@ func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
base.SetL2RpcUrl(ethCfg.L2RpcUrl)
base.SetGasless(ethCfg.Gasless)
base.SetL2RpcLimit(ethCfg.Zk.XLayer.L2RpcLimit) // For X Layer
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, ethCfg)
erigonImpl := NewErigonAPI(base, db, eth)
txpoolImpl := NewTxPoolAPI(base, db, txPool, rawPool, rpcUrl)
Expand Down Expand Up @@ -165,6 +166,7 @@ func AuthAPIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClien
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
base.SetL2RpcUrl(ethCfg.L2RpcUrl)
base.SetGasless(ethCfg.Gasless)
base.SetL2RpcLimit(ethCfg.Zk.XLayer.L2RpcLimit) // For X Layer

ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, ethCfg)
engineImpl := NewEngineAPI(base, db, eth, cfg.InternalCL)
Expand Down
9 changes: 5 additions & 4 deletions cmd/rpcdaemon/commands/eth_accounts_zk.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package commands

import (
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"fmt"
"github.com/ledgerwatch/erigon/common/hexutil"
libcommon "github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/rpc"
"strings"
"github.com/ledgerwatch/erigon/zkevm/hex"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
"strings"
)

func (api *APIImpl) sendGetTransactionCountToSequencer(rpcUrl string, address libcommon.Address, blockNrOrHash *rpc.BlockNumberOrHash) (*hexutil.Uint64, error) {
Expand All @@ -22,7 +23,7 @@ func (api *APIImpl) sendGetTransactionCountToSequencer(rpcUrl string, address li
}
}

res, err := client.JSONRPCCall(rpcUrl, "eth_getTransactionCount", addressHex, blockNrOrHashValue)
res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{rpcUrl, api.l2RpcLimit}, rpcUrl, "eth_getTransactionCount", addressHex, blockNrOrHashValue)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type BaseAPI struct {
dirs datadir.Dirs
l2RpcUrl string
gasless bool

// For X Layer
l2RpcLimit int64
}

func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.AggregatorV3, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader, dirs datadir.Dirs) *BaseAPI {
Expand Down
5 changes: 5 additions & 0 deletions cmd/rpcdaemon/commands/eth_api_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ func (api *BaseAPI) GetL2RpcUrl() string {
}
return api.l2RpcUrl
}

// For X Layer
func (api *BaseAPI) SetL2RpcLimit(limit int64) {
api.l2RpcLimit = limit
}
3 changes: 2 additions & 1 deletion cmd/rpcdaemon/commands/eth_system_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ledgerwatch/erigon/zk/apollo"
"github.com/ledgerwatch/erigon/zk/sequencer"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -72,7 +73,7 @@ func (api *APIImpl) getLatestBlockTxNum(ctx context.Context) (int, error) {
}

func (api *APIImpl) getGPFromTrustedNode(method string) (*big.Int, error) {
res, err := client.JSONRPCCall(api.l2RpcUrl, method)
res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RpcUrl, method)
if err != nil {
return nil, errors.New("failed to get gas price from trusted node")
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/rpcdaemon/commands/eth_system_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/zkevm/encoding"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ func (api *APIImpl) GasPrice(ctx context.Context) (*hexutil.Big, error) {
return &price, nil
}

res, err := client.JSONRPCCall(api.l2RpcUrl, "eth_gasPrice")
res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RpcUrl, "eth_gasPrice")
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/commands/send_transaction_zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package commands

import (
"fmt"
"strings"

"math/big"
"strings"

"github.com/gateway-fm/cdk-erigon-lib/common"
"github.com/gateway-fm/cdk-erigon-lib/common/hexutility"
"github.com/ledgerwatch/erigon/zk/sequencer"
"github.com/ledgerwatch/erigon/zk/zkchainconfig"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
)

func (api *APIImpl) isPoolManagerAddressSet() bool {
Expand All @@ -22,7 +22,7 @@ func (api *APIImpl) isZkNonSequencer(chainId *big.Int) bool {
}

func (api *APIImpl) sendTxZk(rpcUrl string, encodedTx hexutility.Bytes, chainId uint64) (common.Hash, error) {
res, err := client.JSONRPCCall(rpcUrl, "eth_sendRawTransaction", encodedTx)
res, err := client.JSONRPCCallWhitLimit(types.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, rpcUrl, "eth_sendRawTransaction", encodedTx)
if err != nil {
return common.Hash{}, err
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/rpcdaemon/commands/txpool_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import (
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
"github.com/ledgerwatch/erigon/zk/txpool"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
jsonrpc "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
)

// NetAPI the interface for the net_ RPC commands
Expand Down Expand Up @@ -46,7 +47,7 @@ func NewTxPoolAPI(base *BaseAPI, db kv.RoDB, pool proto_txpool.TxpoolClient, raw

func (api *TxPoolAPIImpl) Content(ctx context.Context) (interface{}, error) {
if api.l2RPCUrl != "" {
res, err := client.JSONRPCCall(api.l2RPCUrl, "txpool_content")
res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RPCUrl, "txpool_content")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,7 +138,7 @@ func (api *TxPoolAPIImpl) Content(ctx context.Context) (interface{}, error) {
// Status returns the number of pending and queued transaction in the pool.
func (api *TxPoolAPIImpl) Status(ctx context.Context) (interface{}, error) {
if api.l2RPCUrl != "" {
res, err := client.JSONRPCCall(api.l2RPCUrl, "txpool_status")
res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{api.l2RpcUrl, api.l2RpcLimit}, api.l2RPCUrl, "txpool_status")
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/rpcdaemon/commands/zkevm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ledgerwatch/erigon/zk/witness"
"github.com/ledgerwatch/erigon/zkevm/hex"
"github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
jsonrpc "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -1145,7 +1146,7 @@ func (api *ZkEvmAPIImpl) GetExitRootTable(ctx context.Context) ([]l1InfoTreeData
}

func (api *ZkEvmAPIImpl) sendGetBatchWitness(rpcUrl string, batchNumber uint64, mode *WitnessMode) (json.RawMessage, error) {
res, err := client.JSONRPCCall(rpcUrl, "zkevm_getBatchWitness", batchNumber, mode)
res, err := client.JSONRPCCallWhitLimit(jsonrpc.L2RpcLimit{rpcUrl, api.ethApi.l2RpcLimit}, rpcUrl, "zkevm_getBatchWitness", batchNumber, mode)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {

ethConfig := ethconfig.Defaults
ethConfig.L2RpcUrl = cfg.L2RpcUrl
ethConfig.Zk.XLayer.L2RpcLimit = cfg.L2RpcLimit

// TODO: Replace with correct consensus Engine
engine := ethash.NewFaker()
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ var (
Usage: "Upstream L2 node RPC endpoint",
Value: "",
}
L2RpcLimitFlag = cli.Int64Flag{ // For X Layer
Name: "zkevm.l2-sequencer-rpc-limit",
Usage: "The limit of call L2 seq RPC endpoint",
Value: 0,
}
L2DataStreamerUrlFlag = cli.StringFlag{
Name: "zkevm.l2-datastreamer-url",
Usage: "L2 datastreamer endpoint",
Expand Down
2 changes: 2 additions & 0 deletions eth/ethconfig/config_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type XLayerConfig struct {
EnableInnerTx bool
// Sequencer
SequencerBatchSleepDuration time.Duration

L2RpcLimit int64
}

var DefaultXLayerConfig = XLayerConfig{}
Expand Down
1 change: 1 addition & 0 deletions test/config/test.erigon.rpc.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ private.api.addr: localhost:9091
zkevm.l2-chain-id: 195
zkevm.l2-sequencer-rpc-url: http://erigon-seq:8545
zkevm.l2-datastreamer-url: erigon-seq:6900
zkevm.l2-sequencer-rpc-limit: 1000
zkevm.l1-chain-id: 1337
zkevm.l1-rpc-url: http://erigon-mock-l1-network:8545

Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,5 @@ var DefaultFlags = []cli.Flag{
&utils.TxPoolFreeGasCountPerAddr,
&utils.TxPoolFreeGasExAddrs,
&utils.TxPoolFreeGasLimit,
&utils.L2RpcLimitFlag,
}
3 changes: 2 additions & 1 deletion turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,14 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config) {
TxPoolApiAddr: ctx.String(utils.TxpoolApiAddrFlag.Name),

StateCache: kvcache.DefaultCoherentConfig,

DataStreamPort: ctx.Int(utils.DataStreamPort.Name),
DataStreamHost: ctx.String(utils.DataStreamHost.Name),
DataStreamWriteTimeout: ctx.Duration(utils.DataStreamWriteTimeout.Name),
DataStreamInactivityTimeout: ctx.Duration(utils.DataStreamInactivityTimeout.Name),
DataStreamInactivityCheckInterval: ctx.Duration(utils.DataStreamInactivityCheckInterval.Name),
L2RpcUrl: ctx.String(utils.L2RpcUrlFlag.Name),
L2RpcLimit: ctx.Int64(utils.L2RpcLimitFlag.Name),
}
if ctx.IsSet(utils.HttpCompressionFlag.Name) {
c.HttpCompression = ctx.Bool(utils.HttpCompressionFlag.Name)
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/flags_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func ApplyFlagsForEthXLayerConfig(ctx *cli.Context, cfg *ethconfig.Config) {
},
EnableInnerTx: ctx.Bool(utils.AllowInternalTransactions.Name),
SequencerBatchSleepDuration: ctx.Duration(utils.SequencerBatchSleepDuration.Name),
L2RpcLimit: ctx.Int64(utils.L2RpcLimitFlag.Name),
}

if ctx.IsSet(utils.ApolloNamespaceName.Name) {
Expand Down
3 changes: 3 additions & 0 deletions zk/apollo/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func loadZkConfig(ctx *cli.Context, ethCfg *ethconfig.Config) {
if ctx.IsSet(utils.AllowInternalTransactions.Name) {
ethCfg.Zk.XLayer.EnableInnerTx = ctx.Bool(utils.AllowInternalTransactions.Name)
}
if ctx.IsSet(utils.L2RpcLimitFlag.Name) {
ethCfg.Zk.XLayer.L2RpcLimit = ctx.Int64(utils.L2RpcLimitFlag.Name)
}
}

func getNamespacePrefix(namespace string) (string, error) {
Expand Down
20 changes: 20 additions & 0 deletions zkevm/jsonrpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package client
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync/atomic"

"github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
)
Expand All @@ -31,6 +33,8 @@ func (e *HTTPError) Error() string {
return fmt.Sprintf("invalid status code, expected: %d, found: %d", http.StatusOK, e.StatusCode)
}

var l2RpcCallCount atomic.Int64

// JSONRPCCall executes a 2.0 JSON RPC HTTP Post Request to the provided URL with
// the provided method and parameters, which is compatible with the Ethereum
// JSON RPC Server.
Expand Down Expand Up @@ -147,3 +151,19 @@ func JSONRPCBatchCall(url string, methods []string, parameterGroups ...[]interfa

return batchResponse, nil
}

// For X Layer
func JSONRPCCallWhitLimit(l2RpcLimit types.L2RpcLimit, url, method string, parameters ...interface{}) (types.Response, error) {
if url == l2RpcLimit.L2Url {
if l2RpcLimit.CallLimit > 0 && l2RpcCallCount.Load() >= l2RpcLimit.CallLimit {
return types.Response{}, errors.New("rpc is too busy")
}

l2RpcCallCount.Add(1)
defer func() {
l2RpcCallCount.Add(-1)
}()
}

return JSONRPCCall(url, method, parameters...)
}
6 changes: 6 additions & 0 deletions zkevm/jsonrpc/types/xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package types

type L2RpcLimit struct {
L2Url string
CallLimit int64
}
Loading