Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: internal websocket connection is heavy and unstable #373

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ test-import:
go test -run TestImporterTestSuite -v --vet=off github.com/evmos/ethermint/tests/importer

test-rpc:
./scripts/integration-test-all.sh -t "rpc" -q 1 -z 1 -s 2 -m "rpc" -r "true"
./scripts/integration-test-all.sh -t "rpc" -q 1 -z 1 -s 5 -m "rpc" -r "true"

run-integration-tests:
@nix-shell ./tests/integration_tests/shell.nix --run ./scripts/run-integration-tests.sh
Expand Down
20 changes: 10 additions & 10 deletions rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/evmos/ethermint/rpc/namespaces/ethereum/web3"
ethermint "github.com/evmos/ethermint/types"

rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
rpcclient "github.com/cometbft/cometbft/rpc/client"
)

// RPC namespaces and API version
Expand All @@ -60,7 +60,7 @@ const (
type APICreator = func(
ctx *server.Context,
clientCtx client.Context,
tendermintWebsocketClient *rpcclient.WSClient,
tendermintWebsocketClient rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
) []rpc.API
Expand All @@ -72,7 +72,7 @@ func init() {
apiCreators = map[string]APICreator{
EthNamespace: func(ctx *server.Context,
clientCtx client.Context,
tmWSClient *rpcclient.WSClient,
tmWSClient rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
) []rpc.API {
Expand All @@ -92,7 +92,7 @@ func init() {
},
}
},
Web3Namespace: func(*server.Context, client.Context, *rpcclient.WSClient, bool, ethermint.EVMTxIndexer) []rpc.API {
Web3Namespace: func(*server.Context, client.Context, rpcclient.EventsClient, bool, ethermint.EVMTxIndexer) []rpc.API {
return []rpc.API{
{
Namespace: Web3Namespace,
Expand All @@ -102,7 +102,7 @@ func init() {
},
}
},
NetNamespace: func(_ *server.Context, clientCtx client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API {
NetNamespace: func(_ *server.Context, clientCtx client.Context, _ rpcclient.EventsClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API {
return []rpc.API{
{
Namespace: NetNamespace,
Expand All @@ -114,7 +114,7 @@ func init() {
},
PersonalNamespace: func(ctx *server.Context,
clientCtx client.Context,
_ *rpcclient.WSClient,
_ rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
) []rpc.API {
Expand All @@ -128,7 +128,7 @@ func init() {
},
}
},
TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ *rpcclient.WSClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API {
TxPoolNamespace: func(ctx *server.Context, _ client.Context, _ rpcclient.EventsClient, _ bool, _ ethermint.EVMTxIndexer) []rpc.API {
return []rpc.API{
{
Namespace: TxPoolNamespace,
Expand All @@ -140,7 +140,7 @@ func init() {
},
DebugNamespace: func(ctx *server.Context,
clientCtx client.Context,
_ *rpcclient.WSClient,
_ rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
) []rpc.API {
Expand All @@ -156,7 +156,7 @@ func init() {
},
MinerNamespace: func(ctx *server.Context,
clientCtx client.Context,
_ *rpcclient.WSClient,
_ rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
) []rpc.API {
Expand All @@ -176,7 +176,7 @@ func init() {
// GetRPCAPIs returns the list of all APIs
func GetRPCAPIs(ctx *server.Context,
clientCtx client.Context,
tmWSClient *rpcclient.WSClient,
tmWSClient rpcclient.EventsClient,
allowUnprotectedTxs bool,
indexer ethermint.EVMTxIndexer,
selectedAPIs []string,
Expand Down
4 changes: 2 additions & 2 deletions rpc/namespaces/ethereum/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

"github.com/cometbft/cometbft/libs/log"

rpcclient "github.com/cometbft/cometbft/rpc/client"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
tmtypes "github.com/cometbft/cometbft/types"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -93,7 +93,7 @@ type PublicFilterAPI struct {
}

// NewPublicAPI returns a new PublicFilterAPI instance.
func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient *rpcclient.WSClient, backend Backend) *PublicFilterAPI {
func NewPublicAPI(logger log.Logger, clientCtx client.Context, tmWSClient rpcclient.EventsClient, backend Backend) *PublicFilterAPI {
logger = logger.With("api", "filter")
api := &PublicFilterAPI{
logger: logger,
Expand Down
123 changes: 34 additions & 89 deletions rpc/namespaces/ethereum/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

"github.com/pkg/errors"

tmjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
tmpubsub "github.com/cometbft/cometbft/libs/pubsub"
tmquery "github.com/cometbft/cometbft/libs/pubsub/query"
rpcclient "github.com/cometbft/cometbft/rpc/client"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
rpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
tmtypes "github.com/cometbft/cometbft/types"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,6 +41,8 @@
evmtypes "github.com/evmos/ethermint/x/evm/types"
)

const subscriberName = "eth_filter"

var (
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'",
Expand All @@ -56,7 +58,7 @@
type EventSystem struct {
logger log.Logger
ctx context.Context
tmWSClient *rpcclient.WSClient
tmWSClient rpcclient.EventsClient

// light client mode
lightMode bool
Expand All @@ -66,7 +68,6 @@
indexMux *sync.RWMutex

// Channels
install chan *Subscription // install filter for event notification
uninstall chan *Subscription // remove filter for event notification
eventBus pubsub.EventBus
}
Expand All @@ -77,7 +78,7 @@
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(logger log.Logger, tmWSClient *rpcclient.WSClient) *EventSystem {
func NewEventSystem(logger log.Logger, tmWSClient rpcclient.EventsClient) *EventSystem {
index := make(filterIndex)
for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*Subscription)
Expand All @@ -91,13 +92,11 @@
index: index,
topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)),
indexMux: new(sync.RWMutex),
install: make(chan *Subscription),
uninstall: make(chan *Subscription),
eventBus: pubsub.NewEventBus(),
}

go es.eventLoop()
go es.consumeEvents()
return es
}

Expand All @@ -113,6 +112,7 @@
var (
err error
cancelFn context.CancelFunc
chEvents <-chan coretypes.ResultEvent
)

ctx, cancelFn := context.WithCancel(context.Background())
Expand All @@ -134,23 +134,22 @@

switch sub.typ {
case filters.LogsSubscription:
err = es.tmWSClient.Subscribe(ctx, sub.event)
chEvents, err = es.tmWSClient.Subscribe(ctx, subscriberName, sub.event)
case filters.BlocksSubscription:
err = es.tmWSClient.Subscribe(ctx, sub.event)
chEvents, err = es.tmWSClient.Subscribe(ctx, subscriberName, sub.event)
case filters.PendingTransactionsSubscription:
err = es.tmWSClient.Subscribe(ctx, sub.event)
chEvents, err = es.tmWSClient.Subscribe(ctx, subscriberName, sub.event)
default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
}

if err != nil {
sub.err <- err
if err != nil && !errors.Is(err, tmpubsub.ErrAlreadySubscribed) {
return nil, nil, err
}

// wrap events in a go routine to prevent blocking
es.install <- sub
<-sub.installed
if err := es.eventBus.AddTopic(sub.event, chEvents); err != nil {
return nil, nil, err
}

eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
Expand Down Expand Up @@ -194,42 +193,39 @@
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.LogsSubscription,
event: evmEvents,
logsCrit: crit,
created: time.Now().UTC(),
logs: make(chan []*ethtypes.Log),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
id: rpc.NewID(),
typ: filters.LogsSubscription,
event: evmEvents,
logsCrit: crit,
created: time.Now().UTC(),
logs: make(chan []*ethtypes.Log),
err: make(chan error, 1),
}
return es.subscribe(sub)
}

// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
event: headerEvents,
created: time.Now().UTC(),
headers: make(chan *ethtypes.Header),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
id: rpc.NewID(),
typ: filters.BlocksSubscription,
event: headerEvents,
created: time.Now().UTC(),
headers: make(chan *ethtypes.Header),
err: make(chan error, 1),
}
return es.subscribe(sub)
}

// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
event: txEvents,
created: time.Now().UTC(),
hashes: make(chan []common.Hash),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
event: txEvents,
created: time.Now().UTC(),
hashes: make(chan []common.Hash),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
Expand All @@ -238,19 +234,8 @@

// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
for {

Check failure on line 237 in rpc/namespaces/ethereum/eth/filters/filter_system.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

S1000: should use for range instead of for { select {} } (gosimple)
select {
case f := <-es.install:
es.indexMux.Lock()
es.index[f.typ][f.id] = f
ch := make(chan coretypes.ResultEvent)
if err := es.eventBus.AddTopic(f.event, ch); err != nil {
es.logger.Error("failed to add event topic to event bus", "topic", f.event, "error", err.Error())
} else {
es.topicChans[f.event] = ch
}
es.indexMux.Unlock()
close(f.installed)
case f := <-es.uninstall:
es.indexMux.Lock()
delete(es.index[f.typ], f.id)
Expand All @@ -265,7 +250,7 @@

// remove topic only when channel is not used by other subscriptions
if !channelInUse {
if err := es.tmWSClient.Unsubscribe(es.ctx, f.event); err != nil {
if err := es.tmWSClient.Unsubscribe(es.ctx, subscriberName, f.event); err != nil {
es.logger.Error("failed to unsubscribe from query", "query", f.event, "error", err.Error())
}

Expand All @@ -282,43 +267,3 @@
}
}
}

func (es *EventSystem) consumeEvents() {
for {
for rpcResp := range es.tmWSClient.ResponsesCh {
var ev coretypes.ResultEvent

if rpcResp.Error != nil {
time.Sleep(5 * time.Second)
continue
} else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil {
es.logger.Error("failed to JSON unmarshal ResponsesCh result event", "error", err.Error())
continue
}

if len(ev.Query) == 0 {
// skip empty responses
continue
}

es.indexMux.RLock()
ch, ok := es.topicChans[ev.Query]
es.indexMux.RUnlock()
if !ok {
es.logger.Debug("channel for subscription not found", "topic", ev.Query)
es.logger.Debug("list of available channels", "channels", es.eventBus.Topics())
continue
}

// gracefully handle lagging subscribers
t := time.NewTimer(time.Second)
select {
case <-t.C:
es.logger.Debug("dropped event during lagging subscription", "topic", ev.Query)
case ch <- ev:
}
}

time.Sleep(time.Second)
}
}
21 changes: 10 additions & 11 deletions rpc/namespaces/ethereum/eth/filters/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@ import (

// Subscription defines a wrapper for the private subscription
type Subscription struct {
id rpc.ID
typ filters.Type
event string
created time.Time
logsCrit filters.FilterCriteria
logs chan []*ethtypes.Log
hashes chan []common.Hash
headers chan *ethtypes.Header
installed chan struct{} // closed when the filter is installed
eventCh <-chan coretypes.ResultEvent
err chan error
id rpc.ID
typ filters.Type
event string
created time.Time
logsCrit filters.FilterCriteria
logs chan []*ethtypes.Log
hashes chan []common.Hash
headers chan *ethtypes.Header
eventCh <-chan coretypes.ResultEvent
err chan error
}

// ID returns the underlying subscription RPC identifier.
Expand Down
Loading
Loading