Skip to content

Commit

Permalink
feat(query): support get func persistency, read persistency
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 5, 2024
1 parent 7abbcde commit bcf59c1
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 33 deletions.
5 changes: 4 additions & 1 deletion components/accelerator/nvidia/error/xid/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func getDefaultPoller() query.Poller {
// DO NOT for-loop here
// the query.GetFunc is already called periodically in a loop by the poller
func CreateGet() query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand All @@ -330,6 +330,9 @@ func CreateGet() query.GetFunc {
return nil, ctx.Err()

case ev := <-nvidia_query_nvml.DefaultInstance().RecvXidEvents():
// TODO
// persist on db if configured

return ev, nil

default:
Expand Down
2 changes: 1 addition & 1 deletion components/accelerator/nvidia/gpm/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func getDefaultPoller() query.Poller {
// DO NOT for-loop here
// the query.GetFunc is already called periodically in a loop by the poller
func CreateGet() query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/accelerator/nvidia/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetSuccessOnce() <-chan any {
}

// Get all nvidia component queries.
func Get(ctx context.Context) (output any, err error) {
func Get(ctx context.Context, _ ...query.OpOption) (output any, err error) {
if err := nvml.StartDefaultInstance(ctx); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion components/containerd/pod/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func getDefaultPoller() query.Poller {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/cpu/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func getPrevTimeStat() *cpu.TimesStat {
return prev
}

func Get(ctx context.Context) (_ any, e error) {
func Get(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/disk/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func getDefaultPoller() query.Poller {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/docker/container/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func getDefaultPoller() query.Poller {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/fd/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func getDefaultPoller() query.Poller {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/k8s/pod/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func DefaultPollerReady() <-chan any {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/memory/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func getDefaultPoller() query.Poller {
return defaultPoller
}

func Get(ctx context.Context) (_ any, e error) {
func Get(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/network/latency/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func createGetFunc(cfg Config) query.GetFunc {
timeout = 15 * time.Second
}

return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/os/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func getDefaultPoller() query.Poller {
return defaultPoller
}

func Get(ctx context.Context) (_ any, e error) {
func Get(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/power-supply/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func getDefaultPoller() query.Poller {
return defaultPoller
}

func Get(ctx context.Context) (_ any, e error) {
func Get(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/query/log/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newPoller(ctx context.Context, cfg query_log_config.Config, parseTime query
}
go pl.pollSync(ctx)

flushFunc := func(ctx context.Context) (any, error) {
flushFunc := func(ctx context.Context, _ ...query.OpOption) (any, error) {
pl.bufferedItemsMu.Lock()
defer pl.bufferedItemsMu.Unlock()
copied := make([]Item, len(pl.bufferedItems))
Expand Down
25 changes: 25 additions & 0 deletions components/query/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package query

import (
query_config "github.com/leptonai/gpud/components/query/config"
)

type Op struct {
State *query_config.State
}

type OpOption func(*Op)

func (op *Op) ApplyOpts(opts []OpOption) error {
for _, opt := range opts {
opt(op)
}

return nil
}

func WithState(state *query_config.State) OpOption {
return func(op *Op) {
op.State = state
}
}
60 changes: 43 additions & 17 deletions components/query/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var ErrNoData = errors.New("no data collected yet in the poller")

// Defines the common query/poller interface.
// It polls the data source (rather than watch) in order
// to share the same data source with multiple components (consumer).
Expand All @@ -22,23 +24,24 @@ type Poller interface {
// Returns the poller ID.
ID() string

// Starts the poller routine.
// Redundant calls will be skipped if there's an existing poller.
Start(ctx context.Context, cfg query_config.Config, componentName string)

// Config returns the config used to start the poller.
// This is useful for debugging and logging.
Config() query_config.Config

// Starts the poller routine.
// Redundant calls will be skipped if there's an existing poller.
Start(ctx context.Context, cfg query_config.Config, componentName string)
// Stops the poller routine.
// Safe to call multiple times.
// Returns "true" if the poller was stopped with its reference count being zero.
Stop(componentName string) bool

// Last returns the last result.
// Useful for constructing the state.
// Last returns the last item in the queue.
// It returns ErrNoData if no item is collected yet.
Last() (*Item, error)
// All returns all results.
// Useful for constructing the events.
// All returns all results in the queue since the given time.
// It returns ErrNoData if no item is collected yet.
All(since time.Time) ([]Item, error)
}

Expand All @@ -56,7 +59,7 @@ type Item struct {

// Queries the component data from the host.
// Each get output is persisted to the storage if enabled.
type GetFunc func(context.Context) (any, error)
type GetFunc func(context.Context, ...OpOption) (any, error)

func New(id string, cfg query_config.Config, getFunc GetFunc) Poller {
return &poller{
Expand Down Expand Up @@ -160,12 +163,6 @@ func (pl *poller) ID() string {
return pl.id
}

func (pl *poller) Config() query_config.Config {
pl.cfgMu.RLock()
defer pl.cfgMu.RUnlock()
return pl.cfg
}

func GetTableName(componentName string) string {
return "poll_results_" + state.ConvertToTableName(componentName)
}
Expand Down Expand Up @@ -196,6 +193,12 @@ func (pl *poller) Start(ctx context.Context, cfg query_config.Config, componentN
log.Logger.Debugw("started poller", "caller", componentName, "inflightComponents", len(pl.inflightComponents))
}

func (pl *poller) Config() query_config.Config {
pl.cfgMu.RLock()
defer pl.cfgMu.RUnlock()
return pl.cfg
}

func (pl *poller) Stop(componentName string) bool {
pl.ctxMu.Lock()
defer pl.ctxMu.Unlock()
Expand Down Expand Up @@ -237,6 +240,10 @@ func (pl *poller) processItem(item Item) {
return
}

pl.insertItemToInMemoryQueue(item)
}

func (pl *poller) insertItemToInMemoryQueue(item Item) {
queueN := pl.Config().QueueSize

pl.lastItemsMu.Lock()
Expand All @@ -248,9 +255,15 @@ func (pl *poller) processItem(item Item) {
pl.lastItems = append(pl.lastItems, item)
}

var ErrNoData = errors.New("no data collected yet in the poller")

// Last returns the last item in the queue.
// It returns ErrNoData if no item is collected yet.
func (pl *poller) Last() (*Item, error) {
return pl.readLastItemFromInMemoryQueue()
}

func (pl *poller) readLastItemFromInMemoryQueue() (*Item, error) {
// TODO: read from db

pl.lastItemsMu.RLock()
defer pl.lastItemsMu.RUnlock()

Expand All @@ -261,14 +274,22 @@ func (pl *poller) Last() (*Item, error) {
return &pl.lastItems[len(pl.lastItems)-1], nil
}

// All returns all results in the queue since the given time.
// It returns ErrNoData if no item is collected yet.
func (pl *poller) All(since time.Time) ([]Item, error) {
return pl.readAllItemsFromInMemoryQueue(since)
}

func (pl *poller) readAllItemsFromInMemoryQueue(since time.Time) ([]Item, error) {
// TODO: read from db

pl.lastItemsMu.RLock()
defer pl.lastItemsMu.RUnlock()

// nothing in memory (e.g., process restart)
// we removed db support to simplify the code
if len(pl.lastItems) == 0 {
return nil, nil
return nil, ErrNoData
}

items := make([]Item, 0)
Expand All @@ -278,5 +299,10 @@ func (pl *poller) All(since time.Time) ([]Item, error) {
}
items = append(items, item)
}

if len(items) == 0 {
return nil, ErrNoData
}

return items, nil
}
40 changes: 40 additions & 0 deletions components/query/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,46 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPoller_ReadLastItemFromInMemoryQueue(t *testing.T) {
pl := &poller{
lastItems: []Item{},
}

// Test empty queue
item, err := pl.readLastItemFromInMemoryQueue()
if err != ErrNoData {
t.Errorf("expected ErrNoData, got %v", err)
}
if item != nil {
t.Errorf("expected nil item, got %v", item)
}
}

func TestPoller_ReadAllItemsFromInMemoryQueue(t *testing.T) {
pl := &poller{
lastItems: []Item{},
}

// Test empty queue
items, err := pl.readAllItemsFromInMemoryQueue(time.Time{})
if err != ErrNoData {
t.Errorf("expected ErrNoData, got %v", err)
}
if items != nil {
t.Errorf("expected nil items, got %v", items)
}

// Test with since time but empty queue
since := time.Now()
items, err = pl.readAllItemsFromInMemoryQueue(since)
if err != ErrNoData {
t.Errorf("expected ErrNoData, got %v", err)
}
if items != nil {
t.Errorf("expected nil items, got %v", items)
}
}

func TestPoller_processResult(t *testing.T) {
now := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion components/systemd/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func getDefaultPoller() query.Poller {
}

func CreateGet(cfg Config) query.GetFunc {
return func(ctx context.Context) (_ any, e error) {
return func(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down
2 changes: 1 addition & 1 deletion components/tailscale/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getDefaultPoller() query.Poller {
return defaultPoller
}

func Get(ctx context.Context) (_ any, e error) {
func Get(ctx context.Context, _ ...query.OpOption) (_ any, e error) {
defer func() {
if e != nil {
components_metrics.SetGetFailed(Name)
Expand Down

0 comments on commit bcf59c1

Please sign in to comment.