Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graceful shutdown of websocket #547

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
logger log.Logger
}

func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer {
func NewWebsocketsServer(
ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config,
) WebsocketsServer {

Check warning on line 91 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L91

Added line #L91 was not covered by tests
logger = logger.With("api", "websocket-server")
_, port, _ := net.SplitHostPort(cfg.JSONRPC.Address)

Expand All @@ -95,7 +97,7 @@
wsAddr: cfg.JSONRPC.WsAddress,
certFile: cfg.TLS.CertificatePath,
keyFile: cfg.TLS.KeyPath,
api: newPubSubAPI(clientCtx, logger, stream),
api: newPubSubAPI(ctx, clientCtx, logger, stream),

Check warning on line 100 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L100

Added line #L100 was not covered by tests
logger: logger,
}
}
Expand Down Expand Up @@ -347,18 +349,20 @@

// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
type pubSubAPI struct {
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
cancelContext context.Context
}

// newPubSubAPI creates an instance of the ethereum PubSub API.
func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {
func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {

Check warning on line 359 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L359

Added line #L359 was not covered by tests
logger = logger.With("module", "websocket-client")
return &pubSubAPI{
events: stream,
logger: logger,
clientCtx: clientCtx,
events: stream,
logger: logger,
clientCtx: clientCtx,
cancelContext: ctx,

Check warning on line 365 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L362-L365

Added lines #L362 - L365 were not covered by tests
}
}

Expand Down Expand Up @@ -411,7 +415,8 @@
}

func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)

Check warning on line 419 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L418-L419

Added lines #L418 - L419 were not covered by tests
//nolint: errcheck
go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error {
for _, header := range headers {
Expand Down Expand Up @@ -569,7 +574,7 @@
}
}

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)

Check warning on line 577 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L577

Added line #L577 was not covered by tests
//nolint: errcheck
go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error {
logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
Expand All @@ -589,6 +594,7 @@

err := wsConn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing header, will drop peer", "error", err.Error())

Check warning on line 597 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L597

Added line #L597 was not covered by tests
try(func() {
if err != websocket.ErrCloseSent {
_ = wsConn.Close()
Expand All @@ -605,7 +611,7 @@
}

func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(api.cancelContext)

Check warning on line 614 in rpc/websockets.go

View check run for this annotation

Codecov / codecov/patch

rpc/websockets.go#L614

Added line #L614 was not covered by tests
//nolint: errcheck
go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
for _, hash := range items {
Expand Down
2 changes: 1 addition & 1 deletion server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@

srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config)

Check warning on line 168 in server/json_rpc.go

View check run for this annotation

Codecov / codecov/patch

server/json_rpc.go#L168

Added line #L168 was not covered by tests
wsSrv.Start()
return httpSrv, nil
}
Loading