From e898ad9c1f5e03ac77a29bf3e64f33aedda45431 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 19:55:14 +0100 Subject: [PATCH 01/10] well todos ... --- server/grpc/rpc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 9297f35023..b65dba2bef 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -88,6 +88,8 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er return nil, err } + // TODO: evaluate if a task should not run and mark it as done, currently require a running agent + // who trigger a pull. this should move into it's own go routine. if task.ShouldRun() { workflow := new(rpc.Workflow) err = json.Unmarshal(task.Data, workflow) From 5f8aad0c0d1c1a0b38cb3d9717fdcd928a358fa3 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 20:30:43 +0100 Subject: [PATCH 02/10] refactor so moving logic in codeblock who can be mofed --- server/grpc/rpc.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index b65dba2bef..0f5df59e46 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -90,16 +90,17 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er // TODO: evaluate if a task should not run and mark it as done, currently require a running agent // who trigger a pull. this should move into it's own go routine. - if task.ShouldRun() { - workflow := new(rpc.Workflow) - err = json.Unmarshal(task.Data, workflow) - return workflow, err + if !task.ShouldRun() { + // task should not run, so mark it as done + if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil { + log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) + } + continue } - // task should not run, so mark it as done - if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil { - log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) - } + workflow := new(rpc.Workflow) + err = json.Unmarshal(task.Data, workflow) + return workflow, err } } From 20e02257bc3bc5632bfcc19e41c5ff235673768d Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 21:26:56 +0100 Subject: [PATCH 03/10] do conversion in own func --- server/grpc/rpc.go | 1 + server/pipeline/stepbuilder/stepBuilder.go | 27 +++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 0f5df59e46..196c76d976 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -100,6 +100,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er workflow := new(rpc.Workflow) err = json.Unmarshal(task.Data, workflow) + return workflow, err } } diff --git a/server/pipeline/stepbuilder/stepBuilder.go b/server/pipeline/stepbuilder/stepBuilder.go index 8cf4376068..4fbde9e087 100644 --- a/server/pipeline/stepbuilder/stepBuilder.go +++ b/server/pipeline/stepbuilder/stepBuilder.go @@ -240,12 +240,13 @@ func (b *StepBuilder) environmentVariables(metadata metadata.Metadata, axis matr return environ } -func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) { - var secrets []compiler.Secret - for _, sec := range b.Secs { - var events []string - for _, event := range sec.Events { - events = append(events, string(event)) +func toCompilerSecrets(in []*model.Secret) []compiler.Secret { + secrets := make([]compiler.Secret, 0, len(in)) + + for _, sec := range in { + events := make([]string, len(sec.Events)) + for i, event := range sec.Events { + events[i] = string(event) } secrets = append(secrets, compiler.Secret{ @@ -256,14 +257,24 @@ func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, envi }) } - var registries []compiler.Registry - for _, reg := range b.Regs { + return secrets +} + +func toCompilerRegistries(in []*model.Registry) []compiler.Registry { + registries := make([]compiler.Registry, 0, len(in)) + for _, reg := range in { registries = append(registries, compiler.Registry{ Hostname: reg.Address, Username: reg.Username, Password: reg.Password, }) } + return registries +} + +func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) { + secrets := toCompilerSecrets(b.Secs) + registries := toCompilerRegistries(b.Regs) return compiler.New( compiler.WithEnviron(environ), From 44359e0e75997db7bde1eaf3456563cf20dd0a04 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 22:36:47 +0100 Subject: [PATCH 04/10] Dont require agent polling to make workflows that should be skiped done --- cmd/server/grpc_server.go | 1 + server/grpc/filter.go | 5 +++++ server/grpc/filter_test.go | 12 +++++++++--- server/grpc/rpc.go | 11 +---------- server/grpc/server.go | 38 ++++++++++++++++++++++++++++++++++++-- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/cmd/server/grpc_server.go b/cmd/server/grpc_server.go index f983bb1585..06bdd4cb2d 100644 --- a/cmd/server/grpc_server.go +++ b/cmd/server/grpc_server.go @@ -49,6 +49,7 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro ) woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer( + ctx, server.Config.Services.Queue, server.Config.Services.Logs, server.Config.Services.Pubsub, diff --git a/server/grpc/filter.go b/server/grpc/filter.go index 521aeb9122..16e955172c 100644 --- a/server/grpc/filter.go +++ b/server/grpc/filter.go @@ -22,6 +22,11 @@ import ( func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn { return func(task *model.Task) (bool, int) { + // don't return tasks who are not ready jet + if !task.ShouldRun() { + return false, 0 + } + score := 0 for taskLabel, taskLabelValue := range task.Labels { // if a task label is empty it will be ignored diff --git a/server/grpc/filter_test.go b/server/grpc/filter_test.go index e074f797f7..47b249627e 100644 --- a/server/grpc/filter_test.go +++ b/server/grpc/filter_test.go @@ -62,7 +62,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"org-id": "123", "platform": "windows"}, }, wantMatched: false, - wantScore: 0, }, { name: "No match", @@ -73,7 +72,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"org-id": "123", "platform": "windows"}, }, wantMatched: false, - wantScore: 0, }, { name: "Missing label", @@ -84,7 +82,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"needed": "some"}, }, wantMatched: false, - wantScore: 0, }, { name: "Empty task labels", @@ -119,6 +116,15 @@ func TestCreateFilterFunc(t *testing.T) { wantMatched: true, wantScore: 2, }, + { + name: "dont match task not ready to run", + agentFilter: rpc.Filter{}, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, + RunOn: []string{"success"}, + }, + wantMatched: false, + }, } for _, tt := range tests { diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 196c76d976..c618d566ac 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -43,6 +43,7 @@ import ( const updateAgentLastWorkDelay = time.Minute type RPC struct { + ctx context.Context queue queue.Queue pubsub *pubsub.Publisher logger logging.Log @@ -88,16 +89,6 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er return nil, err } - // TODO: evaluate if a task should not run and mark it as done, currently require a running agent - // who trigger a pull. this should move into it's own go routine. - if !task.ShouldRun() { - // task should not run, so mark it as done - if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil { - log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) - } - continue - } - workflow := new(rpc.Workflow) err = json.Unmarshal(task.Data, workflow) diff --git a/server/grpc/server.go b/server/grpc/server.go index 2cfa8548a1..59986e3331 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -17,6 +17,7 @@ package grpc import ( "context" "encoding/json" + "time" "github.com/prometheus/client_golang/prometheus" prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto" @@ -25,19 +26,23 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" "go.woodpecker-ci.org/woodpecker/v2/server/logging" + "go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/pubsub" "go.woodpecker-ci.org/woodpecker/v2/server/queue" "go.woodpecker-ci.org/woodpecker/v2/server/store" "go.woodpecker-ci.org/woodpecker/v2/version" ) +// markSkippedDoneTimeInterval is the time interval we search in the queue for workflows that can be marked as done +const markSkippedDoneTimeInterval = 3 * time.Second + // WoodpeckerServer is a grpc server implementation. type WoodpeckerServer struct { proto.UnimplementedWoodpeckerServer peer RPC } -func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { +func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { pipelineTime := prometheus_auto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "woodpecker", Name: "pipeline_time", @@ -49,6 +54,7 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P Help: "Pipeline count.", }, []string{"repo", "branch", "status", "pipeline"}) peer := RPC{ + ctx: ctx, store: store, queue: queue, pubsub: pubsub, @@ -56,7 +62,35 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P pipelineTime: pipelineTime, pipelineCount: pipelineCount, } - return &WoodpeckerServer{peer: peer} + rpcServer := &WoodpeckerServer{peer: peer} + go rpcServer.markSkippedDone() + return rpcServer +} + +// mark skipped tasks done, based on dependencies. +// TODO: find better place for this background service +func (ws *WoodpeckerServer) markSkippedDone() { + for { + select { + case <-time.After(markSkippedDoneTimeInterval): + case <-ws.peer.ctx.Done(): + return + } + + task, err := ws.peer.queue.Poll(ws.peer.ctx, -1, func(t *model.Task) (bool, int) { + if !t.ShouldRun() { + return true, 0 + } + return false, 0 + }) + if err != nil { + log.Error().Err(err).Msg("got error while polling for tasks that should be skipped") + continue + } + if err := ws.peer.Done(ws.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil { + log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) + } + } } func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { From 2830ff34933b3f26e567b3665a67a0932782e478 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 22:41:26 +0100 Subject: [PATCH 05/10] compact --- server/grpc/rpc.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index c618d566ac..c7041be156 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -90,9 +90,7 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er } workflow := new(rpc.Workflow) - err = json.Unmarshal(task.Data, workflow) - - return workflow, err + return workflow, json.Unmarshal(task.Data, workflow) } } From 4318c81d2f6c8e42ab8f2843b0f4ef98a7e2e960 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 22:45:31 +0100 Subject: [PATCH 06/10] compact --- server/grpc/server.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/grpc/server.go b/server/grpc/server.go index 59986e3331..abb9e78f15 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -78,10 +78,7 @@ func (ws *WoodpeckerServer) markSkippedDone() { } task, err := ws.peer.queue.Poll(ws.peer.ctx, -1, func(t *model.Task) (bool, int) { - if !t.ShouldRun() { - return true, 0 - } - return false, 0 + return !t.ShouldRun(), 0 }) if err != nil { log.Error().Err(err).Msg("got error while polling for tasks that should be skipped") From d1e8bbe229a3e7ce71fef876435a5bd79825888f Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 22:53:36 +0100 Subject: [PATCH 07/10] queue poll is blocking so use it --- server/grpc/server.go | 16 ++++++---------- server/queue/queue.go | 1 + 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/grpc/server.go b/server/grpc/server.go index abb9e78f15..6789036329 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -17,7 +17,6 @@ package grpc import ( "context" "encoding/json" - "time" "github.com/prometheus/client_golang/prometheus" prometheus_auto "github.com/prometheus/client_golang/prometheus/promauto" @@ -33,9 +32,6 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/version" ) -// markSkippedDoneTimeInterval is the time interval we search in the queue for workflows that can be marked as done -const markSkippedDoneTimeInterval = 3 * time.Second - // WoodpeckerServer is a grpc server implementation. type WoodpeckerServer struct { proto.UnimplementedWoodpeckerServer @@ -71,12 +67,6 @@ func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging. // TODO: find better place for this background service func (ws *WoodpeckerServer) markSkippedDone() { for { - select { - case <-time.After(markSkippedDoneTimeInterval): - case <-ws.peer.ctx.Done(): - return - } - task, err := ws.peer.queue.Poll(ws.peer.ctx, -1, func(t *model.Task) (bool, int) { return !t.ShouldRun(), 0 }) @@ -84,6 +74,12 @@ func (ws *WoodpeckerServer) markSkippedDone() { log.Error().Err(err).Msg("got error while polling for tasks that should be skipped") continue } + if task == nil { + log.Error().Msg("queue poll returned nil task") + continue + } + + log.Trace().Msgf("mark skipped task '%s' as done", task.String()) if err := ws.peer.Done(ws.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil { log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) } diff --git a/server/queue/queue.go b/server/queue/queue.go index 682d0e0ac5..ac97a83f8e 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -80,6 +80,7 @@ type Queue interface { PushAtOnce(c context.Context, tasks []*model.Task) error // Poll retrieves and removes a task head of this queue. + // blocks until a task is available or the context is canceled Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) // Extend extends the deadline for a task. From 016c50fe6c18ab4e84e28781aa559edc7edbd03d Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 23:04:04 +0100 Subject: [PATCH 08/10] fix worker count --- server/grpc/server.go | 2 +- server/queue/fifo.go | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/server/grpc/server.go b/server/grpc/server.go index 6789036329..4ad9c36c36 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -67,7 +67,7 @@ func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging. // TODO: find better place for this background service func (ws *WoodpeckerServer) markSkippedDone() { for { - task, err := ws.peer.queue.Poll(ws.peer.ctx, -1, func(t *model.Task) (bool, int) { + task, err := ws.peer.queue.Poll(ws.peer.ctx, queue.InternalWorkerID, func(t *model.Task) (bool, int) { return !t.ShouldRun(), 0 }) if err != nil { diff --git a/server/queue/fifo.go b/server/queue/fifo.go index f924055ad5..9054b65a3a 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -27,6 +27,9 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/shared/constant" ) +// InternalWorkerID is pseudo agent ID for internal routines using the queue. +const InternalWorkerID = -2 + type entry struct { item *model.Task done chan bool @@ -213,7 +216,13 @@ func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error { func (q *fifo) Info(_ context.Context) InfoT { q.Lock() stats := InfoT{} - stats.Stats.Workers = len(q.workers) + workerCount := 0 + for w := range q.workers { + if w.agentID != InternalWorkerID { // ignore internal workers + workerCount++ + } + } + stats.Stats.Workers = workerCount stats.Stats.Pending = q.pending.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) @@ -251,6 +260,10 @@ func (q *fifo) Resume() { // KickAgentWorkers kicks all workers for a given agent. func (q *fifo) KickAgentWorkers(agentID int64) { + if agentID == InternalWorkerID { + return + } + q.Lock() defer q.Unlock() From 83d044e07acade0c4c1c9551fea8c8f08402f5e6 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 23:16:11 +0100 Subject: [PATCH 09/10] no loop needed --- server/grpc/rpc.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index c7041be156..df8519ec09 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -82,16 +82,14 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er filterFn := createFilterFunc(agentFilter) - for { - // poll blocks until a task is available or the context is canceled / worker is kicked - task, err := s.queue.Poll(c, agent.ID, filterFn) - if err != nil || task == nil { - return nil, err - } - - workflow := new(rpc.Workflow) - return workflow, json.Unmarshal(task.Data, workflow) + // poll blocks until a task is available or the context is canceled / worker is kicked + task, err := s.queue.Poll(c, agent.ID, filterFn) + if err != nil || task == nil { + return nil, err } + + workflow := new(rpc.Workflow) + return workflow, json.Unmarshal(task.Data, workflow) } // Wait blocks until the workflow with the given ID is done. From 17fc595adb40efc13cdc3af3bcf351b973a2b65a Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Mon, 28 Oct 2024 23:17:09 +0100 Subject: [PATCH 10/10] fix lint --- server/grpc/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/grpc/server.go b/server/grpc/server.go index 4ad9c36c36..cc4c015f5f 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -65,9 +65,9 @@ func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging. // mark skipped tasks done, based on dependencies. // TODO: find better place for this background service -func (ws *WoodpeckerServer) markSkippedDone() { +func (s *WoodpeckerServer) markSkippedDone() { for { - task, err := ws.peer.queue.Poll(ws.peer.ctx, queue.InternalWorkerID, func(t *model.Task) (bool, int) { + task, err := s.peer.queue.Poll(s.peer.ctx, queue.InternalWorkerID, func(t *model.Task) (bool, int) { return !t.ShouldRun(), 0 }) if err != nil { @@ -80,7 +80,7 @@ func (ws *WoodpeckerServer) markSkippedDone() { } log.Trace().Msgf("mark skipped task '%s' as done", task.String()) - if err := ws.peer.Done(ws.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil { + if err := s.peer.Done(s.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil { log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) } }