Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: advanced watcher usernames management #174

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,17 @@ func resolve(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task, sm
t.Tags[k] = v
}
}
// Merge task's watcher usernames with the usernames returned in the step
// ignoring duplicate usernames already present.
loop:
for _, u := range s.WatcherUsernames {
for _, e := range t.WatcherUsernames {
if e == u {
continue loop
}
}
t.WatcherUsernames = append(t.WatcherUsernames, u)
}

// "commit" step back into resolution
res.SetStep(s.Name, s)
Expand Down
4 changes: 2 additions & 2 deletions engine/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func extractArguments(path string, v reflect.Value) ([]string, error) {

// Exec is the implementation of the runner.Exec function but does nothing: function runners
// are just place holders to resolve to actual plugin/builtin.
func (f *Function) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error) {
return nil, nil, nil, errors.New("functions cannot be executed")
func (f *Function) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error) {
return nil, nil, nil, nil, errors.New("functions cannot be executed")
}

// ValidConfig insure that the given configuration resolves all the input needed by the function.
Expand Down
15 changes: 8 additions & 7 deletions engine/step/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ type Step struct {

Resources []string `json:"resources"` // resource limits to enforce

Tags map[string]string `json:"tags"`
Tags map[string]string `json:"-"`
WatcherUsernames []string `json:"-"`
}

