Skip to content

Commit

Permalink
Merge pull request #201 from laszlocph/non-blocking-errors
Browse files Browse the repository at this point in the history
Non blocking errors
  • Loading branch information
laszlocph authored May 25, 2021
2 parents b6ba737 + 3cb8d06 commit 5604ff2
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cncd/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func (q *fifo) updateDepStatusInQueue(taskID string, status string) {
}
}

var n *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = n {
next = nil
for e := q.waitingOnDeps.Front(); e != nil; e = next {
next = e.Next()
waiting, ok := e.Value.(*Task)
for _, dep := range waiting.Dependencies {
Expand Down
139 changes: 139 additions & 0 deletions cncd/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,145 @@ func TestFifoErrors(t *testing.T) {
}
}

func TestFifoErrors2(t *testing.T) {
task1 := &Task{
ID: "1",
}

task2 := &Task{
ID: "2",
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1", "2"},
DepStatus: make(map[string]string),
}

q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

for i := 0; i < 2; i++ {
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 && got != task2{
t.Errorf("expect task1 or task2 returned from queue as task3 depends on them")
return
}

if got != task1 {
q.Done(noContext, got.ID, StatusSuccess)
}
if got != task2 {
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
}
}

got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}

if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 succeeded but task2 failed")
return
}
}

func TestFifoErrorsMultiThread(t *testing.T) {
//logrus.SetLevel(logrus.DebugLevel)
task1 := &Task{
ID: "1",
}

task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]string),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1", "2"},
DepStatus: make(map[string]string),
}

q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

obtainedWorkCh := make(chan *Task)

for i := 0; i < 10; i++ {
go func(i int) {
for {
fmt.Printf("Worker %d started\n", i)
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}(i)
}

task1Processed := false
task2Processed := false

for {
select {
case got := <-obtainedWorkCh:
fmt.Println(got.ID)

if !task1Processed {
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 and task3 depends on it")
return
} else {
task1Processed = true
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}()
}
} else if !task2Processed {
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
} else {
task2Processed = true
q.Done(noContext, got.ID, StatusSuccess)
go func() {
for {
fmt.Printf("Worker spawned\n")
got, _ := q.Poll(noContext, func(*Task) bool { return true })
obtainedWorkCh <- got
}
}()
}
} else {
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}

if got.ShouldRun() {
t.Errorf("expect task3 should not run, task1 succeeded but task2 failed")
return
} else {
return
}
}

case <-time.After(5 * time.Second):
info := q.Info(noContext)
fmt.Println(info.String())
t.Errorf("test timed out")
return
}
}
}

func TestFifoTransitiveErrors(t *testing.T) {
task1 := &Task{
ID: "1",
Expand Down
26 changes: 26 additions & 0 deletions cncd/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package queue
import (
"context"
"errors"
"fmt"
"strings"
)

var (
Expand Down Expand Up @@ -61,6 +63,12 @@ func (t *Task) ShouldRun() bool {
return false
}

func (t *Task) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s (%s) - %s", t.ID, t.Dependencies, t.DepStatus))
return sb.String()
}

func runsOnFailure(runsOn []string) bool {
for _, status := range runsOn {
if status == "failure" {
Expand Down Expand Up @@ -98,6 +106,24 @@ type InfoT struct {
Paused bool
}

func (t *InfoT) String() string {
var sb strings.Builder

for _, task := range t.Pending {
sb.WriteString("\t" + task.String())
}

for _, task := range t.Running {
sb.WriteString("\t" + task.String())
}

for _, task := range t.WaitingOnDeps {
sb.WriteString("\t" + task.String())
}

return sb.String()
}

// Filter filters tasks in the queue. If the Filter returns false,
// the Task is skipped and not returned to the subscriber.
type Filter func(*Task) bool
Expand Down

0 comments on commit 5604ff2

Please sign in to comment.