Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failure to commit leads to inconsistencies between workflow and database #302

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ func (e Engine) launchResolution(publicID string, async bool, sm *semaphore.Weig
}
debugLogger.Debugf("Engine: Resolve() %s RECAP BEFORE resolve: state: %s, steps: %s", publicID, res.State, strings.Join(recap, ", "))
e.wg.Add(1)

err = nil
if async {
go resolve(dbp, res, t, sm, e.wg, debugLogger)
} else {
resolve(dbp, res, t, sm, e.wg, debugLogger)
err = resolve(dbp, res, t, sm, e.wg, debugLogger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be replaced with return resolve(...), nil, since there is nothing to do after the if statement, which spares the declaration of err above.

}
return res, nil
return res, err
}

func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry) (*resolution.Resolution, *task.Task, error) {
Expand Down Expand Up @@ -377,13 +379,17 @@ func initialize(dbp zesty.DBProvider, publicID string, debugLogger *logrus.Entry
return res, t, nil
}

func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) {
func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm *semaphore.Weighted, wg *sync.WaitGroup, debugLogger *logrus.Entry) error {
defer wg.Done()
// keep track of steps which get executed during each run, to avoid looping+retrying the same failing step endlessly
executedSteps := map[string]bool{}
stepChan := make(chan *step.Step)

expectedMessages := runAvailableSteps(dbp, map[string]bool{}, res, t, stepChan, executedSteps, []string{}, wg, debugLogger)
expectedMessages, err := runAvailableSteps(dbp, map[string]bool{}, res, stepChan, executedSteps, []string{}, wg, debugLogger)
if err != nil {
debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err)
return err
}

for expectedMessages > 0 {
debugLogger.Debugf("Engine: resolve() %s loop, %d expected steps", res.PublicID, expectedMessages)
Expand Down Expand Up @@ -451,7 +457,13 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
// one less step to go
expectedMessages--
// state change might unlock more steps for execution
expectedMessages += runAvailableSteps(dbp, modifiedSteps, res, t, stepChan, executedSteps, []string{}, wg, debugLogger)
newAvailableSteps, err := runAvailableSteps(dbp, modifiedSteps, res, stepChan, executedSteps, []string{}, wg, debugLogger)
if err != nil {
debugLogger.Debugf("Engine: resolve() %s loop, ERROR WHILE runAvailableSteps: %s", res.PublicID, err)
return err
}

expectedMessages += newAvailableSteps

// attempt to persist all changes in db
if err := commit(dbp, res, t); err != nil {
Expand Down Expand Up @@ -584,6 +596,8 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil {
debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err)
}

return nil
}

func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error {
Expand Down Expand Up @@ -639,15 +653,15 @@ func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) erro
return dbp.Commit()
}

func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, t *task.Task, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) int {
func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res *resolution.Resolution, stepChan chan<- *step.Step, executedSteps map[string]bool, expandedSteps []string, wg *sync.WaitGroup, debugLogger *logrus.Entry) (int, error) {
av := availableSteps(modifiedSteps, res, executedSteps, expandedSteps, debugLogger)
expandedSteps = []string{}
preRunModifiedSteps := map[string]bool{}
expanded := 0

select {
case <-shutdownCtx.Done():
return 0
return 0, nil
default:
for name, s := range av {
// prepare step
Expand Down Expand Up @@ -688,7 +702,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
}
// rebuild step dependency tree to include generated loop steps
res.BuildStepTree()
commit(dbp, res, nil)
if err := commit(dbp, res, nil); err != nil {
return 0, err
}
go func() { stepChan <- s }()
} else { // regular step
s.ResultValidate = jsonschema.Validator(s.Name, s.Schema)
Expand All @@ -699,7 +715,9 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
if s.State != step.StateAfterrunError {
res.SetStepState(s.Name, step.StateRunning)
step.PreRun(s, res.Values, resolutionStateSetter(res, preRunModifiedSteps), executedSteps)
commit(dbp, res, nil)
if err := commit(dbp, res, nil); err != nil {
return 0, err
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem in my current implementation is that exiting the runAvailableSteps here, will close the main loop, while some step might still be running.

Not sure how we should handle this.

}
}

// run
Expand All @@ -714,10 +732,15 @@ func runAvailableSteps(dbp zesty.DBProvider, modifiedSteps map[string]bool, res
// - loop step generated new steps
if len(preRunModifiedSteps) > 0 || expanded > 0 {
pruneSteps(res, preRunModifiedSteps)
return len(av) + runAvailableSteps(dbp, preRunModifiedSteps, res, t, stepChan, executedSteps, expandedSteps, wg, debugLogger)
newAvailableSteps, err := runAvailableSteps(dbp, preRunModifiedSteps, res, stepChan, executedSteps, expandedSteps, wg, debugLogger)
if err != nil {
return 0, err
}

return len(av) + newAvailableSteps, nil
}

return len(av)
return len(av), nil
}

func expandStep(s *step.Step, res *resolution.Resolution) {
Expand Down