Skip to content

Commit

Permalink
Merge pull request #32 from laszlocph/multi-pipeline
Browse files Browse the repository at this point in the history
Multi pipeline
  • Loading branch information
laszlocph authored Jun 26, 2019
2 parents e8f6d38 + 8d79f86 commit 245d7be
Show file tree
Hide file tree
Showing 77 changed files with 1,983 additions and 788 deletions.
12 changes: 8 additions & 4 deletions .drone.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
clone:
git:
image: plugins/git:next

workspace:
base: /go
path: src/github.com/laszlocph/drone-oss-08
Expand Down Expand Up @@ -109,7 +113,7 @@ pipeline:
repo: laszlocloud/drone-oss-08-server
dockerfile: Dockerfile.alpine
secrets: [ docker_username, docker_password ]
tag: [ 0.8.96-alpine ]
tag: [ 0.8.96-multi-pipeline-alpine ]
when:
event: tag

Expand All @@ -118,15 +122,15 @@ pipeline:
repo: laszlocloud/drone-oss-08-agent
dockerfile: Dockerfile.agent.alpine
secrets: [ docker_username, docker_password ]
tag: [ 0.8.96-alpine ]
tag: [ 0.8.96-multi-pipeline-alpine ]
when:
event: tag

release_server:
image: plugins/docker
repo: laszlocloud/drone-oss-08-server
secrets: [ docker_username, docker_password ]
tag: [ 0.8.96 ]
tag: [ 0.8.96-multi-pipeline ]
when:
event: tag

Expand All @@ -135,7 +139,7 @@ pipeline:
repo: laszlocloud/drone-oss-08-agent
dockerfile: Dockerfile.agent
secrets: [ docker_username, docker_password ]
tag: [ 0.8.96 ]
tag: [ 0.8.96-multi-pipeline ]
when:
event: tag

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ release/
cli/release/

