Skip to content

Commit

Permalink
feat(internal/server): dynamically refresh containerd, docker, kubele…
Browse files Browse the repository at this point in the history
…t components (#78)

* feat(internal/server): dynamically refresh containerd, docker, kubelet components

Signed-off-by: Gyuho Lee <[email protected]>

* clean up dmesg

Signed-off-by: Gyuho Lee <[email protected]>

* use binary locate for default + fallback

Signed-off-by: Gyuho Lee <[email protected]>

---------

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho authored Oct 3, 2024
1 parent 9d1a205 commit 57c9c86
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 92 deletions.
9 changes: 8 additions & 1 deletion cmd/gpud/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ var (

pprof bool

retentionPeriod time.Duration
retentionPeriod time.Duration
refreshComponentsInterval time.Duration

webEnable bool
webAdmin bool
Expand Down Expand Up @@ -157,6 +158,12 @@ sudo rm /etc/systemd/system/gpud.service
Destination: &retentionPeriod,
Value: config.DefaultRetentionPeriod.Duration,
},
&cli.DurationFlag{
Name: "refresh-components-interval",
Usage: "set the time period to refresh selected components",
Destination: &refreshComponentsInterval,
Value: config.DefaultRefreshComponentsInterval.Duration,
},
&cli.BoolTFlag{
Name: "web-enable",
Usage: "enable local web interface (default: true)",
Expand Down
3 changes: 1 addition & 2 deletions components/docker/container/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
)

type Config struct {
Query query_config.Config `json:"query"`
Endpoint string `json:"endpoint"`
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type Config struct {
// Once elapsed, old states/metrics are purged/compacted.
RetentionPeriod metav1.Duration `json:"retention_period"`

// Interval at which to refresh selected components.
// Disables refresh if not set.
RefreshComponentsInterval metav1.Duration `json:"refresh_components_interval"`

// Set true to enable profiler.
Pprof bool `json:"pprof"`

Expand Down Expand Up @@ -65,6 +69,9 @@ func (config *Config) Validate() error {
if config.RetentionPeriod.Duration < time.Minute {
return fmt.Errorf("retention_period must be at least 1 minute, got %d", config.RetentionPeriod.Duration)
}
if config.RefreshComponentsInterval.Duration < time.Minute {
return fmt.Errorf("refresh_components_interval must be at least 1 minute, got %d", config.RefreshComponentsInterval.Duration)
}
if config.Web != nil && config.Web.RefreshPeriod.Duration < time.Minute {
return fmt.Errorf("web_refresh_period must be at least 1 minute, got %d", config.Web.RefreshPeriod.Duration)
}
Expand Down
223 changes: 145 additions & 78 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
component_systemd "github.com/leptonai/gpud/components/systemd"
"github.com/leptonai/gpud/components/tailscale"
"github.com/leptonai/gpud/log"
pkg_file "github.com/leptonai/gpud/pkg/file"
pkd_systemd "github.com/leptonai/gpud/pkg/systemd"
"github.com/leptonai/gpud/systemd"
"github.com/leptonai/gpud/version"
Expand All @@ -59,8 +60,9 @@ const (
)

var (
DefaultRefreshPeriod = metav1.Duration{Duration: time.Minute}
DefaultRetentionPeriod = metav1.Duration{Duration: 30 * time.Minute}
DefaultRefreshPeriod = metav1.Duration{Duration: time.Minute}
DefaultRetentionPeriod = metav1.Duration{Duration: 30 * time.Minute}
DefaultRefreshComponentsInterval = metav1.Duration{Duration: time.Minute}
)

func DefaultConfig(ctx context.Context, opts ...OpOption) (*Config, error) {
Expand All @@ -69,8 +71,6 @@ func DefaultConfig(ctx context.Context, opts ...OpOption) (*Config, error) {
return nil, err
}

asRoot := stdos.Geteuid() == 0 // running as root

cfg := &Config{
APIVersion: DefaultAPIVersion,

Expand All @@ -90,8 +90,9 @@ func DefaultConfig(ctx context.Context, opts ...OpOption) (*Config, error) {
os.Name: nil,
},

RetentionPeriod: DefaultRetentionPeriod,
Pprof: false,
RetentionPeriod: DefaultRetentionPeriod,
RefreshComponentsInterval: DefaultRefreshComponentsInterval,
Pprof: false,

Web: &Web{
Enable: true,
Expand All @@ -107,86 +108,22 @@ func DefaultConfig(ctx context.Context, opts ...OpOption) (*Config, error) {
cfg.Components[file.Name] = options.filesToCheck
}

if runtime.GOOS == "linux" {
containerdSocketExists := false
containerdRunning := false

if _, err := stdos.Stat(containerd_pod.DefaultSocketFile); err == nil {
log.Logger.Debugw("containerd default socket file exists, containerd installed", "file", containerd_pod.DefaultSocketFile)
containerdSocketExists = true
} else {
log.Logger.Debugw("containerd default socket file does not exist, skip containerd check", "file", containerd_pod.DefaultSocketFile, "error", err)
}

cctx, ccancel := context.WithTimeout(ctx, 5*time.Second)
defer ccancel()
if _, _, conn, err := containerd_pod.Connect(cctx, containerd_pod.DefaultContainerRuntimeEndpoint); err == nil {
log.Logger.Debugw("containerd default cri endpoint open, containerd running", "endpoint", containerd_pod.DefaultContainerRuntimeEndpoint)
containerdRunning = true
_ = conn.Close()
} else {
log.Logger.Debugw("containerd default cri endpoint not open, skip containerd checking", "endpoint", containerd_pod.DefaultContainerRuntimeEndpoint, "error", err)
}

if containerdSocketExists && containerdRunning {
log.Logger.Debugw("auto-detected containerd -- configuring containerd pod component")
cfg.Components[containerd_pod.Name] = containerd_pod.Config{
Query: query_config.DefaultConfig(),
Endpoint: containerd_pod.DefaultContainerRuntimeEndpoint,
}
}
} else {
log.Logger.Debugw("ignoring default containerd pod checking since it's not linux", "os", runtime.GOOS)
if cc, exists := DefaultDockerContainerComponent(ctx); exists {
cfg.Components[docker_container.Name] = cc
}

if runtime.GOOS == "linux" {
// check if the TCP port is open/used
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", k8s_pod.DefaultKubeletReadOnlyPort), 3*time.Second)
if err != nil {
log.Logger.Debugw("tcp port is not open", "port", k8s_pod.DefaultKubeletReadOnlyPort, "error", err)
} else {
log.Logger.Debugw("tcp port is open", "port", k8s_pod.DefaultKubeletReadOnlyPort)
conn.Close()

kerr := k8s_pod.CheckKubeletReadOnlyPort(ctx, k8s_pod.DefaultKubeletReadOnlyPort)
// check
if kerr != nil {
log.Logger.Debugw("kubelet readonly port is not open", "port", k8s_pod.DefaultKubeletReadOnlyPort, "error", kerr)
} else {
log.Logger.Debugw("auto-detected kubelet readonly port -- configuring k8s pod components", "port", k8s_pod.DefaultKubeletReadOnlyPort)

// "k8s_pod" requires kubelet read-only port
// assume if kubelet is running, it opens the most common read-only port 10255
cfg.Components[k8s_pod.Name] = k8s_pod.Config{
Query: query_config.DefaultConfig(),
Port: k8s_pod.DefaultKubeletReadOnlyPort,
}
}
}
} else {
log.Logger.Debugw("ignoring default kubelet checking since it's not linux", "os", runtime.GOOS)
if cc, exists := DefaultContainerdComponent(ctx); exists {
cfg.Components[containerd_pod.Name] = cc
}

if docker_container.IsDockerRunning() {
log.Logger.Debugw("auto-detected docker -- configuring docker container component")
cfg.Components[docker_container.Name] = nil
if cc, exists := DefaultK8sPodComponent(ctx); exists {
cfg.Components[k8s_pod.Name] = cc
}

if _, err := stdos.Stat(power_supply.DefaultBatteryCapacityFile); err == nil {
cfg.Components[power_supply.Name] = nil
}

if runtime.GOOS == "linux" {
if dmesg.DmesgExists() {
if asRoot {
log.Logger.Debugw("auto-detected dmesg -- configuring dmesg component")
cfg.Components[dmesg.Name] = dmesg.DefaultConfig()
} else {
log.Logger.Debugw("auto-detected dmesg but running as root -- skipping")
}
}
} else {
log.Logger.Debugw("auto-detect dmesg not supported -- skipping", "os", runtime.GOOS)
if cc, exists := DefaultDmesgComponent(); exists {
cfg.Components[dmesg.Name] = cc
}

if runtime.GOOS == "linux" {
Expand Down Expand Up @@ -351,3 +288,133 @@ func DefaultFifoFile() (string, error) {
}
return filepath.Join(f, "gpud.fifo"), nil
}

func DefaultContainerdComponent(ctx context.Context) (any, bool) {
if runtime.GOOS != "linux" {
log.Logger.Debugw("ignoring default containerd pod checking since it's not linux", "os", runtime.GOOS)
return nil, false
}

p, err := pkg_file.LocateExecutable("containerd")
if p != "" && err == nil {
log.Logger.Debugw("containerd found in PATH", "path", p)
return containerd_pod.Config{
Query: query_config.DefaultConfig(),
Endpoint: containerd_pod.DefaultContainerRuntimeEndpoint,
}, true
}
log.Logger.Debugw("containerd not found in PATH -- fallback to containerd run checks", "error", err)

containerdSocketExists := false
containerdRunning := false

if _, err := stdos.Stat(containerd_pod.DefaultSocketFile); err == nil {
log.Logger.Debugw("containerd default socket file exists, containerd installed", "file", containerd_pod.DefaultSocketFile)
containerdSocketExists = true
} else {
log.Logger.Debugw("containerd default socket file does not exist, skip containerd check", "file", containerd_pod.DefaultSocketFile, "error", err)
}

cctx, ccancel := context.WithTimeout(ctx, 5*time.Second)
defer ccancel()

if _, _, conn, err := containerd_pod.Connect(cctx, containerd_pod.DefaultContainerRuntimeEndpoint); err == nil {
log.Logger.Debugw("containerd default cri endpoint open, containerd running", "endpoint", containerd_pod.DefaultContainerRuntimeEndpoint)
containerdRunning = true
_ = conn.Close()
} else {
log.Logger.Debugw("containerd default cri endpoint not open, skip containerd checking", "endpoint", containerd_pod.DefaultContainerRuntimeEndpoint, "error", err)
}

if containerdSocketExists && containerdRunning {
log.Logger.Debugw("auto-detected containerd -- configuring containerd pod component")
return containerd_pod.Config{
Query: query_config.DefaultConfig(),
Endpoint: containerd_pod.DefaultContainerRuntimeEndpoint,
}, true
}
return nil, false
}

func DefaultDockerContainerComponent(ctx context.Context) (any, bool) {
p, err := pkg_file.LocateExecutable("docker")
if p != "" && err == nil {
log.Logger.Debugw("docker found in PATH", "path", p)
return docker_container.Config{
Query: query_config.DefaultConfig(),
}, true
}
log.Logger.Debugw("docker not found in PATH -- fallback to docker run checks", "error", err)

if docker_container.IsDockerRunning() {
log.Logger.Debugw("auto-detected docker -- configuring docker container component")
return docker_container.Config{
Query: query_config.DefaultConfig(),
}, true
}
return nil, false
}

func DefaultK8sPodComponent(ctx context.Context) (any, bool) {
if runtime.GOOS != "linux" {
log.Logger.Debugw("ignoring default kubelet checking since it's not linux", "os", runtime.GOOS)
return nil, false
}

p, err := pkg_file.LocateExecutable("kubelet")
if p != "" && err == nil {
log.Logger.Debugw("kubelet found in PATH", "path", p)
return k8s_pod.Config{
Query: query_config.DefaultConfig(),
Port: k8s_pod.DefaultKubeletReadOnlyPort,
}, true
}
log.Logger.Debugw("kubelet not found in PATH -- fallback to kubelet run checks", "error", err)

// check if the TCP port is open/used
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", k8s_pod.DefaultKubeletReadOnlyPort), 3*time.Second)
if err != nil {
log.Logger.Debugw("tcp port is not open", "port", k8s_pod.DefaultKubeletReadOnlyPort, "error", err)
} else {
log.Logger.Debugw("tcp port is open", "port", k8s_pod.DefaultKubeletReadOnlyPort)
conn.Close()

kerr := k8s_pod.CheckKubeletReadOnlyPort(ctx, k8s_pod.DefaultKubeletReadOnlyPort)
// check
if kerr != nil {
log.Logger.Debugw("kubelet readonly port is not open", "port", k8s_pod.DefaultKubeletReadOnlyPort, "error", kerr)
} else {
log.Logger.Debugw("auto-detected kubelet readonly port -- configuring k8s pod components", "port", k8s_pod.DefaultKubeletReadOnlyPort)

// "k8s_pod" requires kubelet read-only port
// assume if kubelet is running, it opens the most common read-only port 10255
return k8s_pod.Config{
Query: query_config.DefaultConfig(),
Port: k8s_pod.DefaultKubeletReadOnlyPort,
}, true
}
}

return nil, false
}

func DefaultDmesgComponent() (any, bool) {
if runtime.GOOS != "linux" {
log.Logger.Debugw("ignoring default dmesg since it's not linux", "os", runtime.GOOS)
return nil, false
}

asRoot := stdos.Geteuid() == 0 // running as root
if !asRoot {
log.Logger.Debugw("auto-detected dmesg but running as root -- skipping")
return nil, false
}

if dmesg.DmesgExists() {
log.Logger.Debugw("auto-detected dmesg -- configuring dmesg component")
return dmesg.DefaultConfig(), true
}

log.Logger.Debugw("dmesg does not exist -- skipping dmesg component")
return nil, false
}
11 changes: 8 additions & 3 deletions internal/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

lep_components "github.com/leptonai/gpud/components"
Expand All @@ -17,9 +18,11 @@ import (
)

type globalHandler struct {
cfg *lep_config.Config
components map[string]lep_components.Component
componentNames []string
cfg *lep_config.Config
components map[string]lep_components.Component

componentNamesMu sync.RWMutex
componentNames []string
}

func newGlobalHandler(cfg *lep_config.Config, components map[string]lep_components.Component) *globalHandler {
Expand Down Expand Up @@ -61,6 +64,8 @@ func (g *globalHandler) getReqTime(c *gin.Context) (time.Time, time.Time, error)
func (g *globalHandler) getReqComponents(c *gin.Context) ([]string, error) {
components := c.Query("components")
if components == "" {
g.componentNamesMu.RLock()
defer g.componentNamesMu.RUnlock()
return g.componentNames, nil
}

Expand Down
Loading

0 comments on commit 57c9c86

Please sign in to comment.