// Context provides a step with extra metadata about the task
Expand Down Expand Up @@ -287,7 +288,7 @@ func (st *Step) generateExecution(action executor.Executor, baseConfig map[strin
return &ret, nil
}

func (st *Step) execute(execution *execution, callback func(interface{}, interface{}, map[string]string, error)) {
func (st *Step) execute(execution *execution, callback func(interface{}, interface{}, map[string]string, []string, error)) {

select {
case <-execution.stopRunningSteps:
Expand All @@ -302,8 +303,8 @@ func (st *Step) execute(execution *execution, callback func(interface{}, interfa
utask.AcquireResources(limits)
defer utask.ReleaseResources(limits)

output, metadata, tags, err := execution.runner.Exec(st.Name, execution.baseCfgRaw, execution.config, execution.ctx)
callback(output, metadata, tags, err)
output, metadata, tags, watchers, err := execution.runner.Exec(st.Name, execution.baseCfgRaw, execution.config, execution.ctx)
callback(output, metadata, tags, watchers, err)
}

// Run carries out the action defined by a Step, by providing values to its configuration
Expand Down Expand Up @@ -356,7 +357,7 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
go func() {
defer preHookWg.Done()

st.execute(preHookExecution, func(output interface{}, metadata interface{}, tags map[string]string, err error) {
st.execute(preHookExecution, func(output interface{}, metadata interface{}, tags map[string]string, watchers []string, err error) {
if err != nil {
st.State = StateFatalError
st.Error = fmt.Sprintf("prehook: %s", err)
Expand Down Expand Up @@ -384,8 +385,8 @@ func Run(st *Step, baseConfig map[string]json.RawMessage, stepValues *values.Val
return
}

st.execute(execution, func(output interface{}, metadata interface{}, tags map[string]string, err error) {
st.Output, st.Metadata, st.Tags = output, metadata, tags
st.execute(execution, func(output interface{}, metadata interface{}, tags map[string]string, watchers []string, err error) {
st.Output, st.Metadata, st.Tags, st.WatcherUsernames = output, metadata, tags, watchers

execution.generateOutput(st, preHookValues)

Expand Down
2 changes: 1 addition & 1 deletion engine/step/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// Runner represents a component capable of executing a specific action,
// provided a configuration and a context
type Runner interface {
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error)
Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error)
ValidConfig(baseConfig json.RawMessage, config json.RawMessage) error
Context(stepName string) interface{}
Resources(baseConfig json.RawMessage, config json.RawMessage) []string
Expand Down
17 changes: 16 additions & 1 deletion models/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func Create(dbp zesty.DBProvider, tt *tasktemplate.TaskTemplate, reqUsername str
PublicID: uuid.Must(uuid.NewV4()).String(),
TemplateID: tt.ID,
RequesterUsername: reqUsername,
WatcherUsernames: watcherUsernames,
WatcherUsernames: mergeStringSlicesWithoutDuplicates(tt.AllowedWatcherUsernames, watcherUsernames),
ResolverUsernames: resolverUsernames,
Created: now.Get(),
LastActivity: now.Get(),
Expand Down Expand Up @@ -607,3 +607,18 @@ func (t *Task) notifyState(potentialResolvers []string) {
notify.ListActions().TaskStateAction,
)
}

func mergeStringSlicesWithoutDuplicates(a, b []string) []string {
m := make(map[string]struct{}, len(a)+len(b))
for _, v := range a {
m[v] = struct{}{}
}
for _, v := range b {
m[v] = struct{}{}
}
out := make([]string, 0, len(m))
for k := range m {
out = append(out, k)
}
return out
}
3 changes: 3 additions & 0 deletions models/tasktemplate/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type TaskTemplate struct {

AllowedResolverUsernames []string `json:"allowed_resolver_usernames" db:"allowed_resolver_usernames"`
AllowAllResolverUsernames bool `json:"allow_all_resolver_usernames" db:"allow_all_resolver_usernames"`
AllowedWatcherUsernames []string `json:"allowed_watcher_usernames,omitempty" db:"allowed_watcher_usernames"`
AutoRunnable bool `json:"auto_runnable" db:"auto_runnable"`
Blocked bool `json:"blocked" db:"blocked"`
Hidden bool `json:"hidden" db:"hidden"`
Expand All @@ -54,6 +55,7 @@ func Create(dbp zesty.DBProvider,
inputs, resolverInputs []input.Input,
allowedResolverUsernames []string,
allowAllResolverUsernames, autoRunnable bool,
allowedWatcherUsernames []string,
steps map[string]*step.Step,
variables []values.Variable,
tags map[string]string,
Expand All @@ -76,6 +78,7 @@ func Create(dbp zesty.DBProvider,
Tags: tags,
AllowedResolverUsernames: allowedResolverUsernames,
AllowAllResolverUsernames: allowAllResolverUsernames,
AllowedWatcherUsernames: allowedWatcherUsernames,
AutoRunnable: autoRunnable,
Blocked: false,
Hidden: false,
Expand Down
2 changes: 2 additions & 0 deletions pkg/plugins/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
pluginssh "github.com/ovh/utask/pkg/plugins/builtin/ssh"
pluginsubtask "github.com/ovh/utask/pkg/plugins/builtin/subtask"
plugintag "github.com/ovh/utask/pkg/plugins/builtin/tag"
pluginwatcher "github.com/ovh/utask/pkg/plugins/builtin/watcher"
"github.com/ovh/utask/pkg/plugins/taskplugin"
)

Expand All @@ -28,6 +29,7 @@ func Register() error {
pluginping.Plugin,
pluginscript.Plugin,
plugintag.Plugin,
pluginwatcher.Plugin,
} {
if err := step.RegisterRunner(p.PluginName(), p); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/builtin/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func Test_exec(t *testing.T) {
cfgJSON, err := json.Marshal(cfg)
assert.NoError(t, err)

output, metadata, _, err := Plugin.Exec("test", json.RawMessage(""), json.RawMessage(cfgJSON), nil)
output, metadata, _, _, err := Plugin.Exec("test", json.RawMessage(""), json.RawMessage(cfgJSON), nil)
require.NoError(t, err)

assert.NoError(t, err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/plugins/builtin/tag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ This plugin updates the tags of the current task. Existing tags are overwritten

## Configuration

|Fields|Description
| --- | --- |
| Fields | Description |
| ------ | --------------- |
| `tags` | key/values tags |

Expand Down
22 changes: 22 additions & 0 deletions pkg/plugins/builtin/watcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# `watcher` Plugin

This plugin updates the watcher usernames of the current task. New usernames are added to the list of existing one, ignoring any duplicate.

## Configuration

| Fields | Description |
| ----------- | ------------------ |
| `usernames` | an array of string |

## Example

An action of type `watcher` requires only one field, the list of watcher usernames to add to the current task.

```yaml
action:
type: watcher
configuration:
usernames:
- foo
- bar
```
45 changes: 45 additions & 0 deletions pkg/plugins/builtin/watcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package pluginwatcher

import (
"fmt"
"strings"

"github.com/ovh/utask/pkg/plugins/taskplugin"
)

// The watcher plugin allow to update the allowed watcher usernames of a task.
var (
Plugin = taskplugin.New("watcher", "0.1", exec,
taskplugin.WithConfig(validConfig, Config{}),
taskplugin.WithWatchers(watchers),
)
)

// Config represents the configuration of the plugin.
type Config struct {
Usernames []string `json:"usernames"`
}

func validConfig(config interface{}) error {
cfg := config.(*Config)

for i, v := range cfg.Usernames {
if strings.TrimSpace(v) == "" {
return fmt.Errorf("invalid watcher username at position %d", i)
}
}
return nil
}

func exec(stepName string, config interface{}, ctx interface{}) (interface{}, interface{}, error) {
return nil, nil, nil
}

func watchers(config, _, _, _ interface{}, _ error) []string {
if config == nil {
return nil
}
cfg := config.(*Config)

return cfg.Usernames
}
23 changes: 19 additions & 4 deletions pkg/plugins/taskplugin/taskplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PluginExecutor struct {
contextFactory func(string) interface{}
metadataSchema json.RawMessage
tagsFunc tagsFunc
watchersFunc watchersFunc
}

// Context generates a context payload to pass to Exec()
Expand Down Expand Up @@ -84,20 +85,20 @@ func (r PluginExecutor) ValidConfig(baseConfig json.RawMessage, config json.RawM
}

// Exec performs the action implemented by the executor
func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, error) {
func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config json.RawMessage, ctx interface{}) (interface{}, interface{}, map[string]string, []string, error) {
var cfg interface{}

if r.configFactory != nil {
cfg = r.configFactory()
if len(baseConfig) > 0 {
err := utils.JSONnumberUnmarshal(bytes.NewReader(baseConfig), cfg)
if err != nil {
return nil, nil, nil, errors.Annotate(err, "failed to unmarshal base configuration")
return nil, nil, nil, nil, errors.Annotate(err, "failed to unmarshal base configuration")
}
}
err := utils.JSONnumberUnmarshal(bytes.NewReader(config), cfg)
if err != nil {
return nil, nil, nil, errors.Annotate(err, "failed to unmarshal configuration")
return nil, nil, nil, nil, errors.Annotate(err, "failed to unmarshal configuration")
}
}
output, metadata, err := r.execfunc(stepName, cfg, ctx)
Expand All @@ -106,7 +107,11 @@ func (r PluginExecutor) Exec(stepName string, baseConfig json.RawMessage, config
if r.tagsFunc != nil {
tags = r.tagsFunc(cfg, ctx, output, metadata, err)
}
return output, metadata, tags, err
var watchers []string
if r.watchersFunc != nil {
watchers = r.watchersFunc(cfg, ctx, output, metadata, err)
}
return output, metadata, tags, watchers, err
}

// PluginName returns a plugin's name
Expand All @@ -125,6 +130,7 @@ func (r PluginExecutor) MetadataSchema() json.RawMessage {
}

type tagsFunc func(config, ctx, output, metadata interface{}, err error) map[string]string
type watchersFunc func(config, ctx, output, metadata interface{}, err error) []string

// PluginOpt is a helper struct to customize an action executor
type PluginOpt struct {
Expand All @@ -135,6 +141,7 @@ type PluginOpt struct {
resourcesFunc func(interface{}) []string
metadataFunc func() string
tagsFunc tagsFunc
watchersFunc watchersFunc
}

// WithConfig defines the configuration struct and validation function
Expand Down Expand Up @@ -174,6 +181,13 @@ func WithTags(fn tagsFunc) func(*PluginOpt) {
}
}

// WithWatchers defines a function to manipulate the watcher usernames of a task.
func WithWatchers(fn watchersFunc) func(*PluginOpt) {
return func(o *PluginOpt) {
o.watchersFunc = fn
}
}

// WithResources defines a function indicating what resources will be needed by the plugin
func WithResources(resourcesFunc func(interface{}) []string) func(*PluginOpt) {
return func(o *PluginOpt) {
Expand Down Expand Up @@ -255,5 +269,6 @@ func New(pluginName string, pluginVersion string, execfunc ExecFunc, opts ...fun
contextFactory: contextFactory,
metadataSchema: schema,
tagsFunc: pOpt.tagsFunc,
watchersFunc: pOpt.watchersFunc,
}
}
7 changes: 7 additions & 0 deletions sql/migrations/005_template_watcher_usernames.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up

ALTER TABLE "task_template" ADD COLUMN "allowed_watcher_usernames" JSONB NOT NULL DEFAULT '[]';

-- +migrate Down

ALTER TABLE "task_template" DROP COLUMN "allowed_watcher_usernames";
1 change: 1 addition & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ CREATE TABLE "task_template" (
variables JSONB NOT NULL DEFAULT 'null',
allowed_resolver_usernames JSONB NOT NULL DEFAULT '[]',
allow_all_resolver_usernames BOOL NOT NULL DEFAULT false,
allowed_watcher_usernames JSONB NOT NULL DEFAULT '[]',
auto_runnable BOOL NOT NULL DEFAULT false,
blocked BOOL NOT NULL DEFAULT false,
hidden BOOL NOT NULL DEFAULT false,
Expand Down