Skip to content

Commit

Permalink
Merge pull request #40 from laszlocph/pause-queue
Browse files Browse the repository at this point in the history
Pause queue
  • Loading branch information
laszlocph authored Jun 28, 2019
2 parents 8f8276f + a28451d commit 936fc8b
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 1 deletion.
20 changes: 20 additions & 0 deletions cncd/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type fifo struct {
running map[string]*entry
pending *list.List
extension time.Duration
paused bool
}

// New returns a new fifo queue.
Expand All @@ -40,6 +41,7 @@ func New() Queue {
running: map[string]*entry{},
pending: list.New(),
extension: time.Minute * 10,
paused: false,
}
}

Expand Down Expand Up @@ -167,14 +169,32 @@ func (q *fifo) Info(c context.Context) InfoT {
for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item)
}
stats.Paused = q.paused

q.Unlock()
return stats
}

func (q *fifo) Pause() {
q.Lock()
q.paused = true
q.Unlock()
}

func (q *fifo) Resume() {
q.Lock()
q.paused = false
q.Unlock()
go q.process()
}

// helper function that loops through the queue and attempts to
// match the item to a single subscriber.
func (q *fifo) process() {
if q.paused {
return
}

defer func() {
// the risk of panic is low. This code can probably be removed
// once the code has been used in real world installs without issue.
Expand Down
46 changes: 46 additions & 0 deletions cncd/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,52 @@ func TestFifoCancel(t *testing.T) {
}
}

func TestFifoPause(t *testing.T) {
task1 := &Task{
ID: "1",
}

q := New().(*fifo)
var wg sync.WaitGroup
wg.Add(1)
go func() {
_, _ = q.Poll(noContext, func(*Task) bool { return true })
wg.Done()
}()


q.Pause()
t0 := time.Now()
q.Push(noContext, task1)
time.Sleep(20 * time.Millisecond)
q.Resume()

wg.Wait()
t1 := time.Now()

if t1.Sub(t0) < 20 * time.Millisecond {
t.Errorf("Should have waited til resume")
}

q.Pause()
q.Push(noContext, task1)
q.Resume()
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}

func TestFifoPauseResume(t *testing.T) {
task1 := &Task{
ID: "1",
}

q := New().(*fifo)
q.Pause()
q.Push(noContext, task1)
q.Resume()

_, _ = q.Poll(noContext, func(*Task) bool { return true })
}

func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",
Expand Down
7 changes: 7 additions & 0 deletions cncd/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type InfoT struct {
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool
}

// Filter filters tasks in the queue. If the Filter returns false,
Expand Down Expand Up @@ -128,4 +129,10 @@ type Queue interface {

// Info returns internal queue information.
Info(c context.Context) InfoT

// Stops the queue from handing out new work items in Poll
Pause()

// Starts the queue again, Poll returns new items
Resume()
}
16 changes: 16 additions & 0 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl
)
}

queue := e.Group("/api/queue")
{
queue.GET("/pause",
session.MustAdmin(),
server.PauseQueue,
)
queue.GET("/resume",
session.MustAdmin(),
server.ResumeQueue,
)
queue.GET("/norunningbuilds",
session.MustAdmin(),
server.BlockTilQueueHasRunningItem,
)
}

auth := e.Group("/authorize")
{
auth.GET("", server.HandleAuth)
Expand Down
21 changes: 21 additions & 0 deletions server/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"regexp"
"strconv"
"time"
Expand Down Expand Up @@ -51,6 +52,26 @@ func GetQueueInfo(c *gin.Context) {
)
}

func PauseQueue(c *gin.Context) {
Config.Services.Queue.Pause()
c.Status(http.StatusOK)
}

func ResumeQueue(c *gin.Context) {
Config.Services.Queue.Resume()
c.Status(http.StatusOK)
}

func BlockTilQueueHasRunningItem(c *gin.Context) {
for {
info := Config.Services.Queue.Info(c)
if info.Stats.Running == 0 {
break
}
}
c.Status(http.StatusOK)
}

func PostHook(c *gin.Context) {
remote_ := remote.FromContext(c)

Expand Down
1 change: 0 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func EventStreamSSE(c *gin.Context) {
}()

go func() {
// TODO remove this from global config
Config.Services.Pubsub.Subscribe(ctx, "topic/events", func(m pubsub.Message) {
defer func() {
recover() // fix #2480
Expand Down

0 comments on commit 936fc8b

Please sign in to comment.