diff --git a/cmd/subcommands/enqueue/file.go b/cmd/subcommands/enqueue/file.go index f44a61e1..ca841764 100644 --- a/cmd/subcommands/enqueue/file.go +++ b/cmd/subcommands/enqueue/file.go @@ -2,6 +2,7 @@ package enqueue import ( "fmt" + "os" "github.com/spf13/cobra" @@ -19,7 +20,7 @@ func NewEnqueueFileCmd() *cobra.Command { var wait bool var verbose bool - cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Wait for the task to be completed") + cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Wait for the task to be completed, and exit with the exit code of that task") cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output") cmd.RunE = func(cmd *cobra.Command, args []string) error { @@ -29,7 +30,11 @@ func NewEnqueueFileCmd() *cobra.Command { return fmt.Errorf("TODO") } - return queue.EnqueueFile(args[0], queue.EnqueueFileOptions{Wait: wait, Verbose: verbose}) + exitcode, err := queue.EnqueueFile(args[0], queue.EnqueueFileOptions{Wait: wait, Verbose: verbose}) + if exitcode != 0 { + os.Exit(exitcode) + } + return err } return cmd diff --git a/pkg/runtime/queue/enqueue.go b/pkg/runtime/queue/enqueue.go index c990eb87..84cd6a11 100644 --- a/pkg/runtime/queue/enqueue.go +++ b/pkg/runtime/queue/enqueue.go @@ -19,25 +19,25 @@ type EnqueueFileOptions struct { Verbose bool } -func EnqueueFile(task string, opts EnqueueFileOptions) error { +func EnqueueFile(task string, opts EnqueueFileOptions) (int, error) { c, err := NewS3Client() if err != nil { - return err + return 0, err } if err := c.Mkdirp(c.Paths.Bucket); err != nil { - return err + return 0, err } if err := c.Upload(c.Paths.Bucket, task, filepath.Join(c.Paths.PoolPrefix, c.Paths.Inbox, filepath.Base(task))); err != nil { - return err + return 0, err } if opts.Wait { return c.WaitForCompletion(filepath.Base(task), opts.Verbose) } - return nil + return 0, nil } func EnqueueFromS3(fullpath, endpoint, accessKeyId, secretAccessKey string, repeat int) error { diff --git a/pkg/runtime/queue/s3.go b/pkg/runtime/queue/s3.go index 0fc4c801..59a35c2d 100644 --- a/pkg/runtime/queue/s3.go +++ b/pkg/runtime/queue/s3.go @@ -4,6 +4,7 @@ package queue // can share this with the runtime/worker/s3.go import ( + "bytes" "context" "fmt" "io" @@ -167,6 +168,16 @@ func (s3 S3Client) ListObjects(bucket, filePath string, recursive bool) <-chan m }) } +func (s3 S3Client) Get(bucket, filePath string) (string, error) { + var content bytes.Buffer + s, err := s3.client.GetObject(context.Background(), bucket, filePath, minio.GetObjectOptions{}) + if err != nil { + return "", err + } + io.Copy(io.Writer(&content), s) + return content.String(), nil +} + func (s3 S3Client) Cat(bucket, filePath string) error { s, err := s3.client.GetObject(context.Background(), bucket, filePath, minio.GetObjectOptions{}) if err != nil { diff --git a/pkg/runtime/queue/wait.go b/pkg/runtime/queue/wait.go index a273771c..b75cc47d 100644 --- a/pkg/runtime/queue/wait.go +++ b/pkg/runtime/queue/wait.go @@ -5,14 +5,15 @@ import ( "os" "path/filepath" "slices" + "strconv" "time" ) -func (c S3Client) WaitForCompletion(task string, verbose bool) error { +func (c S3Client) WaitForCompletion(task string, verbose bool) (int, error) { for { doneTasks, err := c.Lsf(c.Paths.Bucket, filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox)) if err != nil { - return err + return 0, err } if idx := slices.IndexFunc(doneTasks, func(otask string) bool { return otask == task }); idx >= 0 { @@ -29,5 +30,18 @@ func (c S3Client) WaitForCompletion(task string, verbose bool) error { fmt.Fprintf(os.Stderr, "Task completed %s\n", task) } - return nil + codeFile := filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox, task+".code") + if code, err := c.Get(c.Paths.Bucket, codeFile); err != nil { + return 0, err + } else { + if verbose { + fmt.Fprintf(os.Stderr, "Task completed %s with return code %s\n", task, code) + } + + exitcode, err := strconv.Atoi(code) + if err != nil { + return 0, err + } + return exitcode, nil + } } diff --git a/pkg/runtime/worker/watcher.go b/pkg/runtime/worker/watcher.go index 618f11d2..5895fdfa 100644 --- a/pkg/runtime/worker/watcher.go +++ b/pkg/runtime/worker/watcher.go @@ -125,7 +125,6 @@ func startWatch(handler []string, client queue.S3Client) error { err = handlercmd.Run() if err != nil { fmt.Println("Internal Error running the handler:", err) - continue } EC := handlercmd.ProcessState.ExitCode() @@ -151,7 +150,6 @@ func startWatch(handler []string, client queue.S3Client) error { if err != nil { fmt.Println("Internal Error creating succeeded marker:", err) } - // fmt.Println("handler success: " + in) } else { err = client.Touch(bucket, failed) if err != nil { diff --git a/tests/bin/helpers.sh b/tests/bin/helpers.sh index 8ede4e9b..054f86b6 100644 --- a/tests/bin/helpers.sh +++ b/tests/bin/helpers.sh @@ -114,7 +114,15 @@ function waitForIt { lunchpail qcat outbox/${output}.code) if [[ $code = 0 ]] || [[ $code = -1 ]] || [[ $code = 143 ]] || [[ $code = 137 ]] then echo "✅ PASS run-controller test=$name output=$output code=0" - else echo "❌ FAIL run-controller non-zero exit code test=$name output=$output code=$code" && return 1 + else + if [[ -n "$expectTaskFailure" ]] + then + if [[ ! "$code" =~ $expectTaskFailure ]] + then echo "Missing expected task failure output from code=$code" && return 1 + fi + echo "✅ PASS run-controller got expected non-zero exit code test=$name output=$output code=$code" + else echo "❌ FAIL run-controller non-zero exit code test=$name output=$output code=$code" && return 1 + fi fi stdout=$(kubectl exec $(kubectl get pod -n $ns -l app.kubernetes.io/component=$S3C -o name) -n $ns -- \ diff --git a/tests/tests/test7-wait-withfail/README.md b/tests/tests/test7-wait-withfail/README.md new file mode 100644 index 00000000..b9833ac8 --- /dev/null +++ b/tests/tests/test7-wait-withfail/README.md @@ -0,0 +1,3 @@ +# test7-wait-withfail + +Same as test7, except this test uses the ParameterSweep in "wait" mode with one failing task. diff --git a/tests/tests/test7-wait-withfail/add-data-to-queue.sh b/tests/tests/test7-wait-withfail/add-data-to-queue.sh new file mode 100755 index 00000000..b715d2fb --- /dev/null +++ b/tests/tests/test7-wait-withfail/add-data-to-queue.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +SCRIPTDIR=$(cd $(dirname "$0") && pwd) + +export NAMESPACE=$1 + +# number of task +N=${2-10} + +# name of s3 bucket in which to store the tasks +BUCKET=${3-test7} +RUN_NAME=$BUCKET + +B=$(mktemp -d)/$BUCKET # bucket path +D=$B/$BUCKET # data path; in this case the bucket name and the folder name are both the run name +mkdir -p $D +echo "Staging to $D" 1>&2 + +for idx in $(seq 1 $N) # for each iteration +do + # if we are doing a test, then make sure to use a + # repeatable name for the task files, so that we know what + # to look for when confirming that the tasks were + # processed by the workers + if [[ -n "$CI" ]] || [[ -n "$RUNNING_CODEFLARE_TESTS" ]]; then + id=$idx + else + # otherwise, use a more random name, so that we can + # inject multiple batches of tasks across executions + # of this script + id=$(uuidgen) + fi + + echo "this is task idx=$idx" > $D/task.$id.txt +done + +"$SCRIPTDIR"/../../../tests/bin/add-data.sh $B diff --git a/tests/tests/test7-wait-withfail/init.sh b/tests/tests/test7-wait-withfail/init.sh new file mode 100755 index 00000000..de511c45 --- /dev/null +++ b/tests/tests/test7-wait-withfail/init.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +SCRIPTDIR=$(cd $(dirname "$0") && pwd) + +# make sure these values are compatible with the values in ./settings.sh +NUM_TASKS=6 + +# $1: namespace + +"$SCRIPTDIR"/add-data-to-queue.sh \ + $1 \ + $NUM_TASKS \ + ${TEST_NAME-test7} diff --git a/tests/tests/test7-wait-withfail/pail/app.yaml b/tests/tests/test7-wait-withfail/pail/app.yaml new file mode 100644 index 00000000..314c2168 --- /dev/null +++ b/tests/tests/test7-wait-withfail/pail/app.yaml @@ -0,0 +1,47 @@ +apiVersion: lunchpail.io/v1alpha1 +kind: Application +metadata: + name: test7-wait-withfail +spec: + role: worker + code: + - name: main.sh + source: | + #!/usr/bin/env sh + + # $1 input filepath + # $2 output filepath + in="$1" + out="$2" + + dataset_name=test # match with below! + bucket_name=test7-wait-withfail + folder_name=test7-wait-withfail + N=$(ls $dataset_name/$bucket_name/$folder_name | wc -l | xargs) + + echo "Processing $N $(basename $in)" + sleep 5 + + if [ $(basename $in) = "task.3.txt" ] + then + echo "Error!" 1>&2 + exit 64 + fi + + echo "Done with $(basename $in)" + + command: ./main.sh + minSize: auto + securityContext: + runAsUser: 2000 # lunchpail, same as is specified Dockerfile + runAsGroup: 0 # root, ibid + containerSecurityContext: + runAsUser: 2000 # lunchpail, same as is specified Dockerfile + runAsGroup: 0 # root, ibid + + datasets: + - name: test + s3: + secret: test7data + copyIn: + path: "test7-wait-withfail/" diff --git a/tests/tests/test7-wait-withfail/pail/dataset-secret.yaml b/tests/tests/test7-wait-withfail/pail/dataset-secret.yaml new file mode 100644 index 00000000..36555490 --- /dev/null +++ b/tests/tests/test7-wait-withfail/pail/dataset-secret.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: Secret +metadata: + name: test7data +type: Opaque +stringData: + endpoint: {{ .Values.global.s3Endpoint }} + accessKeyID: {{ .Values.global.s3AccessKey }} + secretAccessKey: {{ .Values.global.s3SecretKey }} diff --git a/tests/tests/test7-wait-withfail/pail/dispatcher.yaml b/tests/tests/test7-wait-withfail/pail/dispatcher.yaml new file mode 100644 index 00000000..9b2cc40b --- /dev/null +++ b/tests/tests/test7-wait-withfail/pail/dispatcher.yaml @@ -0,0 +1,11 @@ +apiVersion: lunchpail.io/v1alpha1 +kind: ParameterSweep +metadata: + name: test7-wait-withfail-workdispatcher +spec: + min: 1 + max: {{ .Values.nTasks | default 5 }} + step: 1 + interval: {{ .Values.every | default 5 }} + wait: true + verbose: true diff --git a/tests/tests/test7-wait-withfail/pail/pool1.yaml b/tests/tests/test7-wait-withfail/pail/pool1.yaml new file mode 100644 index 00000000..74bca5db --- /dev/null +++ b/tests/tests/test7-wait-withfail/pail/pool1.yaml @@ -0,0 +1,8 @@ +apiVersion: lunchpail.io/v1alpha1 +kind: WorkerPool +metadata: + name: test7-wait-withfail-pool1 +spec: + workers: + count: 2 + size: auto diff --git a/tests/tests/test7-wait-withfail/pail/version b/tests/tests/test7-wait-withfail/pail/version new file mode 100644 index 00000000..28cec4ac --- /dev/null +++ b/tests/tests/test7-wait-withfail/pail/version @@ -0,0 +1 @@ +3.1.4.1 \ No newline at end of file diff --git a/tests/tests/test7-wait-withfail/settings.sh b/tests/tests/test7-wait-withfail/settings.sh new file mode 100644 index 00000000..327a999b --- /dev/null +++ b/tests/tests/test7-wait-withfail/settings.sh @@ -0,0 +1,8 @@ +api=workqueue +taskqueue=test7-wait-withfail + +# /queue/0,1 <-- 2 workers +# task.1,task.3,task.5 <-- 3 tasks per iter + +expected=("Processing 6 task.1.txt" "Task completed task.1.txt" "Task completed task.1.txt with return code 0" "Task completed task.3.txt" "Task completed task.3.txt with return code 64") +expectTaskFailure=64 diff --git a/tests/tests/test7-wait/settings.sh b/tests/tests/test7-wait/settings.sh index 822ce215..1deb8266 100644 --- a/tests/tests/test7-wait/settings.sh +++ b/tests/tests/test7-wait/settings.sh @@ -4,4 +4,4 @@ taskqueue=test7-wait # /queue/0,1 <-- 2 workers # task.1,task.3,task.5 <-- 3 tasks per iter -expected=("Processing 6 task.1.txt" "Task completed task.1.txt" "Task completed task.3.txt") +expected=("Processing 6 task.1.txt" "Task completed task.1.txt" "Task completed task.1.txt with return code 0" "Task completed task.3.txt" "Task completed task.3.txt with return code 0")