diff --git a/engine/engine.go b/engine/engine.go index 5e014937..a4277144 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -264,12 +264,14 @@ func (e Engine) launchResolution(publicID string, async bool, sm *semaphore.Weig } debugLogger.Debugf("Engine: Resolve() %s RECAP BEFORE resolve: state: %s, steps: %s", publicID, res.State, strings.Join(recap, ", ")) e.wg.Add(1) + + err = nil if async { go resolve(dbp, res, t, sm, e.wg, debugLogger) } else { - resolve(dbp, res, t, sm, e.wg, debugLogger) + err = resolve(dbp, res, t, sm, e.wg, debugLogger) } - return res, nil + return res, err } func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry) (*resolution.Resolution, *task.Task, error) { @@ -377,13 +379,17 @@ func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry return res, t, nil } -func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) { +func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) error { defer wg.Done() // keep track of steps which get executed during each run, to avoid looping+retrying the same failing step endlessly executedSteps := map[string]bool{} stepChan := make(chan *step.Step) - expectedMessages := runAvailableSteps(dbp, map[string]bool{}, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + expectedMessages, err := runAvailableSteps(dbp, map[string]bool{}, res, stepChan, executedSteps, []string{}, wg, debugLogger) + if err != nil { + debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) + return err + } for expectedMessages > 0 { debugLogger.Debugf("Engine: resolve() %s loop, %d expected steps", res.PublicID, expectedMessages) @@ -451,7 +457,13 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm // one less step to go expectedMessages-- // state change might unlock more steps for execution - expectedMessages += runAvailableSteps(dbp, modifiedSteps, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, modifiedSteps, res, stepChan, executedSteps, []string{}, wg, debugLogger) + if err != nil { + debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) + return err + } + + expectedMessages += newAvailableSteps // attempt to persist all changes in db if err := commit(dbp, res, t); err != nil { @@ -584,6 +596,8 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil { debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err) } + + return nil } func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error { @@ -639,7 +653,7 @@ func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) erro return dbp.Commit() } -func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, t *task.Task, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) int { +func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) (int, error) { av := availableSteps(modifiedSteps, res, executedSteps, expandedSteps, debugLogger) expandedSteps = []string{} preRunModifiedSteps := map[string]bool{} @@ -647,7 +661,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res select { case <-shutdownCtx.Done(): - return 0 + return 0, nil default: for name, s := range av { // prepare step @@ -688,7 +702,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res } // rebuild step dependency tree to include generated loop steps res.BuildStepTree() - commit(dbp, res, nil) + if err := commit(dbp, res, nil); err != nil { + return 0, err + } go func() { stepChan <- s }() } else { // regular step s.ResultValidate = jsonschema.Validator(s.Name, s.Schema) @@ -699,7 +715,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res if s.State != step.StateAfterrunError { res.SetStepState(s.Name, step.StateRunning) step.PreRun(s, res.Values, resolutionStateSetter(res, preRunModifiedSteps), executedSteps) - commit(dbp, res, nil) + if err := commit(dbp, res, nil); err != nil { + return 0, err + } } // run @@ -714,10 +732,15 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res // - loop step generated new steps if len(preRunModifiedSteps) > 0 || expanded > 0 { pruneSteps(res, preRunModifiedSteps) - return len(av) + runAvailableSteps(dbp, preRunModifiedSteps, res, t, stepChan, executedSteps, expandedSteps, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, preRunModifiedSteps, res, stepChan, executedSteps, expandedSteps, wg, debugLogger) + if err != nil { + return 0, err + } + + return len(av) + newAvailableSteps, nil } - return len(av) + return len(av), nil } func expandStep(s *step.Step, res *resolution.Resolution) {