diff --git a/internal/agent/transport/file.go b/internal/agent/transport/file.go new file mode 100644 index 000000000..f5369d445 --- /dev/null +++ b/internal/agent/transport/file.go @@ -0,0 +1,106 @@ +package transport + +import ( + "context" + "os" + "path/filepath" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" + "gopkg.in/yaml.v2" +) + +type File struct { + // Log is a logger for debugging. + Log logr.Logger + + // Dir is the directory to be monitored. + Dir string + + // Tick defines the duration to wait before inspecting the directory for new workflow files. + // It defaulst to 5 seconds. + Tick time.Duration + + // cache is used to track handled workflows. + cache map[string]struct{} +} + +// Start begins watching f.Dir for files. When it finds a file it hasn't handled before, it +// attempts to parse it and offload to the handler. It will run workflows once where a workflow +// is determined by its file name. +func (f *File) Start(ctx context.Context, _ string, handler WorkflowHandler) error { + if f.Tick == 0 { + f.Tick = 5 * time.Second + } + + f.cache = map[string]struct{}{} + + ticker := time.NewTicker(f.Tick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + + case <-ticker.C: + entries, err := os.ReadDir(f.Dir) + if err != nil { + return err + } + + // When we experience an error with a particular file we don't want to return an + // error because we want to process as much as possible. Instead, we ignore the problem + // and hope the user fixes the issue in a future tick. + for _, e := range entries { + if e.IsDir() || !hasYAMLExt(e.Name()) { + continue + } + + // Ensure we haven't handled this file before. + if _, handled := f.cache[e.Name()]; handled { + continue + } + + f.Log.Info("Found new workflow file", "file", e.Name()) + + fh, err := os.Open(filepath.Join(f.Dir, e.Name())) + if err != nil { + f.Log.Error(err, "Could not open file", "file", e.Name()) + continue + } + + var wrkflow workflow.Workflow + if err := yaml.NewDecoder(fh).Decode(&wrkflow); err != nil { + f.Log.Error(err, "Invalid workflow YAML", "file", e.Name()) + continue + } + + // Add the file to the cache so we don't reprocess it on the next iteration. + f.cache[e.Name()] = struct{}{} + + handler.HandleWorkflow(ctx, wrkflow, f) + } + } + } +} + +func hasYAMLExt(path string) bool { + ext := strings.Trim(filepath.Ext(path), ".") + for _, v := range []string{"yml", "yaml"} { + if v == ext { + return true + } + } + return false +} + +func (f *File) RecordEvent(_ context.Context, e event.Event) error { + // Noop because we don't particularly care about events for File based transports. Maybe + // we'll record this in a dedicated file one day. + f.Log.Info("Recording event", "event", e.GetName()) + return nil +} diff --git a/internal/agent/transport/file_test.go b/internal/agent/transport/file_test.go new file mode 100644 index 000000000..87bb9138f --- /dev/null +++ b/internal/agent/transport/file_test.go @@ -0,0 +1,97 @@ +package transport_test + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/zerologr" + "github.com/google/go-cmp/cmp" + "github.com/rs/zerolog" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/transport" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +func TestFile(t *testing.T) { + logger := zerolog.New(zerolog.NewConsoleWriter()) + + expect := expectWorkflows{ + Workflows: []workflow.Workflow{ + { + ID: "test-workflow-id", + Actions: []workflow.Action{ + { + ID: "test-action-1", + Name: "my test action", + Image: "docker.io/hub/alpine", + Cmd: "sh -c", + Args: []string{"echo", "action 1"}, + Env: map[string]string{"foo": "bar"}, + Volumes: []string{"mount:/foo/bar:ro"}, + NetworkNamespace: "custom-namespace", + }, + { + ID: "test-action-2", + Name: "my test action", + Image: "docker.io/hub/alpine", + Cmd: "sh -c", + Args: []string{"echo", "action 2"}, + Env: map[string]string{"foo": "bar"}, + Volumes: []string{"mount:/foo/bar:ro"}, + NetworkNamespace: "custom-namespace", + }, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + handler := &transport.WorkflowHandlerMock{ + HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) { + if expect.End() { + t.Fatalf("Received unexpected workflow:\nid=%v", workflow.ID) + } + + next := expect.Next() + if !cmp.Equal(*next, workflow) { + t.Fatalf("Workflow diff:\n%v", cmp.Diff(next, workflow)) + } + }, + } + + f := transport.File{ + Log: zerologr.New(&logger), + Dir: "./testdata/workflows", + Tick: 1 * time.Second, + } + + err := f.Start(ctx, "agent_id", handler) + if err != nil { + t.Fatal(err) + } +} + +type expectWorkflows struct { + Workflows []workflow.Workflow + idx int +} + +func (e *expectWorkflows) Next() *workflow.Workflow { + if len(e.Workflows) == 0 { + return nil + } + + if len(e.Workflows) < e.idx { + return nil + } + + defer func() { e.idx++ }() + return &e.Workflows[e.idx] +} + +func (e *expectWorkflows) End() bool { + return len(e.Workflows) < e.idx +} diff --git a/internal/agent/transport/grpc_test.go b/internal/agent/transport/grpc_test.go index 42200ee45..80660a6cb 100644 --- a/internal/agent/transport/grpc_test.go +++ b/internal/agent/transport/grpc_test.go @@ -2,13 +2,11 @@ package transport_test import ( "context" - "fmt" "io" "sync" "testing" "github.com/go-logr/zerologr" - "github.com/kr/pretty" "github.com/rs/zerolog" "github.com/tinkerbell/tink/internal/agent/event" "github.com/tinkerbell/tink/internal/agent/transport" @@ -58,7 +56,6 @@ func TestGRPC(t *testing.T) { handler := &transport.WorkflowHandlerMock{ HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) { defer wg.Done() - fmt.Println("handling") close(responses) }, } @@ -71,6 +68,4 @@ func TestGRPC(t *testing.T) { } wg.Wait() - - pretty.Println(handler.HandleWorkflowCalls()) } diff --git a/internal/agent/transport/testdata/workflows/workflow.yml b/internal/agent/transport/testdata/workflows/workflow.yml new file mode 100644 index 000000000..2099ba986 --- /dev/null +++ b/internal/agent/transport/testdata/workflows/workflow.yml @@ -0,0 +1,23 @@ +id: "test-workflow-id" +actions: +- id: "test-action-1" + name: "my test action" + image: "docker.io/hub/alpine" + cmd: "sh -c" + args: ["echo", "action 1"] + env: + foo: bar + volumes: + - mount:/foo/bar:ro + networkNamespace: "custom-namespace" +- id: "test-action-2" + name: "my test action" + image: "docker.io/hub/alpine" + cmd: "sh -c" + args: ["echo", "action 2"] + env: + foo: bar + volumes: + - mount:/foo/bar:ro + networkNamespace: "custom-namespace" + diff --git a/internal/agent/workflow/workflow.go b/internal/agent/workflow/workflow.go index 3082962a0..38408084e 100644 --- a/internal/agent/workflow/workflow.go +++ b/internal/agent/workflow/workflow.go @@ -5,8 +5,8 @@ package workflow // Workflow represents a runnable workflow for the Handler. type Workflow struct { // Do we need a workflow name? Does that even come down in the proto definition? - ID string - Actions []Action + ID string `yaml:"id"` + Actions []Action `yaml:"actions"` } func (w Workflow) String() string { @@ -15,14 +15,14 @@ func (w Workflow) String() string { // Action represents an individually runnable action. type Action struct { - ID string - Name string - Image string - Cmd string - Args []string - Env map[string]string - Volumes []string - NetworkNamespace string + ID string `yaml:"id"` + Name string `yaml:"name"` + Image string `yaml:"image"` + Cmd string `yaml:"cmd"` + Args []string `yaml:"args"` + Env map[string]string `yaml:"env"` + Volumes []string `yaml:"volumes"` + NetworkNamespace string `yaml:"networkNamespace"` } func (a Action) String() string {