Skip to content

Commit

Permalink
Problem: rpc stream overhead even if not used (#551)
Browse files Browse the repository at this point in the history
* Problem: rpc stream overhead even if not used

Solution:
- init the underlying subscriptions lazily.

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* fix build

* ignore pending tx

---------

Signed-off-by: yihuang <[email protected]>
  • Loading branch information
yihuang authored Nov 1, 2024
1 parent 885530b commit 8afdc69
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1.
* [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus.
* [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo.
* [#551](https://github.com/crypto-org-chain/ethermint/pull/551) Start event stream on demand.

## v0.21.x-cronos

Expand Down
48 changes: 32 additions & 16 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ type validatorAccountFunc func(
) (*evmtypes.QueryValidatorAccountResponse, error)

// RPCStream provides data streams for newHeads, logs, and pendingTransactions.
// it's only started on demand, so there's no overhead if the filter apis are not called at all.
type RPCStream struct {
evtClient rpcclient.EventsClient
logger log.Logger
txDecoder sdk.TxDecoder

headerStream *Stream[RPCHeader]
// headerStream/logStream are backed by cometbft event subscription
headerStream *Stream[RPCHeader]
logStream *Stream[*ethtypes.Log]

// pendingTxStream is backed by check-tx ante handler
pendingTxStream *Stream[common.Hash]
logStream *Stream[*ethtypes.Log]

wg sync.WaitGroup
validatorAccount validatorAccountFunc
Expand All @@ -69,39 +73,49 @@ func NewRPCStreams(
logger log.Logger,
txDecoder sdk.TxDecoder,
validatorAccount validatorAccountFunc,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
) *RPCStream {
return &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,
validatorAccount: validatorAccount,
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
}
}

func (s *RPCStream) initSubscriptions() {
if s.headerStream != nil {
// already initialized
return
}

s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity)
s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity)

ctx := context.Background()

chBlocks, err := s.evtClient.Subscribe(ctx, streamSubscriberName, blockEvents, subscribBufferSize)
if err != nil {
return nil, err
panic(err)
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
panic(err)
}

go s.start(&s.wg, chBlocks, chLogs)

return s, nil
}

func (s *RPCStream) Close() error {
if s.headerStream == nil {
// not initialized
return nil
}

if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
return err
}
Expand All @@ -110,6 +124,7 @@ func (s *RPCStream) Close() error {
}

func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
s.initSubscriptions()
return s.headerStream
}

Expand All @@ -118,12 +133,13 @@ func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
s.initSubscriptions()
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(hash common.Hash) {
s.pendingTxStream.Add(hash)
s.PendingTxStream().Add(hash)
}

func (s *RPCStream) start(
Expand Down
19 changes: 2 additions & 17 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ import (
ethermint "github.com/evmos/ethermint/types"
)

const (
ServerStartTime = 5 * time.Second
MaxRetry = 6
)
const ServerStartTime = 5 * time.Second

type AppWithPendingTxStream interface {
RegisterPendingTxListener(listener ante.PendingTxListener)
Expand All @@ -64,20 +61,8 @@ func StartJSONRPC(
return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
}

var rpcStream *stream.RPCStream
var err error
queryClient := rpctypes.NewQueryClient(clientCtx)
for i := 0; i < MaxRetry; i++ {
rpcStream, err = stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount)
if err == nil {
break
}
time.Sleep(time.Second)
}

if err != nil {
return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}
rpcStream := stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount)

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)

Expand Down

0 comments on commit 8afdc69

Please sign in to comment.