Skip to content

Commit

Permalink
feat(nvidia): persist xid, sxid events
Browse files Browse the repository at this point in the history
feat(nvidia/query/xid-sxid-state): initial commit
feat(server): call create table, purge for xid/sxid events table
feat(pkg/sqlite): add Open function
feat(server): sqlite in server
feat(nvml): persist xid

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 9, 2024
1 parent db5e04b commit d5f8a68
Show file tree
Hide file tree
Showing 75 changed files with 1,696 additions and 291 deletions.
3 changes: 2 additions & 1 deletion cmd/gpud/command/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/leptonai/gpud/config"
"github.com/leptonai/gpud/internal/login"
"github.com/leptonai/gpud/internal/server"
"github.com/leptonai/gpud/pkg/sqlite"

"github.com/urfave/cli"
)
Expand All @@ -30,7 +31,7 @@ func cmdLogin(cliContext *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to get state file: %w", err)
}
db, err := state.Open(stateFile)
db, err := sqlite.Open(stateFile)
if err != nil {
return fmt.Errorf("failed to open state file: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/gpud/command/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (

"github.com/leptonai/gpud/components/state"
"github.com/leptonai/gpud/config"
"github.com/leptonai/gpud/pkg/sqlite"
)

func GetUID(ctx context.Context) (string, error) {
stateFile, err := config.DefaultStateFile()
if err != nil {
return "", fmt.Errorf("failed to get state file: %w", err)
}
db, err := state.Open(stateFile)
db, err := sqlite.Open(stateFile)
if err != nil {
return "", fmt.Errorf("failed to open state file: %w", err)
}
Expand Down
128 changes: 128 additions & 0 deletions components/accelerator/nvidia/error-xid-sxid/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Package errorxidsxid implements NVIDIA GPU driver Xid/SXid error detector.
package errorxidsxid

import (
"context"
"database/sql"
"fmt"
"strconv"
"time"

"github.com/dustin/go-humanize"
"github.com/leptonai/gpud/components"
nvidia_error_xid_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid/id"
nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query"
nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()

// this starts the Xid poller via "nvml.StartDefaultInstance"
cctx, ccancel := context.WithCancel(ctx)
nvidia_query.DefaultPoller.Start(cctx, cfg.Query, nvidia_error_xid_sxid_id.Name)

return &component{
rootCtx: ctx,
cancel: ccancel,
poller: nvidia_query.DefaultPoller,
db: cfg.Query.State.DB,
}
}

var _ components.Component = (*component)(nil)

type component struct {
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
db *sql.DB
}

func (c *component) Name() string { return nvidia_error_xid_sxid_id.Name }

func (c *component) States(ctx context.Context) ([]components.State, error) {
return nil, nil
}

const (
EventNameErroXid = "error_xid"
EventNameErroSXid = "error_sxid"

EventKeyUnixSeconds = "unix_seconds"
EventKeyData = "data"
EventKeyEncoding = "encoding"
EventValueEncodingJSON = "json"
)

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
events, err := nvidia_xid_sxid_state.ReadEvents(ctx, c.db, nvidia_xid_sxid_state.WithSince(since))
if err != nil {
return nil, err
}

convertedEvents := make([]components.Event, 0, len(events))
for _, event := range events {
if xidDetail := event.ToXidDetail(); xidDetail != nil {
msg := fmt.Sprintf("xid %d detected by %s (%s)",
event.EventID,
event.DataSource,
humanize.Time(time.Unix(event.UnixSeconds, 0)),
)
xidBytes, _ := xidDetail.JSON()

convertedEvents = append(convertedEvents, components.Event{
Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0)},
Name: EventNameErroXid,
Message: msg,
ExtraInfo: map[string]string{
EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10),
EventKeyData: string(xidBytes),
EventKeyEncoding: EventValueEncodingJSON,
},
})
continue
}

