diff --git a/README.md b/README.md index afc5a6d..1c038bd 100644 --- a/README.md +++ b/README.md @@ -516,6 +516,8 @@ worker: # Default timeout of the task handler. # This value will be used when TaskSpec.timeoutSeconds is not set or 0. defaultTimeout: 30m0s + # Cleanup workspace dir or not when each task handler execution finished. + cleanupWorkspaceDir: false # Task Handler Command # A Worker spawns a process with the command for each received tasks commands: @@ -633,6 +635,8 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv ``` ┌ {workspace direcoty} # pftaskqueue passes the dir name to stdin of task handler process │ # also exported as PFTQ_TASK_HANDLER_WORKSPACE_DIR +│ # Note: this directory will be deleted after task handler finished +│ # when taskHandler.CleanupWorkspaceDir is true in worker configuration │ │ # pftaskqueue prepares whole the contents ├── input diff --git a/cmd/start_worker.go b/cmd/start_worker.go index 85d887e..2899a95 100644 --- a/cmd/start_worker.go +++ b/cmd/start_worker.go @@ -96,6 +96,9 @@ func init() { flag.Duration("default-command-timeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout, "default timeout for executing command for tasks. the value will be used when the taskspec has no timeout spec") viperBindPFlag("Worker.TaskHandler.DefaultTimeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout.String(), flag.Lookup("default-command-timeout")) + flag.Bool("cleanup-workspace-dir", cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir, "cleanup workspace dir or not when each command execution finished") + viperBindPFlag("Worker.TaskHandler.CleanupWorkspaceDir", strconv.FormatBool(cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir), flag.Lookup("cleanup-workspace-dir")) + flag.Bool("exit-on-suspend", cmdOpts.Worker.ExitOnSuspend, "if set, worker exits when queue is suspended") viperBindPFlag("Worker.ExitOnSuspend", strconv.FormatBool(cmdOpts.Worker.ExitOnSuspend), flag.Lookup("exit-on-suspend")) diff --git a/pkg/apis/worker/worker.go b/pkg/apis/worker/worker.go index 20a0b13..438c4de 100644 --- a/pkg/apis/worker/worker.go +++ b/pkg/apis/worker/worker.go @@ -76,6 +76,7 @@ type WorkerSpec struct { type TaskHandlerSpec struct { DefaultCommandTimeout time.Duration `json:"defaultTimeout" yaml:"defaultTimeout" mapstructure:"defaultTimeout" default:"30m" validate:"required"` Commands []string `json:"commands" yaml:"commands" default:"[\"cat\"]" validate:"required"` + CleanupWorkspaceDir bool `json:"cleanupWorkspace" yaml:"cleanupWorkspace" default:"false"` } type HeartBeatSpec struct { diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 5d70f52..d8331cd 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "io/ioutil" + "sort" "strings" "time" @@ -740,11 +741,18 @@ var _ = Describe("Backend", func() { Expect(tasks[0].UID).NotTo(Equal(tasks[1].UID)) pending := mustPendingQueueLength(queue.UID.String(), 2) - Expect(pending[0]).To(Equal(tasks[1].UID)) - Expect(pending[1]).To(Equal(tasks[0].UID)) taskUIDs := mustTasksSetSize(queue.UID.String(), 2) - Expect(taskUIDs[0]).To(Equal(tasks[1].UID)) - Expect(taskUIDs[1]).To(Equal(tasks[0].UID)) + + sort.Strings(pending) + sort.Strings(taskUIDs) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].UID < tasks[j].UID + }) + + Expect(pending[0]).To(Equal(tasks[0].UID)) + Expect(pending[1]).To(Equal(tasks[1].UID)) + Expect(taskUIDs[0]).To(Equal(tasks[0].UID)) + Expect(taskUIDs[1]).To(Equal(tasks[1].UID)) assertKeyContents(backend.taskKey(queue.UID.String(), tasks[0].UID), tasks[0]) assertKeyContents(backend.taskKey(queue.UID.String(), tasks[1].UID), tasks[1]) }) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index c2594fc..812b09d 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -290,6 +290,22 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul } workspacePath, envvars, err := w.prepareTaskHandlerDirAndEnvvars(t) + if w.config.TaskHandler.CleanupWorkspaceDir { + defer func() { + if workspacePath == "" { + workspacePath = w.workspacePath(t) + } + if _, err := os.Stat(workspacePath); errors.Is(err, os.ErrNotExist) { + logger.Info().Str("workspaceDir", workspacePath).Msg("Skip cleaning up because workspace dir does not exist") + return + } + if err := os.RemoveAll(workspacePath); err != nil { + logger.Error().Err(err).Str("workspaceDir", workspacePath).Msg("Failed to cleanup workspace dir") + return + } + logger.Info().Str("workspaceDir", workspacePath).Msg("Cleaned up workspace dir") + }() + } if err != nil { msg := "Can't prepare workspace dir for task handler process" result := task.TaskResult{ @@ -395,11 +411,11 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul return result, postHooks } -func (w *Worker) thisWorkerWorkDir() string { +func (w *Worker) WorkerWorkDir() string { return filepath.Join(w.config.WorkDir, w.uid.String()) } func (w *Worker) workspacePath(t *task.Task) string { - return filepath.Join(w.thisWorkerWorkDir(), t.Status.CurrentRecord.ProcessUID) + return filepath.Join(w.WorkerWorkDir(), t.Status.CurrentRecord.ProcessUID) } // {task workspace path}/ diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index 6ce02f4..437a2c3 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -90,6 +90,7 @@ var _ = Describe("Worker", func() { Concurrency: 4, TaskHandler: apiworker.TaskHandlerSpec{ DefaultCommandTimeout: 1 * time.Minute, + CleanupWorkspaceDir: true, Commands: []string{ "bash", "-c", @@ -189,6 +190,10 @@ var _ = Describe("Worker", func() { suspend.State = taskqueue.TaskQueueStateSuspend testutil.MustQueueWithState(bcknd, suspend) Eventually(workerDone, 30*time.Second).Should(Receive(BeNil())) + + fileList, err := ioutil.ReadDir(worker.WorkerWorkDir()) + Expect(err).NotTo(HaveOccurred()) + Expect(len(fileList)).To(BeZero()) }) }) })