From a1f9b19751323220b9f5a718106e5350487c20ed Mon Sep 17 00:00:00 2001 From: valli0x Date: Fri, 25 Oct 2024 13:32:40 +0300 Subject: [PATCH 1/4] graceful shutdown of websocket --- rpc/websockets.go | 32 +++++++++++++++++--------------- server/json_rpc.go | 2 +- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/rpc/websockets.go b/rpc/websockets.go index 7303a7717a..9c1f012748 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -86,7 +86,7 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address) @@ -95,7 +95,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *st wsAddr: cfg.JSONRPC.WsAddress, certFile: cfg.TLS.CertificatePath, keyFile: cfg.TLS.KeyPath, - api: newPubSubAPI(clientCtx, logger, stream), + api: newPubSubAPI(ctx, clientCtx, logger, stream), logger: logger, } } @@ -347,18 +347,20 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { - events *stream.RPCStream - logger log.Logger - clientCtx client.Context + events *stream.RPCStream + logger log.Logger + clientCtx client.Context + cancelContext context.Context } // newPubSubAPI creates an instance of the ethereum PubSub API. -func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { +func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: stream, - logger: logger, - clientCtx: clientCtx, + events: stream, + logger: logger, + clientCtx: clientCtx, + cancelContext: ctx, } } @@ -411,9 +413,9 @@ type Header struct { } func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error { + go api.events.HeaderStream().Subscribe(api.cancelContext, func(headers []stream.RPCHeader, _ int) error { for _, header := range headers { h := header.EthHeader var enc Header @@ -569,9 +571,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } } - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error { + go api.events.LogStream().Subscribe(api.cancelContext, func(txLogs []*ethtypes.Log, _ int) error { logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) if len(logs) == 0 { return nil @@ -605,9 +607,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error { + go api.events.PendingTxStream().Subscribe(api.cancelContext, func(items []common.Hash, _ int) error { for _, hash := range items { // write to ws conn res := &SubscriptionNotification{ diff --git a/server/json_rpc.go b/server/json_rpc.go index 130af9bc69..c8b5b5eedb 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -165,7 +165,7 @@ func StartJSONRPC( srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress) - wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config) + wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config) wsSrv.Start() return httpSrv, nil } From 6208ee78e6f06f25332005a135f822b52222e587 Mon Sep 17 00:00:00 2001 From: valli0x Date: Mon, 28 Oct 2024 15:49:03 +0300 Subject: [PATCH 2/4] context for unsubscribe logs --- rpc/websockets.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rpc/websockets.go b/rpc/websockets.go index 9c1f012748..61c0b14b65 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -413,9 +413,10 @@ type Header struct { } func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - _, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) + //nolint: errcheck - go api.events.HeaderStream().Subscribe(api.cancelContext, func(headers []stream.RPCHeader, _ int) error { + go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error { for _, header := range headers { h := header.EthHeader var enc Header @@ -571,9 +572,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } } - _, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) //nolint: errcheck - go api.events.LogStream().Subscribe(api.cancelContext, func(txLogs []*ethtypes.Log, _ int) error { + go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error { logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) if len(logs) == 0 { return nil @@ -607,9 +608,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - _, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(api.cancelContext) //nolint: errcheck - go api.events.PendingTxStream().Subscribe(api.cancelContext, func(items []common.Hash, _ int) error { + go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error { for _, hash := range items { // write to ws conn res := &SubscriptionNotification{ From b2d201476681604af5ba1309997db9f117c1c9d5 Mon Sep 17 00:00:00 2001 From: valli0x Date: Mon, 28 Oct 2024 18:42:24 +0300 Subject: [PATCH 3/4] added a logger to the log subscription --- rpc/websockets.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rpc/websockets.go b/rpc/websockets.go index 61c0b14b65..dde74e797b 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -592,6 +592,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac err := wsConn.WriteJSON(res) if err != nil { + api.logger.Error("error writing header, will drop peer", "error", err.Error()) try(func() { if err != websocket.ErrCloseSent { _ = wsConn.Close() From 622cf8f45d15960d085721e96f2dd89bcf8c2110 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Tue, 29 Oct 2024 09:31:23 +0800 Subject: [PATCH 4/4] lint --- rpc/websockets.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/websockets.go b/rpc/websockets.go index dde74e797b..b4d34b2283 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -86,7 +86,9 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer( + ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config, +) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address)