Skip to content

Commit

Permalink
add initial status server
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Jan 22, 2025
1 parent 2e377f4 commit 9757729
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 12 deletions.
23 changes: 22 additions & 1 deletion cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ func main() {
err = cpuprofile.StartCPUProfiler()
terror.MustNil(err)

startDebugHelpers()

if config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler {
err = tiflashcompute.InitGlobalTopoFetcher(
config.GetGlobalConfig().TiFlashComputeAutoScalerType,
Expand Down Expand Up @@ -324,7 +326,7 @@ func main() {
svr := createServer(storage, dom)

exited := make(chan struct{})
signal.SetupSignalHandler(func() {
signal.SetupCloseSignalHandler(func() {
svr.Close()
resourcemanager.InstanceResourceManager.Stop()
cleanup(svr, storage, dom)
Expand Down Expand Up @@ -990,3 +992,22 @@ func enablePyroscope() {
}
}
}

// startDebugHelpers starts the following tools to help debug
// 1. Setup the signal handler of `SIGUSR1` to print stacks.
// 2. Start the initial status server to handle `/debug/pprof` and `/metrics`
//
// Please remember to call `server.StopInitialStatusServer` before starting the
// real status server, as they'll share the same address.
func startDebugHelpers() {
// setup the
signal.SetupStackSignalHandler()

cfg := config.GetGlobalConfig()
if cfg.Status.ReportStatus {
err := server.StartInitialStatusServer(cfg)
if err != nil {
terror.MustNil(err)
}
}
}
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go_test(
"conn_stmt_test.go",
"conn_test.go",
"driver_tidb_test.go",
"http_status_test.go",
"main_test.go",
"mock_conn_test.go",
"server_test.go",
Expand Down Expand Up @@ -192,6 +193,7 @@ go_test(
"//pkg/util/arena",
"//pkg/util/chunk",
"//pkg/util/context",
"//pkg/util/cpuprofile",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/plancodec",
"//pkg/util/replayer",
Expand Down
94 changes: 85 additions & 9 deletions pkg/server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func (b *Ballast) GenHTTPHandler() func(w http.ResponseWriter, r *http.Request)
func (s *Server) startHTTPServer() {
router := mux.NewRouter()

setupCommonPaths(router)

router.HandleFunc("/status", s.handleStatus).Name("Status")
// HTTP path for prometheus.
router.Handle("/metrics", promhttp.Handler()).Name("Metrics")

// HTTP path for dump statistics.
router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler()).
Expand Down Expand Up @@ -296,13 +296,6 @@ func (s *Server) startHTTPServer() {
router.PathPrefix("/static/").Handler(http.StripPrefix("/static", http.FileServer(static.Data)))
}

router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", cpuprofile.ProfileHTTPHandler)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
// Other /debug/pprof paths not covered above are redirected to pprof.Index.
router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)

ballast := newBallast(s.cfg.MaxBallastObjectSize)
{
err := ballast.SetSize(s.cfg.BallastObjectSize)
Expand Down Expand Up @@ -637,3 +630,86 @@ func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHan

return optimizor.NewStatsPriorityQueueHandler(do)
}

// setupCommonPaths adds the paths that can be handled by both `*Server` and `initialStatusServer` to the router
// It should be kept as simple as possible, and only include the tools which are helpful for the developers to debug
// issues before starting the `*Server`
func setupCommonPaths(router *mux.Router) {
// HTTP path for prometheus.
router.Handle("/metrics", promhttp.Handler()).Name("Metrics")

router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", cpuprofile.ProfileHTTPHandler)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)
// Other /debug/pprof paths not covered above are redirected to pprof.Index.
router.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
}

// initialStatusServer is a temporary http server which handles the useful toolkits to debug in the boot stage. It'll be stopped
// before starting the real status server.
var initialStatusServer *http.Server

// StartInitialStatusServer starts the initial status server. It should guarantee that when the `err != nil`,
// the server is not started and it's safe to listen on the same port.
func StartInitialStatusServer(cfg *config.Config) error {
router := mux.NewRouter()

setupCommonPaths(router)
serverMux := http.NewServeMux()
serverMux.Handle("/", router)

statusAddr := net.JoinHostPort(cfg.Status.StatusHost, strconv.Itoa(int(cfg.Status.StatusPort)))
clusterSecurity := cfg.Security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
logutil.BgLogger().Error("invalid TLS config", zap.Error(err))
return err
}

initialStatusServer = &http.Server{
Addr: statusAddr,
Handler: util2.NewCorsHandler(serverMux, cfg),
TLSConfig: tlsConfig,
}

go func() {
var err error
if tlsConfig != nil {
// the TLS configuration should have been included in `tlsConfig`, it's fine
// to path empty strings to `ListenAndServeTLS`
err = initialStatusServer.ListenAndServeTLS("", "")
} else {
err = initialStatusServer.ListenAndServe()
}
if err != http.ErrServerClosed {
logutil.BgLogger().Error("initial status server error", zap.Error(err))
} else {
logutil.BgLogger().Info("initial status server stopped")
}
}()

return nil
}

const stopInitialStatusServerTimeout = 5 * time.Second

// StopInitialStatusServer tries to stop the initial status server. It should be called before starting the fully functional status server
func StopInitialStatusServer() {
if initialStatusServer == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), stopInitialStatusServerTimeout)
defer cancel()

err := initialStatusServer.Shutdown(ctx)
if err != nil {
logutil.BgLogger().Error("fail to shutdown the initial status server gracefully, close it directly", zap.Error(err))

err := initialStatusServer.Close()
if err != nil {
logutil.BgLogger().Error("fail to close the initial status server", zap.Error(err))
}
}
}
196 changes: 196 additions & 0 deletions pkg/server/http_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
"math/big"
mrand "math/rand"
"net"
"net/http"
"os"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/util/cpuprofile"
"github.com/stretchr/testify/require"
)

func generateCertificates(t *testing.T, certPath string, keyPath string) {
priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
require.NoError(t, err)

notBefore := time.Now()
notAfter := notBefore.Add(365 * 24 * time.Hour)

serialNumber, err := rand.Int(rand.Reader, new(big.Int).Lsh(big.NewInt(1), 128))
require.NoError(t, err)

template := x509.Certificate{
SerialNumber: serialNumber,
NotBefore: notBefore,
NotAfter: notAfter,

IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
}

derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
require.NoError(t, err)

certOut, err := os.Create(certPath)
require.NoError(t, err)
defer certOut.Close()

err = pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
require.NoError(t, err)

keyOut, err := os.Create(keyPath)
require.NoError(t, err)
defer keyOut.Close()

privBytes, err := x509.MarshalECPrivateKey(priv)
require.NoError(t, err)

err = pem.Encode(keyOut, &pem.Block{Type: "EC PRIVATE KEY", Bytes: privBytes})
require.NoError(t, err)
}

func TestInitialStatusServer(t *testing.T) {
randPort := (mrand.Int() % 40000) + 20000

// it's required for the `/debug/pprof/profile` to work.
err := cpuprofile.StartCPUProfiler()
terror.MustNil(err)
defer cpuprofile.StopCPUProfiler()

testInitialStatusServer := func(httpClient *http.Client, url string) {
// Wait for the server to start
require.Eventually(t, func() bool {
resp, err := httpClient.Get(url + "/debug/pprof/cmdline")
if resp != nil {
defer resp.Body.Close()
}

return err == nil && resp.StatusCode == http.StatusOK
}, time.Second, time.Millisecond*100)

// `/debug` endpoints should work
endpoints := []string{
"/debug/pprof/cmdline",
"/debug/pprof/profile?seconds=1",
"/debug/pprof/symbol",
"/debug/pprof/trace?seconds=1",
"/debug/pprof/goroutine",
"/debug/pprof/",
}

for _, endpoint := range endpoints {
resp, err := httpClient.Get(url + endpoint)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode, "endpoint %s should return 200", endpoint)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotEmpty(t, body, "endpoint %s should return non-empty body", endpoint)
resp.Body.Close()
}
}

t.Run("test start server", func(t *testing.T) {
cfg := &config.Config{
Status: config.Status{
StatusHost: "127.0.0.1",
StatusPort: uint(randPort),
},
}

err := StartInitialStatusServer(cfg)
require.NoError(t, err)
require.NotNil(t, initialStatusServer)
require.Equal(t, fmt.Sprintf("127.0.0.1:%d", randPort), initialStatusServer.Addr)

url := "http://" + initialStatusServer.Addr
testInitialStatusServer(http.DefaultClient, url)

StopInitialStatusServer()

// Then the port should have been released
resp, err := http.Get(url + "/debug/pprof/cmdline")
require.Error(t, err)
defer resp.Body.Close()

wg := &sync.WaitGroup{}
wg.Add(1)
server := &http.Server{}
go func() {
defer wg.Done()

err := server.ListenAndServe()
require.Equal(t, http.ErrServerClosed, err)
}()
time.Sleep(time.Millisecond * 100)
server.Close()
wg.Wait()
})

t.Run("test start server with tls", func(t *testing.T) {
certPath := t.TempDir() + "/cert.pem"
keyPath := t.TempDir() + "/key.pem"
generateCertificates(t, certPath, keyPath)
// to keep it simple, use the cert as CA directly
caPool := x509.NewCertPool()
certData, err := os.ReadFile(certPath)
require.NoError(t, err)
caPool.AppendCertsFromPEM(certData)

cfg := &config.Config{
Status: config.Status{
StatusHost: "127.0.0.1",
StatusPort: uint(randPort),
},
Security: config.Security{
ClusterSSLCA: certPath,
ClusterSSLCert: certPath,
ClusterSSLKey: keyPath,
},
}

err = StartInitialStatusServer(cfg)
require.NoError(t, err)
require.NotNil(t, initialStatusServer)
require.Equal(t, fmt.Sprintf("127.0.0.1:%d", randPort), initialStatusServer.Addr)

url := "https://" + initialStatusServer.Addr
testInitialStatusServer(&http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caPool,
},
},
}, url)

StopInitialStatusServer()
})
}
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ func (s *Server) initTiDBListener() (err error) {

func (s *Server) initHTTPListener() (err error) {
if s.cfg.Status.ReportStatus {
StopInitialStatusServer()

if err = s.listenStatusHTTPServer(); err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/util/signal/signal_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"go.uber.org/zap"
)

// SetupSignalHandler setup signal handler for TiDB Server
func SetupSignalHandler(shutdownFunc func()) {
// SetupStackSignalHandler setup signal handler for `SIGUSR1`. It'll dump goroutine stack and print it out.
func SetupStackSignalHandler() {
usrDefSignalChan := make(chan os.Signal, 1)

signal.Notify(usrDefSignalChan, syscall.SIGUSR1)
Expand All @@ -41,7 +41,11 @@ func SetupSignalHandler(shutdownFunc func()) {
}
}
}()
}

// SetupCloseSignalHandler setup signal handler for `SIGHUP`, `SIGINT`, `SIGTERM` and `SIGQUIT` for TiDB Server
// These signals will close TiDB server.
func SetupCloseSignalHandler(shutdownFunc func()) {
closeSignalChan := make(chan os.Signal, 1)
signal.Notify(closeSignalChan,
syscall.SIGHUP,
Expand Down

0 comments on commit 9757729

Please sign in to comment.