diff --git a/hack/pipeline-demo.sh b/hack/pipeline-demo.sh index 4b711d7c..baabf980 100755 --- a/hack/pipeline-demo.sh +++ b/hack/pipeline-demo.sh @@ -22,7 +22,7 @@ if [ ! -e $stepo ] then ./lunchpail build --create-namespace -e 'echo "hi from step $LUNCHPAIL_STEP"; sleep 2' -o $stepo fi -step="$stepo up --verbose=${VERBOSE:-false} --workers 3" +step="$stepo up --verbose=${VERBOSE:-false} --workers 3 --queue rclone://cfp/lunchpail" echo "Launching pipeline" $step <(echo in1) <(echo in2) <(echo in3) <(echo in4) <(echo in5) <(echo in6) <(echo in7) <(echo in8) <(echo in9) <(echo in10) <(echo in11) <(echo in12) <(echo in13) <(echo in14) <(echo in15) <(echo in16) \ diff --git a/pkg/be/local/shell/job.go b/pkg/be/local/shell/job.go index 092fa7aa..9da08894 100644 --- a/pkg/be/local/shell/job.go +++ b/pkg/be/local/shell/job.go @@ -20,7 +20,7 @@ func SpawnJob(ctx context.Context, c llir.ShellComponent, ir llir.LLIR, logdir s for workerIdx := range c.Sizing.Workers { group.Go(func() error { - return Spawn(jobCtx, c.WithInstanceNameSuffix(fmt.Sprintf("-w%d", workerIdx)), ir, logdir, opts) + return Spawn(jobCtx, c.WithInstanceName(fmt.Sprintf("w%d", workerIdx)), ir, logdir, opts) }) } diff --git a/pkg/fe/builder/overlay/eval.go b/pkg/fe/builder/overlay/eval.go index 356968b3..e912f938 100644 --- a/pkg/fe/builder/overlay/eval.go +++ b/pkg/fe/builder/overlay/eval.go @@ -46,7 +46,7 @@ func evalApp(eval string, opts Options) hlir.HLIR { return hlir.HLIR{ Applications: []hlir.Application{app}, - WorkerPools: []hlir.WorkerPool{hlir.NewPool("default", opts.BuildOptions.Workers)}, + WorkerPools: []hlir.WorkerPool{hlir.NewPool("p1", opts.BuildOptions.Workers)}, } } diff --git a/pkg/ir/llir/shell.go b/pkg/ir/llir/shell.go index ae6932e5..65a39df9 100644 --- a/pkg/ir/llir/shell.go +++ b/pkg/ir/llir/shell.go @@ -39,7 +39,11 @@ func (c ShellComponent) SetWorkers(w int) Component { return c // FIXME } -func (c ShellComponent) WithInstanceNameSuffix(suffix string) ShellComponent { - c.InstanceName = c.InstanceName + suffix +func (c ShellComponent) WithInstanceName(name string) ShellComponent { + c.InstanceName = name return c } + +func (c ShellComponent) WithInstanceNameSuffix(suffix string) ShellComponent { + return c.WithInstanceName(c.InstanceName + suffix) +} diff --git a/pkg/runtime/builtins/cat.go b/pkg/runtime/builtins/cat.go index 7a3c40d8..a54440eb 100644 --- a/pkg/runtime/builtins/cat.go +++ b/pkg/runtime/builtins/cat.go @@ -48,6 +48,6 @@ mv $1 $2`}, return hlir.HLIR{ Applications: []hlir.Application{app}, - WorkerPools: []hlir.WorkerPool{hlir.NewPool("default", 1)}, + WorkerPools: []hlir.WorkerPool{hlir.NewPool("p1", 1)}, } } diff --git a/pkg/runtime/queue/ls.go b/pkg/runtime/queue/ls.go index 09e5f38f..fb1345c6 100644 --- a/pkg/runtime/queue/ls.go +++ b/pkg/runtime/queue/ls.go @@ -2,6 +2,7 @@ package queue import ( "context" + "regexp" "strings" "lunchpail.io/pkg/be" @@ -35,9 +36,11 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri case "blobs": prefix = wildcard.AsFile(queue.Blobs) default: - prefix = wildcard.ListenPrefix() + prefix = wildcard.ListenPrefixForAnyStep(true) } + nonqueue := regexp.MustCompile("dead|succeeded|dispatcherdone|alive|killfile") + files := make(chan string) errors := make(chan error) go func() { @@ -49,7 +52,8 @@ func Ls(ctx context.Context, backend be.Backend, run queue.RunContext, path stri errors <- o.Err } else { f := strings.Replace(o.Key, prefix+"/", "", 1) - if f != "" { + if f != "" && path != "" || !nonqueue.MatchString(f) { + // this means: we want the default (path=="") behavior to match only queue-related objects files <- f } }