diff --git a/CHANGELOG.md b/CHANGELOG.md index 208f661084..4695190a1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (rpc) [#534](https://github.com/crypto-org-chain/ethermint/pull/534), [#540](https://github.com/crypto-org-chain/ethermint/pull/540) Fix opBlockhash when no block header in abci request. * (rpc) [#536](https://github.com/crypto-org-chain/ethermint/pull/536) Fix validate basic after transaction conversion with raw field. * (cli) [#537](https://github.com/crypto-org-chain/ethermint/pull/537) Fix unsuppored sign mode SIGN_MODE_TEXTUAL for bank transfer. +* (cli) [#]() Fix graceful shutdown. ### Improvements diff --git a/server/json_rpc.go b/server/json_rpc.go index 9156aeb6ec..ead7c6fcdf 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -16,6 +16,7 @@ package server import ( + "context" "fmt" "net/http" "time" @@ -47,18 +48,20 @@ type AppWithPendingTxStream interface { } // StartJSONRPC starts the JSON-RPC server -func StartJSONRPC(srvCtx *server.Context, +func StartJSONRPC( + ctx context.Context, + srvCtx *server.Context, clientCtx client.Context, g *errgroup.Group, config *config.Config, indexer ethermint.EVMTxIndexer, app AppWithPendingTxStream, -) (*http.Server, chan struct{}, error) { +) (*http.Server, error) { logger := srvCtx.Logger.With("module", "geth") evtClient, ok := clientCtx.Client.(rpcclient.EventsClient) if !ok { - return nil, nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client) + return nil, fmt.Errorf("client %T does not implement EventsClient", clientCtx.Client) } var rpcStream *stream.RPCStream @@ -73,7 +76,7 @@ func StartJSONRPC(srvCtx *server.Context, } if err != nil { - return nil, nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err) + return nil, fmt.Errorf("failed to create rpc streams after %d attempts: %w", MaxRetry, err) } app.RegisterPendingTxListener(rpcStream.ListenPendingTx) @@ -104,7 +107,7 @@ func StartJSONRPC(srvCtx *server.Context, "namespace", api.Namespace, "service", api.Service, ) - return nil, nil, err + return nil, err } } @@ -128,12 +131,27 @@ func StartJSONRPC(srvCtx *server.Context, ln, err := Listen(httpSrv.Addr, config) if err != nil { - return nil, nil, err + return nil, err } g.Go(func() error { srvCtx.Logger.Info("Starting JSON-RPC server", "address", config.JSONRPC.Address) - if err := httpSrv.Serve(ln); err != nil { + errCh := make(chan error) + go func() { + errCh <- httpSrv.Serve(ln) + }() + + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. + select { + case <-ctx.Done(): + // The calling process canceled or closed the provided context, so we must + // gracefully stop the gRPC server. + logger.Info("stopping JSON-RPC server...", "address", config.JSONRPC.Address) + shutdownCtx, _ := context.WithTimeout(context.Background(), 10*time.Second) + httpSrv.Shutdown(shutdownCtx) + + case err := <-errCh: if err == http.ErrServerClosed { close(httpSrvDone) } @@ -148,5 +166,5 @@ func StartJSONRPC(srvCtx *server.Context, wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config) wsSrv.Start() - return httpSrv, httpSrvDone, nil + return httpSrv, nil } diff --git a/server/start.go b/server/start.go index b1e6c290f3..8a1963f2ae 100644 --- a/server/start.go +++ b/server/start.go @@ -20,11 +20,9 @@ import ( "fmt" "io" "net" - "net/http" "os" "path/filepath" "runtime/pprof" - "time" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/crypto/keyring" @@ -419,10 +417,7 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start idxer = indexer.NewKVIndexer(idxDB, idxLogger, clientCtx) indexerService := NewEVMIndexerService(idxer, clientCtx.Client.(rpcclient.Client), config.JSONRPC.AllowIndexerGap) indexerService.SetLogger(servercmtlog.CometLoggerWrapper{Logger: idxLogger}) - - g.Go(func() error { - return indexerService.Start() - }) + go indexerService.Start() } if config.API.Enable || config.JSONRPC.Enable { @@ -443,30 +438,12 @@ func startInProcess(svrCtx *server.Context, clientCtx client.Context, opts Start if err != nil { return err } - if grpcSrv != nil { - defer grpcSrv.GracefulStop() - } - apiSrv := startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics) - if apiSrv != nil { - defer apiSrv.Close() - } + startAPIServer(ctx, svrCtx, clientCtx, g, config.Config, app, grpcSrv, metrics) - clientCtx, httpSrv, httpSrvDone, err := startJSONRPCServer(svrCtx, clientCtx, g, config, genDocProvider, idxer, app) - if httpSrv != nil { - defer func() { - shutdownCtx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) - defer cancelFn() - if err := httpSrv.Shutdown(shutdownCtx); err != nil { - logger.Error("HTTP server shutdown produced a warning", "error", err.Error()) - } else { - logger.Info("HTTP server shut down, waiting 5 sec") - select { - case <-time.Tick(5 * time.Second): - case <-httpSrvDone: - } - } - }() + clientCtx, err = startJSONRPCServer(ctx, svrCtx, clientCtx, g, config, genDocProvider, idxer, app) + if err != nil { + return err } // At this point it is safe to block the process if we're in query only mode as @@ -619,9 +596,9 @@ func startAPIServer( app types.Application, grpcSrv *grpc.Server, metrics *telemetry.Metrics, -) *api.Server { +) { if !svrCfg.API.Enable { - return nil + return } apiSrv := api.New(clientCtx, svrCtx.Logger.With("server", "api"), grpcSrv) @@ -634,10 +611,10 @@ func startAPIServer( g.Go(func() error { return apiSrv.Start(ctx, svrCfg) }) - return apiSrv } func startJSONRPCServer( + stdCtx context.Context, svrCtx *server.Context, clientCtx client.Context, g *errgroup.Group, @@ -645,7 +622,7 @@ func startJSONRPCServer( genDocProvider node.GenesisDocProvider, idxer ethermint.EVMTxIndexer, app types.Application, -) (ctx client.Context, httpSrv *http.Server, httpSrvDone chan struct{}, err error) { +) (ctx client.Context, err error) { ctx = clientCtx if !config.JSONRPC.Enable { return @@ -653,19 +630,16 @@ func startJSONRPCServer( txApp, ok := app.(AppWithPendingTxStream) if !ok { - return ctx, httpSrv, httpSrvDone, fmt.Errorf("json-rpc server requires AppWithPendingTxStream") + return ctx, fmt.Errorf("json-rpc server requires AppWithPendingTxStream") } genDoc, err := genDocProvider() if err != nil { - return ctx, httpSrv, httpSrvDone, err + return ctx, err } ctx = clientCtx.WithChainID(genDoc.ChainID) - g.Go(func() error { - httpSrv, httpSrvDone, err = StartJSONRPC(svrCtx, clientCtx, g, &config, idxer, txApp) - return err - }) + _, err = StartJSONRPC(stdCtx, svrCtx, clientCtx, g, &config, idxer, txApp) return } diff --git a/testutil/network/network.go b/testutil/network/network.go index e3d5e0c52d..d5ec078fb7 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -196,14 +196,13 @@ type ( RPCClient tmrpcclient.Client JSONRPCClient *ethclient.Client - tmNode *node.Node - api *api.Server - grpc *grpc.Server - grpcWeb *http.Server - jsonrpc *http.Server - jsonrpcDone chan struct{} - errGroup *errgroup.Group - cancelFn context.CancelFunc + tmNode *node.Node + api *api.Server + grpc *grpc.Server + grpcWeb *http.Server + jsonrpc *http.Server + errGroup *errgroup.Group + cancelFn context.CancelFunc } ) @@ -653,12 +652,6 @@ func (n *Network) Cleanup() { if err := v.jsonrpc.Shutdown(shutdownCtx); err != nil { v.tmNode.Logger.Error("HTTP server shutdown produced a warning", "error", err.Error()) - } else { - v.tmNode.Logger.Info("HTTP server shut down, waiting 5 sec") - select { - case <-time.Tick(5 * time.Second): - case <-v.jsonrpcDone: - } } } } diff --git a/testutil/network/util.go b/testutil/network/util.go index ebc5e1e044..9b8721fe19 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -143,8 +143,8 @@ func startInProcess(cfg Config, val *Validator) error { return fmt.Errorf("validator %s context is nil", val.Moniker) } - val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC( - val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig, + val.jsonrpc, err = server.StartJSONRPC( + ctx, val.Ctx, val.ClientCtx, val.errGroup, val.AppConfig, nil, app.(server.AppWithPendingTxStream), ) if err != nil {