Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: rpc stream overhead even if not used #551

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1.
* [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus.
* [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo.
* [#]() Start event stream on demand.
yihuang marked this conversation as resolved.
Show resolved Hide resolved

## v0.21.x-cronos

Expand Down
42 changes: 28 additions & 14 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type validatorAccountFunc func(
) (*evmtypes.QueryValidatorAccountResponse, error)

// RPCStream provides data streams for newHeads, logs, and pendingTransactions.
// it's only started on demand, so there's no overhead if the filter apis are not called at all.
type RPCStream struct {
evtClient rpcclient.EventsClient
logger log.Logger
Expand All @@ -69,39 +70,49 @@ func NewRPCStreams(
logger log.Logger,
txDecoder sdk.TxDecoder,
validatorAccount validatorAccountFunc,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
) *RPCStream {
return &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,
validatorAccount: validatorAccount,
}
}

func (s *RPCStream) init() {
if s.headerStream != nil {
// already initialized
return
}

s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity)
s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity)
s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity)

ctx := context.Background()

chBlocks, err := s.evtClient.Subscribe(ctx, streamSubscriberName, blockEvents, subscribBufferSize)
if err != nil {
return nil, err
panic(err)
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
panic(err)
}

go s.start(&s.wg, chBlocks, chLogs)

return s, nil
}

func (s *RPCStream) Close() error {
if s.headerStream == nil {
// not initialized
return nil
}

if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
return err
}
Expand All @@ -110,20 +121,23 @@ func (s *RPCStream) Close() error {
}

func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
s.init()
return s.headerStream
}

func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
s.init()
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
s.init()
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(hash common.Hash) {
s.pendingTxStream.Add(hash)
s.PendingTxStream().Add(hash)
}

func (s *RPCStream) start(
Expand Down
Loading