From d18513d4b94251131c5188852aaa995e8d233f70 Mon Sep 17 00:00:00 2001 From: Dasha Komsa Date: Tue, 13 Feb 2024 23:38:48 +0000 Subject: [PATCH] parse job events to extract failure messages Signed-off-by: Dasha Komsa --- go.mod | 2 +- internal/ansible/ansible.go | 121 ++++++++++++++++-- internal/ansible/jobEvent.go | 27 ++++ internal/controller/ansibleRun/ansibleRun.go | 27 ++-- .../controller/ansibleRun/ansibleRun_test.go | 89 +++++++------ 5 files changed, 196 insertions(+), 70 deletions(-) create mode 100644 internal/ansible/jobEvent.go diff --git a/go.mod b/go.mod index 7c59ab0..915e3fa 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/crossplane/crossplane-runtime v0.19.2 github.com/crossplane/crossplane-tools v0.0.0-20220310165030-1f43fc12793e github.com/google/go-cmp v0.5.9 + github.com/google/uuid v1.3.0 github.com/spf13/afero v1.9.5 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.4.0 @@ -40,7 +41,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/internal/ansible/ansible.go b/internal/ansible/ansible.go index cb9fbc3..4bce9d8 100644 --- a/internal/ansible/ansible.go +++ b/internal/ansible/ansible.go @@ -27,6 +27,7 @@ import ( "os/exec" "os/user" "path/filepath" + "strings" "time" "github.com/apenella/go-ansible/pkg/stdoutcallback/results" @@ -35,7 +36,9 @@ import ( "github.com/crossplane-contrib/provider-ansible/pkg/runnerutil" "github.com/crossplane/crossplane-runtime/pkg/meta" "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" ) const ( @@ -130,10 +133,10 @@ func withBehaviorVars(behaviorVars map[string]string) runnerOption { } } -// withAnsibleEnvDir set the runner env/extravars dir. -func withAnsibleEnvDir(dir string) runnerOption { +// withWorkDir set the runner working dir. +func withWorkDir(dir string) runnerOption { return func(r *Runner) { - r.AnsibleEnvDir = dir + r.workDir = dir } } @@ -301,13 +304,14 @@ func (p Parameters) Init(ctx context.Context, cr *v1alpha1.AnsibleRun, behaviorV return nil, err } - return new(withPath(path), + r := new(withPath(path), withCmdFunc(cmdFunc), withBehaviorVars(behaviorVars), withAnsibleRunPolicy(rPolicy), - // TODO should be moved to connect() func - withAnsibleEnvDir(ansibleEnvDir), - ), nil + withWorkDir(p.WorkingDirPath), + ) + + return r, nil } // Runner struct holds the configuration to run the cmdFunc @@ -315,7 +319,7 @@ type Runner struct { Path string // absolute path on disk to a playbook or role depending on what cmdFunc expects behaviorVars map[string]string cmdFunc cmdFuncType // returns a Cmd that runs ansible-runner - AnsibleEnvDir string + workDir string checkMode bool AnsibleRunPolicy *RunPolicy } @@ -337,14 +341,22 @@ func (r *Runner) GetAnsibleRunPolicy() *RunPolicy { return r.AnsibleRunPolicy } +func (r *Runner) ansibleEnvDir() string { + return filepath.Clean(filepath.Join(r.workDir, "env")) +} + // Run execute the appropriate cmdFunc -func (r *Runner) Run() (*exec.Cmd, io.Reader, error) { +func (r *Runner) Run(ctx context.Context) (io.Reader, error) { var ( stdoutBuf bytes.Buffer stdoutWriter, stderrWriter io.Writer ) dc := r.cmdFunc(r.behaviorVars, r.checkMode) + + id := uuid.New().String() + dc.Args = append(dc.Args, "--ident", id) + if !r.checkMode { // for disabled checkMode dc.Stdout and dc.Stderr are respectfully // written to os.Stdout and os.Stdout for debugging purpose @@ -369,10 +381,95 @@ func (r *Runner) Run() (*exec.Cmd, io.Reader, error) { err := dc.Start() if err != nil { - return nil, nil, err + return nil, err + } + + if err := dc.Wait(); err != nil { + jobEventsDir := filepath.Clean(filepath.Join(r.workDir, "artifacts", id, "job_events")) + failureReason, reasonErr := extractFailureReason(jobEventsDir) + if reasonErr != nil { + log.FromContext(ctx).Error(err, "extracting ansible failure message") + } + + return nil, fmt.Errorf("%w: %s", err, failureReason) + } + + return &stdoutBuf, nil +} + +func extractFailureReason(eventsDir string) (string, error) { + evts, err := parseEvents(eventsDir) + if err != nil { + return "", fmt.Errorf("parsing job events: %w", err) + } + + var msgs []string + for _, evt := range evts { + switch evt.Event { + case eventTypeRunnerFailed: + m, err := runnerEventMessage(evt, "Failed") + if err != nil { + return "", err + } + msgs = append(msgs, m) + case eventTypeRunnerUnreachable: + m, err := runnerEventMessage(evt, "Unreachable") + if err != nil { + return "", err + } + msgs = append(msgs, m) + default: + } + } + + return strings.Join(msgs, "; "), nil +} + +func parseEvents(dir string) ([]jobEvent, error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("reading job events directory: %w", err) + } + + var evts []jobEvent + for _, file := range files { + evtBytes, err := os.ReadFile(filepath.Join(dir, file.Name())) + if err != nil { + return nil, fmt.Errorf("reading job event file %q: %w", file.Name(), err) + } + + var evt jobEvent + if err := json.Unmarshal(evtBytes, &evt); err != nil { + return nil, fmt.Errorf("unmarshaling job event from file %q: %w", file.Name(), err) + } + evts = append(evts, evt) } - return dc, &stdoutBuf, nil + return evts, nil +} + +func reunmarshal(data map[string]any, result any) error { + b, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("marshaling: %w", err) + } + + return json.Unmarshal(b, result) +} + +func runnerEventMessage(evt jobEvent, reason string) (string, error) { + var evtData runnerEventData + if err := reunmarshal(evt.EventData, &evtData); err != nil { + return "", fmt.Errorf("unmarshaling job event %s as runner event: %w", evt.UUID, err) + } + + return fmt.Sprintf("%s on play %q, task %q, host %q: %s", + reason, + evtData.Play, + evtData.Task, + evtData.Host, + evtData.Result.Msg), nil + } // selectRolePath will determines the role path @@ -421,7 +518,7 @@ func addFile(path string, content []byte) error { // WriteExtraVar write extra var to env/extravars under working directory // it creates a non-existent env/extravars file func (r *Runner) WriteExtraVar(extraVar map[string]interface{}) error { - extraVarsPath := filepath.Join(r.AnsibleEnvDir, "extravars") + extraVarsPath := filepath.Join(r.ansibleEnvDir(), "extravars") contentVars := make(map[string]interface{}) data, err := os.ReadFile(filepath.Clean(extraVarsPath)) if err != nil { diff --git a/internal/ansible/jobEvent.go b/internal/ansible/jobEvent.go new file mode 100644 index 0000000..3dfa655 --- /dev/null +++ b/internal/ansible/jobEvent.go @@ -0,0 +1,27 @@ +package ansible + +const ( + // https://github.com/ansible/awx/blob/devel/docs/job_events.md#job-event-relationships + // outlines various event types and the relationships between them + eventTypeRunnerFailed = "runner_on_failed" + eventTypeRunnerUnreachable = "runner_on_unreachable" +) + +// jobEvent represents [ansible-runner's job events](https://ansible.readthedocs.io/projects/runner/en/stable/intro/#artifactevents) +type jobEvent struct { + UUID string `json:"uuid"` + Stdout string `json:"stdout"` + Event string `json:"event"` + EventData map[string]any `json:"event_data"` +} + +type runnerEventData struct { + Play string `json:"play"` + Task string `json:"task"` + Host string `json:"host"` + Result runnerResult `json:"res"` +} + +type runnerResult struct { + Msg string `json:"msg"` +} diff --git a/internal/controller/ansibleRun/ansibleRun.go b/internal/controller/ansibleRun/ansibleRun.go index 3bfb6fa..7a5ea43 100644 --- a/internal/controller/ansibleRun/ansibleRun.go +++ b/internal/controller/ansibleRun/ansibleRun.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "os" - "os/exec" "path/filepath" "strings" "time" @@ -87,7 +86,7 @@ type ansibleRunner interface { GetAnsibleRunPolicy() *ansible.RunPolicy WriteExtraVar(extraVar map[string]interface{}) error EnableCheckMode(checkMode bool) - Run() (*exec.Cmd, io.Reader, error) + Run(ctx context.Context) (io.Reader, error) } // Setup adds a controller that reconciles AnsibleRun managed resources. @@ -353,13 +352,10 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex return managed.ExternalObservation{}, err } c.runner.EnableCheckMode(true) - dc, stdoutBuf, err := c.runner.Run() + stdoutBuf, err := c.runner.Run(ctx) if err != nil { return managed.ExternalObservation{}, err } - if err = dc.Wait(); err != nil { - return managed.ExternalObservation{}, err - } res, err := results.ParseJSONResultsStream(stdoutBuf) if err != nil { return managed.ExternalObservation{}, err @@ -395,7 +391,7 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext // disable checkMode for real action c.runner.EnableCheckMode(false) - if err := c.runAnsible(cr); err != nil { + if err := c.runAnsible(ctx, cr); err != nil { return managed.ExternalUpdate{}, fmt.Errorf("running ansible: %w", err) } @@ -403,7 +399,7 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext return managed.ExternalUpdate{ConnectionDetails: nil}, nil } -func (c *external) Delete(_ context.Context, mg resource.Managed) error { +func (c *external) Delete(ctx context.Context, mg resource.Managed) error { cr, ok := mg.(*v1alpha1.AnsibleRun) if !ok { return errors.New(errNotAnsibleRun) @@ -418,13 +414,10 @@ func (c *external) Delete(_ context.Context, mg resource.Managed) error { if err := c.runner.WriteExtraVar(nestedMap); err != nil { return err } - dc, _, err := c.runner.Run() + _, err := c.runner.Run(ctx) if err != nil { return err } - if err = dc.Wait(); err != nil { - return err - } return nil } @@ -472,7 +465,7 @@ func (c *external) handleLastApplied(ctx context.Context, lastParameters *v1alph return managed.ExternalObservation{}, err } - if err := c.runAnsible(desired); err != nil { + if err := c.runAnsible(ctx, desired); err != nil { return managed.ExternalObservation{}, fmt.Errorf("running ansible: %w", err) } @@ -483,13 +476,9 @@ func (c *external) handleLastApplied(ctx context.Context, lastParameters *v1alph return managed.ExternalObservation{ResourceExists: true, ResourceUpToDate: true}, nil } -func (c *external) runAnsible(cr *v1alpha1.AnsibleRun) error { - dc, _, err := c.runner.Run() +func (c *external) runAnsible(ctx context.Context, cr *v1alpha1.AnsibleRun) error { + _, err := c.runner.Run(ctx) if err != nil { - return err - } - - if err = dc.Wait(); err != nil { cond := xpv1.Unavailable() cond.Message = err.Error() cr.SetConditions(cond) diff --git a/internal/controller/ansibleRun/ansibleRun_test.go b/internal/controller/ansibleRun/ansibleRun_test.go index 25964f4..3e2a3f9 100644 --- a/internal/controller/ansibleRun/ansibleRun_test.go +++ b/internal/controller/ansibleRun/ansibleRun_test.go @@ -96,14 +96,15 @@ func (ps MockPs) AddFile(path string, content []byte) error { } type MockRunner struct { - MockRun func() (*exec.Cmd, io.Reader, error) + MockRun func(ctx context.Context) (io.Reader, error) MockWriteExtraVar func(extraVar map[string]interface{}) error MockAnsibleRunPolicy func() *ansible.RunPolicy MockEnableCheckMode func(checkMode bool) + MockFailureReason func() (string, error) } -func (r MockRunner) Run() (*exec.Cmd, io.Reader, error) { - return r.MockRun() +func (r MockRunner) Run(ctx context.Context) (io.Reader, error) { + return r.MockRun(ctx) } func (r MockRunner) WriteExtraVar(extraVar map[string]interface{}) error { @@ -118,6 +119,10 @@ func (r MockRunner) EnableCheckMode(checkMode bool) { r.MockEnableCheckMode(checkMode) } +func (r MockRunner) FailureReason() (string, error) { + return r.MockFailureReason() +} + func TestConnect(t *testing.T) { errBoom := errors.New("boom") pbCreds := "credentials" @@ -623,8 +628,8 @@ func TestObserve(t *testing.T) { MockWriteExtraVar: func(extraVar map[string]interface{}) error { return nil }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, fmt.Errorf("run should not have been called") + MockRun: func(ctx context.Context) (io.Reader, error) { + return nil, fmt.Errorf("run should not have been called") }, }, }, @@ -651,9 +656,10 @@ func TestObserve(t *testing.T) { MockWriteExtraVar: func(extraVar map[string]interface{}) error { return nil }, - MockRun: func() (*exec.Cmd, io.Reader, error) { + MockRun: func(ctx context.Context) (io.Reader, error) { cmd := exec.Command("ls") - return cmd, nil, cmd.Start() + cmd.Start() + return nil, cmd.Wait() }, }, }, @@ -676,8 +682,8 @@ func TestObserve(t *testing.T) { MockWriteExtraVar: func(extraVar map[string]interface{}) error { return nil }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, errBoom + MockRun: func(context.Context) (io.Reader, error) { + return nil, errBoom }, MockEnableCheckMode: func(checkMode bool) { @@ -709,6 +715,8 @@ func TestObserve(t *testing.T) { func TestCreateOrUpdate(t *testing.T) { errBoom := errors.New("boom") + unavaliableCond := xpv1.Unavailable() + unavaliableCond.Message = errBoom.Error() type fields struct { kube client.Client @@ -754,19 +762,21 @@ func TestCreateOrUpdate(t *testing.T) { } }, MockEnableCheckMode: func(checkMode bool) {}, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, errBoom + MockRun: func(context.Context) (io.Reader, error) { + return nil, errBoom }, }, }, want: want{ - err: fmt.Errorf("running ansible: %w", errBoom), + err: fmt.Errorf("running ansible: %w", errBoom), + conditions: []xpv1.Condition{unavaliableCond}, }, }, "SuccessObserveAndDelete": { reason: "We should not return an error when we successfully delete the AnsibleRun resource", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -776,11 +786,10 @@ func TestCreateOrUpdate(t *testing.T) { } }, MockEnableCheckMode: func(checkMode bool) {}, - MockRun: func() (*exec.Cmd, io.Reader, error) { - ctx := context.Background() + MockRun: func(ctx context.Context) (io.Reader, error) { cmd := exec.CommandContext(ctx, "ls") cmd.Start() - return cmd, nil, nil + return nil, cmd.Wait() }, }, }, @@ -791,7 +800,8 @@ func TestCreateOrUpdate(t *testing.T) { "RunErrorWithCheckWhenObservePolicy": { reason: "We should return any error we encounter when running the runner", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -801,19 +811,21 @@ func TestCreateOrUpdate(t *testing.T) { } }, MockEnableCheckMode: func(checkMode bool) {}, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, errBoom + MockRun: func(context.Context) (io.Reader, error) { + return nil, errBoom }, }, }, want: want{ - err: fmt.Errorf("running ansible: %w", errBoom), + err: fmt.Errorf("running ansible: %w", errBoom), + conditions: []xpv1.Condition{unavaliableCond}, }, }, "SuccessCheckWhenObserve": { reason: "We should not return an error when we successfully delete the AnsibleRun resource", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -823,11 +835,10 @@ func TestCreateOrUpdate(t *testing.T) { } }, MockEnableCheckMode: func(checkMode bool) {}, - MockRun: func() (*exec.Cmd, io.Reader, error) { - ctx := context.Background() + MockRun: func(ctx context.Context) (io.Reader, error) { cmd := exec.CommandContext(ctx, "ls") cmd.Start() - return cmd, nil, nil + return nil, cmd.Wait() }, }, }, @@ -911,7 +922,8 @@ func TestDelete(t *testing.T) { "RunErrorWithObserveAndDeletePolicy": { reason: "We should return any error we encounter when running the runner", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -923,8 +935,8 @@ func TestDelete(t *testing.T) { Name: "ObserveAndDelete", } }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, errBoom + MockRun: func(context.Context) (io.Reader, error) { + return nil, errBoom }, }, }, @@ -933,7 +945,8 @@ func TestDelete(t *testing.T) { "SuccessObserveAndDelete": { reason: "We should not return an error when we successfully delete the AnsibleRun resource", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -945,11 +958,10 @@ func TestDelete(t *testing.T) { Name: "ObserveAndDelete", } }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - ctx := context.Background() + MockRun: func(ctx context.Context) (io.Reader, error) { cmd := exec.CommandContext(ctx, "ls") cmd.Start() - return cmd, nil, nil + return nil, cmd.Wait() }, }, }, @@ -958,7 +970,8 @@ func TestDelete(t *testing.T) { "RunErrorWithCheckWhenObservePolicy": { reason: "We should return any error we encounter when running the runner", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -970,8 +983,8 @@ func TestDelete(t *testing.T) { Name: "CheckWhenObserve", } }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - return nil, nil, errBoom + MockRun: func(context.Context) (io.Reader, error) { + return nil, errBoom }, }, }, @@ -980,7 +993,8 @@ func TestDelete(t *testing.T) { "SuccessCheckWhenObserve": { reason: "We should not return an error when we successfully delete the AnsibleRun resource", args: args{ - mg: &v1alpha1.AnsibleRun{}, + ctx: context.Background(), + mg: &v1alpha1.AnsibleRun{}, }, fields: fields{ runner: &MockRunner{ @@ -992,11 +1006,10 @@ func TestDelete(t *testing.T) { Name: "CheckWhenObserve", } }, - MockRun: func() (*exec.Cmd, io.Reader, error) { - ctx := context.Background() + MockRun: func(ctx context.Context) (io.Reader, error) { cmd := exec.CommandContext(ctx, "ls") cmd.Start() - return cmd, nil, nil + return nil, cmd.Wait() }, }, },