if sxidDetail := event.ToSXidDetail(); sxidDetail != nil {
msg := fmt.Sprintf("sxid %d detected by %s (%s)",
event.EventID,
event.DataSource,
humanize.Time(time.Unix(event.UnixSeconds, 0)),
)
sxidBytes, _ := sxidDetail.JSON()

convertedEvents = append(convertedEvents, components.Event{
Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0)},
Name: EventNameErroSXid,
Message: msg,
ExtraInfo: map[string]string{
EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10),
EventKeyData: string(sxidBytes),
EventKeyEncoding: EventValueEncodingJSON,
},
})
continue
}
}
return convertedEvents, nil
}

func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) {
log.Logger.Debugw("querying metrics", "since", since)

return nil, nil
}

func (c *component) Close() error {
log.Logger.Debugw("closing component")

// safe to call stop multiple times
_ = c.poller.Stop(nvidia_error_xid_sxid_id.Name)

return nil
}
32 changes: 32 additions & 0 deletions components/accelerator/nvidia/error-xid-sxid/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package errorxidsxid

import (
"database/sql"
"encoding/json"

query_config "github.com/leptonai/gpud/components/query/config"
)

type Config struct {
Query query_config.Config `json:"query"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
raw, err := json.Marshal(b)
if err != nil {
return nil, err
}
cfg := new(Config)
err = json.Unmarshal(raw, cfg)
if err != nil {
return nil, err
}
if cfg.Query.State != nil {
cfg.Query.State.DB = db
}
return cfg, nil
}

func (cfg Config) Validate() error {
return nil
}
4 changes: 4 additions & 0 deletions components/accelerator/nvidia/error-xid-sxid/id/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package id is the identifier for the nvidia error xid sxid component.
package id

const Name = "accelerator-nvidia-error-xid-sxid"
24 changes: 13 additions & 11 deletions components/accelerator/nvidia/error/sxid/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@ package sxid

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/leptonai/gpud/components"
nvidia_component_error_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid/id"
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
"github.com/leptonai/gpud/components/dmesg"
"github.com/leptonai/gpud/log"
)

const Name = "accelerator-nvidia-error-sxid"

func New() components.Component {
return &component{}
func New(db *sql.DB) components.Component {
return &component{db: db}
}

var _ components.Component = (*component)(nil)

type component struct{}
type component struct {
db *sql.DB
}

func (c *component) Name() string { return Name }
func (c *component) Name() string { return nvidia_component_error_sxid_id.Name }

func (c *component) States(ctx context.Context) ([]components.State, error) {
return []components.State{{
Expand All @@ -33,10 +35,10 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
}}, nil
}

// fetchOutput fetches the latest output from the dmesg
// tailScan fetches the latest output from the dmesg
// it is ok to call this function multiple times for the following reasons (thus shared with events method)
// 1) dmesg "FetchStateWithTailScanner" is cheap (just tails the last x number of lines)
func (c *component) fetchOutput() (*Output, error) {
// 1) dmesg "TailScan" is cheap (just tails the last x number of lines)
func (c *component) tailScan() (*Output, error) {
dmesgC, err := components.GetComponent(dmesg.Name)
if err != nil {
return nil, err
Expand All @@ -50,7 +52,7 @@ func (c *component) fetchOutput() (*Output, error) {
return nil, fmt.Errorf("expected *dmesg.Component, got %T", dmesgC)
}
}
dmesgTailResults, err := dmesgComponent.FetchStateWithTailScanner()
dmesgTailResults, err := dmesgComponent.TailScan()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -78,7 +80,7 @@ func (c *component) fetchOutput() (*Output, error) {
}

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
o, err := c.fetchOutput()
o, err := c.tailScan()
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions components/accelerator/nvidia/error/sxid/id/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package id provides the nvidia error sxid id component.
package id

const Name = "accelerator-nvidia-error-sxid"
23 changes: 11 additions & 12 deletions components/accelerator/nvidia/error/xid/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@ import (
"time"

"github.com/leptonai/gpud/components"
nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id"
nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
"github.com/leptonai/gpud/components/dmesg"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/log"
)

const Name = "accelerator-nvidia-error-xid"

func New(ctx context.Context, cfg Config) components.Component {
cfg.Query.SetDefaultsIfNotSet()
setDefaultPoller(cfg)

cctx, ccancel := context.WithCancel(ctx)
getDefaultPoller().Start(cctx, cfg.Query, Name)
getDefaultPoller().Start(cctx, cfg.Query, nvidia_component_error_xid_id.Name)

return &component{
rootCtx: ctx,
Expand All @@ -40,7 +39,7 @@ type component struct {
poller query.Poller
}

func (c *component) Name() string { return Name }
func (c *component) Name() string { return nvidia_component_error_xid_id.Name }

// Just checks if the xid poller is working.
func (c *component) States(_ context.Context) ([]components.State, error) {
Expand All @@ -49,7 +48,7 @@ func (c *component) States(_ context.Context) ([]components.State, error) {
// no data yet from realtime xid poller
// just return whatever we got from dmesg
if err == query.ErrNoData {
log.Logger.Debugw("nothing found in last state (no data collected yet)", "component", Name)
log.Logger.Debugw("nothing found in last state (no data collected yet)", "component", nvidia_component_error_xid_id.Name)
return []components.State{
{
Name: StateNameErrorXid,
Expand Down Expand Up @@ -77,11 +76,11 @@ func (c *component) States(_ context.Context) ([]components.State, error) {
}, nil
}

// fetchOutput fetches the latest output from the dmesg and the NVML poller
// tailScan fetches the latest output from the dmesg and the NVML poller
// it is ok to call this function multiple times for the following reasons (thus shared with events method)
// 1) dmesg "FetchStateWithTailScanner" is cheap (just tails the last x number of lines)
// 1) dmesg "TailScan" is cheap (just tails the last x number of lines)
// 2) NVML poller "Last" costs nothing, since we simply read the last state in the queue (no NVML call involved)
func (c *component) fetchOutput() (*Output, error) {
func (c *component) tailScan() (*Output, error) {
dmesgC, err := components.GetComponent(dmesg.Name)
if err != nil {
return nil, err
Expand All @@ -95,7 +94,7 @@ func (c *component) fetchOutput() (*Output, error) {
return nil, fmt.Errorf("expected *dmesg.Component, got %T", dmesgC)
}
}
dmesgTailResults, err := dmesgComponent.FetchStateWithTailScanner()
dmesgTailResults, err := dmesgComponent.TailScan()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,7 +123,7 @@ func (c *component) fetchOutput() (*Output, error) {
// no data yet from realtime xid poller
// just return whatever we got from dmesg
if err == query.ErrNoData {
log.Logger.Debugw("nothing found in last state (no data collected yet)", "component", Name)
log.Logger.Debugw("nothing found in last state (no data collected yet)", "component", nvidia_component_error_xid_id.Name)
return o, nil
}

Expand Down Expand Up @@ -155,7 +154,7 @@ func (c *component) fetchOutput() (*Output, error) {
}

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
o, err := c.fetchOutput()
o, err := c.tailScan()
if err != nil {
return nil, err
}
Expand All @@ -172,7 +171,7 @@ func (c *component) Close() error {
log.Logger.Debugw("closing component")

// safe to call stop multiple times
c.poller.Stop(Name)
c.poller.Stop(nvidia_component_error_xid_id.Name)

return nil
}
9 changes: 5 additions & 4 deletions components/accelerator/nvidia/error/xid/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/leptonai/gpud/components"
nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id"
nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
"github.com/leptonai/gpud/components/common"
Expand Down Expand Up @@ -223,7 +224,7 @@ var (
// only set once since it relies on the kube client and specific port
func setDefaultPoller(cfg Config) {
defaultPollerOnce.Do(func() {
defaultPoller = query.New(Name, cfg.Query, CreateGet())
defaultPoller = query.New(nvidia_component_error_xid_id.Name, cfg.Query, CreateGet())
})
}

Expand All @@ -234,12 +235,12 @@ func getDefaultPoller() query.Poller {
// DO NOT for-loop here
// the query.GetFunc is already called periodically in a loop by the poller
func CreateGet() query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
components_metrics.SetGetFailed(nvidia_component_error_xid_id.Name)
} else {
components_metrics.SetGetSuccess(Name)
components_metrics.SetGetSuccess(nvidia_component_error_xid_id.Name)
}
}()

Expand Down
Loading

0 comments on commit d5f8a68

Please sign in to comment.