diff --git a/rpc/websockets.go b/rpc/websockets.go index 7303a7717a..b4d34b2283 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -86,7 +86,9 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer( + ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config, +) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address) @@ -95,7 +97,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *st wsAddr: cfg.JSONRPC.WsAddress, certFile: cfg.TLS.CertificatePath, keyFile: cfg.TLS.KeyPath, - api: newPubSubAPI(clientCtx, logger, stream), + api: newPubSubAPI(ctx, clientCtx, logger, stream), logger: logger, } } @@ -347,18 +349,20 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { - events *stream.RPCStream - logger log.Logger - clientCtx client.Context + events *stream.RPCStream + logger log.Logger + clientCtx client.Context + cancelContext context.Context } // newPubSubAPI creates an instance of the ethereum PubSub API. -func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { +func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: stream, - logger: logger, - clientCtx: clientCtx, + events: stream, + logger: logger, + clientCtx: clientCtx, + cancelContext: ctx, } } @@ -411,7 +415,8 @@ type Header struct { } func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) + //nolint: errcheck go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error { for _, header := range headers { @@ -569,7 +574,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) //nolint: errcheck go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error { logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) @@ -589,6 +594,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac err := wsConn.WriteJSON(res) if err != nil { + api.logger.Error("error writing header, will drop peer", "error", err.Error()) try(func() { if err != websocket.ErrCloseSent { _ = wsConn.Close() @@ -605,7 +611,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) //nolint: errcheck go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error { for _, hash := range items { diff --git a/server/json_rpc.go b/server/json_rpc.go index 130af9bc69..c8b5b5eedb 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -165,7 +165,7 @@ func StartJSONRPC( srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress) - wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config) + wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config) wsSrv.Start() return httpSrv, nil }