Skip to content

Commit

Permalink
Merge pull request #48 from laszlocph/pending-vs-waiting
Browse files Browse the repository at this point in the history
Differentiating between waiting on dependencies and workers
  • Loading branch information
laszlocph authored Jul 10, 2019
2 parents 3c50918 + bb941c8 commit ca67aff
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 36 deletions.
73 changes: 58 additions & 15 deletions cncd/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ type worker struct {
type fifo struct {
sync.Mutex

workers map[*worker]struct{}
running map[string]*entry
pending *list.List
extension time.Duration
paused bool
workers map[*worker]struct{}
running map[string]*entry
pending *list.List
waitingOnDeps *list.List
extension time.Duration
paused bool
}

// New returns a new fifo queue.
func New() Queue {
return &fifo{
workers: map[*worker]struct{}{},
running: map[string]*entry{},
pending: list.New(),
extension: time.Minute * 10,
paused: false,
workers: map[*worker]struct{}{},
running: map[string]*entry{},
pending: list.New(),
waitingOnDeps: list.New(),
extension: time.Minute * 10,
paused: false,
}
}

Expand Down Expand Up @@ -161,11 +163,15 @@ func (q *fifo) Info(c context.Context) InfoT {
stats := InfoT{}
stats.Stats.Workers = len(q.workers)
stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running)

for e := q.pending.Front(); e != nil; e = e.Next() {
stats.Pending = append(stats.Pending, e.Value.(*Task))
}
for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() {
stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task))
}
for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item)
}
Expand Down Expand Up @@ -210,7 +216,7 @@ func (q *fifo) process() {
defer q.Unlock()

q.resubmitExpiredBuilds()

q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
delete(q.workers, worker)
Expand All @@ -224,16 +230,41 @@ func (q *fifo) process() {
}
}

func (q *fifo) filterWaiting() {
// resubmits all waiting tasks to pending, deps may have cleared
var nextWaiting *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting {
nextWaiting = e.Next()
task := e.Value.(*Task)
q.pending.PushBack(task)
}

// rebuild waitingDeps
q.waitingOnDeps = list.New()
filtered := []*list.Element{}
var nextPending *list.Element
for e := q.pending.Front(); e != nil; e = nextPending {
nextPending = e.Next()
task := e.Value.(*Task)
if q.depsInQueue(task) {
logrus.Debugf("queue: waiting due to unmet dependencies %v", task.ID)
q.waitingOnDeps.PushBack(task)
filtered = append(filtered, e)
}
}

// filter waiting tasks
for _, f := range filtered {
q.pending.Remove(f)
}
}

func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
if q.depsInQueue(task) {
logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID)
continue
}

for w := range q.workers {
if w.filter(task) {
Expand Down Expand Up @@ -290,13 +321,25 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
}
}
}

for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
running.item.DepStatus[dep] = success
}
}
}

var n *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = n {
next = e.Next()
waiting, ok := e.Value.(*Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
waiting.DepStatus[dep] = success
}
}
}
}

func (q *fifo) removeFromPending(taskID string) {
Expand Down
54 changes: 44 additions & 10 deletions cncd/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func TestFifoDependencies(t *testing.T) {
}

q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task1})

got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
Expand Down Expand Up @@ -168,9 +167,7 @@ func TestFifoErrors(t *testing.T) {
}

q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
Expand Down Expand Up @@ -222,9 +219,7 @@ func TestFifoCancel(t *testing.T) {
}

q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

_, _ = q.Poll(noContext, func(*Task) bool { return true })
q.Error(noContext, task1.ID, fmt.Errorf("cancelled"))
Expand All @@ -251,7 +246,6 @@ func TestFifoPause(t *testing.T) {
wg.Done()
}()


q.Pause()
t0 := time.Now()
q.Push(noContext, task1)
Expand All @@ -261,7 +255,7 @@ func TestFifoPause(t *testing.T) {
wg.Wait()
t1 := time.Now()

if t1.Sub(t0) < 20 * time.Millisecond {
if t1.Sub(t0) < 20*time.Millisecond {
t.Errorf("Should have waited til resume")
}

Expand All @@ -284,6 +278,46 @@ func TestFifoPauseResume(t *testing.T) {
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}

func TestWaitingVsPending(t *testing.T) {
task1 := &Task{
ID: "1",
}

task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
}

task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
RunOn: []string{"success", "failure"},
}

q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})

got, _ := q.Poll(noContext, func(*Task) bool { return true })

info := q.Info(noContext)
if info.Stats.WaitingOnDeps != 2 {
t.Errorf("2 should wait on deps")
}

q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })

info = q.Info(noContext)
if info.Stats.WaitingOnDeps != 0 {
t.Errorf("0 should wait on deps")
}
if info.Stats.Pending != 1 {
t.Errorf("1 should wait for worker")
}
}

func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",
Expand Down
16 changes: 9 additions & 7 deletions cncd/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ func runsOnSuccess(runsOn []string) bool {

// InfoT provides runtime information.
type InfoT struct {
Pending []*Task `json:"pending"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
Pending []*Task `json:"pending"`
WaitingOnDeps []*Task `json:"waiting_on_deps"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool
}
Expand Down
1 change: 1 addition & 0 deletions drone-go/drone/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func Test_QueueInfo(t *testing.T) {
"stats": {
"worker_count": 3,
"pending_count": 0,
"waiting_on_deps_count": 0,
"running_count": 1,
"completed_count": 0
},
Expand Down
9 changes: 5 additions & 4 deletions drone-go/drone/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ type (
// Info provides queue stats.
Info struct {
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool `json:"paused,omitempty"`
}
Expand Down

0 comments on commit ca67aff

Please sign in to comment.