From b7f61fc81d5b9283cca8b0f59281935530cb0499 Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Mon, 22 Jul 2019 12:43:59 +0200 Subject: [PATCH] Queue knows about skipped tasks now to handle transitive deps --- cncd/queue/fifo.go | 24 ++++++--- cncd/queue/fifo_test.go | 114 +++++++++++++++++++++++++++++++++------- cncd/queue/queue.go | 14 ++--- model/queue.go | 2 +- server/hook.go | 2 +- server/rpc.go | 2 +- 6 files changed, 121 insertions(+), 37 deletions(-) diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index d76ee23b8ce..18cf328f530 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -11,6 +11,12 @@ import ( "github.com/Sirupsen/logrus" ) +const ( + StatusSkipped = "skipped" + StatusSuccess = "success" + StatusFailure = "failure" +) + type entry struct { item *Task done chan bool @@ -92,22 +98,26 @@ func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) { } // Done signals that the item is done executing. -func (q *fifo) Done(c context.Context, id string) error { - return q.Error(c, id, nil) +func (q *fifo) Done(c context.Context, id string, exitStatus string) error { + return q.finished(id, exitStatus, nil) } // Error signals that the item is done executing with error. func (q *fifo) Error(c context.Context, id string, err error) error { + return q.finished(id, StatusFailure, err) +} + +func (q *fifo) finished(id string, exitStatus string, err error) error { q.Lock() taskEntry, ok := q.running[id] if ok { - q.updateDepStatusInQueue(id, err == nil) taskEntry.error = err close(taskEntry.done) delete(q.running, id) } else { q.removeFromPending(id) } + q.updateDepStatusInQueue(id, exitStatus) q.Unlock() return nil } @@ -310,14 +320,14 @@ func (q *fifo) depsInQueue(task *Task) bool { return false } -func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { +func (q *fifo) updateDepStatusInQueue(taskID string, status string) { var next *list.Element for e := q.pending.Front(); e != nil; e = next { next = e.Next() pending, ok := e.Value.(*Task) for _, dep := range pending.Dependencies { if ok && taskID == dep { - pending.DepStatus[dep] = success + pending.DepStatus[dep] = status } } } @@ -325,7 +335,7 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { for _, running := range q.running { for _, dep := range running.item.Dependencies { if taskID == dep { - running.item.DepStatus[dep] = success + running.item.DepStatus[dep] = status } } } @@ -336,7 +346,7 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) { waiting, ok := e.Value.(*Task) for _, dep := range waiting.Dependencies { if ok && taskID == dep { - waiting.DepStatus[dep] = success + waiting.DepStatus[dep] = status } } } diff --git a/cncd/queue/fifo_test.go b/cncd/queue/fifo_test.go index 4e308c1ff02..287107c6f2f 100644 --- a/cncd/queue/fifo_test.go +++ b/cncd/queue/fifo_test.go @@ -37,7 +37,7 @@ func TestFifo(t *testing.T) { return } - q.Done(noContext, got.ID) + q.Done(noContext, got.ID, StatusSuccess) info = q.Info(noContext) if len(info.Pending) != 0 { t.Errorf("expect task removed from pending queue") @@ -94,7 +94,7 @@ func TestFifoWait(t *testing.T) { }() <-time.After(time.Millisecond) - q.Done(noContext, got.ID) + q.Done(noContext, got.ID, StatusSuccess) wg.Wait() } @@ -127,7 +127,7 @@ func TestFifoDependencies(t *testing.T) { task2 := &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), } q := New().(*fifo) @@ -139,7 +139,7 @@ func TestFifoDependencies(t *testing.T) { return } - q.Done(noContext, got.ID) + q.Done(noContext, got.ID, StatusSuccess) got, _ = q.Poll(noContext, func(*Task) bool { return true }) if got != task2 { @@ -156,13 +156,13 @@ func TestFifoErrors(t *testing.T) { task2 := &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), } task3 := &Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), RunOn: []string{"success", "failure"}, } @@ -200,6 +200,55 @@ func TestFifoErrors(t *testing.T) { } } +func TestFifoTransitiveErrors(t *testing.T) { + task1 := &Task{ + ID: "1", + } + + task2 := &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: make(map[string]string), + } + + task3 := &Task{ + ID: "3", + Dependencies: []string{"2"}, + DepStatus: make(map[string]string), + } + + q := New().(*fifo) + q.PushAtOnce(noContext, []*Task{task2, task3, task1}) + + got, _ := q.Poll(noContext, func(*Task) bool { return true }) + if got != task1 { + t.Errorf("expect task1 returned from queue as task2 depends on it") + return + } + q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error")) + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task2 { + t.Errorf("expect task2 returned from queue") + return + } + if got.ShouldRun() { + t.Errorf("expect task2 should not run, since task1 failed") + return + } + q.Done(noContext, got.ID, StatusSkipped) + + got, _ = q.Poll(noContext, func(*Task) bool { return true }) + if got != task3 { + t.Errorf("expect task3 returned from queue") + return + } + if got.ShouldRun() { + t.Errorf("expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too") + return + } +} + func TestFifoCancel(t *testing.T) { task1 := &Task{ ID: "1", @@ -208,13 +257,13 @@ func TestFifoCancel(t *testing.T) { task2 := &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), } task3 := &Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), RunOn: []string{"success", "failure"}, } @@ -286,13 +335,13 @@ func TestWaitingVsPending(t *testing.T) { task2 := &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), } task3 := &Task{ ID: "3", Dependencies: []string{"1"}, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), RunOn: []string{"success", "failure"}, } @@ -322,8 +371,8 @@ func TestShouldRun(t *testing.T) { task := &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]bool{ - "1": true, + DepStatus: map[string]string{ + "1": StatusSuccess, }, RunOn: []string{"failure"}, } @@ -335,8 +384,8 @@ func TestShouldRun(t *testing.T) { task = &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]bool{ - "1": true, + DepStatus: map[string]string{ + "1": StatusSuccess, }, RunOn: []string{"failure", "success"}, } @@ -348,8 +397,8 @@ func TestShouldRun(t *testing.T) { task = &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]bool{ - "1": false, + DepStatus: map[string]string{ + "1": StatusFailure, }, } if task.ShouldRun() { @@ -360,8 +409,8 @@ func TestShouldRun(t *testing.T) { task = &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]bool{ - "1": true, + DepStatus: map[string]string{ + "1": StatusSuccess, }, RunOn: []string{"success"}, } @@ -373,8 +422,8 @@ func TestShouldRun(t *testing.T) { task = &Task{ ID: "2", Dependencies: []string{"1"}, - DepStatus: map[string]bool{ - "1": false, + DepStatus: map[string]string{ + "1": StatusFailure, }, RunOn: []string{"failure"}, } @@ -382,4 +431,29 @@ func TestShouldRun(t *testing.T) { t.Errorf("expect task to run") return } + + task = &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: map[string]string{ + "1": StatusSkipped, + }, + } + if task.ShouldRun() { + t.Errorf("Tasked should not run if dependency is skipped") + return + } + + task = &Task{ + ID: "2", + Dependencies: []string{"1"}, + DepStatus: map[string]string{ + "1": StatusSkipped, + }, + RunOn: []string{"failure"}, + } + if !task.ShouldRun() { + t.Errorf("On Failure tasks should run on skipped deps, something failed higher up the chain") + return + } } diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 3154d3d9393..67ffe3ee5cf 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -27,8 +27,8 @@ type Task struct { // Task IDs this task depend Dependencies []string - // If dep finished sucessfully - DepStatus map[string]bool + // Dependency's exit status + DepStatus map[string]string // RunOn failure or success RunOn []string @@ -41,8 +41,8 @@ func (t *Task) ShouldRun() bool { } if !runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) { - for _, success := range t.DepStatus { - if !success { + for _, status := range t.DepStatus { + if StatusSuccess != status { return false } } @@ -50,8 +50,8 @@ func (t *Task) ShouldRun() bool { } if runsOnFailure(t.RunOn) && !runsOnSuccess(t.RunOn) { - for _, success := range t.DepStatus { - if success { + for _, status := range t.DepStatus { + if StatusSuccess == status { return false } } @@ -118,7 +118,7 @@ type Queue interface { Extend(c context.Context, id string) error // Done signals the task is complete. - Done(c context.Context, id string) error + Done(c context.Context, exitStatus string, id string) error // Error signals the task is complete with errors. Error(c context.Context, id string, err error) error diff --git a/model/queue.go b/model/queue.go index 3b595268745..5fa83c7f4f7 100644 --- a/model/queue.go +++ b/model/queue.go @@ -49,7 +49,7 @@ func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue { Labels: task.Labels, Dependencies: task.Dependencies, RunOn: task.RunOn, - DepStatus: make(map[string]bool), + DepStatus: make(map[string]string), }) } q.PushAtOnce(context.Background(), toEnqueue) diff --git a/server/hook.go b/server/hook.go index 5c145455f6d..6fe847d2410 100644 --- a/server/hook.go +++ b/server/hook.go @@ -388,7 +388,7 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) { task.Labels["repo"] = repo.FullName task.Dependencies = taskIds(item.DependsOn, buildItems) task.RunOn = item.RunsOn - task.DepStatus = make(map[string]bool) + task.DepStatus = make(map[string]string) task.Data, _ = json.Marshal(rpc.Pipeline{ ID: fmt.Sprint(item.Proc.ID), diff --git a/server/rpc.go b/server/rpc.go index 2fa6640b00a..438dcd423e6 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -384,7 +384,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error { if proc.Failing() { queueErr = s.queue.Error(c, id, fmt.Errorf("Proc finished with exitcode %d, %s", state.ExitCode, state.Error)) } else { - queueErr = s.queue.Done(c, id) + queueErr = s.queue.Done(c, id, proc.State) } if queueErr != nil { log.Printf("error: done: cannot ack proc_id %d: %s", procID, err)