Skip to content

Commit

Permalink
parse job events to extract failure messages
Browse files Browse the repository at this point in the history
Signed-off-by: Dasha Komsa <[email protected]>
  • Loading branch information
d-honeybadger committed Feb 15, 2024
1 parent fbd8487 commit 29261a9
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 73 deletions.
130 changes: 114 additions & 16 deletions internal/ansible/ansible.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os/exec"
"os/user"
"path/filepath"
"strings"
"time"

"github.com/apenella/go-ansible/pkg/stdoutcallback/results"
Expand All @@ -35,7 +36,9 @@ import (
"github.com/crossplane-contrib/provider-ansible/pkg/runnerutil"
"github.com/crossplane/crossplane-runtime/pkg/meta"
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
Expand Down Expand Up @@ -130,13 +133,6 @@ func withBehaviorVars(behaviorVars map[string]string) runnerOption {
}
}

// withAnsibleEnvDir set the runner env/extravars dir.
func withAnsibleEnvDir(dir string) runnerOption {
return func(r *Runner) {
r.AnsibleEnvDir = dir
}
}

// withAnsibleRunPolicy set the runner Policy to execute against.
func withAnsibleRunPolicy(p *RunPolicy) runnerOption {
return func(r *Runner) {
Expand Down Expand Up @@ -301,21 +297,23 @@ func (p Parameters) Init(ctx context.Context, cr *v1alpha1.AnsibleRun, behaviorV
return nil, err
}

return new(withPath(path),
r := new(withPath(path),
withCmdFunc(cmdFunc),
withBehaviorVars(behaviorVars),
withAnsibleRunPolicy(rPolicy),
// TODO should be moved to connect() func
withAnsibleEnvDir(ansibleEnvDir),
), nil
)

r.workDir = p.WorkingDirPath

return r, nil
}

// Runner struct holds the configuration to run the cmdFunc
type Runner struct {
Path string // absolute path on disk to a playbook or role depending on what cmdFunc expects
behaviorVars map[string]string
cmdFunc cmdFuncType // returns a Cmd that runs ansible-runner
AnsibleEnvDir string
workDir string
checkMode bool
AnsibleRunPolicy *RunPolicy
}
Expand All @@ -337,14 +335,21 @@ func (r *Runner) GetAnsibleRunPolicy() *RunPolicy {
return r.AnsibleRunPolicy
}

func (r *Runner) ansibleEnvDir() string {
return filepath.Clean(filepath.Join(r.workDir, "env"))
}

// Run execute the appropriate cmdFunc
func (r *Runner) Run() (*exec.Cmd, io.Reader, error) {
func (r *Runner) Run(ctx context.Context) (io.Reader, error) {
var (
stdoutBuf bytes.Buffer
stdoutWriter, stderrWriter io.Writer
)

id := uuid.New().String()

dc := r.cmdFunc(r.behaviorVars, r.checkMode)
dc.Args = append(dc.Args, "-i", id)
if !r.checkMode {
// for disabled checkMode dc.Stdout and dc.Stderr are respectfully
// written to os.Stdout and os.Stdout for debugging purpose
Expand All @@ -369,10 +374,103 @@ func (r *Runner) Run() (*exec.Cmd, io.Reader, error) {

err := dc.Start()
if err != nil {
return nil, nil, err
return nil, err
}

if err := dc.Wait(); err != nil {
jobEventsDir := filepath.Clean(filepath.Join(r.workDir, "artifacts", id, "job_events"))
failureReason, reasonErr := extractFailureReason(jobEventsDir)
if reasonErr != nil {
log.FromContext(ctx).Error(err, "extracting ansible failure message")
}

return nil, fmt.Errorf("%w: %s", err, failureReason)
}

// TODO: need to start removing artifacts directories - important w/ retries
// cause failed run dirs will be accumulating

return &stdoutBuf, nil
}

const (
eventTypeRunnerFailed = "runner_on_failed"
eventTypeRunnerUnreachable = "runner_on_unreachable"
)

func extractFailureReason(eventsDir string) (string, error) {
evts, err := parseEvents(eventsDir)
if err != nil {
return "", fmt.Errorf("parsing job events: %w", err)
}

var msgs []string
for _, evt := range evts {
switch evt.Event {
case eventTypeRunnerFailed:
m, err := runnerEventMessage(evt, "Failed")
if err != nil {
return "", err
}
msgs = append(msgs, m)
case eventTypeRunnerUnreachable:
m, err := runnerEventMessage(evt, "Unreachable")
if err != nil {
return "", err
}
msgs = append(msgs, m)
default:
}
}

return strings.Join(msgs, "; "), nil
}

func parseEvents(dir string) ([]jobEvent, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("reading job events directory: %w", err)
}

return dc, &stdoutBuf, nil
var evts []jobEvent
for _, file := range files {
evtBytes, err := os.ReadFile(filepath.Join(dir, file.Name()))
if err != nil {
return nil, fmt.Errorf("reading job event file %q: %w", file.Name(), err)
}

var evt jobEvent
if err := json.Unmarshal(evtBytes, &evt); err != nil {
return nil, fmt.Errorf("unmarshaling job event from file %q: %w", file.Name(), err)
}
evts = append(evts, evt)
}

return evts, nil
}

func reunmarshal(data map[string]any, result any) error {
b, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshaling: %w", err)
}

return json.Unmarshal(b, result)
}

func runnerEventMessage(evt jobEvent, reason string) (string, error) {
var evtData runnerEventData
if err := reunmarshal(evt.EventData, &evtData); err != nil {
return "", fmt.Errorf("unmarshaling job event %s as runner event: %w", evt.UUID, err)
}

return fmt.Sprintf("%s on play %q, task %q, host %q: %s",
reason,
evtData.Play,
evtData.Task,
evtData.Host,
evtData.Result.Msg), nil

}

// selectRolePath will determines the role path
Expand Down Expand Up @@ -421,7 +519,7 @@ func addFile(path string, content []byte) error {
// WriteExtraVar write extra var to env/extravars under working directory
// it creates a non-existent env/extravars file
func (r *Runner) WriteExtraVar(extraVar map[string]interface{}) error {
extraVarsPath := filepath.Join(r.AnsibleEnvDir, "extravars")
extraVarsPath := filepath.Join(r.ansibleEnvDir(), "extravars")
contentVars := make(map[string]interface{})
data, err := os.ReadFile(filepath.Clean(extraVarsPath))
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions internal/ansible/jobEvent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ansible

// jobEvent represents [ansible-runner's job events](https://ansible.readthedocs.io/projects/runner/en/stable/intro/#artifactevents)
type jobEvent struct {
UUID string `json:"uuid"`
Stdout string `json:"stdout"`

// https://github.com/ansible/awx/blob/devel/docs/job_events.md#job-event-relationships
// outlines various event types and the relationships between
Event string `json:"event"`
EventData map[string]any `json:"event_data"`
}

type runnerEventData struct {
Play string `json:"play"`
Task string `json:"task"`
Host string `json:"host"`
Result runnerResult `json:"res"`
}

type runnerResult struct {
Msg string `json:"msg"`
}
27 changes: 8 additions & 19 deletions internal/controller/ansibleRun/ansibleRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -87,7 +86,7 @@ type ansibleRunner interface {
GetAnsibleRunPolicy() *ansible.RunPolicy
WriteExtraVar(extraVar map[string]interface{}) error
EnableCheckMode(checkMode bool)
Run() (*exec.Cmd, io.Reader, error)
Run(ctx context.Context) (io.Reader, error)
}

// Setup adds a controller that reconciles AnsibleRun managed resources.
Expand Down Expand Up @@ -353,13 +352,10 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
return managed.ExternalObservation{}, err
}
c.runner.EnableCheckMode(true)
dc, stdoutBuf, err := c.runner.Run()
stdoutBuf, err := c.runner.Run(ctx)
if err != nil {
return managed.ExternalObservation{}, err
}
if err = dc.Wait(); err != nil {
return managed.ExternalObservation{}, err
}
res, err := results.ParseJSONResultsStream(stdoutBuf)
if err != nil {
return managed.ExternalObservation{}, err
Expand Down Expand Up @@ -395,15 +391,15 @@ func (c *external) Update(ctx context.Context, mg resource.Managed) (managed.Ext

// disable checkMode for real action
c.runner.EnableCheckMode(false)
if err := c.runAnsible(cr); err != nil {
if err := c.runAnsible(ctx, cr); err != nil {
return managed.ExternalUpdate{}, fmt.Errorf("running ansible: %w", err)
}

// TODO handle ConnectionDetails https://github.com/multicloudlab/crossplane-provider-ansible/pull/74#discussion_r888467991
return managed.ExternalUpdate{ConnectionDetails: nil}, nil
}

func (c *external) Delete(_ context.Context, mg resource.Managed) error {
func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
cr, ok := mg.(*v1alpha1.AnsibleRun)
if !ok {
return errors.New(errNotAnsibleRun)
Expand All @@ -418,13 +414,10 @@ func (c *external) Delete(_ context.Context, mg resource.Managed) error {
if err := c.runner.WriteExtraVar(nestedMap); err != nil {
return err
}
dc, _, err := c.runner.Run()
_, err := c.runner.Run(ctx)
if err != nil {
return err
}
if err = dc.Wait(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -472,7 +465,7 @@ func (c *external) handleLastApplied(ctx context.Context, lastParameters *v1alph
return managed.ExternalObservation{}, err
}

if err := c.runAnsible(desired); err != nil {
if err := c.runAnsible(ctx, desired); err != nil {
return managed.ExternalObservation{}, fmt.Errorf("running ansible: %w", err)
}

Expand All @@ -483,13 +476,9 @@ func (c *external) handleLastApplied(ctx context.Context, lastParameters *v1alph
return managed.ExternalObservation{ResourceExists: true, ResourceUpToDate: true}, nil
}

func (c *external) runAnsible(cr *v1alpha1.AnsibleRun) error {
dc, _, err := c.runner.Run()
func (c *external) runAnsible(ctx context.Context, cr *v1alpha1.AnsibleRun) error {
_, err := c.runner.Run(ctx)
if err != nil {
return err
}

if err = dc.Wait(); err != nil {
cond := xpv1.Unavailable()
cond.Message = err.Error()
cr.SetConditions(cond)
Expand Down
Loading

0 comments on commit 29261a9

Please sign in to comment.