From 68b1496d1f66d00a349122b3042ea132057dc357 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 1 Nov 2024 12:08:46 +0800 Subject: [PATCH 1/4] Problem: rpc stream overhead even if not used Solution: - init the underlying subscriptions lazily. --- CHANGELOG.md | 1 + rpc/stream/rpc.go | 42 ++++++++++++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e82a4c87..9bfcaaa49b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1. * [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus. * [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo. +* [#]() Start event stream on demand. ## v0.21.x-cronos diff --git a/rpc/stream/rpc.go b/rpc/stream/rpc.go index baa2a59660..f3a46da642 100644 --- a/rpc/stream/rpc.go +++ b/rpc/stream/rpc.go @@ -51,6 +51,7 @@ type validatorAccountFunc func( ) (*evmtypes.QueryValidatorAccountResponse, error) // RPCStream provides data streams for newHeads, logs, and pendingTransactions. +// it's only started on demand, so there's no overhead if the filter apis are not called at all. type RPCStream struct { evtClient rpcclient.EventsClient logger log.Logger @@ -69,23 +70,30 @@ func NewRPCStreams( logger log.Logger, txDecoder sdk.TxDecoder, validatorAccount validatorAccountFunc, -) (*RPCStream, error) { - s := &RPCStream{ - evtClient: evtClient, - logger: logger, - txDecoder: txDecoder, - - headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity), - pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity), - logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity), +) *RPCStream { + return &RPCStream{ + evtClient: evtClient, + logger: logger, + txDecoder: txDecoder, validatorAccount: validatorAccount, } +} + +func (s *RPCStream) init() { + if s.headerStream != nil { + // already initialized + return + } + + s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity) + s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity) + s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity) ctx := context.Background() chBlocks, err := s.evtClient.Subscribe(ctx, streamSubscriberName, blockEvents, subscribBufferSize) if err != nil { - return nil, err + panic(err) } chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize) @@ -93,15 +101,18 @@ func NewRPCStreams( if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil { s.logger.Error("failed to unsubscribe", "err", err) } - return nil, err + panic(err) } go s.start(&s.wg, chBlocks, chLogs) - - return s, nil } func (s *RPCStream) Close() error { + if s.headerStream == nil { + // not initialized + return nil + } + if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil { return err } @@ -110,20 +121,23 @@ func (s *RPCStream) Close() error { } func (s *RPCStream) HeaderStream() *Stream[RPCHeader] { + s.init() return s.headerStream } func (s *RPCStream) PendingTxStream() *Stream[common.Hash] { + s.init() return s.pendingTxStream } func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] { + s.init() return s.logStream } // ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx. func (s *RPCStream) ListenPendingTx(hash common.Hash) { - s.pendingTxStream.Add(hash) + s.PendingTxStream().Add(hash) } func (s *RPCStream) start( From 0291350a6a8f0d4950840509b1008d9b88741636 Mon Sep 17 00:00:00 2001 From: yihuang Date: Fri, 1 Nov 2024 12:10:16 +0800 Subject: [PATCH 2/4] Update CHANGELOG.md Signed-off-by: yihuang --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bfcaaa49b..c9cf53a8f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,7 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1. * [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus. * [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo. -* [#]() Start event stream on demand. +* [#551](https://github.com/crypto-org-chain/ethermint/pull/551) Start event stream on demand. ## v0.21.x-cronos From e909eaf6710c34c95c1dab34ece7e7677b85febc Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 1 Nov 2024 12:13:38 +0800 Subject: [PATCH 3/4] fix build --- server/json_rpc.go | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/server/json_rpc.go b/server/json_rpc.go index c8b5b5eedb..5fee46da97 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -38,10 +38,7 @@ import ( ethermint "github.com/evmos/ethermint/types" ) -const ( - ServerStartTime = 5 * time.Second - MaxRetry = 6 -) +const ServerStartTime = 5 * time.Second type AppWithPendingTxStream interface { RegisterPendingTxListener(listener ante.PendingTxListener) @@ -64,20 +61,8 @@ func StartJSONRPC( return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client) } - var rpcStream *stream.RPCStream - var err error queryClient := rpctypes.NewQueryClient(clientCtx) - for i := 0; i < MaxRetry; i++ { - rpcStream, err = stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount) - if err == nil { - break - } - time.Sleep(time.Second) - } - - if err != nil { - return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err) - } + rpcStream := stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount) app.RegisterPendingTxListener(rpcStream.ListenPendingTx) From 7e3a2b786ff1aed61ed6667e0f3a5c3ad24e4adb Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 1 Nov 2024 13:12:31 +0800 Subject: [PATCH 4/4] ignore pending tx --- rpc/stream/rpc.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rpc/stream/rpc.go b/rpc/stream/rpc.go index f3a46da642..5e4d1bb78c 100644 --- a/rpc/stream/rpc.go +++ b/rpc/stream/rpc.go @@ -57,9 +57,12 @@ type RPCStream struct { logger log.Logger txDecoder sdk.TxDecoder - headerStream *Stream[RPCHeader] + // headerStream/logStream are backed by cometbft event subscription + headerStream *Stream[RPCHeader] + logStream *Stream[*ethtypes.Log] + + // pendingTxStream is backed by check-tx ante handler pendingTxStream *Stream[common.Hash] - logStream *Stream[*ethtypes.Log] wg sync.WaitGroup validatorAccount validatorAccountFunc @@ -76,17 +79,17 @@ func NewRPCStreams( logger: logger, txDecoder: txDecoder, validatorAccount: validatorAccount, + pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity), } } -func (s *RPCStream) init() { +func (s *RPCStream) initSubscriptions() { if s.headerStream != nil { // already initialized return } s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity) - s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity) s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity) ctx := context.Background() @@ -121,17 +124,16 @@ func (s *RPCStream) Close() error { } func (s *RPCStream) HeaderStream() *Stream[RPCHeader] { - s.init() + s.initSubscriptions() return s.headerStream } func (s *RPCStream) PendingTxStream() *Stream[common.Hash] { - s.init() return s.pendingTxStream } func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] { - s.init() + s.initSubscriptions() return s.logStream }