diff --git a/components/accelerator/nvidia/bad-envs/component.go b/components/accelerator/nvidia/bad-envs/component.go index d8a28cf8..b81b3e7f 100644 --- a/components/accelerator/nvidia/bad-envs/component.go +++ b/components/accelerator/nvidia/bad-envs/component.go @@ -17,12 +17,12 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, bad_envs_id.Name) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, bad_envs_id.Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/clock-speed/component.go b/components/accelerator/nvidia/clock-speed/component.go index ec93f162..70013e57 100644 --- a/components/accelerator/nvidia/clock-speed/component.go +++ b/components/accelerator/nvidia/clock-speed/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/clock/component.go b/components/accelerator/nvidia/clock/component.go index e2817a12..d3070684 100644 --- a/components/accelerator/nvidia/clock/component.go +++ b/components/accelerator/nvidia/clock/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/ecc/component.go b/components/accelerator/nvidia/ecc/component.go index 154418e8..0a97546b 100644 --- a/components/accelerator/nvidia/ecc/component.go +++ b/components/accelerator/nvidia/ecc/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/error-xid-sxid/component.go b/components/accelerator/nvidia/error-xid-sxid/component.go new file mode 100644 index 00000000..5e7118b6 --- /dev/null +++ b/components/accelerator/nvidia/error-xid-sxid/component.go @@ -0,0 +1,128 @@ +// Package errorxidsxid implements NVIDIA GPU driver Xid/SXid error detector. +package errorxidsxid + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + "github.com/dustin/go-humanize" + "github.com/leptonai/gpud/components" + nvidia_error_xid_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid/id" + nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" + nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" + "github.com/leptonai/gpud/components/query" + "github.com/leptonai/gpud/log" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func New(ctx context.Context, cfg Config) components.Component { + cfg.Query.SetDefaultsIfNotSet() + + // this starts the Xid poller via "nvml.StartDefaultInstance" + cctx, ccancel := context.WithCancel(ctx) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_error_xid_sxid_id.Name) + + return &component{ + rootCtx: ctx, + cancel: ccancel, + poller: nvidia_query.GetDefaultPoller(), + db: cfg.Query.State.DB, + } +} + +var _ components.Component = (*component)(nil) + +type component struct { + rootCtx context.Context + cancel context.CancelFunc + poller query.Poller + db *sql.DB +} + +func (c *component) Name() string { return nvidia_error_xid_sxid_id.Name } + +func (c *component) States(ctx context.Context) ([]components.State, error) { + return nil, nil +} + +const ( + EventNameErroXid = "error_xid" + EventNameErroSXid = "error_sxid" + + EventKeyUnixSeconds = "unix_seconds" + EventKeyData = "data" + EventKeyEncoding = "encoding" + EventValueEncodingJSON = "json" +) + +func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) { + events, err := nvidia_xid_sxid_state.ReadEvents(ctx, c.db, nvidia_xid_sxid_state.WithSince(since)) + if err != nil { + return nil, err + } + + convertedEvents := make([]components.Event, 0, len(events)) + for _, event := range events { + if xidDetail := event.ToXidDetail(); xidDetail != nil { + msg := fmt.Sprintf("xid %d detected by %s (%s)", + event.EventID, + event.DataSource, + humanize.Time(time.Unix(event.UnixSeconds, 0)), + ) + xidBytes, _ := xidDetail.JSON() + + convertedEvents = append(convertedEvents, components.Event{ + Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0)}, + Name: EventNameErroXid, + Message: msg, + ExtraInfo: map[string]string{ + EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10), + EventKeyData: string(xidBytes), + EventKeyEncoding: EventValueEncodingJSON, + }, + }) + continue + } + + if sxidDetail := event.ToSXidDetail(); sxidDetail != nil { + msg := fmt.Sprintf("sxid %d detected by %s (%s)", + event.EventID, + event.DataSource, + humanize.Time(time.Unix(event.UnixSeconds, 0)), + ) + sxidBytes, _ := sxidDetail.JSON() + + convertedEvents = append(convertedEvents, components.Event{ + Time: metav1.Time{Time: time.Unix(event.UnixSeconds, 0)}, + Name: EventNameErroSXid, + Message: msg, + ExtraInfo: map[string]string{ + EventKeyUnixSeconds: strconv.FormatInt(event.UnixSeconds, 10), + EventKeyData: string(sxidBytes), + EventKeyEncoding: EventValueEncodingJSON, + }, + }) + continue + } + } + return convertedEvents, nil +} + +func (c *component) Metrics(ctx context.Context, since time.Time) ([]components.Metric, error) { + log.Logger.Debugw("querying metrics", "since", since) + + return nil, nil +} + +func (c *component) Close() error { + log.Logger.Debugw("closing component") + + // safe to call stop multiple times + _ = c.poller.Stop(nvidia_error_xid_sxid_id.Name) + + return nil +} diff --git a/components/accelerator/nvidia/error-xid-sxid/config.go b/components/accelerator/nvidia/error-xid-sxid/config.go new file mode 100644 index 00000000..df6f4660 --- /dev/null +++ b/components/accelerator/nvidia/error-xid-sxid/config.go @@ -0,0 +1,32 @@ +package errorxidsxid + +import ( + "database/sql" + "encoding/json" + + query_config "github.com/leptonai/gpud/components/query/config" +) + +type Config struct { + Query query_config.Config `json:"query"` +} + +func ParseConfig(b any, db *sql.DB) (*Config, error) { + raw, err := json.Marshal(b) + if err != nil { + return nil, err + } + cfg := new(Config) + err = json.Unmarshal(raw, cfg) + if err != nil { + return nil, err + } + if cfg.Query.State != nil { + cfg.Query.State.DB = db + } + return cfg, nil +} + +func (cfg Config) Validate() error { + return nil +} diff --git a/components/accelerator/nvidia/error-xid-sxid/id/id.go b/components/accelerator/nvidia/error-xid-sxid/id/id.go new file mode 100644 index 00000000..1417bbd7 --- /dev/null +++ b/components/accelerator/nvidia/error-xid-sxid/id/id.go @@ -0,0 +1,4 @@ +// Package id is the identifier for the nvidia error xid sxid component. +package id + +const Name = "accelerator-nvidia-error-xid-sxid" diff --git a/components/accelerator/nvidia/error/component.go b/components/accelerator/nvidia/error/component.go index 28227ff4..047c4fde 100644 --- a/components/accelerator/nvidia/error/component.go +++ b/components/accelerator/nvidia/error/component.go @@ -18,12 +18,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/fabric-manager/component.go b/components/accelerator/nvidia/fabric-manager/component.go index 8f50eacf..2ed04c92 100644 --- a/components/accelerator/nvidia/fabric-manager/component.go +++ b/components/accelerator/nvidia/fabric-manager/component.go @@ -21,7 +21,8 @@ func New(ctx context.Context, cfg Config) (components.Component, error) { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Log.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) if err := cfg.Log.Validate(); err != nil { ccancel() @@ -38,7 +39,7 @@ func New(ctx context.Context, cfg Config) (components.Component, error) { return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), logPoller: fabric_manager_log.GetDefaultPoller(), }, nil } diff --git a/components/accelerator/nvidia/fabric-manager/component_test.go b/components/accelerator/nvidia/fabric-manager/component_test.go index 1012dabf..ce8be8fe 100644 --- a/components/accelerator/nvidia/fabric-manager/component_test.go +++ b/components/accelerator/nvidia/fabric-manager/component_test.go @@ -6,10 +6,11 @@ import ( "testing" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - query_config "github.com/leptonai/gpud/components/query/config" query_log_config "github.com/leptonai/gpud/components/query/log/config" + "github.com/leptonai/gpud/pkg/sqlite" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestComponentLog(t *testing.T) { @@ -28,6 +29,12 @@ func TestComponentLog(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, err := sqlite.Open(":memory:") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + pollInterval := 3 * time.Second component, err := New( ctx, @@ -35,6 +42,9 @@ func TestComponentLog(t *testing.T) { Log: query_log_config.Config{ Query: query_config.Config{ Interval: metav1.Duration{Duration: pollInterval}, + State: &query_config.State{ + DB: db, + }, }, BufferSize: query_log_config.DefaultBufferSize, File: f.Name(), diff --git a/components/accelerator/nvidia/gsp-firmware-mode/component.go b/components/accelerator/nvidia/gsp-firmware-mode/component.go index aef00aa6..30482dcc 100644 --- a/components/accelerator/nvidia/gsp-firmware-mode/component.go +++ b/components/accelerator/nvidia/gsp-firmware-mode/component.go @@ -17,12 +17,12 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, nvidia_gsp_firmware_mode_id.Name) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_gsp_firmware_mode_id.Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/infiniband/component.go b/components/accelerator/nvidia/infiniband/component.go index bde1ca43..26bb9a63 100644 --- a/components/accelerator/nvidia/infiniband/component.go +++ b/components/accelerator/nvidia/infiniband/component.go @@ -19,12 +19,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/info/component.go b/components/accelerator/nvidia/info/component.go index c9bb42e1..8781675d 100644 --- a/components/accelerator/nvidia/info/component.go +++ b/components/accelerator/nvidia/info/component.go @@ -18,12 +18,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/memory/component.go b/components/accelerator/nvidia/memory/component.go index 61bf08d5..97327647 100644 --- a/components/accelerator/nvidia/memory/component.go +++ b/components/accelerator/nvidia/memory/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/nccl/component.go b/components/accelerator/nvidia/nccl/component.go index fc4e914e..7c3dbd23 100644 --- a/components/accelerator/nvidia/nccl/component.go +++ b/components/accelerator/nvidia/nccl/component.go @@ -20,12 +20,12 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, nvidia_nccl_id.Name) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_nccl_id.Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/nvlink/component.go b/components/accelerator/nvidia/nvlink/component.go index 86ae14b6..244f52f8 100644 --- a/components/accelerator/nvidia/nvlink/component.go +++ b/components/accelerator/nvidia/nvlink/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/peermem/component.go b/components/accelerator/nvidia/peermem/component.go index bfada62e..55c097b9 100644 --- a/components/accelerator/nvidia/peermem/component.go +++ b/components/accelerator/nvidia/peermem/component.go @@ -20,12 +20,12 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, nvidia_peermem_id.Name) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_peermem_id.Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/persistence-mode/component.go b/components/accelerator/nvidia/persistence-mode/component.go index ab8f020e..8ffd5aab 100644 --- a/components/accelerator/nvidia/persistence-mode/component.go +++ b/components/accelerator/nvidia/persistence-mode/component.go @@ -17,12 +17,12 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, nvidia_persistence_mode_id.Name) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, nvidia_persistence_mode_id.Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/power/component.go b/components/accelerator/nvidia/power/component.go index 4f5ef601..a059f862 100644 --- a/components/accelerator/nvidia/power/component.go +++ b/components/accelerator/nvidia/power/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/processes/component.go b/components/accelerator/nvidia/processes/component.go index 00695a40..6b29f560 100644 --- a/components/accelerator/nvidia/processes/component.go +++ b/components/accelerator/nvidia/processes/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/query/fabric-manager-log/poller.go b/components/accelerator/nvidia/query/fabric-manager-log/poller.go index 766a6efe..984ad0a5 100644 --- a/components/accelerator/nvidia/query/fabric-manager-log/poller.go +++ b/components/accelerator/nvidia/query/fabric-manager-log/poller.go @@ -24,6 +24,7 @@ func CreateDefaultPoller(ctx context.Context, cfg query_log_config.Config) error ctx, cfg, ExtractTimeFromLogLine, + nil, ) if err != nil { panic(err) diff --git a/components/accelerator/nvidia/query/nvml/nvml.go b/components/accelerator/nvidia/query/nvml/nvml.go index 2baa81a2..2284a964 100644 --- a/components/accelerator/nvidia/query/nvml/nvml.go +++ b/components/accelerator/nvidia/query/nvml/nvml.go @@ -4,12 +4,14 @@ package nvml import ( "context" + "database/sql" "errors" "fmt" "sort" "sync" "time" + components_nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" "github.com/leptonai/gpud/log" "github.com/NVIDIA/go-nvlib/pkg/nvlib/device" @@ -59,6 +61,8 @@ type instance struct { // maps from uuid to device info devices map[string]*DeviceInfo + db *sql.DB + xidErrorSupported bool xidEventMask uint64 xidEventSet nvml.EventSet @@ -159,6 +163,7 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { if err := op.applyOpts(opts); err != nil { return nil, err } + gpmMetricsIDs := make([]nvml.GpmMetricId, 0, len(op.gpmMetricsIDs)) for id := range op.gpmMetricsIDs { gpmMetricsIDs = append(gpmMetricsIDs, id) @@ -216,6 +221,8 @@ func NewInstance(ctx context.Context, opts ...OpOption) (Instance, error) { nvmlExists: nvmlExists, nvmlExistsMsg: nvmlExistsMsg, + db: op.db, + xidErrorSupported: false, xidEventSet: xidEventSet, xidEventMask: defaultXidEventMask, @@ -244,6 +251,12 @@ func (inst *instance) Start() error { inst.mu.Lock() defer inst.mu.Unlock() + ctx, cancel := context.WithTimeout(inst.rootCtx, 10*time.Second) + defer cancel() + if err := components_nvidia_xid_sxid_state.CreateTableXidSXidEventHistory(ctx, inst.db); err != nil { + return err + } + devices, err := inst.deviceLib.GetDevices() if err != nil { return err diff --git a/components/accelerator/nvidia/query/nvml/options.go b/components/accelerator/nvidia/query/nvml/options.go index 18b4169c..e4ee9518 100644 --- a/components/accelerator/nvidia/query/nvml/options.go +++ b/components/accelerator/nvidia/query/nvml/options.go @@ -1,10 +1,14 @@ package nvml import ( + "database/sql" + "github.com/NVIDIA/go-nvml/pkg/nvml" + "github.com/leptonai/gpud/pkg/sqlite" ) type Op struct { + db *sql.DB gpmMetricsIDs map[nvml.GpmMetricId]struct{} } @@ -14,9 +18,25 @@ func (op *Op) applyOpts(opts []OpOption) error { for _, opt := range opts { opt(op) } + if op.db == nil { + var err error + op.db, err = sqlite.Open(":memory:") + if err != nil { + return err + } + } return nil } +// Specifies the database instance to persist nvidia components data +// (e.g., xid/sxid events). +// If not specified, a new in-memory database is created. +func WithDB(db *sql.DB) OpOption { + return func(op *Op) { + op.db = db + } +} + func WithGPMMetricsID(ids ...nvml.GpmMetricId) OpOption { return func(op *Op) { if op.gpmMetricsIDs == nil { diff --git a/components/accelerator/nvidia/query/nvml/xid.go b/components/accelerator/nvidia/query/nvml/xid.go index 7a68fb86..909cc4a0 100644 --- a/components/accelerator/nvidia/query/nvml/xid.go +++ b/components/accelerator/nvidia/query/nvml/xid.go @@ -1,10 +1,12 @@ package nvml import ( + "context" "fmt" "time" nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid" + components_nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" "github.com/leptonai/gpud/log" "github.com/NVIDIA/go-nvml/pkg/nvml" @@ -164,6 +166,21 @@ func (inst *instance) pollXidEvents() { } log.Logger.Warnw("detected xid event", "xid", xid, "event", event) + + // no need to check duplicate entries, assuming nvml event poller does not return old events + ctx, cancel := context.WithTimeout(inst.rootCtx, 10*time.Second) + werr := components_nvidia_xid_sxid_state.InsertEvent(ctx, inst.db, components_nvidia_xid_sxid_state.Event{ + UnixSeconds: event.Time.Unix(), + DataSource: "nvml", + EventType: "xid", + EventID: int64(event.Xid), + EventDetails: "", + }) + cancel() + if werr != nil { + log.Logger.Errorw("failed to insert xid event into database", "error", werr) + } + select { case <-inst.rootCtx.Done(): return diff --git a/components/accelerator/nvidia/query/query.go b/components/accelerator/nvidia/query/query.go index 105be7c9..65c9f464 100644 --- a/components/accelerator/nvidia/query/query.go +++ b/components/accelerator/nvidia/query/query.go @@ -4,6 +4,7 @@ package query import ( "context" + "database/sql" "fmt" "os" "sync" @@ -30,18 +31,32 @@ import ( "sigs.k8s.io/yaml" ) -var DefaultPoller = query.New( - "shared-nvidia-poller", - query_config.Config{ - Interval: metav1.Duration{Duration: query_config.DefaultPollInterval}, - QueueSize: query_config.DefaultQueueSize, - State: &query_config.State{ - Retention: metav1.Duration{Duration: query_config.DefaultStateRetention}, - }, - }, - Get, +var ( + defaultPollerOnce sync.Once + defaultPoller query.Poller ) +// only set once since it relies on the kube client and specific port +func SetDefaultPoller(db *sql.DB) { + defaultPollerOnce.Do(func() { + defaultPoller = query.New( + "shared-nvidia-poller", + query_config.Config{ + Interval: metav1.Duration{Duration: query_config.DefaultPollInterval}, + QueueSize: query_config.DefaultQueueSize, + State: &query_config.State{ + Retention: metav1.Duration{Duration: query_config.DefaultStateRetention}, + }, + }, + CreateGet(db), + ) + }) +} + +func GetDefaultPoller() query.Poller { + return defaultPoller +} + var ( getSuccessOnceCloseOnce sync.Once getSuccessOnce = make(chan any) @@ -51,10 +66,17 @@ func GetSuccessOnce() <-chan any { return getSuccessOnce } +func CreateGet(db *sql.DB) query.GetFunc { + return func(ctx context.Context) (_ any, e error) { + return Get(ctx, db) + } +} + // Get all nvidia component queries. -func Get(ctx context.Context) (output any, err error) { +func Get(ctx context.Context, db *sql.DB) (output any, err error) { if err := nvml.StartDefaultInstance( ctx, + nvml.WithDB(db), nvml.WithGPMMetricsID( go_nvml.GPM_METRIC_SM_OCCUPANCY, go_nvml.GPM_METRIC_INTEGER_UTIL, diff --git a/components/accelerator/nvidia/query/sxid/sxid.go b/components/accelerator/nvidia/query/sxid/sxid.go index 7c134b1b..2949a1a8 100644 --- a/components/accelerator/nvidia/query/sxid/sxid.go +++ b/components/accelerator/nvidia/query/sxid/sxid.go @@ -1,7 +1,11 @@ // Package sxid provides the NVIDIA SXID error details. package sxid -import "github.com/leptonai/gpud/components/common" +import ( + "encoding/json" + + "github.com/leptonai/gpud/components/common" +) // Defines the SXid error information that is static. // ref. https://docs.nvidia.com/datacenter/tesla/pdf/fabric-manager-user-guide.pdf @@ -25,6 +29,10 @@ type Detail struct { OtherImpact string `json:"other_impact"` } +func (d Detail) JSON() ([]byte, error) { + return json.Marshal(d) +} + // Returns the error if found. // Otherwise, returns false. func GetDetail(id int) (*Detail, bool) { diff --git a/components/accelerator/nvidia/query/xid-sxid-state/options.go b/components/accelerator/nvidia/query/xid-sxid-state/options.go new file mode 100644 index 00000000..50627fdd --- /dev/null +++ b/components/accelerator/nvidia/query/xid-sxid-state/options.go @@ -0,0 +1,67 @@ +package xidsxidstate + +import ( + "errors" + "time" +) + +var ErrInvalidLimit = errors.New("limit must be greater than or equal to 0") + +type Op struct { + sinceUnixSeconds int64 + beforeUnixSeconds int64 + sortUnixSecondsAscOrder bool + limit int +} + +type OpOption func(*Op) + +func (op *Op) applyOpts(opts []OpOption) error { + for _, opt := range opts { + opt(op) + } + + if op.limit < 0 { + return ErrInvalidLimit + } + + return nil +} + +// WithSince sets the since timestamp for the select queries. +// If not specified, it returns all events. +func WithSince(t time.Time) OpOption { + return func(op *Op) { + op.sinceUnixSeconds = t.UTC().Unix() + } +} + +// WithBefore sets the before timestamp for the delete queries. +// If not specified, it deletes all events. +func WithBefore(t time.Time) OpOption { + return func(op *Op) { + op.beforeUnixSeconds = t.UTC().Unix() + } +} + +// WithSortUnixSecondsAscendingOrder sorts the events by unix_seconds in ascending order, +// meaning its read query returns the oldest events first. +func WithSortUnixSecondsAscendingOrder() OpOption { + return func(op *Op) { + op.sortUnixSecondsAscOrder = true + } +} + +// WithSortUnixSecondsDescendingOrder sorts the events by unix_seconds in descending order, +// meaning its read query returns the newest events first. +func WithSortUnixSecondsDescendingOrder() OpOption { + return func(op *Op) { + op.sortUnixSecondsAscOrder = false + } +} + +func WithLimit(limit int) OpOption { + return func(op *Op) { + op.limit = limit + } +} diff --git a/components/accelerator/nvidia/query/xid-sxid-state/state.go b/components/accelerator/nvidia/query/xid-sxid-state/state.go new file mode 100644 index 00000000..c4957225 --- /dev/null +++ b/components/accelerator/nvidia/query/xid-sxid-state/state.go @@ -0,0 +1,269 @@ +// Package xidsxidstate provides the persistent storage layer for the nvidia query results. +package xidsxidstate + +import ( + "context" + "database/sql" + "errors" + "fmt" + + nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid" + nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid" + + _ "github.com/mattn/go-sqlite3" +) + +const TableNameXidSXidEventHistory = "components_accelerator_nvidia_query_xid_sxid_event_history" + +const ( + // unix timestamp in seconds when the event was observed + ColumnUnixSeconds = "unix_seconds" + + // either "nvml" or "dmesg" + ColumnDataSource = "data_source" + + // either "xid" or "sxid" + ColumnEventType = "event_type" + + // event id; xid or sxid + ColumnEventID = "event_id" + + // event details; dmesg log line + ColumnEventDetails = "event_details" +) + +type Event struct { + UnixSeconds int64 + DataSource string + EventType string + EventID int64 + EventDetails string +} + +func (e Event) ToXidDetail() *nvidia_query_xid.Detail { + if e.EventType != "xid" { + return nil + } + d, ok := nvidia_query_xid.GetDetail(int(e.EventID)) + if !ok { + return nil + } + return d +} + +func (e Event) ToSXidDetail() *nvidia_query_sxid.Detail { + if e.EventType != "sxid" { + return nil + } + d, ok := nvidia_query_sxid.GetDetail(int(e.EventID)) + if !ok { + return nil + } + return d +} + +func CreateTableXidSXidEventHistory(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, fmt.Sprintf(` +CREATE TABLE IF NOT EXISTS %s ( + %s INTEGER NOT NULL, + %s TEXT NOT NULL, + %s TEXT NOT NULL, + %s INTEGER NOT NULL, + %s TEXT +);`, TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + )) + return err +} + +func InsertEvent(ctx context.Context, db *sql.DB, event Event) error { + insertStatement := fmt.Sprintf(` +INSERT OR REPLACE INTO %s (%s, %s, %s, %s, %s) VALUES (?, ?, ?, ?, NULLIF(?, '')); +`, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + ) + _, err := db.ExecContext( + ctx, + insertStatement, + event.UnixSeconds, + event.DataSource, + event.EventType, + event.EventID, + event.EventDetails, + ) + return err +} + +func FindEvent(ctx context.Context, db *sql.DB, event Event) (bool, error) { + selectStatement := fmt.Sprintf(` +SELECT %s, %s, %s, %s, %s FROM %s WHERE %s = ? AND %s = ? AND %s = ? AND %s = ?; +`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ) + + var foundEvent Event + if err := db.QueryRowContext( + ctx, + selectStatement, + event.UnixSeconds, + event.DataSource, + event.EventType, + event.EventID, + ).Scan( + &foundEvent.UnixSeconds, + &foundEvent.DataSource, + &foundEvent.EventType, + &foundEvent.EventID, + &foundEvent.EventDetails, + ); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + + // event at the same time but with different details + if foundEvent.EventDetails != "" && foundEvent.EventDetails != event.EventDetails { + return false, nil + } + + // found event + // e.g., same messages in dmesg + return true, nil +} + +// Returns nil if no event is found. +func ReadEvents(ctx context.Context, db *sql.DB, opts ...OpOption) ([]Event, error) { + selectStatement, args, err := createSelectStatementAndArgs(opts...) + if err != nil { + return nil, err + } + + rows, err := db.QueryContext(ctx, selectStatement, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + events := []Event{} + for rows.Next() { + var event Event + if err := rows.Scan( + &event.UnixSeconds, + &event.DataSource, + &event.EventType, + &event.EventID, + &event.EventDetails, + ); err != nil { + return nil, err + } + events = append(events, event) + } + if err := rows.Err(); err != nil { + return nil, err + } + if len(events) == 0 { + return nil, nil + } + + return events, nil +} + +func createSelectStatementAndArgs(opts ...OpOption) (string, []any, error) { + op := &Op{} + if err := op.applyOpts(opts); err != nil { + return "", nil, err + } + + selectStatement := fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ) + + args := []any{} + + if op.sinceUnixSeconds > 0 { + selectStatement += "\nWHERE " + selectStatement += fmt.Sprintf("%s >= ?", ColumnUnixSeconds) + args = append(args, op.sinceUnixSeconds) + } + + if op.sortUnixSecondsAscOrder { + selectStatement += "\nORDER BY " + ColumnUnixSeconds + " ASC" + } else { + selectStatement += "\nORDER BY " + ColumnUnixSeconds + " DESC" + } + + if op.limit > 0 { + selectStatement += fmt.Sprintf("\nLIMIT %d", op.limit) + } + + if len(args) == 0 { + return selectStatement, nil, nil + } + return selectStatement, args, nil +} + +func Purge(ctx context.Context, db *sql.DB, opts ...OpOption) (int, error) { + deleteStatement, args, err := createDeleteStatementAndArgs(opts...) + if err != nil { + return 0, err + } + rs, err := db.ExecContext(ctx, deleteStatement, args...) + if err != nil { + return 0, err + } + affected, err := rs.RowsAffected() + if err != nil { + return 0, err + } + return int(affected), nil +} + +// ignores order by and limit +func createDeleteStatementAndArgs(opts ...OpOption) (string, []any, error) { + op := &Op{} + if err := op.applyOpts(opts); err != nil { + return "", nil, err + } + + deleteStatement := fmt.Sprintf(`DELETE FROM %s`, + TableNameXidSXidEventHistory, + ) + + args := []any{} + + if op.beforeUnixSeconds > 0 { + deleteStatement += " WHERE " + deleteStatement += fmt.Sprintf("%s < ?", ColumnUnixSeconds) + args = append(args, op.beforeUnixSeconds) + } + + if len(args) == 0 { + return deleteStatement, nil, nil + } + return deleteStatement, args, nil +} diff --git a/components/accelerator/nvidia/query/xid-sxid-state/state_test.go b/components/accelerator/nvidia/query/xid-sxid-state/state_test.go new file mode 100644 index 00000000..218c69a6 --- /dev/null +++ b/components/accelerator/nvidia/query/xid-sxid-state/state_test.go @@ -0,0 +1,507 @@ +package xidsxidstate + +import ( + "context" + "database/sql" + "fmt" + "os" + "reflect" + "testing" + "time" + + "github.com/leptonai/gpud/pkg/sqlite" +) + +func TestCreateSelectStatement(t *testing.T) { + tests := []struct { + name string + opts []OpOption + want string + wantArgs []any + wantErr bool + }{ + { + name: "no options", + opts: nil, + want: fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s +ORDER BY %s DESC`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ), + wantArgs: nil, + wantErr: false, + }, + { + name: "with since unix seconds", + opts: []OpOption{WithSince(time.Unix(1234, 0))}, + want: fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s +WHERE %s >= ? +ORDER BY %s DESC`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ColumnUnixSeconds, + ), + wantArgs: []any{int64(1234)}, + wantErr: false, + }, + { + name: "with ascending order", + opts: []OpOption{WithSortUnixSecondsAscendingOrder()}, + want: fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s +ORDER BY %s ASC`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ), + wantArgs: nil, + wantErr: false, + }, + { + name: "with limit", + opts: []OpOption{WithLimit(10)}, + want: fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s +ORDER BY %s DESC +LIMIT 10`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ), + wantArgs: nil, + wantErr: false, + }, + { + name: "with all options", + opts: []OpOption{ + WithSince(time.Unix(1234, 0)), + WithSortUnixSecondsAscendingOrder(), + WithLimit(10), + }, + want: fmt.Sprintf(`SELECT %s, %s, %s, %s, %s +FROM %s +WHERE %s >= ? +ORDER BY %s ASC +LIMIT 10`, + ColumnUnixSeconds, + ColumnDataSource, + ColumnEventType, + ColumnEventID, + ColumnEventDetails, + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ColumnUnixSeconds, + ), + wantArgs: []any{int64(1234)}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, args, err := createSelectStatementAndArgs(tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("createSelectStatement() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("createSelectStatement() = %v, want %v", got, tt.want) + } + if !reflect.DeepEqual(args, tt.wantArgs) { + t.Errorf("createSelectStatement() args = %v, want %v", args, tt.wantArgs) + } + }) + } +} + +func TestOpenMemory(t *testing.T) { + t.Parallel() + + db, err := sqlite.Open(":memory:") + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := CreateTableXidSXidEventHistory(ctx, db); err != nil { + t.Fatal("failed to create table:", err) + } +} + +func TestInsertAndFindEvent(t *testing.T) { + t.Parallel() + + db, cleanup := setupTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + event := Event{ + UnixSeconds: time.Now().Unix(), + DataSource: "nvml", + EventType: "xid", + EventID: 31, + EventDetails: "GPU has fallen off the bus", + } + + // Test insertion + err := InsertEvent(ctx, db, event) + if err != nil { + t.Errorf("InsertEvent failed: %v", err) + } + + // Test finding the event + found, err := FindEvent(ctx, db, event) + if err != nil { + t.Errorf("FindEvent failed: %v", err) + } + if !found { + t.Error("expected to find event, but it wasn't found") + } + + // Test finding event with different details + eventDiffDetails := event + eventDiffDetails.EventDetails = "Different details" + found, err = FindEvent(ctx, db, eventDiffDetails) + if err != nil { + t.Errorf("FindEvent with different details failed: %v", err) + } + if found { + t.Error("expected not to find event with different details") + } +} + +func TestReadEvents_NoRows(t *testing.T) { + t.Parallel() + + db, cleanup := setupTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // test ReadEvents with empty table + events, err := ReadEvents(ctx, db) + if err != nil { + t.Errorf("expected no error for empty table, got: %v", err) + } + if events != nil { + t.Errorf("expected nil events for empty table, got: %v", events) + } +} + +func TestReadEvents(t *testing.T) { + t.Parallel() + + db, cleanup := setupTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + baseTime := time.Now().Unix() + + // Insert test events + testEvents := []Event{ + { + UnixSeconds: baseTime, + DataSource: "nvml", + EventType: "xid", + EventID: 31, + EventDetails: "First event", + }, + { + UnixSeconds: baseTime + 1, + DataSource: "dmesg", + EventType: "sxid", + EventID: 32, + EventDetails: "Second event", + }, + { + UnixSeconds: baseTime + 2, + DataSource: "nvml", + EventType: "xid", + EventID: 33, + EventDetails: "Third event", + }, + } + + for _, event := range testEvents { + if err := InsertEvent(ctx, db, event); err != nil { + t.Fatalf("failed to insert test event: %v", err) + } + } + + // test reading all events + events, err := ReadEvents(ctx, db) + if err != nil { + t.Errorf("ReadEvents failed: %v", err) + } + if len(events) != len(testEvents) { + t.Errorf("expected %d events, got %d", len(testEvents), len(events)) + } + + // test reading events with limit + events, err = ReadEvents(ctx, db, WithLimit(2)) + if err != nil { + t.Errorf("ReadEvents with limit failed: %v", err) + } + if len(events) != 2 { + t.Errorf("expected 2 events with limit, got %d", len(events)) + } + + // test reading events since specific time + events, err = ReadEvents(ctx, db, WithSince(time.Unix(baseTime+1, 0))) + if err != nil { + t.Errorf("ReadEvents with since time failed: %v", err) + } + + t.Logf("searching for events since: %d", baseTime+1) + for _, e := range events { + t.Logf("Found event with timestamp: %d", e.UnixSeconds) + } + + if len(events) != 2 { + t.Errorf("expected 2 events since baseTime+1, got %d", len(events)) + } + + // Test reading events with ascending order + events, err = ReadEvents(ctx, db, WithSortUnixSecondsAscendingOrder()) + if err != nil { + t.Errorf("ReadEvents with ascending order failed: %v", err) + } + if len(events) != 3 || events[0].UnixSeconds > events[len(events)-1].UnixSeconds { + t.Error("Events not properly ordered in ascending order") + } +} + +func TestCreateDeleteStatementAndArgs(t *testing.T) { + tests := []struct { + name string + opts []OpOption + wantStatement string + wantArgs []any + wantErr bool + }{ + { + name: "no options", + opts: []OpOption{}, + wantStatement: fmt.Sprintf("DELETE FROM %s", + TableNameXidSXidEventHistory, + ), + wantArgs: nil, + wantErr: false, + }, + { + name: "with before unix seconds and limit", + opts: []OpOption{ + WithBefore(time.Unix(1234, 0)), + WithLimit(10), + }, + wantStatement: fmt.Sprintf("DELETE FROM %s WHERE %s < ?", + TableNameXidSXidEventHistory, + ColumnUnixSeconds, + ), + wantArgs: []any{int64(1234)}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStatement, gotArgs, err := createDeleteStatementAndArgs(tt.opts...) + + if tt.wantErr { + if err == nil { + t.Error("createDeleteStatementAndArgs() error = nil, wantErr = true") + } + return + } + + if err != nil { + t.Errorf("createDeleteStatementAndArgs() error = %v, wantErr = false", err) + return + } + + if gotStatement != tt.wantStatement { + t.Errorf("createDeleteStatementAndArgs() statement = %v, want %v", gotStatement, tt.wantStatement) + } + + if !reflect.DeepEqual(gotArgs, tt.wantArgs) { + t.Errorf("createDeleteStatementAndArgs() args = %v, want %v", gotArgs, tt.wantArgs) + } + }) + } +} + +func TestPurge(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + setup []Event + opts []OpOption + wantErr bool + wantPurged int + wantCount int + validate func(*testing.T, []Event) + }{ + { + name: "delete events before timestamp", + setup: []Event{ + {UnixSeconds: 1000, DataSource: "nvml", EventType: "xid", EventID: 1, EventDetails: "detail1"}, + {UnixSeconds: 2000, DataSource: "nvml", EventType: "xid", EventID: 2, EventDetails: "detail2"}, + {UnixSeconds: 3000, DataSource: "nvml", EventType: "xid", EventID: 3, EventDetails: "detail3"}, + }, + opts: []OpOption{WithBefore(time.Unix(2500, 0))}, + wantPurged: 2, + wantCount: 1, + validate: func(t *testing.T, events []Event) { + if len(events) == 0 || events[0].UnixSeconds != 3000 { + t.Errorf("expected event with timestamp 3000, got %+v", events) + } + }, + }, + { + name: "delete all events", + setup: []Event{ + {UnixSeconds: 1000, DataSource: "nvml", EventType: "xid", EventID: 1, EventDetails: "detail1"}, + {UnixSeconds: 2000, DataSource: "nvml", EventType: "xid", EventID: 2, EventDetails: "detail2"}, + }, + opts: []OpOption{}, + wantPurged: 2, + wantCount: 0, + }, + { + name: "delete events with large dataset", + setup: func() []Event { + events := make([]Event, 100) + baseTime := time.Now().Unix() + for i := 0; i < 100; i++ { + events[i] = Event{ + UnixSeconds: baseTime + int64(i*60), // Events 1 minute apart + DataSource: "nvml", + EventType: "xid", + EventID: int64(i + 1), + EventDetails: fmt.Sprintf("detail%d", i+1), + } + } + return events + }(), + opts: []OpOption{WithBefore(time.Now().Add(30 * time.Minute))}, + wantPurged: 30, + wantCount: 70, + validate: func(t *testing.T, events []Event) { + if len(events) != 70 { + t.Errorf("expected 70 events, got %d", len(events)) + } + cutoff := time.Now().Add(30 * time.Minute).Unix() + for _, e := range events { + if e.UnixSeconds < cutoff { + t.Errorf("found event with timestamp %d, which is before cutoff %d", + e.UnixSeconds, cutoff) + } + } + }, + }, + } + + for _, tt := range tests { + tt := tt // capture range variable + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // setup fresh database for each test + db, cleanup := setupTestDB(t) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Insert test data + for _, event := range tt.setup { + if err := InsertEvent(ctx, db, event); err != nil { + t.Fatalf("failed to insert test event: %v", err) + } + } + + // perform deletion + purged, err := Purge(ctx, db, tt.opts...) + if (err != nil) != tt.wantErr { + t.Errorf("DeleteEvents() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if purged != tt.wantPurged { + t.Errorf("DeleteEvents() purged = %v, want %v", purged, tt.wantPurged) + } + + // verify results + events, err := ReadEvents(ctx, db) + if err != nil { + t.Fatalf("failed to read events: %v", err) + } + + if len(events) != tt.wantCount { + t.Errorf("expected %d events, got %d", tt.wantCount, len(events)) + } + + if tt.validate != nil { + tt.validate(t, events) + } + }) + } +} + +func setupTestDB(t *testing.T) (*sql.DB, func()) { + tmpfile, err := os.CreateTemp("", "test-nvidia-*.db") + if err != nil { + t.Fatalf("failed to create temp file: %v", err) + } + tmpfile.Close() + + db, err := sqlite.Open(tmpfile.Name()) + if err != nil { + os.Remove(tmpfile.Name()) + t.Fatalf("failed to open database: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := CreateTableXidSXidEventHistory(ctx, db); err != nil { + db.Close() + os.Remove(tmpfile.Name()) + t.Fatalf("failed to create table: %v", err) + } + + cleanup := func() { + db.Close() + os.Remove(tmpfile.Name()) + } + return db, cleanup +} diff --git a/components/accelerator/nvidia/query/xid/xid.go b/components/accelerator/nvidia/query/xid/xid.go index 268295b6..1e733b5a 100644 --- a/components/accelerator/nvidia/query/xid/xid.go +++ b/components/accelerator/nvidia/query/xid/xid.go @@ -2,6 +2,8 @@ package xid import ( + "encoding/json" + "github.com/leptonai/gpud/components/common" ) @@ -48,6 +50,10 @@ type Detail struct { PotentialFBCorruption bool `json:"potential_fb_corruption"` } +func (d Detail) JSON() ([]byte, error) { + return json.Marshal(d) +} + // if nvidia says only possible reason is hw, then we do hard inspections directly func (d Detail) IsOnlyHWError() bool { if !d.PotentialHWError { diff --git a/components/accelerator/nvidia/remapped-rows/component.go b/components/accelerator/nvidia/remapped-rows/component.go index 133ebf5d..990b552f 100644 --- a/components/accelerator/nvidia/remapped-rows/component.go +++ b/components/accelerator/nvidia/remapped-rows/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/temperature/component.go b/components/accelerator/nvidia/temperature/component.go index 283ce12a..a3b361c6 100644 --- a/components/accelerator/nvidia/temperature/component.go +++ b/components/accelerator/nvidia/temperature/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/accelerator/nvidia/utilization/component.go b/components/accelerator/nvidia/utilization/component.go index 7617108b..3718a414 100644 --- a/components/accelerator/nvidia/utilization/component.go +++ b/components/accelerator/nvidia/utilization/component.go @@ -22,12 +22,13 @@ func New(ctx context.Context, cfg Config) components.Component { cfg.Query.SetDefaultsIfNotSet() cctx, ccancel := context.WithCancel(ctx) - nvidia_query.DefaultPoller.Start(cctx, cfg.Query, Name) + nvidia_query.SetDefaultPoller(cfg.Query.State.DB) + nvidia_query.GetDefaultPoller().Start(cctx, cfg.Query, Name) return &component{ rootCtx: ctx, cancel: ccancel, - poller: nvidia_query.DefaultPoller, + poller: nvidia_query.GetDefaultPoller(), } } diff --git a/components/diagnose/scan.go b/components/diagnose/scan.go index 1f5037bc..69b38cb2 100644 --- a/components/diagnose/scan.go +++ b/components/diagnose/scan.go @@ -19,6 +19,7 @@ import ( "github.com/leptonai/gpud/pkg/file" latency_edge "github.com/leptonai/gpud/pkg/latency/edge" "github.com/leptonai/gpud/pkg/process" + "github.com/leptonai/gpud/pkg/sqlite" ) const ( @@ -87,7 +88,13 @@ func Scan(ctx context.Context, opts ...OpOption) error { } } - outputRaw, err := nvidia_query.Get(ctx) + db, err := sqlite.Open(":memory:") + if err != nil { + log.Logger.Fatalw("failed to open database", "error", err) + } + defer db.Close() + + outputRaw, err := nvidia_query.Get(ctx, db) if err != nil { log.Logger.Warnw("error getting nvidia info", "error", err) } else { diff --git a/components/dmesg/component.go b/components/dmesg/component.go index 8c363e4c..46683ad1 100644 --- a/components/dmesg/component.go +++ b/components/dmesg/component.go @@ -8,30 +8,31 @@ import ( "github.com/leptonai/gpud/components" query_log "github.com/leptonai/gpud/components/query/log" + query_log_common "github.com/leptonai/gpud/components/query/log/common" query_log_tail "github.com/leptonai/gpud/components/query/log/tail" "github.com/leptonai/gpud/log" ) const Name = "dmesg" -func New(ctx context.Context, cfg Config) (components.Component, error) { +func New(ctx context.Context, cfg Config, processMatched query_log_common.ProcessMatchedFunc) (components.Component, error) { if err := cfg.Log.Validate(); err != nil { return nil, err } cfg.Log.SetDefaultsIfNotSet() - if err := createDefaultLogPoller(ctx, cfg); err != nil { + if err := createDefaultLogPoller(ctx, cfg, processMatched); err != nil { return nil, err } cctx, ccancel := context.WithCancel(ctx) - GetDefaultLogPoller().Start(cctx, cfg.Log.Query, Name) + defaultLogPoller.Start(cctx, cfg.Log.Query, Name) return &Component{ cfg: &cfg, rootCtx: ctx, cancel: ccancel, - logPoller: GetDefaultLogPoller(), + logPoller: defaultLogPoller, }, nil } diff --git a/components/dmesg/component_test.go b/components/dmesg/component_test.go index 540c0a32..9c901f19 100644 --- a/components/dmesg/component_test.go +++ b/components/dmesg/component_test.go @@ -58,6 +58,7 @@ func TestComponent(t *testing.T) { SelectFilters: filters, }, }, + nil, ) if err != nil { t.Fatalf("failed to create component: %v", err) diff --git a/components/dmesg/config.go b/components/dmesg/config.go index 28c9c9b9..55217e4a 100644 --- a/components/dmesg/config.go +++ b/components/dmesg/config.go @@ -29,7 +29,6 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) { if cfg.Log.Query.State != nil { cfg.Log.Query.State.DB = db } - cfg.Log.DB = db return cfg, nil } diff --git a/components/dmesg/poller.go b/components/dmesg/poller.go index 2234e626..18322837 100644 --- a/components/dmesg/poller.go +++ b/components/dmesg/poller.go @@ -5,6 +5,7 @@ import ( "sync" query_log "github.com/leptonai/gpud/components/query/log" + query_log_common "github.com/leptonai/gpud/components/query/log/common" pkg_dmesg "github.com/leptonai/gpud/pkg/dmesg" ) @@ -14,13 +15,14 @@ var ( ) // only set once since it relies on the kube client and specific port -func createDefaultLogPoller(ctx context.Context, cfg Config) error { +func createDefaultLogPoller(ctx context.Context, cfg Config, processMatched query_log_common.ProcessMatchedFunc) error { var err error defaultLogPollerOnce.Do(func() { defaultLogPoller, err = query_log.New( ctx, cfg.Log, pkg_dmesg.ParseCtimeWithError, + processMatched, ) if err != nil { panic(err) diff --git a/components/query/log/config/config.go b/components/query/log/config/config.go index dcb5b077..12c8fe4a 100644 --- a/components/query/log/config/config.go +++ b/components/query/log/config/config.go @@ -3,7 +3,6 @@ package config import ( "context" - "database/sql" "encoding/json" "errors" @@ -39,7 +38,6 @@ type Config struct { // (e.g., good healthy log messages). RejectFilters []*query_log_common.Filter `json:"reject_filters"` - DB *sql.DB `json:"-"` SeekInfo *tail.SeekInfo `json:"seek_info,omitempty"` // Used to commit the last seek info to disk. diff --git a/components/query/log/poller.go b/components/query/log/poller.go index 27ebc255..604d6e8f 100644 --- a/components/query/log/poller.go +++ b/components/query/log/poller.go @@ -76,20 +76,21 @@ type poller struct { cfg query_log_config.Config - tailLogger query_log_tail.Streamer + tailLogger query_log_tail.Streamer + tailFileSeekInfoMu sync.RWMutex tailFileSeekInfo tail.SeekInfo - tailFileSeekInfoSyncer func(ctx context.Context, file string, seekInfo tail.SeekInfo) `json:"-"` + tailFileSeekInfoSyncer func(ctx context.Context, file string, seekInfo tail.SeekInfo) bufferedItemsMu sync.RWMutex bufferedItems []Item } -func New(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc) (Poller, error) { - return newPoller(ctx, cfg, parseTime) +func New(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc, processMatched query_log_common.ProcessMatchedFunc) (Poller, error) { + return newPoller(ctx, cfg, parseTime, processMatched) } -func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc) (*poller, error) { +func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc, processMatched query_log_common.ProcessMatchedFunc) (*poller, error) { if err := cfg.Validate(); err != nil { return nil, err } @@ -100,6 +101,7 @@ func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query query_log_tail.WithSelectFilter(cfg.SelectFilters...), query_log_tail.WithRejectFilter(cfg.RejectFilters...), query_log_tail.WithParseTime(parseTime), + query_log_tail.WithProcessMatched(processMatched), } var tailLogger query_log_tail.Streamer @@ -149,6 +151,9 @@ func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query return pl, nil } +// pollSync polls the log tail from the specified file or long-running commands +// and syncs the items to the buffered items. +// This only catches the realtime/latest and all the future logs. func (pl *poller) pollSync(ctx context.Context) { for line := range pl.tailLogger.Line() { item := Item{ @@ -157,6 +162,7 @@ func (pl *poller) pollSync(ctx context.Context) { Matched: line.MatchedFilter, Error: line.Err, } + pl.bufferedItemsMu.Lock() pl.bufferedItems = append(pl.bufferedItems, item) pl.bufferedItemsMu.Unlock() @@ -182,47 +188,7 @@ func (pl *poller) Commands() [][]string { return pl.tailLogger.Commands() } -func (pl *poller) TailScan(ctx context.Context, opts ...query_log_tail.OpOption) ([]Item, error) { - items := make([]Item, 0) - processMatchedFunc := func(line []byte, time time.Time, matchedFilter *query_log_common.Filter) { - items = append(items, Item{ - Time: metav1.Time{Time: time}, - Line: string(line), - Matched: matchedFilter, - }) - } - - options := []query_log_tail.OpOption{ - query_log_tail.WithProcessMatched(processMatchedFunc), - } - if pl.cfg.File != "" { - options = append(options, query_log_tail.WithFile(pl.cfg.File)) - } - if len(pl.cfg.Commands) > 0 { - options = append(options, query_log_tail.WithCommands(pl.cfg.Commands)) - } - if pl.cfg.Scan != nil && pl.cfg.Scan.File != "" { - options = append(options, query_log_tail.WithFile(pl.cfg.Scan.File)) - } - if pl.cfg.Scan != nil && len(pl.cfg.Scan.Commands) > 0 { - options = append(options, query_log_tail.WithCommands(pl.cfg.Scan.Commands)) - } - if len(pl.cfg.SelectFilters) > 0 { - options = append(options, query_log_tail.WithSelectFilter(pl.cfg.SelectFilters...)) - } - if _, err := query_log_tail.Scan( - ctx, - append(options, opts...)..., - ); err != nil { - return nil, err - } - if len(items) == 0 { - return nil, nil - } - - return items, nil -} - +// This only catches the realtime/latest and all the future logs. // Returns `github.com/leptonai/gpud/components/query.ErrNoData` if there is no event found. func (pl *poller) Find(since time.Time, selectFilters ...*query_log_common.Filter) ([]Item, error) { // 1. filter the already flushed/in-queue ones diff --git a/components/query/log/poller_test.go b/components/query/log/poller_test.go index 5a508bd0..e028b073 100644 --- a/components/query/log/poller_test.go +++ b/components/query/log/poller_test.go @@ -25,7 +25,7 @@ func TestPoller(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - poller, err := newPoller(ctx, cfg, nil) + poller, err := newPoller(ctx, cfg, nil, nil) if err != nil { t.Fatalf("failed to create log poller: %v", err) } @@ -87,7 +87,7 @@ func TestPollerTail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - poller, err := newPoller(ctx, cfg, nil) + poller, err := newPoller(ctx, cfg, nil, nil) if err != nil { t.Fatalf("failed to create log poller: %v", err) } diff --git a/components/query/log/tail/scan_test.go b/components/query/log/tail/scan_test.go index f687a3b3..8b403786 100644 --- a/components/query/log/tail/scan_test.go +++ b/components/query/log/tail/scan_test.go @@ -11,6 +11,7 @@ import ( "time" query_log_common "github.com/leptonai/gpud/components/query/log/common" + "k8s.io/utils/ptr" ) diff --git a/components/query/log/tail/streamer.go b/components/query/log/tail/streamer.go index 78fced09..debb38c6 100644 --- a/components/query/log/tail/streamer.go +++ b/components/query/log/tail/streamer.go @@ -4,6 +4,7 @@ import ( "sync" query_log_common "github.com/leptonai/gpud/components/query/log/common" + "github.com/nxadm/tail" ) diff --git a/components/query/log/tail/streamer_command.go b/components/query/log/tail/streamer_command.go index 789daf60..07effb4d 100644 --- a/components/query/log/tail/streamer_command.go +++ b/components/query/log/tail/streamer_command.go @@ -112,7 +112,7 @@ func (sr *commandStreamer) pollLoops(scanner *bufio.Scanner) { sr.dedup.mu.Unlock() } - ts, err = sr.op.parseTime([]byte(s)) + ts, err = sr.op.parseTime(scanner.Bytes()) if err != nil { log.Logger.Warnw("error parsing time", "error", err) continue @@ -130,6 +130,10 @@ func (sr *commandStreamer) pollLoops(scanner *bufio.Scanner) { continue } + if sr.op.processMatched != nil { + sr.op.processMatched(scanner.Bytes(), ts, matchedFilter) + } + lineToSend := Line{ Line: &tail.Line{ Text: s, diff --git a/components/query/log/tail/streamer_file.go b/components/query/log/tail/streamer_file.go index 5252bb6e..0d62e40e 100644 --- a/components/query/log/tail/streamer_file.go +++ b/components/query/log/tail/streamer_file.go @@ -103,6 +103,10 @@ func (sr *fileStreamer) pollLoops() { line.Time = time.Now().UTC() } + if sr.op.processMatched != nil { + sr.op.processMatched([]byte(line.Text), line.Time, matchedFilter) + } + lineToSend := Line{ Line: line, MatchedFilter: matchedFilter, diff --git a/components/query/log/tail_scan.go b/components/query/log/tail_scan.go new file mode 100644 index 00000000..605c9f5a --- /dev/null +++ b/components/query/log/tail_scan.go @@ -0,0 +1,54 @@ +package log + +import ( + "context" + "time" + + query_log_common "github.com/leptonai/gpud/components/query/log/common" + query_log_tail "github.com/leptonai/gpud/components/query/log/tail" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TailScan tails the last N lines without polling, just by reading the file. +// This only catches the old logs, not the future ones. +func (pl *poller) TailScan(ctx context.Context, opts ...query_log_tail.OpOption) ([]Item, error) { + items := make([]Item, 0) + processMatchedFunc := func(line []byte, time time.Time, matchedFilter *query_log_common.Filter) { + items = append(items, Item{ + Time: metav1.Time{Time: time}, + Line: string(line), + Matched: matchedFilter, + }) + } + + options := []query_log_tail.OpOption{ + query_log_tail.WithProcessMatched(processMatchedFunc), + } + if pl.cfg.File != "" { + options = append(options, query_log_tail.WithFile(pl.cfg.File)) + } + if len(pl.cfg.Commands) > 0 { + options = append(options, query_log_tail.WithCommands(pl.cfg.Commands)) + } + if pl.cfg.Scan != nil && pl.cfg.Scan.File != "" { + options = append(options, query_log_tail.WithFile(pl.cfg.Scan.File)) + } + if pl.cfg.Scan != nil && len(pl.cfg.Scan.Commands) > 0 { + options = append(options, query_log_tail.WithCommands(pl.cfg.Scan.Commands)) + } + if len(pl.cfg.SelectFilters) > 0 { + options = append(options, query_log_tail.WithSelectFilter(pl.cfg.SelectFilters...)) + } + if _, err := query_log_tail.Scan( + ctx, + append(options, opts...)..., + ); err != nil { + return nil, err + } + if len(items) == 0 { + return nil, nil + } + + return items, nil +} diff --git a/components/query/poller.go b/components/query/poller.go index b3e793af..f7eef1fd 100644 --- a/components/query/poller.go +++ b/components/query/poller.go @@ -12,6 +12,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var ErrNoData = errors.New("no data collected yet in the poller") + // Defines the common query/poller interface. // It polls the data source (rather than watch) in order // to share the same data source with multiple components (consumer). @@ -21,13 +23,14 @@ type Poller interface { // Returns the poller ID. ID() string + // Starts the poller routine. + // Redundant calls will be skipped if there's an existing poller. + Start(ctx context.Context, cfg query_config.Config, componentName string) + // Config returns the config used to start the poller. // This is useful for debugging and logging. Config() query_config.Config - // Starts the poller routine. - // Redundant calls will be skipped if there's an existing poller. - Start(ctx context.Context, cfg query_config.Config, componentName string) // Stops the poller routine. // Safe to call multiple times. // Returns "true" if the poller was stopped with its reference count being zero. @@ -230,6 +233,10 @@ func (pl *poller) processItem(item Item) { return } + pl.insertItemToInMemoryQueue(item) +} + +func (pl *poller) insertItemToInMemoryQueue(item Item) { queueN := pl.Config().QueueSize pl.lastItemsMu.Lock() @@ -241,9 +248,13 @@ func (pl *poller) processItem(item Item) { pl.lastItems = append(pl.lastItems, item) } -var ErrNoData = errors.New("no data collected yet in the poller") - +// Last returns the last item in the queue. +// It returns ErrNoData if no item is collected yet. func (pl *poller) Last() (*Item, error) { + return pl.readLastItemFromInMemoryQueue() +} + +func (pl *poller) readLastItemFromInMemoryQueue() (*Item, error) { pl.lastItemsMu.RLock() defer pl.lastItemsMu.RUnlock() @@ -254,7 +265,13 @@ func (pl *poller) Last() (*Item, error) { return &pl.lastItems[len(pl.lastItems)-1], nil } +// All returns all results in the queue since the given time. +// It returns ErrNoData if no item is collected yet. func (pl *poller) All(since time.Time) ([]Item, error) { + return pl.readAllItemsFromInMemoryQueue(since) +} + +func (pl *poller) readAllItemsFromInMemoryQueue(since time.Time) ([]Item, error) { pl.lastItemsMu.RLock() defer pl.lastItemsMu.RUnlock() @@ -270,5 +287,10 @@ func (pl *poller) All(since time.Time) ([]Item, error) { } items = append(items, item) } + + if len(items) == 0 { + return nil, ErrNoData + } + return items, nil } diff --git a/components/query/poller_test.go b/components/query/poller_test.go index e9f601f1..98c30adb 100644 --- a/components/query/poller_test.go +++ b/components/query/poller_test.go @@ -11,6 +11,46 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func TestPoller_ReadLastItemFromInMemoryQueue(t *testing.T) { + pl := &poller{ + lastItems: []Item{}, + } + + // Test empty queue + item, err := pl.readLastItemFromInMemoryQueue() + if err != ErrNoData { + t.Errorf("expected ErrNoData, got %v", err) + } + if item != nil { + t.Errorf("expected nil item, got %v", item) + } +} + +func TestPoller_ReadAllItemsFromInMemoryQueue(t *testing.T) { + pl := &poller{ + lastItems: []Item{}, + } + + // Test empty queue + items, err := pl.readAllItemsFromInMemoryQueue(time.Time{}) + if err != ErrNoData { + t.Errorf("expected ErrNoData, got %v", err) + } + if items != nil { + t.Errorf("expected nil items, got %v", items) + } + + // Test with since time but empty queue + since := time.Now() + items, err = pl.readAllItemsFromInMemoryQueue(since) + if err != ErrNoData { + t.Errorf("expected ErrNoData, got %v", err) + } + if items != nil { + t.Errorf("expected nil items, got %v", items) + } +} + func TestPoller_processResult(t *testing.T) { now := time.Now() diff --git a/config/default.go b/config/default.go index b90adf57..242fe511 100644 --- a/config/default.go +++ b/config/default.go @@ -13,6 +13,7 @@ import ( nvidia_clockspeed "github.com/leptonai/gpud/components/accelerator/nvidia/clock-speed" nvidia_ecc "github.com/leptonai/gpud/components/accelerator/nvidia/ecc" nvidia_error "github.com/leptonai/gpud/components/accelerator/nvidia/error" + nvidia_component_error_xid_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid/id" nvidia_component_error_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid/id" nvidia_component_error_xid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid/id" nvidia_fabric_manager "github.com/leptonai/gpud/components/accelerator/nvidia/fabric-manager" @@ -238,6 +239,7 @@ func DefaultConfig(ctx context.Context, opts ...OpOption) (*Config, error) { if _, ok := cfg.Components[dmesg.Name]; ok { cfg.Components[nvidia_component_error_xid_id.Name] = nil cfg.Components[nvidia_component_error_sxid_id.Name] = nil + cfg.Components[nvidia_component_error_xid_sxid_id.Name] = nil } cfg.Components[nvidia_info.Name] = nil diff --git a/docs/COMPONENTS.md b/docs/COMPONENTS.md index 6a4969ff..35dc8b84 100644 --- a/docs/COMPONENTS.md +++ b/docs/COMPONENTS.md @@ -9,6 +9,7 @@ - [**`accelerator-nvidia-error`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/error): Tracks NVIDIA GPU errors real-time in the SMI queries -- likely requires host restarts. - [**`accelerator-nvidia-error-sxid`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid): Tracks the NVIDIA GPU SXid errors scanning the dmesg -- see [fabric manager documentation](https://docs.nvidia.com/datacenter/tesla/pdf/fabric-manager-user-guide.pdf). - [**`accelerator-nvidia-error-xid`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/error/xid): Tracks the NVIDIA GPU Xid errors scanning the dmesg and using the NVIDIA Management Library (NVML) -- see [Xid messages](https://docs.nvidia.com/deploy/gpu-debug-guidelines/index.html#xid-messages). +- [**`accelerator-nvidia-error-xid-sxid`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid): Tracks the NVIDIA GPU Xid and SXid errors scanning the dmesg and using the NVIDIA Management Library (NVML) -- see [Xid messages](https://docs.nvidia.com/deploy/gpu-debug-guidelines/index.html#xid-messages). - [**`accelerator-nvidia-fabric-manager`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/fabric-manager): Tracks the fabric manager version and its activeness. - [**`accelerator-nvidia-gsp-firmware`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/fabric-manager): Tracks the GSP firmware mode. - [**`accelerator-nvidia-infiniband`**](https://pkg.go.dev/github.com/leptonai/gpud/components/accelerator/nvidia/infiniband): Monitors the infiniband status of the system. Optional, enabled if the host has NVIDIA GPUs. diff --git a/internal/server/server.go b/internal/server/server.go index 89f514a5..d4287110 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -22,7 +22,6 @@ import ( "time" "github.com/gin-contrib/gzip" - "github.com/gin-gonic/gin" "github.com/nxadm/tail" "github.com/prometheus/client_golang/prometheus" @@ -37,6 +36,8 @@ import ( nvidia_clockspeed "github.com/leptonai/gpud/components/accelerator/nvidia/clock-speed" nvidia_ecc "github.com/leptonai/gpud/components/accelerator/nvidia/ecc" nvidia_error "github.com/leptonai/gpud/components/accelerator/nvidia/error" + nvidia_component_error_xid_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid" + nvidia_component_error_xid_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error-xid-sxid/id" nvidia_error_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid" nvidia_component_error_sxid_id "github.com/leptonai/gpud/components/accelerator/nvidia/error/sxid/id" nvidia_error_xid "github.com/leptonai/gpud/components/accelerator/nvidia/error/xid" @@ -59,6 +60,9 @@ import ( nvidia_processes "github.com/leptonai/gpud/components/accelerator/nvidia/processes" nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query" nvidia_query_nvml "github.com/leptonai/gpud/components/accelerator/nvidia/query/nvml" + nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid" + nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid" + components_nvidia_xid_sxid_state "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid-sxid-state" nvidia_remapped_rows "github.com/leptonai/gpud/components/accelerator/nvidia/remapped-rows" nvidia_temperature "github.com/leptonai/gpud/components/accelerator/nvidia/temperature" nvidia_utilization "github.com/leptonai/gpud/components/accelerator/nvidia/utilization" @@ -79,6 +83,7 @@ import ( "github.com/leptonai/gpud/components/os" power_supply "github.com/leptonai/gpud/components/power-supply" query_config "github.com/leptonai/gpud/components/query/config" + query_log_common "github.com/leptonai/gpud/components/query/log/common" query_log_config "github.com/leptonai/gpud/components/query/log/config" query_log_state "github.com/leptonai/gpud/components/query/log/state" "github.com/leptonai/gpud/components/state" @@ -155,13 +160,13 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID return nil, fmt.Errorf("api version mismatch: %s (only supports v1)", ver) } - if err := components_metrics_state.CreateTableMetrics(ctx, db, components_metrics_state.DefaultTableName); err != nil { - return nil, fmt.Errorf("failed to create metrics table: %w", err) - } if err := query_log_state.CreateTableLogFileSeekInfo(ctx, db); err != nil { return nil, fmt.Errorf("failed to create query log state table: %w", err) } + if err := components_metrics_state.CreateTableMetrics(ctx, db, components_metrics_state.DefaultTableName); err != nil { + return nil, fmt.Errorf("failed to create metrics table: %w", err) + } go func() { dur := config.RetentionPeriod.Duration for { @@ -181,11 +186,117 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID } }() + // create nvidia-specific table regardless of whether nvidia components are enabled + if err := components_nvidia_xid_sxid_state.CreateTableXidSXidEventHistory(ctx, db); err != nil { + return nil, fmt.Errorf("failed to create nvidia xid/sxid state table: %w", err) + } + go func() { + dur := config.RetentionPeriod.Duration + for { + select { + case <-ctx.Done(): + return + case <-time.After(dur): + now := time.Now().UTC() + before := now.Add(-dur) + + purged, err := components_nvidia_xid_sxid_state.Purge(ctx, db, components_nvidia_xid_sxid_state.WithBefore(before)) + if err != nil { + log.Logger.Warnw("failed to delete nvidia xid/sxid events", "error", err) + } else { + log.Logger.Debugw("deleted nvidia xid/sxid events", "before", before, "purged", purged) + } + } + } + }() + + dmesgProcessMatched := func(line []byte, ts time.Time, matchedFilter *query_log_common.Filter) { + if len(line) == 0 { + return + } + if ts.IsZero() { + return + } + if matchedFilter == nil { + return + } + + cctx, ccancel := context.WithTimeout(ctx, 10*time.Second) + defer ccancel() + + for _, ref := range matchedFilter.OwnerReferences { + switch ref { + case nvidia_component_error_xid_id.Name: + ev, err := nvidia_query_xid.ParseDmesgLogLine(string(line)) + if err != nil { + log.Logger.Errorw("failed to parse xid dmesg line", "line", string(line), "error", err) + continue + } + if ev.Detail == nil { + log.Logger.Errorw("failed to parse xid dmesg line", "line", string(line), "error", "no detail") + continue + } + + eventToInsert := components_nvidia_xid_sxid_state.Event{ + UnixSeconds: ts.Unix(), + DataSource: "dmesg", + EventType: "xid", + EventID: int64(ev.Detail.Xid), + EventDetails: ev.LogItem.Line, + } + + found, err := components_nvidia_xid_sxid_state.FindEvent(cctx, db, eventToInsert) + if err != nil { + log.Logger.Errorw("failed to find xid event in database", "error", err) + continue + } + if found { + continue + } + if werr := components_nvidia_xid_sxid_state.InsertEvent(cctx, db, eventToInsert); werr != nil { + log.Logger.Errorw("failed to insert xid event into database", "error", werr) + continue + } + + case nvidia_component_error_sxid_id.Name: + ev, err := nvidia_query_sxid.ParseDmesgLogLine(string(line)) + if err != nil { + log.Logger.Errorw("failed to parse sxid dmesg line", "line", string(line), "error", err) + continue + } + if ev.Detail == nil { + log.Logger.Errorw("failed to parse sxid dmesg line", "line", string(line), "error", "no detail") + continue + } + + eventToInsert := components_nvidia_xid_sxid_state.Event{ + UnixSeconds: ts.Unix(), + DataSource: "dmesg", + EventType: "sxid", + EventID: int64(ev.Detail.SXid), + EventDetails: ev.LogItem.Line, + } + + found, err := components_nvidia_xid_sxid_state.FindEvent(cctx, db, eventToInsert) + if err != nil { + log.Logger.Errorw("failed to find sxid event in database", "error", err) + continue + } + if found { + continue + } + if werr := components_nvidia_xid_sxid_state.InsertEvent(cctx, db, eventToInsert); werr != nil { + log.Logger.Errorw("failed to insert sxid event into database", "error", werr) + continue + } + } + } + } + defaultStateCfg := query_config.State{DB: db} defaultQueryCfg := query_config.Config{State: &defaultStateCfg} defaultLogCfg := query_log_config.Config{ Query: defaultQueryCfg, - DB: db, SeekInfoSyncer: func(ctx context.Context, file string, seekInfo tail.SeekInfo) { if err := query_log_state.InsertLogFileSeekInfo(ctx, db, file, seekInfo.Offset, int64(seekInfo.Whence)); err != nil { log.Logger.Errorw("failed to sync seek info", "error", err) @@ -233,21 +344,30 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID allComponents = append(allComponents, disk.New(ctx, cfg)) case dmesg.Name: + // "defaultQueryCfg" here has the db object to write/insert xid/sxid events (write-only, reads are done in individual components) cfg := dmesg.Config{Log: defaultLogCfg} if configValue != nil { parsed, err := dmesg.ParseConfig(configValue, db) if err != nil { return nil, fmt.Errorf("failed to parse component %s config: %w", k, err) } + + parsed.Log.SeekInfoSyncer = func(ctx context.Context, file string, seekInfo tail.SeekInfo) { + if err := query_log_state.InsertLogFileSeekInfo(ctx, db, file, seekInfo.Offset, int64(seekInfo.Whence)); err != nil { + log.Logger.Errorw("failed to sync seek info", "error", err) + } + } + cfg = *parsed } if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) } - // nvidia_error_xid cannot be used without dmesg - nvrmXidFilterFound := false if _, ok := config.Components[nvidia_component_error_xid_id.Name]; ok { + // nvidia_error_xid cannot be used without dmesg + nvrmXidFilterFound := false + for _, f := range cfg.Log.SelectFilters { if f.Name == dmesg.EventNvidiaNVRMXid { nvrmXidFilterFound = true @@ -258,10 +378,25 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID return nil, fmt.Errorf("%q enabled but dmesg config missing %q filter", nvidia_component_error_xid_id.Name, dmesg.EventNvidiaNVRMXid) } } + if _, ok := config.Components[nvidia_component_error_xid_sxid_id.Name]; ok { + // nvidia_error_xid cannot be used without dmesg + nvrmXidFilterFound := false + + for _, f := range cfg.Log.SelectFilters { + if f.Name == dmesg.EventNvidiaNVRMXid { + nvrmXidFilterFound = true + break + } + } + if !nvrmXidFilterFound { + return nil, fmt.Errorf("%q enabled but dmesg config missing %q filter", nvidia_component_error_xid_sxid_id.Name, dmesg.EventNvidiaNVRMXid) + } + } - // nvidia_error_sxid cannot be used without dmesg - nvswitchSXidFilterFound := false if _, ok := config.Components[nvidia_component_error_sxid_id.Name]; ok { + // nvidia_error_sxid cannot be used without dmesg + nvswitchSXidFilterFound := false + for _, f := range cfg.Log.SelectFilters { if f.Name == dmesg.EventNvidiaNVSwitchSXid { nvswitchSXidFilterFound = true @@ -272,8 +407,22 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID return nil, fmt.Errorf("%q enabled but dmesg config missing %q filter", nvidia_component_error_sxid_id.Name, dmesg.EventNvidiaNVSwitchSXid) } } + if _, ok := config.Components[nvidia_component_error_xid_sxid_id.Name]; ok { + // nvidia_error_sxid cannot be used without dmesg + nvswitchSXidFilterFound := false + + for _, f := range cfg.Log.SelectFilters { + if f.Name == dmesg.EventNvidiaNVSwitchSXid { + nvswitchSXidFilterFound = true + break + } + } + if !nvswitchSXidFilterFound { + return nil, fmt.Errorf("%q enabled but dmesg config missing %q filter", nvidia_component_error_xid_sxid_id.Name, dmesg.EventNvidiaNVSwitchSXid) + } + } - c, err := dmesg.New(ctx, cfg) + c, err := dmesg.New(ctx, cfg, dmesgProcessMatched) if err != nil { return nil, fmt.Errorf("failed to create component %s: %w", k, err) } @@ -428,6 +577,7 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID allComponents = append(allComponents, nvidia_error.New(ctx, cfg)) case nvidia_component_error_xid_id.Name: + // "defaultQueryCfg" here has the db object to read xid events (read-only, writes are done in poller) cfg := nvidia_error_xid.Config{Query: defaultQueryCfg} if configValue != nil { parsed, err := nvidia_error_xid.ParseConfig(configValue, db) @@ -442,8 +592,23 @@ func New(ctx context.Context, config *lepconfig.Config, endpoint string, cliUID allComponents = append(allComponents, nvidia_error_xid.New(ctx, cfg)) case nvidia_component_error_sxid_id.Name: + // db object to read sxid events (read-only, writes are done in poller) allComponents = append(allComponents, nvidia_error_sxid.New()) + case nvidia_component_error_xid_sxid_id.Name: + cfg := nvidia_component_error_xid_sxid.Config{Query: defaultQueryCfg} + if configValue != nil { + parsed, err := nvidia_component_error_xid_sxid.ParseConfig(configValue, db) + if err != nil { + return nil, fmt.Errorf("failed to parse component %s config: %w", k, err) + } + cfg = *parsed + } + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("failed to validate component %s config: %w", k, err) + } + allComponents = append(allComponents, nvidia_component_error_xid_sxid.New(ctx, cfg)) + case nvidia_clock.Name: cfg := nvidia_clock.Config{Query: defaultQueryCfg} if configValue != nil {