diff --git a/cmd/server/grpc_server.go b/cmd/server/grpc_server.go index f983bb15852..06bdd4cb2de 100644 --- a/cmd/server/grpc_server.go +++ b/cmd/server/grpc_server.go @@ -49,6 +49,7 @@ func runGrpcServer(ctx context.Context, c *cli.Command, _store store.Store) erro ) woodpeckerServer := woodpeckerGrpcServer.NewWoodpeckerServer( + ctx, server.Config.Services.Queue, server.Config.Services.Logs, server.Config.Services.Pubsub, diff --git a/server/grpc/filter.go b/server/grpc/filter.go index 521aeb91226..16e955172c2 100644 --- a/server/grpc/filter.go +++ b/server/grpc/filter.go @@ -22,6 +22,11 @@ import ( func createFilterFunc(agentFilter rpc.Filter) queue.FilterFn { return func(task *model.Task) (bool, int) { + // don't return tasks who are not ready jet + if !task.ShouldRun() { + return false, 0 + } + score := 0 for taskLabel, taskLabelValue := range task.Labels { // if a task label is empty it will be ignored diff --git a/server/grpc/filter_test.go b/server/grpc/filter_test.go index e074f797f7d..47b249627eb 100644 --- a/server/grpc/filter_test.go +++ b/server/grpc/filter_test.go @@ -62,7 +62,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"org-id": "123", "platform": "windows"}, }, wantMatched: false, - wantScore: 0, }, { name: "No match", @@ -73,7 +72,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"org-id": "123", "platform": "windows"}, }, wantMatched: false, - wantScore: 0, }, { name: "Missing label", @@ -84,7 +82,6 @@ func TestCreateFilterFunc(t *testing.T) { Labels: map[string]string{"needed": "some"}, }, wantMatched: false, - wantScore: 0, }, { name: "Empty task labels", @@ -119,6 +116,15 @@ func TestCreateFilterFunc(t *testing.T) { wantMatched: true, wantScore: 2, }, + { + name: "dont match task not ready to run", + agentFilter: rpc.Filter{}, + task: &model.Task{ + Labels: map[string]string{"org-id": "123", "platform": "linux"}, + RunOn: []string{"success"}, + }, + wantMatched: false, + }, } for _, tt := range tests { diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 9297f350230..df8519ec099 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -43,6 +43,7 @@ import ( const updateAgentLastWorkDelay = time.Minute type RPC struct { + ctx context.Context queue queue.Queue pubsub *pubsub.Publisher logger logging.Log @@ -81,24 +82,14 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, er filterFn := createFilterFunc(agentFilter) - for { - // poll blocks until a task is available or the context is canceled / worker is kicked - task, err := s.queue.Poll(c, agent.ID, filterFn) - if err != nil || task == nil { - return nil, err - } - - if task.ShouldRun() { - workflow := new(rpc.Workflow) - err = json.Unmarshal(task.Data, workflow) - return workflow, err - } - - // task should not run, so mark it as done - if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil { - log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) - } + // poll blocks until a task is available or the context is canceled / worker is kicked + task, err := s.queue.Poll(c, agent.ID, filterFn) + if err != nil || task == nil { + return nil, err } + + workflow := new(rpc.Workflow) + return workflow, json.Unmarshal(task.Data, workflow) } // Wait blocks until the workflow with the given ID is done. diff --git a/server/grpc/server.go b/server/grpc/server.go index 2cfa8548a19..cc4c015f5fe 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -25,6 +25,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto" "go.woodpecker-ci.org/woodpecker/v2/server/logging" + "go.woodpecker-ci.org/woodpecker/v2/server/model" "go.woodpecker-ci.org/woodpecker/v2/server/pubsub" "go.woodpecker-ci.org/woodpecker/v2/server/queue" "go.woodpecker-ci.org/woodpecker/v2/server/store" @@ -37,7 +38,7 @@ type WoodpeckerServer struct { peer RPC } -func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { +func NewWoodpeckerServer(ctx context.Context, queue queue.Queue, logger logging.Log, pubsub *pubsub.Publisher, store store.Store) proto.WoodpeckerServer { pipelineTime := prometheus_auto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "woodpecker", Name: "pipeline_time", @@ -49,6 +50,7 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P Help: "Pipeline count.", }, []string{"repo", "branch", "status", "pipeline"}) peer := RPC{ + ctx: ctx, store: store, queue: queue, pubsub: pubsub, @@ -56,7 +58,32 @@ func NewWoodpeckerServer(queue queue.Queue, logger logging.Log, pubsub *pubsub.P pipelineTime: pipelineTime, pipelineCount: pipelineCount, } - return &WoodpeckerServer{peer: peer} + rpcServer := &WoodpeckerServer{peer: peer} + go rpcServer.markSkippedDone() + return rpcServer +} + +// mark skipped tasks done, based on dependencies. +// TODO: find better place for this background service +func (s *WoodpeckerServer) markSkippedDone() { + for { + task, err := s.peer.queue.Poll(s.peer.ctx, queue.InternalWorkerID, func(t *model.Task) (bool, int) { + return !t.ShouldRun(), 0 + }) + if err != nil { + log.Error().Err(err).Msg("got error while polling for tasks that should be skipped") + continue + } + if task == nil { + log.Error().Msg("queue poll returned nil task") + continue + } + + log.Trace().Msgf("mark skipped task '%s' as done", task.String()) + if err := s.peer.Done(s.peer.ctx, task.ID, rpc.WorkflowState{}); err != nil { + log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID) + } + } } func (s *WoodpeckerServer) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { diff --git a/server/pipeline/stepbuilder/stepBuilder.go b/server/pipeline/stepbuilder/stepBuilder.go index 8cf43760689..4fbde9e0875 100644 --- a/server/pipeline/stepbuilder/stepBuilder.go +++ b/server/pipeline/stepbuilder/stepBuilder.go @@ -240,12 +240,13 @@ func (b *StepBuilder) environmentVariables(metadata metadata.Metadata, axis matr return environ } -func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) { - var secrets []compiler.Secret - for _, sec := range b.Secs { - var events []string - for _, event := range sec.Events { - events = append(events, string(event)) +func toCompilerSecrets(in []*model.Secret) []compiler.Secret { + secrets := make([]compiler.Secret, 0, len(in)) + + for _, sec := range in { + events := make([]string, len(sec.Events)) + for i, event := range sec.Events { + events[i] = string(event) } secrets = append(secrets, compiler.Secret{ @@ -256,14 +257,24 @@ func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, envi }) } - var registries []compiler.Registry - for _, reg := range b.Regs { + return secrets +} + +func toCompilerRegistries(in []*model.Registry) []compiler.Registry { + registries := make([]compiler.Registry, 0, len(in)) + for _, reg := range in { registries = append(registries, compiler.Registry{ Hostname: reg.Address, Username: reg.Username, Password: reg.Password, }) } + return registries +} + +func (b *StepBuilder) toInternalRepresentation(parsed *yaml_types.Workflow, environ map[string]string, metadata metadata.Metadata, workflowID int64) (*backend_types.Config, error) { + secrets := toCompilerSecrets(b.Secs) + registries := toCompilerRegistries(b.Regs) return compiler.New( compiler.WithEnviron(environ), diff --git a/server/queue/fifo.go b/server/queue/fifo.go index f924055ad5b..9054b65a3a7 100644 --- a/server/queue/fifo.go +++ b/server/queue/fifo.go @@ -27,6 +27,9 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/shared/constant" ) +// InternalWorkerID is pseudo agent ID for internal routines using the queue. +const InternalWorkerID = -2 + type entry struct { item *model.Task done chan bool @@ -213,7 +216,13 @@ func (q *fifo) Extend(_ context.Context, agentID int64, taskID string) error { func (q *fifo) Info(_ context.Context) InfoT { q.Lock() stats := InfoT{} - stats.Stats.Workers = len(q.workers) + workerCount := 0 + for w := range q.workers { + if w.agentID != InternalWorkerID { // ignore internal workers + workerCount++ + } + } + stats.Stats.Workers = workerCount stats.Stats.Pending = q.pending.Len() stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len() stats.Stats.Running = len(q.running) @@ -251,6 +260,10 @@ func (q *fifo) Resume() { // KickAgentWorkers kicks all workers for a given agent. func (q *fifo) KickAgentWorkers(agentID int64) { + if agentID == InternalWorkerID { + return + } + q.Lock() defer q.Unlock() diff --git a/server/queue/queue.go b/server/queue/queue.go index 682d0e0ac5a..ac97a83f8eb 100644 --- a/server/queue/queue.go +++ b/server/queue/queue.go @@ -80,6 +80,7 @@ type Queue interface { PushAtOnce(c context.Context, tasks []*model.Task) error // Poll retrieves and removes a task head of this queue. + // blocks until a task is available or the context is canceled Poll(c context.Context, agentID int64, f FilterFn) (*model.Task, error) // Extend extends the deadline for a task.