diff --git a/CHANGELOG.md b/CHANGELOG.md index da8c0cc551..3b2d1928d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - (evm) [#343](https://github.com/crypto-org-chain/ethermint/pull/343) Add native event converter APIs. - (ante) [#353](https://github.com/crypto-org-chain/ethermint/pull/353) Remove blocked address decorator and support custom decorators instead. - (statedb) [#359](https://github.com/crypto-org-chain/ethermint/pull/359) Add `CacheContext` method to StateDB, to support efficient read-only native actions. +- (rpc) [#375](https://github.com/crypto-org-chain/ethermint/pull/375) Refactor websocket/subscription system to improve performance and stability. ## [v0.21.0] - 2023-01-26 diff --git a/Makefile b/Makefile index f3aaa60b5d..25a419e01e 100644 --- a/Makefile +++ b/Makefile @@ -341,7 +341,7 @@ test-import: go test -run TestImporterTestSuite -v --vet=off github.com/evmos/ethermint/tests/importer test-rpc: - ./scripts/integration-test-all.sh -t "rpc" -q 1 -z 1 -s 2 -m "rpc" -r "true" + ./scripts/integration-test-all.sh -t "rpc" -q 1 -z 1 -s 5 -m "rpc" -r "true" run-integration-tests: @nix-shell ./tests/integration_tests/shell.nix --run ./scripts/run-integration-tests.sh diff --git a/rpc/apis.go b/rpc/apis.go index 02c217c55e..01cee8c09c 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -32,9 +32,8 @@ import ( "github.com/evmos/ethermint/rpc/namespaces/ethereum/personal" "github.com/evmos/ethermint/rpc/namespaces/ethereum/txpool" "github.com/evmos/ethermint/rpc/namespaces/ethereum/web3" + "github.com/evmos/ethermint/rpc/stream" ethermint "github.com/evmos/ethermint/types" - - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" ) // RPC namespaces and API version @@ -60,7 +59,7 @@ const ( type APICreator = func( ctx *server.Context, clientCtx client.Context, - tendermintWebsocketClient *rpcclient.WSClient, + stream *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API @@ -72,7 +71,7 @@ func init() { apiCreators = map[string]APICreator{ EthNamespace: func(ctx *server.Context, clientCtx client.Context, - tmWSClient *rpcclient.WSClient, + stream *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -87,12 +86,12 @@ func init() { { Namespace: EthNamespace, Version: apiVersion, - Service: filters.NewPublicAPI(ctx.Logger, clientCtx, tmWSClient, evmBackend), + Service: filters.NewPublicAPI(ctx.Logger, clientCtx, stream, evmBackend), Public: true, }, } }, - Web3Namespace: func(*server.Context, client.Context, *rpcclient.WSClient, bool, ethermint.EVMTxIndexer) []rpc.API { + Web3Namespace: func(*server.Context, client.Context, *stream.RPCStream, bool, ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: Web3Namespace, @@ -102,7 +101,7 @@ func init() { }, } }, - NetNamespace: func(_ *server.Context, clientCtx client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { + NetNamespace: func(_ *server.Context, clientCtx client.Context, _ *stream.RPCStream, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: NetNamespace, @@ -114,7 +113,7 @@ func init() { }, PersonalNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, + _ *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -128,7 +127,7 @@ func init() { }, } }, - TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { + TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ *stream.RPCStream, _ bool, _ ethermint.EVMTxIndexer) []rpc.API { return []rpc.API{ { Namespace: TxPoolNamespace, @@ -140,7 +139,7 @@ func init() { }, DebugNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, + _ *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -156,7 +155,7 @@ func init() { }, MinerNamespace: func(ctx *server.Context, clientCtx client.Context, - _ *rpcclient.WSClient, + _ *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, ) []rpc.API { @@ -176,7 +175,7 @@ func init() { // GetRPCAPIs returns the list of all APIs func GetRPCAPIs(ctx *server.Context, clientCtx client.Context, - tmWSClient *rpcclient.WSClient, + stream *stream.RPCStream, allowUnprotectedTxs bool, indexer ethermint.EVMTxIndexer, selectedAPIs []string, @@ -185,7 +184,7 @@ func GetRPCAPIs(ctx *server.Context, for _, ns := range selectedAPIs { if creator, ok := apiCreators[ns]; ok { - apis = append(apis, creator(ctx, clientCtx, tmWSClient, allowUnprotectedTxs, indexer)...) + apis = append(apis, creator(ctx, clientCtx, stream, allowUnprotectedTxs, indexer)...) } else { ctx.Logger.Error("invalid namespace value", "namespace", ns) } diff --git a/rpc/namespaces/ethereum/eth/filters/api.go b/rpc/namespaces/ethereum/eth/filters/api.go index 421dd08401..571a18481d 100644 --- a/rpc/namespaces/ethereum/eth/filters/api.go +++ b/rpc/namespaces/ethereum/eth/filters/api.go @@ -22,20 +22,18 @@ import ( "time" "github.com/cosmos/cosmos-sdk/client" - "github.com/evmos/ethermint/rpc/types" "github.com/cometbft/cometbft/libs/log" coretypes "github.com/cometbft/cometbft/rpc/core/types" - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" - tmtypes "github.com/cometbft/cometbft/types" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" - evmtypes "github.com/evmos/ethermint/x/evm/types" + "github.com/evmos/ethermint/rpc/stream" + "github.com/evmos/ethermint/rpc/types" ) // FilterAPI gathers @@ -75,10 +73,9 @@ var deadline = 5 * time.Minute type filter struct { typ filters.Type deadline *time.Timer // filter is inactive when deadline triggers - hashes []common.Hash crit filters.FilterCriteria - logs []*ethtypes.Log - s *Subscription // associated subscription in event system + cancel context.CancelFunc + offset int // offset for stream subscription } // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various @@ -87,20 +84,21 @@ type PublicFilterAPI struct { logger log.Logger clientCtx client.Context backend Backend - events *EventSystem + events *stream.RPCStream filtersMu sync.Mutex filters map[rpc.ID]*filter } // NewPublicAPI returns a new PublicFilterAPI instance. -func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI { +func NewPublicAPI(logger log.Logger, clientCtx client.Context, stream *stream.RPCStream, backend Backend) *PublicFilterAPI { logger = logger.With("api", "filter") + api := &PublicFilterAPI{ logger: logger, clientCtx: clientCtx, backend: backend, filters: make(map[rpc.ID]*filter), - events: NewEventSystem(logger, tmWSClient), + events: stream, } go api.timeoutLoop() @@ -120,7 +118,7 @@ func (api *PublicFilterAPI) timeoutLoop() { for id, f := range api.filters { select { case <-f.deadline.C: - f.s.Unsubscribe(api.events) + f.cancel() delete(api.filters, id) default: continue @@ -145,127 +143,15 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { return rpc.ID("error creating pending tx filter: max limit reached") } - pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs() - if err != nil { - // wrap error on the ID - return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", err.Error())) - } - - api.filters[pendingTxSub.ID()] = &filter{ + id := rpc.NewID() + _, offset := api.events.TxStream().ReadNonBlocking(-1) + api.filters[id] = &filter{ typ: filters.PendingTransactionsSubscription, deadline: time.NewTimer(deadline), - hashes: make([]common.Hash, 0), - s: pendingTxSub, + offset: offset, } - go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { - defer cancelSubs() - - for { - select { - case ev, ok := <-txsCh: - if !ok { - api.filtersMu.Lock() - delete(api.filters, pendingTxSub.ID()) - api.filtersMu.Unlock() - return - } - - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) - if err != nil { - api.logger.Debug("fail to decode tx", "error", err.Error()) - continue - } - - api.filtersMu.Lock() - if f, found := api.filters[pendingTxSub.ID()]; found { - for _, msg := range tx.GetMsgs() { - ethTx, ok := msg.(*evmtypes.MsgEthereumTx) - if ok { - f.hashes = append(f.hashes, ethTx.AsTransaction().Hash()) - } - } - } - api.filtersMu.Unlock() - case <-errCh: - api.filtersMu.Lock() - delete(api.filters, pendingTxSub.ID()) - api.filtersMu.Unlock() - } - } - }(pendingTxSub.eventCh, pendingTxSub.Err()) - - return pendingTxSub.ID() -} - -// NewPendingTransactions creates a subscription that is triggered each time a transaction -// enters the transaction pool and was signed from one of the transactions this nodes manages. -func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - - rpcSub := notifier.CreateSubscription() - - ctx, cancelFn := context.WithTimeout(context.Background(), deadline) - defer cancelFn() - - api.events.WithContext(ctx) - - pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs() - if err != nil { - return nil, err - } - - go func(txsCh <-chan coretypes.ResultEvent) { - defer cancelSubs() - - for { - select { - case ev, ok := <-txsCh: - if !ok { - api.filtersMu.Lock() - delete(api.filters, pendingTxSub.ID()) - api.filtersMu.Unlock() - return - } - - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - tx, err := api.clientCtx.TxConfig.TxDecoder()(data.Tx) - if err != nil { - api.logger.Debug("fail to decode tx", "error", err.Error()) - continue - } - - for _, msg := range tx.GetMsgs() { - ethTx, ok := msg.(*evmtypes.MsgEthereumTx) - if ok { - _ = notifier.Notify(rpcSub.ID, ethTx.AsTransaction().Hash()) - } - } - case <-rpcSub.Err(): - pendingTxSub.Unsubscribe(api.events) - return - case <-notifier.Closed(): - pendingTxSub.Unsubscribe(api.events) - return - } - } - }(pendingTxSub.eventCh) - - return rpcSub, err + return id } // NewBlockFilter creates a filter that fetches blocks that are imported into the chain. @@ -280,160 +166,15 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { return rpc.ID("error creating block filter: max limit reached") } - headerSub, cancelSubs, err := api.events.SubscribeNewHeads() - if err != nil { - // wrap error on the ID - return rpc.ID(fmt.Sprintf("error creating block filter: %s", err.Error())) - } - - api.filters[headerSub.ID()] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: headerSub} - - go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) { - defer cancelSubs() - - for { - select { - case ev, ok := <-headersCh: - if !ok { - api.filtersMu.Lock() - delete(api.filters, headerSub.ID()) - api.filtersMu.Unlock() - return - } - - data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - api.filtersMu.Lock() - if f, found := api.filters[headerSub.ID()]; found { - f.hashes = append(f.hashes, common.BytesToHash(data.Header.Hash())) - } - api.filtersMu.Unlock() - case <-errCh: - api.filtersMu.Lock() - delete(api.filters, headerSub.ID()) - api.filtersMu.Unlock() - return - } - } - }(headerSub.eventCh, headerSub.Err()) - - return headerSub.ID() -} - -// NewHeads send a notification each time a new (header) block is appended to the chain. -func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - - api.events.WithContext(ctx) - rpcSub := notifier.CreateSubscription() - - headersSub, cancelSubs, err := api.events.SubscribeNewHeads() - if err != nil { - return &rpc.Subscription{}, err + id := rpc.NewID() + _, offset := api.events.HeaderStream().ReadNonBlocking(-1) + api.filters[id] = &filter{ + typ: filters.BlocksSubscription, + deadline: time.NewTimer(deadline), + offset: offset, } - go func(headersCh <-chan coretypes.ResultEvent) { - defer cancelSubs() - - for { - select { - case ev, ok := <-headersCh: - if !ok { - headersSub.Unsubscribe(api.events) - return - } - - data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - baseFee := types.BaseFeeFromEvents(data.ResultBeginBlock.Events) - - // TODO: fetch bloom from events - header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - _ = notifier.Notify(rpcSub.ID, header) - case <-rpcSub.Err(): - headersSub.Unsubscribe(api.events) - return - case <-notifier.Closed(): - headersSub.Unsubscribe(api.events) - return - } - } - }(headersSub.eventCh) - - return rpcSub, err -} - -// Logs creates a subscription that fires for all new log that match the given filter criteria. -func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported - } - - api.events.WithContext(ctx) - rpcSub := notifier.CreateSubscription() - - logsSub, cancelSubs, err := api.events.SubscribeLogs(crit) - if err != nil { - return &rpc.Subscription{}, err - } - - go func(logsCh <-chan coretypes.ResultEvent) { - defer cancelSubs() - - for { - select { - case ev, ok := <-logsCh: - if !ok { - logsSub.Unsubscribe(api.events) - return - } - - // filter only events from EVM module txs - _, isMsgEthereumTx := ev.Events[evmtypes.TypeMsgEthereumTx] - - if !isMsgEthereumTx { - // ignore transaction as it's not from the evm module - return - } - - // get transaction result data - dataTx, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data, uint64(dataTx.TxResult.Height)) - if err != nil { - api.logger.Error("fail to decode tx response", "error", err.Error()) - return - } - logs := FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) - for _, log := range logs { - _ = notifier.Notify(rpcSub.ID, log) - } - case <-rpcSub.Err(): // client send an unsubscribe request - logsSub.Unsubscribe(api.events) - return - case <-notifier.Closed(): // connection dropped - logsSub.Unsubscribe(api.events) - return - } - } - }(logsSub.eventCh) - - return rpcSub, err + return id } // NewFilter creates a new filter and returns the filter id. It can be @@ -457,65 +198,16 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, return rpc.ID(""), fmt.Errorf("error creating filter: max limit reached") } - var ( - filterID = rpc.ID("") - err error - ) - - logsSub, cancelSubs, err := api.events.SubscribeLogs(criteria) - if err != nil { - return rpc.ID(""), err - } - - filterID = logsSub.ID() - - api.filters[filterID] = &filter{ + id := rpc.NewID() + _, offset := api.events.LogStream().ReadNonBlocking(-1) + api.filters[id] = &filter{ typ: filters.LogsSubscription, - crit: criteria, deadline: time.NewTimer(deadline), - hashes: []common.Hash{}, - s: logsSub, + crit: criteria, + offset: offset, } - go func(eventCh <-chan coretypes.ResultEvent) { - defer cancelSubs() - - for { - select { - case ev, ok := <-eventCh: - if !ok { - api.filtersMu.Lock() - delete(api.filters, filterID) - api.filtersMu.Unlock() - return - } - dataTx, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data, uint64(dataTx.TxResult.Height)) - if err != nil { - api.logger.Error("fail to decode tx response", "error", err.Error()) - return - } - logs := FilterLogs(txLogs, criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) - - api.filtersMu.Lock() - if f, found := api.filters[filterID]; found { - f.logs = append(f.logs, logs...) - } - api.filtersMu.Unlock() - case <-logsSub.Err(): - api.filtersMu.Lock() - delete(api.filters, filterID) - api.filtersMu.Unlock() - return - } - } - }(logsSub.eventCh) - - return filterID, err + return id, nil } // GetLogs returns logs matching the given argument that are stored within the state. @@ -554,17 +246,13 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit filters.FilterCrit // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { api.filtersMu.Lock() - f, found := api.filters[id] + _, found := api.filters[id] if found { delete(api.filters, id) } api.filtersMu.Unlock() - if !found { - return false - } - f.s.Unsubscribe(api.events) - return true + return found } // GetFilterLogs returns the logs for the filter with the given id. @@ -633,14 +321,31 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.deadline.Reset(deadline) switch f.typ { - case filters.PendingTransactionsSubscription, filters.BlocksSubscription: - hashes := f.hashes - f.hashes = nil + case filters.PendingTransactionsSubscription: + var hashes []common.Hash + hashes, f.offset = api.events.TxStream().ReadAllNonBlocking(f.offset) return returnHashes(hashes), nil - case filters.LogsSubscription, filters.MinedAndPendingLogsSubscription: - logs := make([]*ethtypes.Log, len(f.logs)) - copy(logs, f.logs) - f.logs = []*ethtypes.Log{} + case filters.BlocksSubscription: + var headers []stream.RPCHeader + headers, f.offset = api.events.HeaderStream().ReadAllNonBlocking(f.offset) + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash + } + return hashes, nil + case filters.LogsSubscription: + var ( + logs []*ethtypes.Log + chunk []*ethtypes.Log + ) + for { + chunk, f.offset = api.events.LogStream().ReadNonBlocking(f.offset) + if len(chunk) == 0 { + break + } + chunk = FilterLogs(chunk, f.crit.FromBlock, f.crit.ToBlock, f.crit.Addresses, f.crit.Topics) + logs = append(logs, chunk...) + } return returnLogs(logs), nil default: return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ) diff --git a/rpc/namespaces/ethereum/eth/filters/filter_system.go b/rpc/namespaces/ethereum/eth/filters/filter_system.go deleted file mode 100644 index 965d36a3d4..0000000000 --- a/rpc/namespaces/ethereum/eth/filters/filter_system.go +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2021 Evmos Foundation -// This file is part of Evmos' Ethermint library. -// -// The Ethermint library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The Ethermint library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the Ethermint library. If not, see https://github.com/evmos/ethermint/blob/main/LICENSE -package filters - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/pkg/errors" - - tmjson "github.com/cometbft/cometbft/libs/json" - "github.com/cometbft/cometbft/libs/log" - tmquery "github.com/cometbft/cometbft/libs/pubsub/query" - coretypes "github.com/cometbft/cometbft/rpc/core/types" - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" - tmtypes "github.com/cometbft/cometbft/types" - - "github.com/ethereum/go-ethereum/common" - ethtypes "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/rpc" - - sdk "github.com/cosmos/cosmos-sdk/types" - - "github.com/evmos/ethermint/rpc/ethereum/pubsub" - evmtypes "github.com/evmos/ethermint/x/evm/types" -) - -var ( - txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() - evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", - tmtypes.EventTypeKey, - tmtypes.EventTx, - sdk.EventTypeMessage, - sdk.AttributeKeyModule, evmtypes.ModuleName)).String() - headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() -) - -// EventSystem creates subscriptions, processes events and broadcasts them to the -// subscription which match the subscription criteria using the Tendermint's RPC client. -type EventSystem struct { - logger log.Logger - ctx context.Context - tmWSClient *rpcclient.WSClient - - // light client mode - lightMode bool - - index filterIndex - topicChans map[string]chan<- coretypes.ResultEvent - indexMux *sync.RWMutex - - // Channels - install chan *Subscription // install filter for event notification - uninstall chan *Subscription // remove filter for event notification - eventBus pubsub.EventBus -} - -// NewEventSystem creates a new manager that listens for event on the given mux, -// parses and filters them. It uses the all map to retrieve filter changes. The -// work loop holds its own index that is used to forward events to filters. -// -// The returned manager has a loop that needs to be stopped with the Stop function -// or by stopping the given mux. -func NewEventSystem(logger log.Logger, tmWSClient *rpcclient.WSClient) *EventSystem { - index := make(filterIndex) - for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { - index[i] = make(map[rpc.ID]*Subscription) - } - - es := &EventSystem{ - logger: logger, - ctx: context.Background(), - tmWSClient: tmWSClient, - lightMode: false, - index: index, - topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), - indexMux: new(sync.RWMutex), - install: make(chan *Subscription), - uninstall: make(chan *Subscription), - eventBus: pubsub.NewEventBus(), - } - - go es.eventLoop() - go es.consumeEvents() - return es -} - -// WithContext sets a new context to the EventSystem. This is required to set a timeout context when -// a new filter is intantiated. -func (es *EventSystem) WithContext(ctx context.Context) { - es.ctx = ctx -} - -// subscribe performs a new event subscription to a given Tendermint event. -// The subscription creates a unidirectional receive event channel to receive the ResultEvent. -func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) { - var ( - err error - cancelFn context.CancelFunc - ) - - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - - existingSubs := es.eventBus.Topics() - for _, topic := range existingSubs { - if topic == sub.event { - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) - return nil, nil, err - } - - sub.eventCh = eventCh - return sub, unsubFn, nil - } - } - - switch sub.typ { - case filters.LogsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.BlocksSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - case filters.PendingTransactionsSubscription: - err = es.tmWSClient.Subscribe(ctx, sub.event) - default: - err = fmt.Errorf("invalid filter subscription type %d", sub.typ) - } - - if err != nil { - sub.err <- err - return nil, nil, err - } - - // wrap events in a go routine to prevent blocking - es.install <- sub - <-sub.installed - - eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) - } - - sub.eventCh = eventCh - return sub, unsubFn, nil -} - -// SubscribeLogs creates a subscription that will write all logs matching the -// given criteria to the given logs channel. Default value for the from and to -// block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { - var from, to rpc.BlockNumber - if crit.FromBlock == nil { - from = rpc.LatestBlockNumber - } else { - from = rpc.BlockNumber(crit.FromBlock.Int64()) - } - if crit.ToBlock == nil { - to = rpc.LatestBlockNumber - } else { - to = rpc.BlockNumber(crit.ToBlock.Int64()) - } - - switch { - // only interested in new mined logs, mined logs within a specific block range, or - // logs from a specific block number to new mined blocks - case (from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber), - (from >= 0 && to >= 0 && to >= from), - (from >= 0 && to == rpc.LatestBlockNumber): - return es.subscribeLogs(crit) - - default: - return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to) - } -} - -// subscribeLogs creates a subscription that will write all logs matching the -// given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: rpc.NewID(), - typ: filters.LogsSubscription, - event: evmEvents, - logsCrit: crit, - created: time.Now().UTC(), - logs: make(chan []*ethtypes.Log), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -// SubscribeNewHeads subscribes to new block headers events. -func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: rpc.NewID(), - typ: filters.BlocksSubscription, - event: headerEvents, - created: time.Now().UTC(), - headers: make(chan *ethtypes.Header), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -// SubscribePendingTxs subscribes to new pending transactions events from the mempool. -func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { - sub := &Subscription{ - id: rpc.NewID(), - typ: filters.PendingTransactionsSubscription, - event: txEvents, - created: time.Now().UTC(), - hashes: make(chan []common.Hash), - installed: make(chan struct{}, 1), - err: make(chan error, 1), - } - return es.subscribe(sub) -} - -type filterIndex map[filters.Type]map[rpc.ID]*Subscription - -// eventLoop (un)installs filters and processes mux events. -func (es *EventSystem) eventLoop() { - for { - select { - case f := <-es.install: - es.indexMux.Lock() - es.index[f.typ][f.id] = f - ch := make(chan coretypes.ResultEvent) - if err := es.eventBus.AddTopic(f.event, ch); err != nil { - es.logger.Error("failed to add event topic to event bus", "topic", f.event, "error", err.Error()) - } else { - es.topicChans[f.event] = ch - } - es.indexMux.Unlock() - close(f.installed) - case f := <-es.uninstall: - es.indexMux.Lock() - delete(es.index[f.typ], f.id) - - var channelInUse bool - for _, sub := range es.index[f.typ] { - if sub.event == f.event { - channelInUse = true - break - } - } - - // remove topic only when channel is not used by other subscriptions - if !channelInUse { - if err := es.tmWSClient.Unsubscribe(es.ctx, f.event); err != nil { - es.logger.Error("failed to unsubscribe from query", "query", f.event, "error", err.Error()) - } - - ch, ok := es.topicChans[f.event] - if ok { - es.eventBus.RemoveTopic(f.event) - close(ch) - delete(es.topicChans, f.event) - } - } - - es.indexMux.Unlock() - close(f.err) - } - } -} - -func (es *EventSystem) consumeEvents() { - for { - for rpcResp := range es.tmWSClient.ResponsesCh { - var ev coretypes.ResultEvent - - if rpcResp.Error != nil { - time.Sleep(5 * time.Second) - continue - } else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil { - es.logger.Error("failed to JSON unmarshal ResponsesCh result event", "error", err.Error()) - continue - } - - if len(ev.Query) == 0 { - // skip empty responses - continue - } - - es.indexMux.RLock() - ch, ok := es.topicChans[ev.Query] - es.indexMux.RUnlock() - if !ok { - es.logger.Debug("channel for subscription not found", "topic", ev.Query) - es.logger.Debug("list of available channels", "channels", es.eventBus.Topics()) - continue - } - - // gracefully handle lagging subscribers - t := time.NewTimer(time.Second) - select { - case <-t.C: - es.logger.Debug("dropped event during lagging subscription", "topic", ev.Query) - case ch <- ev: - } - } - - time.Sleep(time.Second) - } -} diff --git a/rpc/namespaces/ethereum/eth/filters/filter_system_test.go b/rpc/namespaces/ethereum/eth/filters/filter_system_test.go deleted file mode 100644 index e615e781ab..0000000000 --- a/rpc/namespaces/ethereum/eth/filters/filter_system_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package filters - -import ( - "context" - "os" - "sync" - "testing" - "time" - - "github.com/cometbft/cometbft/libs/log" - coretypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/ethereum/go-ethereum/common" - ethtypes "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/rpc" - "github.com/evmos/ethermint/rpc/ethereum/pubsub" -) - -func makeSubscription(id, event string) *Subscription { - return &Subscription{ - id: rpc.ID(id), - typ: filters.LogsSubscription, - event: event, - created: time.Now(), - logs: make(chan []*ethtypes.Log), - hashes: make(chan []common.Hash), - headers: make(chan *ethtypes.Header), - installed: make(chan struct{}), - eventCh: make(chan coretypes.ResultEvent), - err: make(chan error), - } -} - -func TestFilterSystem(t *testing.T) { - index := make(filterIndex) - for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { - index[i] = make(map[rpc.ID]*Subscription) - } - es := &EventSystem{ - logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)), - ctx: context.Background(), - lightMode: false, - index: index, - topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), - indexMux: new(sync.RWMutex), - install: make(chan *Subscription), - uninstall: make(chan *Subscription), - eventBus: pubsub.NewEventBus(), - } - go es.eventLoop() - - event := "event" - sub := makeSubscription("1", event) - es.install <- sub - <-sub.installed - ch, ok := es.topicChans[sub.event] - if !ok { - t.Error("expect topic channel exist") - } - - sub = makeSubscription("2", event) - es.install <- sub - <-sub.installed - newCh, ok := es.topicChans[sub.event] - if !ok { - t.Error("expect topic channel exist") - } - - if newCh != ch { - t.Error("expect topic channel unchanged") - } -} diff --git a/rpc/namespaces/ethereum/eth/filters/subscription.go b/rpc/namespaces/ethereum/eth/filters/subscription.go deleted file mode 100644 index 8566757bd8..0000000000 --- a/rpc/namespaces/ethereum/eth/filters/subscription.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2021 Evmos Foundation -// This file is part of Evmos' Ethermint library. -// -// The Ethermint library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The Ethermint library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the Ethermint library. If not, see https://github.com/evmos/ethermint/blob/main/LICENSE -package filters - -import ( - "time" - - coretypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/ethereum/go-ethereum/common" - ethtypes "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/rpc" -) - -// Subscription defines a wrapper for the private subscription -type Subscription struct { - id rpc.ID - typ filters.Type - event string - created time.Time - logsCrit filters.FilterCriteria - logs chan []*ethtypes.Log - hashes chan []common.Hash - headers chan *ethtypes.Header - installed chan struct{} // closed when the filter is installed - eventCh <-chan coretypes.ResultEvent - err chan error -} - -// ID returns the underlying subscription RPC identifier. -func (s Subscription) ID() rpc.ID { - return s.id -} - -// Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the -// subscription error channel if unsubscribe fails. -func (s *Subscription) Unsubscribe(es *EventSystem) { - go func() { - uninstallLoop: - for { - // write uninstall request and consume logs/hashes. This prevents - // the eventLoop broadcast method to deadlock when writing to the - // filter event channel while the subscription loop is waiting for - // this method to return (and thus not reading these events). - select { - case es.uninstall <- s: - break uninstallLoop - case <-s.logs: - case <-s.hashes: - case <-s.headers: - } - } - }() -} - -// Err returns the error channel -func (s *Subscription) Err() <-chan error { - return s.err -} - -// Event returns the tendermint result event channel -func (s *Subscription) Event() <-chan coretypes.ResultEvent { - return s.eventCh -} diff --git a/rpc/stream/cond.go b/rpc/stream/cond.go new file mode 100644 index 0000000000..8f629907ee --- /dev/null +++ b/rpc/stream/cond.go @@ -0,0 +1,37 @@ +package stream + +import ( + "context" + "sync" +) + +// Cond implements conditional variable with a channel +type Cond struct { + mu sync.Mutex // guards ch + ch chan struct{} +} + +func NewCond() *Cond { + return &Cond{ch: make(chan struct{})} +} + +// Wait returns true if the condition is signaled, false if the context is canceled +func (c *Cond) Wait(ctx context.Context) bool { + c.mu.Lock() + ch := c.ch + c.mu.Unlock() + + select { + case <-ch: + return true + case <-ctx.Done(): + return false + } +} + +func (c *Cond) Broadcast() { + c.mu.Lock() + defer c.mu.Unlock() + close(c.ch) + c.ch = make(chan struct{}) +} diff --git a/rpc/stream/queue.go b/rpc/stream/queue.go new file mode 100644 index 0000000000..abfe08d1bd --- /dev/null +++ b/rpc/stream/queue.go @@ -0,0 +1,152 @@ +/* +The MIT License (MIT) + +Copyright (c) 2014 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +/* +Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki. +Using this instead of other, simpler, queue implementations (slice+append or linked list) provides +substantial memory and time benefits, and fewer GC pauses. + +The queue implemented here is as fast as it is for an additional reason: it is *not* thread-safe. +*/ + +package stream + +// minQueueLen is smallest capacity that queue may have. +// Must be power of 2 for bitwise modulus: x % n == x & (n - 1). +const minQueueLen = 16 + +// Queue represents a single instance of the queue data structure. +type Queue[V any] struct { + buf []V + head, tail, count int +} + +// New constructs and returns a new Queue. +func New[V any]() *Queue[V] { + return &Queue[V]{ + buf: make([]V, minQueueLen), + } +} + +// Length returns the number of elements currently stored in the queue. +func (q *Queue[V]) Length() int { + return q.count +} + +// resizes the queue to fit exactly twice its current contents +// this can result in shrinking if the queue is less than half-full +func (q *Queue[V]) resize() { + newBuf := make([]V, q.count<<1) + + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} + +// Add puts an element on the end of the queue. +func (q *Queue[V]) Add(elem V) { + if q.count == len(q.buf) { + q.resize() + } + + q.buf[q.tail] = elem + // bitwise modulus + q.tail = (q.tail + 1) & (len(q.buf) - 1) + q.count++ +} + +// Peek returns the element at the head of the queue. This call panics +// if the queue is empty. +func (q *Queue[V]) Peek() V { + return *q.PeekP() +} + +// PeekP returns the element at the head of the queue. This call panics +// if the queue is empty. +func (q *Queue[V]) PeekP() *V { + if q.count <= 0 { + panic("queue: Peek() called on empty queue") + } + return &q.buf[q.head] +} + +// Tail returns the element at the tail of the queue. This call panics +// if the queue is empty. +func (q *Queue[V]) Tail() V { + return *q.TailP() +} + +// TailP returns the element at the tail of the queue. This call panics +// if the queue is empty. +func (q *Queue[V]) TailP() *V { + return q.GetP(-1) +} + +// Get returns the element at index i in the queue. If the index is +// invalid, the call will panic. This method accepts both positive and +// negative index values. Index 0 refers to the first element, and +// index -1 refers to the last. +func (q *Queue[V]) Get(i int) V { + return *q.GetP(i) +} + +// GetP returns the pointer to the element at index i in the queue. If the index is +// invalid, the call will panic. This method accepts both positive and +// negative index values. Index 0 refers to the first element, and +// index -1 refers to the last. +func (q *Queue[V]) GetP(i int) *V { + // If indexing backwards, convert to positive index. + if i < 0 { + i += q.count + } + if i < 0 || i >= q.count { + panic("queue: Get() called with index out of range") + } + // bitwise modulus + return &q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Remove removes and returns the element from the front of the queue. If the +// queue is empty, the call will panic. +func (q *Queue[V]) Remove() V { + if q.count <= 0 { + panic("queue: Remove() called on empty queue") + } + ret := q.buf[q.head] + // bitwise modulus + q.head = (q.head + 1) & (len(q.buf) - 1) + q.count-- + // Resize down if buffer 1/4 full. + if len(q.buf) > minQueueLen && (q.count<<2) == len(q.buf) { + q.resize() + } + return ret +} diff --git a/rpc/stream/queue_test.go b/rpc/stream/queue_test.go new file mode 100644 index 0000000000..d31d3b54c2 --- /dev/null +++ b/rpc/stream/queue_test.go @@ -0,0 +1,203 @@ +/* +The MIT License (MIT) + +Copyright (c) 2014 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package stream + +import "testing" + +func TestQueueSimple(t *testing.T) { + q := New[int]() + + for i := 0; i < minQueueLen; i++ { + q.Add(i) + } + for i := 0; i < minQueueLen; i++ { + if q.Peek() != i { + t.Error("peek", i, "had value", q.Peek()) + } + x := q.Remove() + if x != i { + t.Error("remove", i, "had value", x) + } + } +} + +func TestQueueWrapping(t *testing.T) { + q := New[int]() + + for i := 0; i < minQueueLen; i++ { + q.Add(i) + } + for i := 0; i < 3; i++ { + q.Remove() + q.Add(minQueueLen + i) + } + + for i := 0; i < minQueueLen; i++ { + if q.Peek() != i+3 { + t.Error("peek", i, "had value", q.Peek()) + } + q.Remove() + } +} + +func TestQueueLength(t *testing.T) { + q := New[int]() + + if q.Length() != 0 { + t.Error("empty queue length not 0") + } + + for i := 0; i < 1000; i++ { + q.Add(i) + if q.Length() != i+1 { + t.Error("adding: queue with", i, "elements has length", q.Length()) + } + } + for i := 0; i < 1000; i++ { + q.Remove() + if q.Length() != 1000-i-1 { + t.Error("removing: queue with", 1000-i-i, "elements has length", q.Length()) + } + } +} + +func TestQueueGet(t *testing.T) { + q := New[int]() + + for i := 0; i < 1000; i++ { + q.Add(i) + for j := 0; j < q.Length(); j++ { + if q.Get(j) != j { + t.Errorf("index %d doesn't contain %d", j, j) + } + } + } +} + +func TestQueueGetNegative(t *testing.T) { + q := New[int]() + + for i := 0; i < 1000; i++ { + q.Add(i) + for j := 1; j <= q.Length(); j++ { + if q.Get(-j) != q.Length()-j { + t.Errorf("index %d doesn't contain %d", -j, q.Length()-j) + } + } + } +} + +func TestQueueGetOutOfRangePanics(t *testing.T) { + q := New[int]() + + q.Add(1) + q.Add(2) + q.Add(3) + + assertPanics(t, "should panic when negative index", func() { + q.Get(-4) + }) + + assertPanics(t, "should panic when index greater than length", func() { + q.Get(4) + }) +} + +func TestQueuePeekOutOfRangePanics(t *testing.T) { + q := New[any]() + + assertPanics(t, "should panic when peeking empty queue", func() { + q.Peek() + }) + + q.Add(1) + q.Remove() + + assertPanics(t, "should panic when peeking emptied queue", func() { + q.Peek() + }) +} + +func TestQueueRemoveOutOfRangePanics(t *testing.T) { + q := New[int]() + + assertPanics(t, "should panic when removing empty queue", func() { + q.Remove() + }) + + q.Add(1) + q.Remove() + + assertPanics(t, "should panic when removing emptied queue", func() { + q.Remove() + }) +} + +func assertPanics(t *testing.T, name string, f func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("%s: didn't panic as expected", name) + } + }() + + f() +} + +// WARNING: Go's benchmark utility (go test -bench .) increases the number of +// iterations until the benchmarks take a reasonable amount of time to run; memory usage +// is *NOT* considered. On a fast CPU, these benchmarks can fill hundreds of GB of memory +// (and then hang when they start to swap). You can manually control the number of iterations +// with the `-benchtime` argument. Passing `-benchtime 1000000x` seems to be about right. + +func BenchmarkQueueSerial(b *testing.B) { + q := New[any]() + for i := 0; i < b.N; i++ { + q.Add(nil) + } + for i := 0; i < b.N; i++ { + q.Peek() + q.Remove() + } +} + +func BenchmarkQueueGet(b *testing.B) { + q := New[int]() + for i := 0; i < b.N; i++ { + q.Add(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Get(i) + } +} + +func BenchmarkQueueTickTock(b *testing.B) { + q := New[any]() + for i := 0; i < b.N; i++ { + q.Add(nil) + q.Peek() + q.Remove() + } +} diff --git a/rpc/stream/rpc.go b/rpc/stream/rpc.go new file mode 100644 index 0000000000..c71a79e1fd --- /dev/null +++ b/rpc/stream/rpc.go @@ -0,0 +1,208 @@ +package stream + +import ( + "context" + "fmt" + "sync" + + "github.com/cometbft/cometbft/libs/log" + tmquery "github.com/cometbft/cometbft/libs/pubsub/query" + rpcclient "github.com/cometbft/cometbft/rpc/client" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + tmtypes "github.com/cometbft/cometbft/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/evmos/ethermint/rpc/types" + evmtypes "github.com/evmos/ethermint/x/evm/types" +) + +const ( + streamSubscriberName = "ethermint-json-rpc" + subscribBufferSize = 1024 + + headerStreamSegmentSize = 128 + headerStreamCapacity = 128 * 32 + txStreamSegmentSize = 1024 + txStreamCapacity = 1024 * 32 + logStreamSegmentSize = 2048 + logStreamCapacity = 2048 * 32 +) + +var ( + txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() + evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", + tmtypes.EventTypeKey, + tmtypes.EventTx, + sdk.EventTypeMessage, + sdk.AttributeKeyModule, evmtypes.ModuleName)).String() + headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() + evmTxHashKey = fmt.Sprintf("%s.%s", evmtypes.TypeMsgEthereumTx, evmtypes.AttributeKeyEthereumTxHash) +) + +type RPCHeader struct { + EthHeader *ethtypes.Header + Hash common.Hash +} + +// RPCStream provides data streams for newHeads, logs, and pendingTransactions. +type RPCStream struct { + evtClient rpcclient.EventsClient + logger log.Logger + txDecoder sdk.TxDecoder + + headerStream *Stream[RPCHeader] + txStream *Stream[common.Hash] + logStream *Stream[*ethtypes.Log] + + wg sync.WaitGroup +} + +func NewRPCStreams(evtClient rpcclient.EventsClient, logger log.Logger, txDecoder sdk.TxDecoder) (*RPCStream, error) { + s := &RPCStream{ + evtClient: evtClient, + logger: logger, + txDecoder: txDecoder, + + headerStream: NewStream[RPCHeader](headerStreamSegmentSize, headerStreamCapacity), + txStream: NewStream[common.Hash](txStreamSegmentSize, txStreamCapacity), + logStream: NewStream[*ethtypes.Log](logStreamSegmentSize, logStreamCapacity), + } + + ctx := context.Background() + + chHeaders, err := s.evtClient.Subscribe(ctx, streamSubscriberName, headerEvents, subscribBufferSize) + if err != nil { + return nil, err + } + + chTx, err := s.evtClient.Subscribe(ctx, streamSubscriberName, txEvents, subscribBufferSize) + if err != nil { + if err := s.evtClient.UnsubscribeAll(ctx, streamSubscriberName); err != nil { + s.logger.Error("failed to unsubscribe", "err", err) + } + return nil, err + } + + chLogs, err := s.evtClient.Subscribe(ctx, streamSubscriberName, evmEvents, subscribBufferSize) + if err != nil { + if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil { + s.logger.Error("failed to unsubscribe", "err", err) + } + return nil, err + } + + go s.start(&s.wg, chHeaders, chTx, chLogs) + + return s, nil +} + +func (s *RPCStream) Close() error { + if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil { + return err + } + s.wg.Wait() + return nil +} + +func (s *RPCStream) HeaderStream() *Stream[RPCHeader] { + return s.headerStream +} + +func (s *RPCStream) TxStream() *Stream[common.Hash] { + return s.txStream +} + +func (s *RPCStream) LogStream() *Stream[*ethtypes.Log] { + return s.logStream +} + +func (s *RPCStream) start( + wg *sync.WaitGroup, + chHeaders <-chan coretypes.ResultEvent, + chTx <-chan coretypes.ResultEvent, + chLogs <-chan coretypes.ResultEvent, +) { + wg.Add(1) + defer func() { + wg.Done() + if err := s.evtClient.UnsubscribeAll(context.Background(), streamSubscriberName); err != nil { + s.logger.Error("failed to unsubscribe", "err", err) + } + }() + + for { + select { + case ev, ok := <-chHeaders: + if !ok { + chHeaders = nil + break + } + + data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) + if !ok { + s.logger.Error("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) + continue + } + + baseFee := types.BaseFeeFromEvents(data.ResultBeginBlock.Events) + + // TODO: fetch bloom from events + header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) + s.headerStream.Add(RPCHeader{EthHeader: header, Hash: common.BytesToHash(data.Header.Hash())}) + case ev, ok := <-chTx: + if !ok { + chTx = nil + break + } + + data, ok := ev.Data.(tmtypes.EventDataTx) + if !ok { + s.logger.Error("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) + continue + } + + tx, err := s.txDecoder(data.Tx) + if err != nil { + s.logger.Error("fail to decode tx", "error", err.Error()) + continue + } + + var hashes []common.Hash + for _, msg := range tx.GetMsgs() { + if ethTx, ok := msg.(*evmtypes.MsgEthereumTx); ok { + hashes = append(hashes, ethTx.AsTransaction().Hash()) + } + } + s.txStream.Add(hashes...) + case ev, ok := <-chLogs: + if !ok { + chLogs = nil + break + } + + if _, ok := ev.Events[evmTxHashKey]; !ok { + // ignore transaction as it's not from the evm module + continue + } + + // get transaction result data + dataTx, ok := ev.Data.(tmtypes.EventDataTx) + if !ok { + s.logger.Error("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) + continue + } + txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data, uint64(dataTx.TxResult.Height)) + if err != nil { + s.logger.Error("fail to decode evm tx response", "error", err.Error()) + continue + } + + s.logStream.Add(txLogs...) + } + + if chHeaders == nil && chTx == nil && chLogs == nil { + break + } + } +} diff --git a/rpc/stream/stream.go b/rpc/stream/stream.go new file mode 100644 index 0000000000..794bcb2d5a --- /dev/null +++ b/rpc/stream/stream.go @@ -0,0 +1,187 @@ +package stream + +import ( + "context" + "sync" +) + +// Stream implements a data stream, user can subscribe the stream in blocking or non-blocking way using offsets. +// it use a segmented ring buffer to store the items, with a fixed capacity, when buffer is full, the old data get pruned when new data comes. +type Stream[V any] struct { + segments *Queue[[]V] + segmentSize int + maxSegments int + segmentOffset int + cond *Cond + mutex sync.RWMutex +} + +func NewStream[V any](segmentSize, capacity int) *Stream[V] { + maxSegments := (capacity + segmentSize - 1) / segmentSize + if maxSegments < 1 { + panic("capacity is too small") + } + + stream := &Stream[V]{ + segments: New[[]V](), + segmentSize: segmentSize, + maxSegments: maxSegments, + segmentOffset: 0, + cond: NewCond(), + } + return stream +} + +// Add appends items to the stream and returns the id of last one. +// item id start with 1. +func (s *Stream[V]) Add(vs ...V) int { + if len(vs) == 0 { + return 0 + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + for _, v := range vs { + if s.segments.Length() == 0 || len(s.segments.Tail()) == s.segmentSize { + var seg []V + if s.segments.Length() > s.maxSegments { + // reuse the free segment + seg = s.segments.Remove()[:0] + s.segmentOffset++ + } else { + seg = make([]V, 0, s.segmentSize) + } + s.segments.Add(seg) + } + + tail := s.segments.TailP() + *tail = append(*tail, v) + } + + // notify the subscribers + s.cond.Broadcast() + + return s.lastID() +} + +// Subscribe subscribes the stream in a loop, pass the chunks of items to the callback, +// it only stops if the context is canceled. +// it returns the last id of the items. +func (s *Stream[V]) Subscribe(ctx context.Context, callback func([]V, int) error) error { + var ( + items []V + offset = -1 + ) + for { + items, offset = s.ReadBlocking(ctx, offset) + if len(items) == 0 { + // canceled + break + } + if err := callback(items, offset); err != nil { + return err + } + } + return nil +} + +// ReadNonBlocking returns items with id greater than the last received id reported by user, without blocking. +// if there are no new items, it also returns the largest id of the items. +func (s *Stream[V]) ReadNonBlocking(offset int) ([]V, int) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + return s.doRead(offset) +} + +// ReadAllNonBlocking returns all items in the stream, without blocking. +func (s *Stream[V]) ReadAllNonBlocking(offset int) ([]V, int) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + var ( + result []V + items []V + ) + for { + items, offset = s.doRead(offset) + if len(items) == 0 { + break + } + result = append(result, items...) + } + + return result, offset +} + +// ReadBlocking returns items with id greater than the last received id reported by user. +// reads at most one segment at a time. +// negative offset means read from the end. +// it also returns the largest id of the items, if there are no new items, returns the id of the last item. +func (s *Stream[V]) ReadBlocking(ctx context.Context, offset int) ([]V, int) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + var items []V + for { + items, offset = s.doRead(offset) + if len(items) > 0 { + return items, offset + } + + s.mutex.RUnlock() + r := s.cond.Wait(ctx) + s.mutex.RLock() + + if !r { + // context canceled + return nil, 0 + } + } +} + +// lastID returns the id of the last item, 0 for empty stream. +func (s *Stream[V]) lastID() int { + if s.segments.Length() == 0 { + return 0 + } + + return (s.segmentOffset+s.segments.Length()-1)*s.segmentSize + len(s.segments.Tail()) +} + +// doRead is the underlying logic of Read. +func (s *Stream[V]) doRead(offset int) ([]V, int) { + if s.segments.Length() == 0 { + return nil, s.lastID() + } + + if offset < 0 { + return nil, s.lastID() + } + + segment := offset / s.segmentSize + if segment >= s.segmentOffset { + segment -= s.segmentOffset + } else { + // the target segment is pruned, ajust to earliest segment + segment = 0 + } + + if segment >= s.segments.Length() { + // offset is in the future + return nil, s.lastID() + } + + seg := s.segments.Get(segment) + items := seg[offset%s.segmentSize:] + if len(items) == 0 { + return nil, s.lastID() + } + + // copy the slice + clone := make([]V, len(items)) + copy(clone, items) + + return clone, (s.segmentOffset+segment)*s.segmentSize + len(seg) +} diff --git a/rpc/stream/stream_test.go b/rpc/stream/stream_test.go new file mode 100644 index 0000000000..c86df35890 --- /dev/null +++ b/rpc/stream/stream_test.go @@ -0,0 +1,120 @@ +package stream + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestStreamAdd(t *testing.T) { + testCases := []struct { + segmentSize, capacity int + }{ + {128, 1280}, + {1024, 2048}, + {1024, 2148}, + {1024, 1024}, + {2048, 100}, + } + + for _, tc := range testCases { + name := fmt.Sprintf("segmentSize=%d,capacity=%d", tc.segmentSize, tc.capacity) + t.Run(name, func(t *testing.T) { + stream := NewStream[int](tc.segmentSize, tc.capacity) + + amount := 100000 + for i := 0; i < amount; i++ { + require.Equal(t, i+1, stream.Add(i)) + } + + all, _ := stream.ReadAllNonBlocking(0) + maxSegments := (tc.capacity + tc.segmentSize - 1) / tc.segmentSize + require.Equal(t, maxSegments*tc.segmentSize+amount%tc.segmentSize, len(all)) + require.Equal(t, 100000-1, all[len(all)-1]) + for i, n := range all[:len(all)-1] { + require.Equal(t, n+1, all[i+1]) + } + }) + } +} + +func TestStreamReadNonBlocking(t *testing.T) { + stream := NewStream[int](16, 31) + + for i := 0; i < 32; i++ { + require.Equal(t, i+1, stream.Add(i)) + } + + items, offset := stream.ReadNonBlocking(0) + require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, items) + require.Equal(t, 16, offset) +} + +func TestStreamReadBlocking(t *testing.T) { + stream := NewStream[int](16, 31) + + wg := sync.WaitGroup{} + + ctx, cancel := context.WithCancel(context.Background()) + + // subscriber + subscribers := 10 + result := make([][]int, subscribers) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + require.NoError(t, stream.Subscribe(ctx, func(items []int, offset int) error { + result[i] = append(result[i], items...) + return nil + })) + }(i) + } + + // wait for subscribers to setup + time.Sleep(100 * time.Millisecond) + + // publisher + for i := 0; i < 32; i++ { + require.Equal(t, i+1, stream.Add(i)) + } + + // wait for subscribers to finish + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + // check result + for i := 0; i < subscribers; i++ { + require.Equal(t, 32, len(result[i])) + require.Equal(t, 31, result[i][len(result[i])-1]) + for j, n := range result[i][:len(result[i])-1] { + require.Equal(t, n+1, result[i][j+1]) + } + } +} + +func TestStreamReadFromEnd(t *testing.T) { + stream := NewStream[int](16, 31) + + items, offset := stream.ReadNonBlocking(-1) + require.Empty(t, items) + require.Equal(t, 0, offset) + + stream.Add(1) + + items, offset = stream.ReadNonBlocking(-1) + require.Empty(t, items) + require.Equal(t, 1, offset) + + stream.Add(2) + + items, offset = stream.ReadNonBlocking(offset) + require.Equal(t, []int{2}, items) + require.Equal(t, 2, offset) +} diff --git a/rpc/websockets.go b/rpc/websockets.go index a800825732..1c65c4ef11 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -35,18 +35,13 @@ import ( "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/cometbft/cometbft/libs/log" - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" - tmtypes "github.com/cometbft/cometbft/types" - "github.com/evmos/ethermint/rpc/ethereum/pubsub" rpcfilters "github.com/evmos/ethermint/rpc/namespaces/ethereum/eth/filters" - "github.com/evmos/ethermint/rpc/types" + "github.com/evmos/ethermint/rpc/stream" "github.com/evmos/ethermint/server/config" - evmtypes "github.com/evmos/ethermint/x/evm/types" ) type WebsocketsServer interface { @@ -90,7 +85,7 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, tmWSClient *rpcclient.WSClient, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address) @@ -99,7 +94,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, tmWSClient wsAddr: cfg.JSONRPC.WsAddress, certFile: cfg.TLS.CertificatePath, keyFile: cfg.TLS.KeyPath, - api: newPubSubAPI(clientCtx, logger, tmWSClient), + api: newPubSubAPI(clientCtx, logger, stream), logger: logger, } } @@ -186,7 +181,7 @@ func (w *wsConn) ReadMessage() (messageType int, p []byte, err error) { func (s *websocketsServer) readLoop(wsConn *wsConn) { // subscriptions of current connection - subscriptions := make(map[rpc.ID]pubsub.UnsubscribeFunc) + subscriptions := make(map[rpc.ID]context.CancelFunc) defer func() { // cancel all subscriptions when connection closed for _, unsubFn := range subscriptions { @@ -353,22 +348,22 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { - events *rpcfilters.EventSystem + events *stream.RPCStream logger log.Logger clientCtx client.Context } // newPubSubAPI creates an instance of the ethereum PubSub API. -func newPubSubAPI(clientCtx client.Context, logger log.Logger, tmWSClient *rpcclient.WSClient) *pubSubAPI { +func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: rpcfilters.NewEventSystem(logger, tmWSClient), + events: stream, logger: logger, clientCtx: clientCtx, } } -func (api *pubSubAPI) subscribe(wsConn *wsConn, subID rpc.ID, params []interface{}) (pubsub.UnsubscribeFunc, error) { +func (api *pubSubAPI) subscribe(wsConn *wsConn, subID rpc.ID, params []interface{}) (context.CancelFunc, error) { method, ok := params[0].(string) if !ok { return nil, errors.New("invalid parameters") @@ -392,63 +387,36 @@ func (api *pubSubAPI) subscribe(wsConn *wsConn, subID rpc.ID, params []interface } } -func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { - sub, unsubFn, err := api.events.SubscribeNewHeads() - if err != nil { - return nil, errors.Wrap(err, "error creating block filter") - } - - // TODO: use events - baseFee := big.NewInt(params.InitialBaseFee) - - go func() { - headersCh := sub.Event() - errCh := sub.Err() - for { - select { - case event, ok := <-headersCh: - if !ok { - return - } - - data, ok := event.Data.(tmtypes.EventDataNewBlockHeader) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data)) - continue - } - - header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: header, - }, - } +func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { + ctx, cancel := context.WithCancel(context.Background()) + //nolint: errcheck + go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error { + for _, header := range headers { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: header.EthHeader, + }, + } - err = wsConn.WriteJSON(res) - if err != nil { - api.logger.Error("error writing header, will drop peer", "error", err.Error()) + if err := wsConn.WriteJSON(res); err != nil { + api.logger.Error("error writing header, will drop peer", "error", err.Error()) - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") - } - case err, ok := <-errCh: - if !ok { - return - } - api.logger.Debug("dropping NewHeads WebSocket subscription", "subscription-id", subID, "error", err.Error()) + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") + return err } } - }() + return nil + }) - return unsubFn, nil + return cancel, nil } func try(fn func(), l log.Logger, desc string) { @@ -468,7 +436,7 @@ func try(fn func(), l log.Logger, desc string) { fn() } -func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interface{}) (pubsub.UnsubscribeFunc, error) { +func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interface{}) (context.CancelFunc, error) { crit := filters.FilterCriteria{} if extra != nil { @@ -480,30 +448,20 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } if params["address"] != nil { - address, isString := params["address"].(string) - addresses, isSlice := params["address"].([]interface{}) - if !isString && !isSlice { - err := errors.New("invalid addresses; must be address or array of addresses") - api.logger.Debug("invalid addresses", "type", fmt.Sprintf("%T", params["address"])) - return nil, err - } - - if ok { + switch address := params["address"].(type) { + case string: crit.Addresses = []common.Address{common.HexToAddress(address)} - } - - if isSlice { - crit.Addresses = []common.Address{} - for _, addr := range addresses { + case []interface{}: + for _, addr := range address { address, ok := addr.(string) if !ok { - err := errors.New("invalid address") - api.logger.Debug("invalid address", "type", fmt.Sprintf("%T", addr)) - return nil, err + return nil, errors.New("invalid address") } crit.Addresses = append(crit.Addresses, common.HexToAddress(address)) } + default: + return nil, errors.New("invalid addresses; must be address or array of addresses") } } @@ -568,127 +526,75 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } } - sub, unsubFn, err := api.events.SubscribeLogs(crit) - if err != nil { - api.logger.Error("failed to subscribe logs", "error", err.Error()) - return nil, err - } - - go func() { - ch := sub.Event() - errCh := sub.Err() - for { - select { - case event, ok := <-ch: - if !ok { - return - } + ctx, cancel := context.WithCancel(context.Background()) + //nolint: errcheck + go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error { + logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + if len(logs) == 0 { + return nil + } - dataTx, ok := event.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", event.Data)) - continue - } - txLogs, err := evmtypes.DecodeTxLogsFromEvents(dataTx.TxResult.Result.Data, uint64(dataTx.TxResult.Height)) - if err != nil { - api.logger.Error("failed to decode tx response", "error", err.Error()) - return - } - logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) - if len(logs) == 0 { - continue - } + for _, ethLog := range logs { + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: ethLog, + }, + } - for _, ethLog := range logs { - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: ethLog, - }, + err := wsConn.WriteJSON(res) + if err != nil { + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() } + }, api.logger, "closing websocket peer sub") - err = wsConn.WriteJSON(res) - if err != nil { - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") - } - } - case err, ok := <-errCh: - if !ok { - return - } - api.logger.Debug("dropping Logs WebSocket subscription", "subscription-id", subID, "error", err.Error()) + return err } } - }() + return nil + }) - return unsubFn, nil + return cancel, nil } -func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { - sub, unsubFn, err := api.events.SubscribePendingTxs() - if err != nil { - return nil, errors.Wrap(err, "error creating block filter: %s") - } - - go func() { - txsCh := sub.Event() - errCh := sub.Err() - for { - select { - case ev := <-txsCh: - data, ok := ev.Data.(tmtypes.EventDataTx) - if !ok { - api.logger.Debug("event data type mismatch", "type", fmt.Sprintf("%T", ev.Data)) - continue - } - - ethTxs, err := types.RawTxToEthTx(api.clientCtx, data.Tx) - if err != nil { - // not ethereum tx - continue - } - - for _, ethTx := range ethTxs { - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: ethTx.Hash, - }, - } +func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { + ctx, cancel := context.WithCancel(context.Background()) + //nolint: errcheck + go api.events.TxStream().Subscribe(ctx, func(items []common.Hash, _ int) error { + for _, hash := range items { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: hash, + }, + } - err = wsConn.WriteJSON(res) - if err != nil { - api.logger.Debug("error writing header, will drop peer", "error", err.Error()) + err := wsConn.WriteJSON(res) + if err != nil { + api.logger.Debug("error writing header, will drop peer", "error", err.Error()) - try(func() { - if err != websocket.ErrCloseSent { - _ = wsConn.Close() - } - }, api.logger, "closing websocket peer sub") + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() } - } - case err, ok := <-errCh: - if !ok { - return - } - api.logger.Debug("dropping PendingTransactions WebSocket subscription", subID, "error", err.Error()) + }, api.logger, "closing websocket peer sub") + return err } } - }() + return nil + }) - return unsubFn, nil + return cancel, nil } -func (api *pubSubAPI) subscribeSyncing(_ *wsConn, _ rpc.ID) (pubsub.UnsubscribeFunc, error) { +func (api *pubSubAPI) subscribeSyncing(_ *wsConn, _ rpc.ID) (context.CancelFunc, error) { return nil, errors.New("syncing subscription is not implemented") } diff --git a/server/json_rpc.go b/server/json_rpc.go index 7934a30c77..9223bbfb1d 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -16,18 +16,21 @@ package server import ( + "fmt" "net/http" "time" "github.com/gorilla/mux" "github.com/rs/cors" + rpcclient "github.com/cometbft/cometbft/rpc/client" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/server" "github.com/cosmos/cosmos-sdk/server/types" ethlog "github.com/ethereum/go-ethereum/log" ethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/evmos/ethermint/rpc" + "github.com/evmos/ethermint/rpc/stream" "github.com/evmos/ethermint/server/config" ethermint "github.com/evmos/ethermint/types" @@ -36,14 +39,21 @@ import ( // StartJSONRPC starts the JSON-RPC server func StartJSONRPC(ctx *server.Context, clientCtx client.Context, - tmRPCAddr, - tmEndpoint string, config *config.Config, indexer ethermint.EVMTxIndexer, ) (*http.Server, chan struct{}, error) { - tmWsClient := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger) - logger := ctx.Logger.With("module", "geth") + + evtClient, ok := clientCtx.Client.(rpcclient.EventsClient) + if !ok { + return nil, nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client) + } + + stream, err := stream.NewRPCStreams(evtClient, logger, clientCtx.TxConfig.TxDecoder()) + if err != nil { + return nil, nil, fmt.Errorf("failed to create rpc streams: %w", err) + } + ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error { switch r.Lvl { case ethlog.LvlTrace, ethlog.LvlDebug: @@ -61,7 +71,7 @@ func StartJSONRPC(ctx *server.Context, allowUnprotectedTxs := config.JSONRPC.AllowUnprotectedTxs rpcAPIArr := config.JSONRPC.API - apis := rpc.GetRPCAPIs(ctx, clientCtx, tmWsClient, allowUnprotectedTxs, indexer, rpcAPIArr) + apis := rpc.GetRPCAPIs(ctx, clientCtx, stream, allowUnprotectedTxs, indexer, rpcAPIArr) for _, api := range apis { if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { @@ -120,9 +130,7 @@ func StartJSONRPC(ctx *server.Context, ctx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress) - // allocate separate WS connection to Tendermint - tmWsClient = ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger) - wsSrv := rpc.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClient, config) + wsSrv := rpc.NewWebsocketsServer(clientCtx, ctx.Logger, stream, config) wsSrv.Start() return httpSrv, httpSrvDone, nil } diff --git a/server/start.go b/server/start.go index 608aebcb79..2ecfcc3da3 100644 --- a/server/start.go +++ b/server/start.go @@ -552,9 +552,7 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start ) if config.JSONRPC.Enable { - tmEndpoint := "/websocket" - tmRPCAddr := cfg.RPC.ListenAddress - httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, tmRPCAddr, tmEndpoint, &config, idxer) + httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, &config, idxer) if err != nil { return err } diff --git a/server/util.go b/server/util.go index e5d3d5e386..7eed497d25 100644 --- a/server/util.go +++ b/server/util.go @@ -22,7 +22,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/evmos/ethermint/server/config" "github.com/gorilla/mux" @@ -37,7 +36,6 @@ import ( tcmd "github.com/cometbft/cometbft/cmd/cometbft/commands" tmlog "github.com/cometbft/cometbft/libs/log" - rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client" ) // AddCommands adds server commands @@ -77,34 +75,6 @@ func AddCommands( ) } -func ConnectTmWS(tmRPCAddr, tmEndpoint string, logger tmlog.Logger) *rpcclient.WSClient { - tmWsClient, err := rpcclient.NewWS(tmRPCAddr, tmEndpoint, - rpcclient.MaxReconnectAttempts(256), - rpcclient.ReadWait(120*time.Second), - rpcclient.WriteWait(120*time.Second), - rpcclient.PingPeriod(50*time.Second), - rpcclient.OnReconnect(func() { - logger.Debug("EVM RPC reconnects to Tendermint WS", "address", tmRPCAddr+tmEndpoint) - }), - ) - - if err != nil { - logger.Error( - "Tendermint WS client could not be created", - "address", tmRPCAddr+tmEndpoint, - "error", err, - ) - } else if err := tmWsClient.OnStart(); err != nil { - logger.Error( - "Tendermint WS client could not start", - "address", tmRPCAddr+tmEndpoint, - "error", err, - ) - } - - return tmWsClient -} - func MountGRPCWebServices( router *mux.Router, grpcWeb *grpcweb.WrappedGrpcServer, diff --git a/testutil/network/util.go b/testutil/network/util.go index b5e95e9cfb..2d5d18142a 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -144,10 +144,7 @@ func startInProcess(cfg Config, val *Validator) error { return fmt.Errorf("validator %s context is nil", val.Moniker) } - tmEndpoint := "/websocket" - tmRPCAddr := val.RPCAddress - - val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, val.AppConfig, nil) + val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, val.AppConfig, nil) if err != nil { return err }