Skip to content

Commit

Permalink
Merge pull request #2170 from vjanelle/federation_external_agents
Browse files Browse the repository at this point in the history
(#2169) Pass choria federations to external discovery agent
  • Loading branch information
vjanelle authored May 21, 2024
2 parents acca014 + 039f1bd commit 4ca21a3
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 68 deletions.
11 changes: 11 additions & 0 deletions cmd/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"

"github.com/choria-io/go-choria/internal/fs"
Expand All @@ -23,6 +24,8 @@ type discoverCommand struct {
verbose bool
silent bool
fo *discovery.StandardOptions

federations string
}

func (d *discoverCommand) Setup() error {
Expand All @@ -35,6 +38,8 @@ func (d *discoverCommand) Setup() error {
d.fo.AddSelectionFlags(d.cmd)
d.fo.AddFlatFileFlags(d.cmd)

d.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&d.federations)

d.cmd.Flag("verbose", "Log verbosely").Default("false").Short('v').UnNegatableBoolVar(&d.verbose)
d.cmd.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&d.jsonFormat)
d.cmd.Flag("silent", "Produce as little logging as possible").Hidden().UnNegatableBoolVar(&d.silent)
Expand All @@ -44,6 +49,12 @@ func (d *discoverCommand) Setup() error {

func (d *discoverCommand) Configure() error {
err = commonConfigure()

// If list of federations is specified on the CLI, mutate the configuration directly
if len(d.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(d.federations, ",")
}

if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion cmd/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"sync"
"time"

Expand All @@ -34,6 +35,8 @@ type pingCommand struct {

fo *discovery.StandardOptions

federations string

namesOnly bool

start time.Time
Expand All @@ -54,6 +57,8 @@ func (p *pingCommand) Setup() (err error) {
p.fo = discovery.NewStandardOptions()
p.fo.AddFilterFlags(p.cmd)

p.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&p.federations)

p.cmd.Flag("expect", "Wait until this many replies were received or timeout").IntVar(&p.waitfor)
p.cmd.Flag("timeout", "How long to wait for responses").IntVar(&p.timeout)
p.cmd.Flag("graph", "Produce a graph of the result times").UnNegatableBoolVar(&p.graph)
Expand Down Expand Up @@ -195,7 +200,14 @@ func (p *pingCommand) createMessage(filter *protocol.Filter) (inter.Message, err
func (p *pingCommand) Configure() error {
protocol.ClientStrictValidation = false

return commonConfigure()
err := commonConfigure()

// If list of federations is specified on the CLI, mutate the configuration directly
if len(p.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(p.federations, ",")
}

return err
}

// chart takes all the received time stamps and put them
Expand Down
9 changes: 9 additions & 0 deletions cmd/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type reqCommand struct {

outputWriter *bufio.Writer
outputFileHandle *os.File

federations string
}

func (r *reqCommand) Setup() (err error) {
Expand Down Expand Up @@ -99,6 +101,8 @@ that match the filter.
r.fo.AddFlatFileFlags(r.cmd)
r.fo.AddSelectionFlags(r.cmd)

r.cmd.Flag("federations", "List of federations to search for collectives in, comma separated").StringVar(&r.federations)

r.cmd.Flag("limit", "Limits request to a set of targets eg 10 or 10%").StringVar(&r.limit)
r.cmd.Flag("limit-seed", "Seed value for deterministic random limits").PlaceHolder("SEED").Int64Var(&r.limitSeed)
r.cmd.Flag("batch", "Do requests in batches").PlaceHolder("SIZE").IntVar(&r.batch)
Expand Down Expand Up @@ -462,6 +466,11 @@ func (r *reqCommand) Configure() error {
return err
}

// If list of federations is specified on the CLI, mutate the configuration directly
if len(r.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(r.federations, ",")
}

// we try not to spam things to stderr in these structured output formats
if (r.jsonLinesOnly || r.jsonOnly) && cfg.LogLevel != "debug" {
cfg.LogLevel = "fatal"
Expand Down
16 changes: 16 additions & 0 deletions inter/imocks/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func WithDDLFiles(kind string, plugin string, path string) fwMockOption {
}
}

func WithFederations(federations []string) fwMockOption {
return func(o *fwMockOpts) {
o.cfg.Choria.FederationCollectives = federations
}
}

func NewFrameworkForTests(ctrl *gomock.Controller, logWriter io.Writer, opts ...fwMockOption) (*MockFramework, *config.Config) {
mopts := &fwMockOpts{
cfg: config.NewConfigForTests(),
Expand All @@ -96,6 +102,16 @@ func NewFrameworkForTests(ctrl *gomock.Controller, logWriter io.Writer, opts ...
fw.EXPECT().Configuration().Return(mopts.cfg).AnyTimes()
fw.EXPECT().Logger(gomock.AssignableToTypeOf("")).Return(logrus.NewEntry(logger)).AnyTimes()
fw.EXPECT().NewRequestID().Return(util.RandomHexString(), nil).AnyTimes()
fw.EXPECT().FederationCollectives().DoAndReturn(
func() []string {
if len(fw.Configuration().Choria.FederationCollectives) == 0 {
retval := strings.Split(os.Getenv("CHORIA_FED_COLLECTIVE"), ",")
if retval[0] == "" {
return []string{}
}
}
return fw.Configuration().Choria.FederationCollectives
}).AnyTimes()
fw.EXPECT().HasCollective(gomock.AssignableToTypeOf("")).DoAndReturn(func(c string) bool {
for _, collective := range fw.Configuration().Collectives {
if c == collective {
Expand Down
36 changes: 20 additions & 16 deletions providers/discovery/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ type Response struct {

// Request is the request sent to the external script on its STDIN
type Request struct {
Protocol string `json:"protocol"`
Collective string `json:"collective"`
Filter *protocol.Filter `json:"filter"`
Options map[string]string `json:"options"`
Schema string `json:"$schema"`
Timeout float64 `json:"timeout"`
Protocol string `json:"protocol"`
Collective string `json:"collective"`
Filter *protocol.Filter `json:"filter"`
Federations []string `json:"federations"`
Options map[string]string `json:"options"`
Schema string `json:"$schema"`
Timeout float64 `json:"timeout"`
}

const (
Expand All @@ -67,10 +68,11 @@ func New(fw inter.Framework) *External {

func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []string, err error) {
dopts := &dOpts{
collective: e.fw.Configuration().MainCollective,
timeout: e.timeout,
command: e.fw.Configuration().Choria.ExternalDiscoveryCommand,
do: make(map[string]string),
collective: e.fw.Configuration().MainCollective,
timeout: e.timeout,
command: e.fw.Configuration().Choria.ExternalDiscoveryCommand,
federations: e.fw.FederationCollectives(),
do: make(map[string]string),
}

for _, opt := range opts {
Expand Down Expand Up @@ -100,12 +102,13 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st
defer cancel()

idat := &Request{
Schema: RequestSchema,
Protocol: RequestProtocol,
Timeout: dopts.timeout.Seconds(),
Collective: dopts.collective,
Filter: dopts.filter,
Options: dopts.do,
Schema: RequestSchema,
Protocol: RequestProtocol,
Timeout: dopts.timeout.Seconds(),
Collective: dopts.collective,
Federations: dopts.federations,
Filter: dopts.filter,
Options: dopts.do,
}

req, err := json.Marshal(idat)
Expand Down Expand Up @@ -145,6 +148,7 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st

cmd := exec.CommandContext(timeoutCtx, command, args[1:]...)
cmd.Dir = os.TempDir()

cmd.Env = []string{
"CHORIA_EXTERNAL_REQUEST=" + reqfile.Name(),
"CHORIA_EXTERNAL_REPLY=" + repfile.Name(),
Expand Down
139 changes: 93 additions & 46 deletions providers/discovery/external/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,64 +34,111 @@ var _ = Describe("External", func() {
e *External
)

BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter)
cfg.Collectives = []string{"mcollective", "test"}
Context("command without federation", func() {
BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter)
cfg.Collectives = []string{"mcollective", "test"}

e = New(fw)
})

AfterEach(func() {
mockctl.Finish()
})

Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
It("Should request and return discovered nodes", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
AfterEach(func() {
mockctl.Finish()
})

f := protocol.NewFilter()
f.AddAgentFilter("rpcutil")
f.AddFactFilter("country", "==", "mt")
Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
wd, _ := os.Getwd()
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
var f *protocol.Filter
BeforeEach(func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}

f = protocol.NewFilter()
f.AddAgentFilter("rpcutil")
err := f.AddFactFilter("country", "==", "mt")
Expect(err).ToNot(HaveOccurred())
})
It("Should request and return discovered nodes", func() {
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})

It("Should support command overrides via options", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb")
cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})
})
})
Context("With federation", func() {
BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter, imock.WithFederations([]string{"alpha", "beta"}))
cfg.Collectives = []string{"mcollective", "test"}

It("Should support command overrides via options", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
e = New(fw)
})

f := protocol.NewFilter()
f.AddAgentFilter("rpcutil")
f.AddFactFilter("country", "==", "mt")
AfterEach(func() {
mockctl.Finish()
})

Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
wd, _ := os.Getwd()
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb")
cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
var f *protocol.Filter
var err error
BeforeEach(func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
// err := os.Setenv("CHORIA_FED_COLLECTIVE", "alpha,beta")
// Expect(err).ToNot(HaveOccurred())

f = protocol.NewFilter()
f.AddAgentFilter("rpcutil")
err = f.AddFactFilter("country", "==", "mt")
Expect(err).ToNot(HaveOccurred())
})
It("Should request and return discovered nodes", func() {
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_federation.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})
})

})
})
18 changes: 13 additions & 5 deletions providers/discovery/external/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
)

type dOpts struct {
filter *protocol.Filter
collective string
timeout time.Duration
command string
do map[string]string
filter *protocol.Filter
collective string
federations []string
timeout time.Duration
command string
do map[string]string
}

// DiscoverOption configures the broadcast discovery method
Expand All @@ -35,6 +36,13 @@ func Collective(c string) DiscoverOption {
}
}

// Federations sets the list of federated collectives to discover in
func Federations(f []string) DiscoverOption {
return func(o *dOpts) {
o.federations = f
}
}

// Timeout sets the discovery timeout, else the configured default is used
func Timeout(t time.Duration) DiscoverOption {
return func(o *dOpts) {
Expand Down
Loading

0 comments on commit 4ca21a3

Please sign in to comment.