Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: rpc stream overhead even if not used #551

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#533](https://github.com/crypto-org-chain/ethermint/pull/533) Bump cosmos-sdk to v0.50.10, cometbft to v0.38.13 and ibc-go to v8.5.1.
* [#546](https://github.com/crypto-org-chain/ethermint/pull/546) Introduce `--async-check-tx` flag to run check-tx async with consensus.
* [#549](https://github.com/crypto-org-chain/ethermint/pull/549) Support build without cgo.
* [#551](https://github.com/crypto-org-chain/ethermint/pull/551) Start event stream on demand.

## v0.21.x-cronos

Expand Down
42 changes: 28 additions & 14 deletions rpc/stream/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
) (*evmtypes.QueryValidatorAccountResponse, error)

// RPCStream provides data streams for newHeads, logs, and pendingTransactions.
// it's only started on demand, so there's no overhead if the filter apis are not called at all.
type RPCStream struct {
evtClient rpcclient.EventsClient
logger log.Logger
Expand All @@ -69,39 +70,49 @@
logger log.Logger,
txDecoder sdk.TxDecoder,
validatorAccount validatorAccountFunc,
) (*RPCStream, error) {
s := &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity),
pendingTxStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity),
logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity),
) *RPCStream {
return &RPCStream{
evtClient: evtClient,
logger: logger,
txDecoder: txDecoder,

Check warning on line 77 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L73-L77

Added lines #L73 - L77 were not covered by tests
validatorAccount: validatorAccount,
}
}

func (s *RPCStream) init() {
if s.headerStream != nil {
// already initialized
return
}

Check warning on line 86 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L82-L86

Added lines #L82 - L86 were not covered by tests

s.headerStream = NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity)
s.pendingTxStream = NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity)
s.logStream = NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity)

Check warning on line 90 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L88-L90

Added lines #L88 - L90 were not covered by tests

ctx := context.Background()

chBlocks, err := s.evtClient.Subscribe(ctx, streamSubscriberName, blockEvents, subscribBufferSize)
if err != nil {
return nil, err
panic(err)

Check warning on line 96 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L96

Added line #L96 was not covered by tests
}

chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize)
if err != nil {
if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
s.logger.Error("failed to unsubscribe", "err", err)
}
return nil, err
panic(err)

Check warning on line 104 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L104

Added line #L104 was not covered by tests
}

go s.start(&s.wg, chBlocks, chLogs)

return s, nil
}

func (s *RPCStream) Close() error {
if s.headerStream == nil {
// not initialized
return nil
}

Check warning on line 114 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L111-L114

Added lines #L111 - L114 were not covered by tests

if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil {
return err
}
Expand All @@ -110,20 +121,23 @@
}

func (s *RPCStream) HeaderStream() *Stream[RPCHeader] {
s.init()

Check warning on line 124 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L124

Added line #L124 was not covered by tests
return s.headerStream
}

func (s *RPCStream) PendingTxStream() *Stream[common.Hash] {
s.init()

Check warning on line 129 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L129

Added line #L129 was not covered by tests
return s.pendingTxStream
}

func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] {
s.init()

Check warning on line 134 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L134

Added line #L134 was not covered by tests
return s.logStream
}

// ListenPendingTx is a callback passed to application to listen for pending transactions in CheckTx.
func (s *RPCStream) ListenPendingTx(hash common.Hash) {
s.pendingTxStream.Add(hash)
s.PendingTxStream().Add(hash)

Check warning on line 140 in rpc/stream/rpc.go

View check run for this annotation

Codecov / codecov/patch

rpc/stream/rpc.go#L140

Added line #L140 was not covered by tests
}

func (s *RPCStream) start(
Expand Down
19 changes: 2 additions & 17 deletions server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
ethermint "github.com/evmos/ethermint/types"
)

const (
ServerStartTime = 5 * time.Second
MaxRetry = 6
)
const ServerStartTime = 5 * time.Second

type AppWithPendingTxStream interface {
RegisterPendingTxListener(listener ante.PendingTxListener)
Expand All @@ -64,20 +61,8 @@
return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client)
}

var rpcStream *stream.RPCStream
var err error
queryClient := rpctypes.NewQueryClient(clientCtx)
for i := 0; i < MaxRetry; i++ {
rpcStream, err = stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount)
if err == nil {
break
}
time.Sleep(time.Second)
}

if err != nil {
return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err)
}
rpcStream := stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder(), queryClient.ValidatorAccount)

Check warning on line 65 in server/json_rpc.go

View check run for this annotation

Codecov / codecov/patch

server/json_rpc.go#L65

Added line #L65 was not covered by tests

app.RegisterPendingTxListener(rpcStream.ListenPendingTx)

Expand Down
Loading