server/swagger/files/*.json
server/swagger/swagger_gen.go
.idea/
17 changes: 17 additions & 0 deletions BUILDING
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,20 @@

go install github.com/laszlocph/drone-oss-08/cmd/drone-agent
go install github.com/laszlocph/drone-oss-08/cmd/drone-server

---

0. To generate SQL files

go get github.com/vektra/mockery/.../

export download_url=$(curl -s https://api.github.com/repos/go-swagger/go-swagger/releases/latest | \
jq -r '.assets[] | select(.name | contains("'"$(uname | tr '[:upper:]' '[:lower:]')"'_amd64")) | .browser_download_url')
curl -o swagger -L'#' "$download_url"
chmod +x swagger
sudo mv swagger /usr/local/bin

go get github.com/laszlocph/togo

go generate

4 changes: 2 additions & 2 deletions cncd/logging/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestLogging(t *testing.T) {
logger.Tail(ctx, testPath, func(entry ...*Entry) { wg.Done() })
}()

<-time.After(time.Millisecond)
<-time.After(500 * time.Millisecond)

wg.Add(4)
go func() {
Expand All @@ -45,7 +45,7 @@ func TestLogging(t *testing.T) {
logger.Tail(ctx, testPath, func(entry ...*Entry) { wg.Done() })
}()

<-time.After(time.Millisecond)
<-time.After(500 * time.Millisecond)

wg.Wait()
cancel()
Expand Down
7 changes: 7 additions & 0 deletions cncd/pipeline/pipeline/frontend/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,10 @@ func (m *Metadata) EnvironDrone() map[string]string {
}

var pullRegexp = regexp.MustCompile("\\d+")

func (m *Metadata) SetPlatform(platform string) {
if platform == "" {
platform = "linux/amd64"
}
m.Sys.Arch = platform
}
4 changes: 2 additions & 2 deletions cncd/pipeline/pipeline/frontend/yaml/compiler/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Compiler) Compile(conf *yaml.Config) *backend.Config {
}

// add default clone step
if c.local == false && len(conf.Clone.Containers) == 0 {
if c.local == false && len(conf.Clone.Containers) == 0 && !conf.SkipClone {
container := &yaml.Container{
Name: "clone",
Image: "plugins/git:latest",
Expand All @@ -118,7 +118,7 @@ func (c *Compiler) Compile(conf *yaml.Config) *backend.Config {
stage.Steps = append(stage.Steps, step)

config.Stages = append(config.Stages, stage)
} else if c.local == false {
} else if c.local == false && !conf.SkipClone {
for i, container := range conf.Clone.Containers {
if !container.Constraints.Match(c.metadata) {
continue
Expand Down
3 changes: 3 additions & 0 deletions cncd/pipeline/pipeline/frontend/yaml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type (
Networks Networks
Volumes Volumes
Labels libcompose.SliceorMap
DependsOn []string `yaml:"depends_on,omitempty"`
RunsOn []string `yaml:"runs_on,omitempty"`
SkipClone bool `yaml:"skip_clone"`
}

// Workspace defines a pipeline workspace.
Expand Down
15 changes: 13 additions & 2 deletions cncd/pipeline/pipeline/frontend/yaml/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/franela/goblin"
)

func xTestParse(t *testing.T) {
func TestParse(t *testing.T) {
g := goblin.Goblin(t)

g.Describe("Parser", func() {
Expand Down Expand Up @@ -35,9 +35,14 @@ func xTestParse(t *testing.T) {
g.Assert(out.Pipeline.Containers[1].Commands).Equal(yaml.Stringorslice{"go build"})
g.Assert(out.Pipeline.Containers[2].Name).Equal("notify")
g.Assert(out.Pipeline.Containers[2].Image).Equal("slack")
g.Assert(out.Pipeline.Containers[2].NetworkMode).Equal("container:name")
// g.Assert(out.Pipeline.Containers[2].NetworkMode).Equal("container:name")
g.Assert(out.Labels["com.example.team"]).Equal("frontend")
g.Assert(out.Labels["com.example.type"]).Equal("build")
g.Assert(out.DependsOn[0]).Equal("lint")
g.Assert(out.DependsOn[1]).Equal("test")
g.Assert(out.RunsOn[0]).Equal("success")
g.Assert(out.RunsOn[1]).Equal("failure")
g.Assert(out.SkipClone).Equal(false)
})
// Check to make sure variable expansion works in yaml.MapSlice
// g.It("Should unmarshal variables", func() {
Expand Down Expand Up @@ -94,6 +99,12 @@ volumes:
labels:
com.example.type: "build"
com.example.team: "frontend"
depends_on:
- lint
- test
runs_on:
- success
- failure
`

var sampleVarYaml = `
Expand Down
3 changes: 1 addition & 2 deletions cncd/pipeline/pipeline/frontend/yaml/matrix/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func Parse(data []byte) ([]Axis, error) {
return nil, err
}

// if not a matrix build return an array with just the single axis.
if len(matrix) == 0 {
return nil, nil
return []Axis{}, nil
}

return calc(matrix), nil
Expand Down
4 changes: 2 additions & 2 deletions cncd/pipeline/pipeline/frontend/yaml/matrix/matrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func TestMatrix(t *testing.T) {
g.Assert(len(set)).Equal(24)
})

g.It("Should return nil if no matrix", func() {
g.It("Should return empty array if no matrix", func() {
axis, err := ParseString("")
g.Assert(err == nil).IsTrue()
g.Assert(axis == nil).IsTrue()
g.Assert(len(axis) == 0).IsTrue()
})

g.It("Should return included axis", func() {
Expand Down
4 changes: 2 additions & 2 deletions cncd/pubsub/pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestPubsub(t *testing.T) {
broker.Subscribe(ctx, testTopic, func(message Message) { wg.Done() })
}()

<-time.After(time.Millisecond)
<-time.After(500 * time.Millisecond)

if _, ok := broker.(*publisher).topics[testTopic]; !ok {
t.Errorf("Expect topic registered with publisher")
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestSubscriptionClosed(t *testing.T) {
wg.Done()
}()

<-time.After(time.Millisecond)
<-time.After(500 * time.Millisecond)

if _, ok := broker.(*publisher).topics[testTopic]; !ok {
t.Errorf("Expect topic registered with publisher")
Expand Down
127 changes: 107 additions & 20 deletions cncd/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"runtime"
"sync"
"time"

"github.com/Sirupsen/logrus"
)

type entry struct {
Expand Down Expand Up @@ -50,6 +52,17 @@ func (q *fifo) Push(c context.Context, task *Task) error {
return nil
}

// Push pushes an item to the tail of this queue.
func (q *fifo) PushAtOnce(c context.Context, tasks []*Task) error {
q.Lock()
for _, task := range tasks {
q.pending.PushBack(task)
}
q.Unlock()
go q.process()
return nil
}

// Poll retrieves and removes the head of this queue.
func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) {
q.Lock()
Expand Down Expand Up @@ -82,11 +95,14 @@ func (q *fifo) Done(c context.Context, id string) error {
// Error signals that the item is done executing with error.
func (q *fifo) Error(c context.Context, id string, err error) error {
q.Lock()
state, ok := q.running[id]
taskEntry, ok := q.running[id]
if ok {
state.error = err
close(state.done)
q.updateDepStatusInQueue(id, err == nil)
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
} else {
q.removeFromPending(id)
}
q.Unlock()
return nil
Expand Down Expand Up @@ -173,35 +189,106 @@ func (q *fifo) process() {
q.Lock()
defer q.Unlock()

// TODO(bradrydzewski) move this to a helper function
// push items to the front of the queue if the item expires.
q.resubmitExpiredBuilds()

for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
delete(q.workers, worker)
q.pending.Remove(pending)
q.running[task.ID] = &entry{
item: task,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}
worker.channel <- task
}
}

func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
if q.depsInQueue(task) {
logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID)
continue
}

for w := range q.workers {
if w.filter(task) {
logrus.Debugf("queue: assigned task: %v with deps %v", task.ID, task.Dependencies)
return e, w
}
}
}

return nil, nil
}

func (q *fifo) resubmitExpiredBuilds() {
for id, state := range q.running {
if time.Now().After(state.deadline) {
q.pending.PushFront(state.item)
delete(q.running, id)
close(state.done)
}
}
}

func (q *fifo) depsInQueue(task *Task) bool {
var next *list.Element
loop:
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
item := e.Value.(*Task)
for w := range q.workers {
if w.filter(item) {
delete(q.workers, w)
q.pending.Remove(e)

q.running[item.ID] = &entry{
item: item,
done: make(chan bool),
deadline: time.Now().Add(q.extension),
}

w.channel <- item
break loop
possibleDep, ok := e.Value.(*Task)
logrus.Debugf("queue: pending right now: %v", possibleDep.ID)
for _, dep := range task.Dependencies {
if ok && possibleDep.ID == dep {
return true
}
}
}
for possibleDepID := range q.running {
logrus.Debugf("queue: running right now: %v", possibleDepID)
for _, dep := range task.Dependencies {
if possibleDepID == dep {
return true
}
}
}
return false
}

func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
pending, ok := e.Value.(*Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
pending.DepStatus[dep] = success
}
}
}
for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
running.item.DepStatus[dep] = success
}
}
}
}

func (q *fifo) removeFromPending(taskID string) {
logrus.Debugf("queue: trying to remove %s", taskID)
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
if task.ID == taskID {
logrus.Debugf("queue: %s is removed from pending", taskID)
q.pending.Remove(e)
return
}
}
}
Loading

0 comments on commit 245d7be

Please sign in to comment.