Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(components/query): move parse time func, filter to common packages #175

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/accelerator/nvidia/fabric-manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

fabric_manager_log "github.com/leptonai/gpud/components/accelerator/nvidia/query/fabric-manager-log"
query_config "github.com/leptonai/gpud/components/query/config"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_config "github.com/leptonai/gpud/components/query/log/config"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"

"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ const (
)

var (
filters = []*query_log_filter.Filter{
filters = []*query_log_common.Filter{
{
Name: eventNVSwitchFatailSXid,
Regex: ptr.To(fabric_manager_log.RegexNVSwitchFatalSXidFromLog),
Expand Down
4 changes: 2 additions & 2 deletions components/diagnose/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query"
"github.com/leptonai/gpud/components/dmesg"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/pkg/host"
"github.com/leptonai/gpud/pkg/process"
Expand Down Expand Up @@ -149,7 +149,7 @@ func run(ctx context.Context, dir string, opts ...OpOption) error {
query_log_tail.WithLinesToTail(5000),
query_log_tail.WithSelectFilter(defaultDmesgCfg.Log.SelectFilters...),
query_log_tail.WithParseTime(dmesg.ExtractTimeFromLogLine),
query_log_tail.WithProcessMatched(func(line []byte, time time.Time, matched *query_log_filter.Filter) {
query_log_tail.WithProcessMatched(func(line []byte, time time.Time, matched *query_log_common.Filter) {
o.CheckSummary = append(o.CheckSummary, fmt.Sprintf("dmesg match: %s", string(line)))
}),
)
Expand Down
4 changes: 2 additions & 2 deletions components/diagnose/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
"github.com/leptonai/gpud/components/dmesg"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
Expand Down Expand Up @@ -184,7 +184,7 @@ func Scan(ctx context.Context, opts ...OpOption) error {
query_log_tail.WithLinesToTail(op.lines),
query_log_tail.WithSelectFilter(defaultDmesgCfg.Log.SelectFilters...),
query_log_tail.WithParseTime(dmesg.ExtractTimeFromLogLine),
query_log_tail.WithProcessMatched(func(line []byte, time time.Time, matched *query_log_filter.Filter) {
query_log_tail.WithProcessMatched(func(line []byte, time time.Time, matched *query_log_common.Filter) {
log.Logger.Debugw("matched", "line", string(line))
matchedB, _ := matched.YAML()
fmt.Println(string(matchedB))
Expand Down
6 changes: 3 additions & 3 deletions components/dmesg/component_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/leptonai/gpud/components"
query_log "github.com/leptonai/gpud/components/query/log"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -49,9 +49,9 @@ func ParseEventDmesgMatched(m map[string]string) (query_log.Item, error) {
ev.Time = metav1.Time{Time: time.Unix(unixSeconds, 0)}
ev.Line = m[EventKeyDmesgMatchedLine]

var f *query_log_filter.Filter
var f *query_log_common.Filter
if m[EventKeyDmesgMatchedFilter] != "" {
f, err = query_log_filter.ParseFilterJSON([]byte(m[EventKeyDmesgMatchedFilter]))
f, err = query_log_common.ParseFilterJSON([]byte(m[EventKeyDmesgMatchedFilter]))
if err != nil {
return query_log.Item{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions components/dmesg/component_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/leptonai/gpud/components"
query_log "github.com/leptonai/gpud/components/query/log"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

"github.com/nxadm/tail"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,10 +78,10 @@ func ParseStateDmesgTailScanMatched(m map[string]string) (query_log.Item, error)
}
ev.Line = m[StateKeyDmesgTailScanMatchedLine]

var f *query_log_filter.Filter
var f *query_log_common.Filter
if m[StateKeyDmesgTailScanMatchedFilter] != "" {
var err error
f, err = query_log_filter.ParseFilterJSON([]byte(m[StateKeyDmesgTailScanMatchedFilter]))
f, err = query_log_common.ParseFilterJSON([]byte(m[StateKeyDmesgTailScanMatchedFilter]))
if err != nil {
return query_log.Item{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions components/dmesg/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

query_config "github.com/leptonai/gpud/components/query/config"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_config "github.com/leptonai/gpud/components/query/log/config"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
Expand All @@ -31,7 +31,7 @@ func TestComponent(t *testing.T) {
defer cancel()

xidErr := "NVRM: Xid (0000:03:00): 14, Channel 00000001"
filters := []*query_log_filter.Filter{
filters := []*query_log_common.Filter{
{
Name: "xid error check",
Substring: &xidErr,
Expand Down
6 changes: 3 additions & 3 deletions components/dmesg/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

nvidia_query "github.com/leptonai/gpud/components/accelerator/nvidia/query"
"github.com/leptonai/gpud/components/memory"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

"k8s.io/utils/ptr"
)
Expand All @@ -32,8 +32,8 @@ const (
EventOOMCgroupRegex = `Memory cgroup out of memory`
)

func DefaultLogFilters(ctx context.Context) ([]*query_log_filter.Filter, error) {
defaultFilters := []*query_log_filter.Filter{
func DefaultLogFilters(ctx context.Context) ([]*query_log_common.Filter, error) {
defaultFilters := []*query_log_common.Filter{
{
Name: EventOOMKill,
Regex: ptr.To(EventOOMKillRegex),
Expand Down
6 changes: 3 additions & 3 deletions components/dmesg/filters_nvidia.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
nvidia_query_peermem "github.com/leptonai/gpud/components/accelerator/nvidia/query/peermem"
nvidia_query_sxid "github.com/leptonai/gpud/components/accelerator/nvidia/query/sxid"
nvidia_query_xid "github.com/leptonai/gpud/components/accelerator/nvidia/query/xid"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -45,8 +45,8 @@ const (
EventNvidiaNCCLSegfaultInLibnccl = "nvidia_nccl_segfault_in_libnccl"
)

func DefaultDmesgFiltersForNvidia() []*query_log_filter.Filter {
return []*query_log_filter.Filter{
func DefaultDmesgFiltersForNvidia() []*query_log_common.Filter {
return []*query_log_common.Filter{
{
Name: EventNvidiaNVRMXid,
Regex: ptr.To(nvidia_query_xid.RegexNVRMXidDmesg),
Expand Down
10 changes: 10 additions & 0 deletions components/query/log/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Package common provides the common log components.
package common

import (
"time"
)

type ParseTimeFunc func([]byte) (time.Time, error)

type ProcessMatchedFunc func(line []byte, parsedTime time.Time, filter *Filter)
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Package filter provides the log filter implementation.
package filter
package common

import (
"bytes"
Expand Down
6 changes: 3 additions & 3 deletions components/query/log/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"errors"

query_config "github.com/leptonai/gpud/components/query/config"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"

"github.com/nxadm/tail"
)
Expand All @@ -32,12 +32,12 @@ type Config struct {
// An event is generated if any of the filters match.
// Useful for explicit blacklisting "error" logs
// (e.g., GPU error messages in dmesg).
SelectFilters []*query_log_filter.Filter `json:"select_filters"`
SelectFilters []*query_log_common.Filter `json:"select_filters"`
// "AND" conditions to select logs.
// An event is generated if all of the filters do not match.
// Useful for explicit whitelisting logs and catch all other
// (e.g., good healthy log messages).
RejectFilters []*query_log_filter.Filter `json:"reject_filters"`
RejectFilters []*query_log_common.Filter `json:"reject_filters"`

DB *sql.DB `json:"-"`
SeekInfo *tail.SeekInfo `json:"seek_info,omitempty"`
Expand Down
18 changes: 9 additions & 9 deletions components/query/log/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/leptonai/gpud/components/query"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
query_log_config "github.com/leptonai/gpud/components/query/log/config"
query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/log"

Expand Down Expand Up @@ -51,7 +51,7 @@ type Poller interface {
// Returns all the events for the given "since" time.
// If none, it returns all events that are already filtered
// by the default filters in the configuration.
Find(since time.Time, selectFilters ...*query_log_filter.Filter) ([]Item, error)
Find(since time.Time, selectFilters ...*query_log_common.Filter) ([]Item, error)

// Returns the last seek info.
SeekInfo() tail.SeekInfo
Expand All @@ -65,7 +65,7 @@ type Item struct {
Line string `json:"line"`

// Matched filter that was applied to this item/line.
Matched *query_log_filter.Filter `json:"matched,omitempty"`
Matched *query_log_common.Filter `json:"matched,omitempty"`

Error error `json:"error,omitempty"`
}
Expand All @@ -84,11 +84,11 @@ type poller struct {
bufferedItems []Item
}

func New(ctx context.Context, cfg query_log_config.Config, parseTime query_log_tail.ParseTimeFunc) (Poller, error) {
func New(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc) (Poller, error) {
return newPoller(ctx, cfg, parseTime)
}

func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query_log_tail.ParseTimeFunc) (*poller, error) {
func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query_log_common.ParseTimeFunc) (*poller, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (pl *poller) Commands() [][]string {

func (pl *poller) TailScan(ctx context.Context, opts ...query_log_tail.OpOption) ([]Item, error) {
items := make([]Item, 0)
processMatchedFunc := func(line []byte, time time.Time, matchedFilter *query_log_filter.Filter) {
processMatchedFunc := func(line []byte, time time.Time, matchedFilter *query_log_common.Filter) {
items = append(items, Item{
Time: metav1.Time{Time: time},
Line: string(line),
Expand Down Expand Up @@ -221,7 +221,7 @@ func (pl *poller) TailScan(ctx context.Context, opts ...query_log_tail.OpOption)
return items, nil
}

func (pl *poller) Find(since time.Time, selectFilters ...*query_log_filter.Filter) ([]Item, error) {
func (pl *poller) Find(since time.Time, selectFilters ...*query_log_common.Filter) ([]Item, error) {
// 1. filter the already flushed/in-queue ones
polledItems, err := pl.Poller.All(since)
if err != nil {
Expand All @@ -245,7 +245,7 @@ func (pl *poller) Find(since time.Time, selectFilters ...*query_log_filter.Filte
continue
}

var matchedFilter *query_log_filter.Filter
var matchedFilter *query_log_common.Filter
for _, f := range selectFilters {
matched, err := f.MatchString(item.Line)
if err != nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (pl *poller) Find(since time.Time, selectFilters ...*query_log_filter.Filte
continue
}

var matchedFilter *query_log_filter.Filter
var matchedFilter *query_log_common.Filter
for _, f := range selectFilters {
matched, err := f.MatchString(item.Line)
if err != nil {
Expand Down
26 changes: 11 additions & 15 deletions components/query/log/tail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"time"

query_log_filter "github.com/leptonai/gpud/components/query/log/filter"
query_log_common "github.com/leptonai/gpud/components/query/log/common"
)

type Op struct {
Expand All @@ -15,11 +15,11 @@ type Op struct {

perLineFunc func([]byte)

selectFilters []*query_log_filter.Filter
rejectFilters []*query_log_filter.Filter
selectFilters []*query_log_common.Filter
rejectFilters []*query_log_common.Filter

parseTime ParseTimeFunc
processMatched ProcessMatchedFunc
parseTime query_log_common.ParseTimeFunc
processMatched query_log_common.ProcessMatchedFunc
}

type OpOption func(*Op)
Expand Down Expand Up @@ -57,7 +57,7 @@ func (op *Op) applyOpts(opts []OpOption) error {
}
}
if op.processMatched == nil {
op.processMatched = func([]byte, time.Time, *query_log_filter.Filter) {}
op.processMatched = func([]byte, time.Time, *query_log_common.Filter) {}
}

return nil
Expand Down Expand Up @@ -95,7 +95,7 @@ func WithPerLineFunc(f func([]byte)) OpOption {
// The line is sent when any of the filters match.
// Useful for explicit blacklisting "error" logs
// (e.g., GPU error messages in dmesg).
func WithSelectFilter(filters ...*query_log_filter.Filter) OpOption {
func WithSelectFilter(filters ...*query_log_common.Filter) OpOption {
return func(op *Op) {
if len(filters) > 0 {
op.selectFilters = append(op.selectFilters, filters...)
Expand All @@ -108,15 +108,15 @@ func WithSelectFilter(filters ...*query_log_filter.Filter) OpOption {
// The line is sent if and only if all of the filters do not match.
// Useful for explicit whitelisting logs and catch all other
// (e.g., good healthy log messages).
func WithRejectFilter(filters ...*query_log_filter.Filter) OpOption {
func WithRejectFilter(filters ...*query_log_common.Filter) OpOption {
return func(op *Op) {
if len(filters) > 0 {
op.rejectFilters = append(op.rejectFilters, filters...)
}
}
}

func (op *Op) applyFilter(line any) (shouldInclude bool, matchedFilter *query_log_filter.Filter, err error) {
func (op *Op) applyFilter(line any) (shouldInclude bool, matchedFilter *query_log_common.Filter, err error) {
if len(op.selectFilters) == 0 && len(op.rejectFilters) == 0 {
// no filters
return true, nil, nil
Expand Down Expand Up @@ -175,23 +175,19 @@ func (op *Op) applyFilter(line any) (shouldInclude bool, matchedFilter *query_lo
return true, matchedFilter, nil
}

type ParseTimeFunc func([]byte) (time.Time, error)

func WithParseTime(f ParseTimeFunc) OpOption {
func WithParseTime(f query_log_common.ParseTimeFunc) OpOption {
return func(op *Op) {
if f != nil {
op.parseTime = f
}
}
}

type ProcessMatchedFunc func([]byte, time.Time, *query_log_filter.Filter)

// Called if the line is matched.
// If not set, the matched line is no-op.
// Useful to append to a slice or not to return a string slice
// to avoid extra heap allocation.
func WithProcessMatched(f ProcessMatchedFunc) OpOption {
func WithProcessMatched(f query_log_common.ProcessMatchedFunc) OpOption {
return func(op *Op) {
if f != nil {
op.processMatched = f
Expand Down
Loading
Loading