Skip to content

Commit

Permalink
Merge pull request #53 from everpeace/cleanup-workspace-dir
Browse files Browse the repository at this point in the history
Add `--cleanup-workspace-dir` in start-worker subcommand
  • Loading branch information
superbrothers authored Nov 15, 2023
2 parents d393f61 + e7bf93b commit 25b1b72
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ worker:
# Default timeout of the task handler.
# This value will be used when TaskSpec.timeoutSeconds is not set or 0.
defaultTimeout: 30m0s
# Cleanup workspace dir or not when each task handler execution finished.
cleanupWorkspaceDir: false
# Task Handler Command
# A Worker spawns a process with the command for each received tasks
commands:
Expand Down Expand Up @@ -633,6 +635,8 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv
```
┌ {workspace direcoty} # pftaskqueue passes the dir name to stdin of task handler process
│ # also exported as PFTQ_TASK_HANDLER_WORKSPACE_DIR
│ # Note: this directory will be deleted after task handler finished
│ # when taskHandler.CleanupWorkspaceDir is true in worker configuration
│ # pftaskqueue prepares whole the contents
├── input
Expand Down
3 changes: 3 additions & 0 deletions cmd/start_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func init() {
flag.Duration("default-command-timeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout, "default timeout for executing command for tasks. the value will be used when the taskspec has no timeout spec")
viperBindPFlag("Worker.TaskHandler.DefaultTimeout", cmdOpts.Worker.TaskHandler.DefaultCommandTimeout.String(), flag.Lookup("default-command-timeout"))

flag.Bool("cleanup-workspace-dir", cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir, "cleanup workspace dir or not when each command execution finished")
viperBindPFlag("Worker.TaskHandler.CleanupWorkspaceDir", strconv.FormatBool(cmdOpts.Worker.TaskHandler.CleanupWorkspaceDir), flag.Lookup("cleanup-workspace-dir"))

flag.Bool("exit-on-suspend", cmdOpts.Worker.ExitOnSuspend, "if set, worker exits when queue is suspended")
viperBindPFlag("Worker.ExitOnSuspend", strconv.FormatBool(cmdOpts.Worker.ExitOnSuspend), flag.Lookup("exit-on-suspend"))

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type WorkerSpec struct {
type TaskHandlerSpec struct {
DefaultCommandTimeout time.Duration `json:"defaultTimeout" yaml:"defaultTimeout" mapstructure:"defaultTimeout" default:"30m" validate:"required"`
Commands []string `json:"commands" yaml:"commands" default:"[\"cat\"]" validate:"required"`
CleanupWorkspaceDir bool `json:"cleanupWorkspace" yaml:"cleanupWorkspace" default:"false"`
}

type HeartBeatSpec struct {
Expand Down
16 changes: 12 additions & 4 deletions pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"io/ioutil"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -740,11 +741,18 @@ var _ = Describe("Backend", func() {
Expect(tasks[0].UID).NotTo(Equal(tasks[1].UID))

pending := mustPendingQueueLength(queue.UID.String(), 2)
Expect(pending[0]).To(Equal(tasks[1].UID))
Expect(pending[1]).To(Equal(tasks[0].UID))
taskUIDs := mustTasksSetSize(queue.UID.String(), 2)
Expect(taskUIDs[0]).To(Equal(tasks[1].UID))
Expect(taskUIDs[1]).To(Equal(tasks[0].UID))

sort.Strings(pending)
sort.Strings(taskUIDs)
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].UID < tasks[j].UID
})

Expect(pending[0]).To(Equal(tasks[0].UID))
Expect(pending[1]).To(Equal(tasks[1].UID))
Expect(taskUIDs[0]).To(Equal(tasks[0].UID))
Expect(taskUIDs[1]).To(Equal(tasks[1].UID))
assertKeyContents(backend.taskKey(queue.UID.String(), tasks[0].UID), tasks[0])
assertKeyContents(backend.taskKey(queue.UID.String(), tasks[1].UID), tasks[1])
})
Expand Down
20 changes: 18 additions & 2 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul
}

workspacePath, envvars, err := w.prepareTaskHandlerDirAndEnvvars(t)
if w.config.TaskHandler.CleanupWorkspaceDir {
defer func() {
if workspacePath == "" {
workspacePath = w.workspacePath(t)
}
if _, err := os.Stat(workspacePath); errors.Is(err, os.ErrNotExist) {
logger.Info().Str("workspaceDir", workspacePath).Msg("Skip cleaning up because workspace dir does not exist")
return
}
if err := os.RemoveAll(workspacePath); err != nil {
logger.Error().Err(err).Str("workspaceDir", workspacePath).Msg("Failed to cleanup workspace dir")
return
}
logger.Info().Str("workspaceDir", workspacePath).Msg("Cleaned up workspace dir")
}()
}
if err != nil {
msg := "Can't prepare workspace dir for task handler process"
result := task.TaskResult{
Expand Down Expand Up @@ -395,11 +411,11 @@ func (w *Worker) runCommand(logger zerolog.Logger, t *task.Task) (task.TaskResul
return result, postHooks
}

func (w *Worker) thisWorkerWorkDir() string {
func (w *Worker) WorkerWorkDir() string {
return filepath.Join(w.config.WorkDir, w.uid.String())
}
func (w *Worker) workspacePath(t *task.Task) string {
return filepath.Join(w.thisWorkerWorkDir(), t.Status.CurrentRecord.ProcessUID)
return filepath.Join(w.WorkerWorkDir(), t.Status.CurrentRecord.ProcessUID)
}

// {task workspace path}/
Expand Down
5 changes: 5 additions & 0 deletions pkg/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("Worker", func() {
Concurrency: 4,
TaskHandler: apiworker.TaskHandlerSpec{
DefaultCommandTimeout: 1 * time.Minute,
CleanupWorkspaceDir: true,
Commands: []string{
"bash",
"-c",
Expand Down Expand Up @@ -189,6 +190,10 @@ var _ = Describe("Worker", func() {
suspend.State = taskqueue.TaskQueueStateSuspend
testutil.MustQueueWithState(bcknd, suspend)
Eventually(workerDone, 30*time.Second).Should(Receive(BeNil()))

fileList, err := ioutil.ReadDir(worker.WorkerWorkDir())
Expect(err).NotTo(HaveOccurred())
Expect(len(fileList)).To(BeZero())
})
})
})

0 comments on commit 25b1b72

Please sign in to comment.