Skip to content

Commit

Permalink
fix: be/local was not streaming out logs for workers across all steps
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Nov 8, 2024
1 parent 94c0938 commit 17efd59
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 14 deletions.
13 changes: 11 additions & 2 deletions pkg/be/local/files/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,23 @@ func RunsDir() (string, error) {
return filepath.Join(dir, "runs"), nil
}

func runDir(run queue.RunContext) (string, error) {
func StepsDir(run queue.RunContext) (string, error) {
dir, err := RunsDir()
if err != nil {
return "", err
}

return filepath.Join(dir, run.RunName, "step"), nil
}

func runDir(run queue.RunContext) (string, error) {
dir, err := StepsDir(run)
if err != nil {
return "", err
}

//strings.Replace(runname, build.Name()+"-", "", 1),
return filepath.Join(dir, run.RunName, "step", fmt.Sprintf("%d", run.Step)), nil
return filepath.Join(dir, fmt.Sprintf("%d", run.Step)), nil
}

func LogDir(run queue.RunContext, mkdir bool) (string, error) {
Expand Down
59 changes: 47 additions & 12 deletions pkg/be/local/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -81,7 +82,42 @@ func (s localStreamer) Utilization(c chan utilization.Model, intervalSeconds int
}
}

func (s localStreamer) watchForWorkerPools(logdir string, opts streamer.LogOptions) error {
func ScanLogFiles(run queue.RunContext) ([]string, error) {
dir, err := files.StepsDir(run)
if err != nil {
return nil, err
}

steps, err := os.ReadDir(dir)
if err != nil {
return nil, err
}

logfiles := []string{}
for _, s := range steps {
step, err := strconv.Atoi(s.Name())
if err != nil {
return nil, err
}
logdir, err := files.LogDir(run.ForStep(step), false)
if err != nil {
return nil, err
}

fs, err := os.ReadDir(logdir)
if err != nil {
return nil, err
}

for _, f := range fs {
logfiles = append(logfiles, filepath.Join(logdir, f.Name()))
}
}

return logfiles, nil
}

func (s localStreamer) watchForWorkerPools(opts streamer.LogOptions) error {
watching := make(map[string]bool)
group, _ := errgroup.WithContext(s.Context)

Expand All @@ -93,19 +129,18 @@ func (s localStreamer) watchForWorkerPools(logdir string, opts streamer.LogOptio
default:
}

fs, err := os.ReadDir(logdir)
fs, err := ScanLogFiles(s.run)
if err != nil {
return err
}

for _, f := range fs {
file := f.Name()
if strings.HasPrefix(file, lunchpail.ComponentShortName(lunchpail.WorkersComponent)) {
for _, file := range fs {
if strings.HasPrefix(filepath.Base(file), lunchpail.ComponentShortName(lunchpail.WorkersComponent)) {
alreadyWatching, exists := watching[file]
if !alreadyWatching || !exists {
watching[file] = true
group.Go(func() error {
return s.tailf(filepath.Join(logdir, file), opts)
return s.tailf(file, opts)
})
}
}
Expand All @@ -131,16 +166,16 @@ func (s localStreamer) watchForWorkerPools(logdir string, opts streamer.LogOptio

// Stream logs from a given Component to os.Stdout
func (s localStreamer) ComponentLogs(c lunchpail.Component, opts streamer.LogOptions) error {
logdir, err := files.LogDir(s.run, true)
if err != nil {
return err
}

switch c {
case lunchpail.WorkersComponent:
return s.watchForWorkerPools(logdir, opts)
return s.watchForWorkerPools(opts)

default:
logdir, err := files.LogDir(s.run, true)
if err != nil {
return err
}

// TODO allow caller to select stderr versus stdout
group, _ := errgroup.WithContext(s.Context)
group.Go(func() error { return s.tailf(filepath.Join(logdir, files.LogFileForComponent(c)+".out"), opts) })
Expand Down

0 comments on commit 17efd59

Please sign in to comment.