Skip to content

Commit

Permalink
Merge pull request #56 from laszlocph/depending-on-skipped-pipe
Browse files Browse the repository at this point in the history
Handle transitive deps: queue knows about skipped tasks now
  • Loading branch information
laszlocph authored Jul 22, 2019
2 parents 46deb56 + b7f61fc commit a949750
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 37 deletions.
24 changes: 17 additions & 7 deletions cncd/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/Sirupsen/logrus"
)

const (
StatusSkipped = "skipped"
StatusSuccess = "success"
StatusFailure = "failure"
)

type entry struct {
item *Task
done chan bool
Expand Down Expand Up @@ -92,22 +98,26 @@ func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
}

// Done signals that the item is done executing.
func (q *fifo) Done(c context.Context, id string) error {
return q.Error(c, id, nil)
func (q *fifo) Done(c context.Context, id string, exitStatus string) error {
return q.finished(id, exitStatus, nil)
}

// Error signals that the item is done executing with error.
func (q *fifo) Error(c context.Context, id string, err error) error {
return q.finished(id, StatusFailure, err)
}

func (q *fifo) finished(id string, exitStatus string, err error) error {
q.Lock()
taskEntry, ok := q.running[id]
if ok {
q.updateDepStatusInQueue(id, err == nil)
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
} else {
q.removeFromPending(id)
}
q.updateDepStatusInQueue(id, exitStatus)
q.Unlock()
return nil
}
Expand Down Expand Up @@ -310,22 +320,22 @@ func (q *fifo) depsInQueue(task *Task) bool {
return false
}

func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
func (q *fifo) updateDepStatusInQueue(taskID string, status string) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
pending, ok := e.Value.(*Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
pending.DepStatus[dep] = success
pending.DepStatus[dep] = status
}
}
}

for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
running.item.DepStatus[dep] = success
running.item.DepStatus[dep] = status
}
}
}
Expand All @@ -336,7 +346,7 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
waiting, ok := e.Value.(*Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
waiting.DepStatus[dep] = success
waiting.DepStatus[dep] = status
}
}
}
Expand Down
114 changes: 94 additions & 20 deletions cncd/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestFifo(t *testing.T) {
return
}

q.Done(noContext, got.ID)
q.Done(noContext, got.ID, StatusSuccess)
info = q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("expect task removed from pending queue")
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestFifoWait(t *testing.T) {
}()

<-time.After(time.Millisecond)
q.Done(noContext, got.ID)
q.Done(noContext, got.ID, StatusSuccess)
wg.Wait()
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestFifoDependencies(t *testing.T) {
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
}

q := New().(*fifo)
Expand All @@ -139,7 +139,7 @@ func TestFifoDependencies(t *testing.T) {
return
}

q.Done(noContext, got.ID)
q.Done(noContext, got.ID, StatusSuccess)

got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
Expand All @@ -156,13 +156,13 @@ func TestFifoErrors(t *testing.T) {
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}

Expand Down Expand Up @@ -200,6 +200,55 @@ func TestFifoErrors(t *testing.T) {
}
}

func TestFifoTransitiveErrors(t *testing.T) {
task1 := &Task{
ID: "1",
}

task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"2"},
DepStatus: make(map[string]string),
}

q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))

got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task2 should not run, since task1 failed")
return
}
q.Done(noContext, got.ID, StatusSkipped)

got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 failed, thus task2 was skipped, task3 should be skipped too")
return
}
}

func TestFifoCancel(t *testing.T) {
task1 := &Task{
ID: "1",
Expand All @@ -208,13 +257,13 @@ func TestFifoCancel(t *testing.T) {
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}

Expand Down Expand Up @@ -286,13 +335,13 @@ func TestWaitingVsPending(t *testing.T) {
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
RunOn: []string{"success", "failure"},
}

Expand Down Expand Up @@ -322,8 +371,8 @@ func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]bool{
"1": true,
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"failure"},
}
Expand All @@ -335,8 +384,8 @@ func TestShouldRun(t *testing.T) {
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]bool{
"1": true,
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"failure", "success"},
}
Expand All @@ -348,8 +397,8 @@ func TestShouldRun(t *testing.T) {
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]bool{
"1": false,
DepStatus: map[string]string{
"1": StatusFailure,
},
}
if task.ShouldRun() {
Expand All @@ -360,8 +409,8 @@ func TestShouldRun(t *testing.T) {
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]bool{
"1": true,
DepStatus: map[string]string{
"1": StatusSuccess,
},
RunOn: []string{"success"},
}
Expand All @@ -373,13 +422,38 @@ func TestShouldRun(t *testing.T) {
task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]bool{
"1": false,
DepStatus: map[string]string{
"1": StatusFailure,
},
RunOn: []string{"failure"},
}
if !task.ShouldRun() {
t.Errorf("expect task to run")
return
}

task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSkipped,
},
}
if task.ShouldRun() {
t.Errorf("Tasked should not run if dependency is skipped")
return
}

task = &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: map[string]string{
"1": StatusSkipped,
},
RunOn: []string{"failure"},
}
if !task.ShouldRun() {
t.Errorf("On Failure tasks should run on skipped deps, something failed higher up the chain")
return
}
}
14 changes: 7 additions & 7 deletions cncd/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Task struct {
// Task IDs this task depend
Dependencies []string

// If dep finished sucessfully
DepStatus map[string]bool
// Dependency's exit status
DepStatus map[string]string

// RunOn failure or success
RunOn []string
Expand All @@ -41,17 +41,17 @@ func (t *Task) ShouldRun() bool {
}

if !runsOnFailure(t.RunOn) && runsOnSuccess(t.RunOn) {
for _, success := range t.DepStatus {
if !success {
for _, status := range t.DepStatus {
if StatusSuccess != status {
return false
}
}
return true
}

if runsOnFailure(t.RunOn) && !runsOnSuccess(t.RunOn) {
for _, success := range t.DepStatus {
if success {
for _, status := range t.DepStatus {
if StatusSuccess == status {
return false
}
}
Expand Down Expand Up @@ -118,7 +118,7 @@ type Queue interface {
Extend(c context.Context, id string) error

// Done signals the task is complete.
Done(c context.Context, id string) error
Done(c context.Context, exitStatus string, id string) error

// Error signals the task is complete with errors.
Error(c context.Context, id string, err error) error
Expand Down
2 changes: 1 addition & 1 deletion model/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue {
Labels: task.Labels,
Dependencies: task.Dependencies,
RunOn: task.RunOn,
DepStatus: make(map[string]bool),
DepStatus: make(map[string]string),
})
}
q.PushAtOnce(context.Background(), toEnqueue)
Expand Down
2 changes: 1 addition & 1 deletion server/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func queueBuild(build *model.Build, repo *model.Repo, buildItems []*buildItem) {
task.Labels["repo"] = repo.FullName
task.Dependencies = taskIds(item.DependsOn, buildItems)
task.RunOn = item.RunsOn
task.DepStatus = make(map[string]bool)
task.DepStatus = make(map[string]string)

task.Data, _ = json.Marshal(rpc.Pipeline{
ID: fmt.Sprint(item.Proc.ID),
Expand Down
2 changes: 1 addition & 1 deletion server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {
if proc.Failing() {
queueErr = s.queue.Error(c, id, fmt.Errorf("Proc finished with exitcode %d, %s", state.ExitCode, state.Error))
} else {
queueErr = s.queue.Done(c, id)
queueErr = s.queue.Done(c, id, proc.State)
}
if queueErr != nil {
log.Printf("error: done: cannot ack proc_id %d: %s", procID, err)
Expand Down

0 comments on commit a949750

Please sign in to comment.