From 3112413ca33f7fc23b3894e4f5ac77bcbfc11748 Mon Sep 17 00:00:00 2001 From: Romain Beuque <556072+rbeuque74@users.noreply.github.com> Date: Tue, 30 Nov 2021 12:56:22 +0100 Subject: [PATCH 1/2] fix: failure to commit leads to inconsistencies between workflow and database MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a µTask worker fails to commit its current state to the database, it was sometimes not stopping the execution of its resolution. This leads to inconsistencies, as the step was currently RUNNING, but database state indicates TODO. In that way, if the worker crashes, then the new worker will restart with wrong information, inducing workflow issues. Signed-off-by: Romain Beuque <556072+rbeuque74@users.noreply.github.com> --- engine/engine.go | 45 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 5e014937..4ec8c678 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -264,12 +264,14 @@ func (e Engine) launchResolution(publicID string, async bool, sm *semaphore.Weig } debugLogger.Debugf("Engine: Resolve() %s RECAP BEFORE resolve: state: %s, steps: %s", publicID, res.State, strings.Join(recap, ", ")) e.wg.Add(1) + + err = nil if async { go resolve(dbp, res, t, sm, e.wg, debugLogger) } else { - resolve(dbp, res, t, sm, e.wg, debugLogger) + err = resolve(dbp, res, t, sm, e.wg, debugLogger) } - return res, nil + return res, err } func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry) (*resolution.Resolution, *task.Task, error) { @@ -377,13 +379,17 @@ func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry return res, t, nil } -func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) { +func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) error { defer wg.Done() // keep track of steps which get executed during each run, to avoid looping+retrying the same failing step endlessly executedSteps := map[string]bool{} stepChan := make(chan *step.Step) - expectedMessages := runAvailableSteps(dbp, map[string]bool{}, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + expectedMessages, err := runAvailableSteps(dbp, map[string]bool{}, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + if err != nil { + debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) + return err + } for expectedMessages > 0 { debugLogger.Debugf("Engine: resolve() %s loop, %d expected steps", res.PublicID, expectedMessages) @@ -451,7 +457,13 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm // one less step to go expectedMessages-- // state change might unlock more steps for execution - expectedMessages += runAvailableSteps(dbp, modifiedSteps, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, modifiedSteps, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + if err != nil { + debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) + return err + } + + expectedMessages += newAvailableSteps // attempt to persist all changes in db if err := commit(dbp, res, t); err != nil { @@ -584,6 +596,8 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil { debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err) } + + return nil } func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error { @@ -639,7 +653,7 @@ func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) erro return dbp.Commit() } -func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, t *task.Task, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) int { +func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, t *task.Task, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) (int, error) { av := availableSteps(modifiedSteps, res, executedSteps, expandedSteps, debugLogger) expandedSteps = []string{} preRunModifiedSteps := map[string]bool{} @@ -647,7 +661,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res select { case <-shutdownCtx.Done(): - return 0 + return 0, nil default: for name, s := range av { // prepare step @@ -688,7 +702,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res } // rebuild step dependency tree to include generated loop steps res.BuildStepTree() - commit(dbp, res, nil) + if err := commit(dbp, res, nil); err != nil { + return 0, err + } go func() { stepChan <- s }() } else { // regular step s.ResultValidate = jsonschema.Validator(s.Name, s.Schema) @@ -699,7 +715,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res if s.State != step.StateAfterrunError { res.SetStepState(s.Name, step.StateRunning) step.PreRun(s, res.Values, resolutionStateSetter(res, preRunModifiedSteps), executedSteps) - commit(dbp, res, nil) + if err := commit(dbp, res, nil); err != nil { + return 0, err + } } // run @@ -714,10 +732,15 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res // - loop step generated new steps if len(preRunModifiedSteps) > 0 || expanded > 0 { pruneSteps(res, preRunModifiedSteps) - return len(av) + runAvailableSteps(dbp, preRunModifiedSteps, res, t, stepChan, executedSteps, expandedSteps, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, preRunModifiedSteps, res, t, stepChan, executedSteps, expandedSteps, wg, debugLogger) + if err != nil { + return 0, err + } + + return len(av) + newAvailableSteps, nil } - return len(av) + return len(av), nil } func expandStep(s *step.Step, res *resolution.Resolution) { From 28d9455b8edfc306034c5acedc0c2ee704e32593 Mon Sep 17 00:00:00 2001 From: Romain Beuque <556072+rbeuque74@users.noreply.github.com> Date: Tue, 30 Nov 2021 13:05:57 +0100 Subject: [PATCH 2/2] fix: remove unused runAvailableSteps arguments Signed-off-by: Romain Beuque <556072+rbeuque74@users.noreply.github.com> --- engine/engine.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 4ec8c678..a4277144 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -385,7 +385,7 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm executedSteps := map[string]bool{} stepChan := make(chan *step.Step) - expectedMessages, err := runAvailableSteps(dbp, map[string]bool{}, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + expectedMessages, err := runAvailableSteps(dbp, map[string]bool{}, res, stepChan, executedSteps, []string{}, wg, debugLogger) if err != nil { debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) return err @@ -457,7 +457,7 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm // one less step to go expectedMessages-- // state change might unlock more steps for execution - newAvailableSteps, err := runAvailableSteps(dbp, modifiedSteps, res, t, stepChan, executedSteps, []string{}, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, modifiedSteps, res, stepChan, executedSteps, []string{}, wg, debugLogger) if err != nil { debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err) return err @@ -653,7 +653,7 @@ func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) erro return dbp.Commit() } -func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, t *task.Task, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) (int, error) { +func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) (int, error) { av := availableSteps(modifiedSteps, res, executedSteps, expandedSteps, debugLogger) expandedSteps = []string{} preRunModifiedSteps := map[string]bool{} @@ -732,7 +732,7 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res // - loop step generated new steps if len(preRunModifiedSteps) > 0 || expanded > 0 { pruneSteps(res, preRunModifiedSteps) - newAvailableSteps, err := runAvailableSteps(dbp, preRunModifiedSteps, res, t, stepChan, executedSteps, expandedSteps, wg, debugLogger) + newAvailableSteps, err := runAvailableSteps(dbp, preRunModifiedSteps, res, stepChan, executedSteps, expandedSteps, wg, debugLogger) if err != nil { return 0, err }