diff --git a/cmd/discover.go b/cmd/discover.go index 4045cb5cb..087515e80 100644 --- a/cmd/discover.go +++ b/cmd/discover.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "os" + "strings" "sync" "github.com/choria-io/go-choria/internal/fs" @@ -23,6 +24,8 @@ type discoverCommand struct { verbose bool silent bool fo *discovery.StandardOptions + + federations string } func (d *discoverCommand) Setup() error { @@ -35,6 +38,8 @@ func (d *discoverCommand) Setup() error { d.fo.AddSelectionFlags(d.cmd) d.fo.AddFlatFileFlags(d.cmd) + d.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&d.federations) + d.cmd.Flag("verbose", "Log verbosely").Default("false").Short('v').UnNegatableBoolVar(&d.verbose) d.cmd.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&d.jsonFormat) d.cmd.Flag("silent", "Produce as little logging as possible").Hidden().UnNegatableBoolVar(&d.silent) @@ -44,6 +49,12 @@ func (d *discoverCommand) Setup() error { func (d *discoverCommand) Configure() error { err = commonConfigure() + + // If list of federations is specified on the CLI, mutate the configuration directly + if len(d.federations) > 0 { + cfg.Choria.FederationCollectives = strings.Split(d.federations, ",") + } + if err != nil { return err } diff --git a/cmd/ping.go b/cmd/ping.go index e5ca0d53c..d14881a59 100644 --- a/cmd/ping.go +++ b/cmd/ping.go @@ -11,6 +11,7 @@ import ( "fmt" "math" "sort" + "strings" "sync" "time" @@ -34,6 +35,8 @@ type pingCommand struct { fo *discovery.StandardOptions + federations string + namesOnly bool start time.Time @@ -54,6 +57,8 @@ func (p *pingCommand) Setup() (err error) { p.fo = discovery.NewStandardOptions() p.fo.AddFilterFlags(p.cmd) + p.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&p.federations) + p.cmd.Flag("expect", "Wait until this many replies were received or timeout").IntVar(&p.waitfor) p.cmd.Flag("timeout", "How long to wait for responses").IntVar(&p.timeout) p.cmd.Flag("graph", "Produce a graph of the result times").UnNegatableBoolVar(&p.graph) @@ -195,7 +200,14 @@ func (p *pingCommand) createMessage(filter *protocol.Filter) (inter.Message, err func (p *pingCommand) Configure() error { protocol.ClientStrictValidation = false - return commonConfigure() + err := commonConfigure() + + // If list of federations is specified on the CLI, mutate the configuration directly + if len(p.federations) > 0 { + cfg.Choria.FederationCollectives = strings.Split(p.federations, ",") + } + + return err } // chart takes all the received time stamps and put them diff --git a/cmd/req.go b/cmd/req.go index 9d5af11b1..777fc935a 100644 --- a/cmd/req.go +++ b/cmd/req.go @@ -61,6 +61,8 @@ type reqCommand struct { outputWriter *bufio.Writer outputFileHandle *os.File + + federations string } func (r *reqCommand) Setup() (err error) { @@ -99,6 +101,8 @@ that match the filter. r.fo.AddFlatFileFlags(r.cmd) r.fo.AddSelectionFlags(r.cmd) + r.cmd.Flag("federations", "List of federations to search for collectives in, comma separated").StringVar(&r.federations) + r.cmd.Flag("limit", "Limits request to a set of targets eg 10 or 10%").StringVar(&r.limit) r.cmd.Flag("limit-seed", "Seed value for deterministic random limits").PlaceHolder("SEED").Int64Var(&r.limitSeed) r.cmd.Flag("batch", "Do requests in batches").PlaceHolder("SIZE").IntVar(&r.batch) @@ -462,6 +466,11 @@ func (r *reqCommand) Configure() error { return err } + // If list of federations is specified on the CLI, mutate the configuration directly + if len(r.federations) > 0 { + cfg.Choria.FederationCollectives = strings.Split(r.federations, ",") + } + // we try not to spam things to stderr in these structured output formats if (r.jsonLinesOnly || r.jsonOnly) && cfg.LogLevel != "debug" { cfg.LogLevel = "fatal" diff --git a/inter/imocks/util.go b/inter/imocks/util.go index 404cc1ba6..9280ccdf8 100644 --- a/inter/imocks/util.go +++ b/inter/imocks/util.go @@ -76,6 +76,12 @@ func WithDDLFiles(kind string, plugin string, path string) fwMockOption { } } +func WithFederations(federations []string) fwMockOption { + return func(o *fwMockOpts) { + o.cfg.Choria.FederationCollectives = federations + } +} + func NewFrameworkForTests(ctrl *gomock.Controller, logWriter io.Writer, opts ...fwMockOption) (*MockFramework, *config.Config) { mopts := &fwMockOpts{ cfg: config.NewConfigForTests(), @@ -96,6 +102,16 @@ func NewFrameworkForTests(ctrl *gomock.Controller, logWriter io.Writer, opts ... fw.EXPECT().Configuration().Return(mopts.cfg).AnyTimes() fw.EXPECT().Logger(gomock.AssignableToTypeOf("")).Return(logrus.NewEntry(logger)).AnyTimes() fw.EXPECT().NewRequestID().Return(util.RandomHexString(), nil).AnyTimes() + fw.EXPECT().FederationCollectives().DoAndReturn( + func() []string { + if len(fw.Configuration().Choria.FederationCollectives) == 0 { + retval := strings.Split(os.Getenv("CHORIA_FED_COLLECTIVE"), ",") + if retval[0] == "" { + return []string{} + } + } + return fw.Configuration().Choria.FederationCollectives + }).AnyTimes() fw.EXPECT().HasCollective(gomock.AssignableToTypeOf("")).DoAndReturn(func(c string) bool { for _, collective := range fw.Configuration().Collectives { if c == collective { diff --git a/providers/discovery/external/external.go b/providers/discovery/external/external.go index 2d200276d..934fda132 100644 --- a/providers/discovery/external/external.go +++ b/providers/discovery/external/external.go @@ -40,12 +40,13 @@ type Response struct { // Request is the request sent to the external script on its STDIN type Request struct { - Protocol string `json:"protocol"` - Collective string `json:"collective"` - Filter *protocol.Filter `json:"filter"` - Options map[string]string `json:"options"` - Schema string `json:"$schema"` - Timeout float64 `json:"timeout"` + Protocol string `json:"protocol"` + Collective string `json:"collective"` + Filter *protocol.Filter `json:"filter"` + Federations []string `json:"federations"` + Options map[string]string `json:"options"` + Schema string `json:"$schema"` + Timeout float64 `json:"timeout"` } const ( @@ -67,10 +68,11 @@ func New(fw inter.Framework) *External { func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []string, err error) { dopts := &dOpts{ - collective: e.fw.Configuration().MainCollective, - timeout: e.timeout, - command: e.fw.Configuration().Choria.ExternalDiscoveryCommand, - do: make(map[string]string), + collective: e.fw.Configuration().MainCollective, + timeout: e.timeout, + command: e.fw.Configuration().Choria.ExternalDiscoveryCommand, + federations: e.fw.FederationCollectives(), + do: make(map[string]string), } for _, opt := range opts { @@ -100,12 +102,13 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st defer cancel() idat := &Request{ - Schema: RequestSchema, - Protocol: RequestProtocol, - Timeout: dopts.timeout.Seconds(), - Collective: dopts.collective, - Filter: dopts.filter, - Options: dopts.do, + Schema: RequestSchema, + Protocol: RequestProtocol, + Timeout: dopts.timeout.Seconds(), + Collective: dopts.collective, + Federations: dopts.federations, + Filter: dopts.filter, + Options: dopts.do, } req, err := json.Marshal(idat) @@ -145,6 +148,7 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st cmd := exec.CommandContext(timeoutCtx, command, args[1:]...) cmd.Dir = os.TempDir() + cmd.Env = []string{ "CHORIA_EXTERNAL_REQUEST=" + reqfile.Name(), "CHORIA_EXTERNAL_REPLY=" + repfile.Name(), diff --git a/providers/discovery/external/external_test.go b/providers/discovery/external/external_test.go index 758430dc4..5c8301818 100644 --- a/providers/discovery/external/external_test.go +++ b/providers/discovery/external/external_test.go @@ -34,64 +34,111 @@ var _ = Describe("External", func() { e *External ) - BeforeEach(func() { - mockctl = gomock.NewController(GinkgoT()) - fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter) - cfg.Collectives = []string{"mcollective", "test"} + Context("command without federation", func() { + BeforeEach(func() { + mockctl = gomock.NewController(GinkgoT()) + fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter) + cfg.Collectives = []string{"mcollective", "test"} - e = New(fw) - }) - - AfterEach(func() { - mockctl.Finish() - }) - - Describe("New", func() { - It("Should initialize timeout to default", func() { - Expect(e.timeout).To(Equal(2 * time.Second)) - cfg.DiscoveryTimeout = 100 e = New(fw) - Expect(e.timeout).To(Equal(100 * time.Second)) }) - }) - Describe("Discover", func() { - It("Should request and return discovered nodes", func() { - if runtime.GOOS == "windows" { - Skip("not tested on windows") - } + AfterEach(func() { + mockctl.Finish() + }) - f := protocol.NewFilter() - f.AddAgentFilter("rpcutil") - f.AddFactFilter("country", "==", "mt") + Describe("New", func() { + It("Should initialize timeout to default", func() { + Expect(e.timeout).To(Equal(2 * time.Second)) + cfg.DiscoveryTimeout = 100 + e = New(fw) + Expect(e.timeout).To(Equal(100 * time.Second)) + }) + }) + Describe("Discover", func() { wd, _ := os.Getwd() - cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb") - nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"})) - Expect(err).ToNot(HaveOccurred()) - Expect(nodes).To(Equal([]string{"one", "two"})) - - cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test" - nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"})) - Expect(err).ToNot(HaveOccurred()) - Expect(nodes).To(Equal([]string{"one", "two"})) + var f *protocol.Filter + BeforeEach(func() { + if runtime.GOOS == "windows" { + Skip("not tested on windows") + } + + f = protocol.NewFilter() + f.AddAgentFilter("rpcutil") + err := f.AddFactFilter("country", "==", "mt") + Expect(err).ToNot(HaveOccurred()) + }) + It("Should request and return discovered nodes", func() { + cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb") + nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"})) + Expect(err).ToNot(HaveOccurred()) + Expect(nodes).To(Equal([]string{"one", "two"})) + + cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test" + nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"})) + Expect(err).ToNot(HaveOccurred()) + Expect(nodes).To(Equal([]string{"one", "two"})) + }) + + It("Should support command overrides via options", func() { + if runtime.GOOS == "windows" { + Skip("not tested on windows") + } + + cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb") + cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test" + nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"})) + Expect(err).ToNot(HaveOccurred()) + Expect(nodes).To(Equal([]string{"one", "two"})) + }) }) + }) + Context("With federation", func() { + BeforeEach(func() { + mockctl = gomock.NewController(GinkgoT()) + fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter, imock.WithFederations([]string{"alpha", "beta"})) + cfg.Collectives = []string{"mcollective", "test"} - It("Should support command overrides via options", func() { - if runtime.GOOS == "windows" { - Skip("not tested on windows") - } + e = New(fw) + }) - f := protocol.NewFilter() - f.AddAgentFilter("rpcutil") - f.AddFactFilter("country", "==", "mt") + AfterEach(func() { + mockctl.Finish() + }) + Describe("New", func() { + It("Should initialize timeout to default", func() { + Expect(e.timeout).To(Equal(2 * time.Second)) + cfg.DiscoveryTimeout = 100 + e = New(fw) + Expect(e.timeout).To(Equal(100 * time.Second)) + }) + }) + + Describe("Discover", func() { wd, _ := os.Getwd() - cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb") - cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test" - nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"})) - Expect(err).ToNot(HaveOccurred()) - Expect(nodes).To(Equal([]string{"one", "two"})) + var f *protocol.Filter + var err error + BeforeEach(func() { + if runtime.GOOS == "windows" { + Skip("not tested on windows") + } + // err := os.Setenv("CHORIA_FED_COLLECTIVE", "alpha,beta") + // Expect(err).ToNot(HaveOccurred()) + + f = protocol.NewFilter() + f.AddAgentFilter("rpcutil") + err = f.AddFactFilter("country", "==", "mt") + Expect(err).ToNot(HaveOccurred()) + }) + It("Should request and return discovered nodes", func() { + cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_federation.rb") + nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"})) + Expect(err).ToNot(HaveOccurred()) + Expect(nodes).To(Equal([]string{"one", "two"})) + }) }) + }) }) diff --git a/providers/discovery/external/options.go b/providers/discovery/external/options.go index b3317fedb..975999722 100644 --- a/providers/discovery/external/options.go +++ b/providers/discovery/external/options.go @@ -11,11 +11,12 @@ import ( ) type dOpts struct { - filter *protocol.Filter - collective string - timeout time.Duration - command string - do map[string]string + filter *protocol.Filter + collective string + federations []string + timeout time.Duration + command string + do map[string]string } // DiscoverOption configures the broadcast discovery method @@ -35,6 +36,13 @@ func Collective(c string) DiscoverOption { } } +// Federations sets the list of federated collectives to discover in +func Federations(f []string) DiscoverOption { + return func(o *dOpts) { + o.federations = f + } +} + // Timeout sets the discovery timeout, else the configured default is used func Timeout(t time.Duration) DiscoverOption { return func(o *dOpts) { diff --git a/providers/discovery/external/testdata/good.rb b/providers/discovery/external/testdata/good.rb index d346059e0..63ec4e8cc 100755 --- a/providers/discovery/external/testdata/good.rb +++ b/providers/discovery/external/testdata/good.rb @@ -30,6 +30,7 @@ def write_output(output) "identity" => [] }, "collective" => "ginkgo", + "federations" => [], "timeout" => 2, } diff --git a/providers/discovery/external/testdata/good_with_federation.rb b/providers/discovery/external/testdata/good_with_federation.rb new file mode 100755 index 000000000..84d73f723 --- /dev/null +++ b/providers/discovery/external/testdata/good_with_federation.rb @@ -0,0 +1,43 @@ +#!/usr/bin/env ruby + +require "json" +require "pp" + +def write_output(output) + File.open(ENV["CHORIA_EXTERNAL_REPLY"], "w") {|f| + f.puts(output.to_json) + } + exit +end + +if ENV["CHORIA_EXTERNAL_PROTOCOL"] != "io.choria.choria.discovery.v1.external_request" + write_output({"error" => "invalid protocol"}) + exit +end + +request = JSON.parse(File.read(ENV["CHORIA_EXTERNAL_REQUEST"])) +expected = { + "$schema" => "https://choria.io/schemas/choria/discovery/v1/external_request.json", + "options" => { + "foo" => "bar" + }, + "protocol" => "io.choria.choria.discovery.v1.external_request", + "filter" => { + "fact" => [{"fact" => "country", "operator"=>"==","value"=>"mt"}], + "cf_class"=>[], + "agent" => ["rpcutil"], + "compound" => [], + "identity" => [] + }, + "collective" => "ginkgo", + "federations" => ["alpha", "beta"], + "timeout" => 2, +} + +if request != expected + write_output({"error" => "invalid filter received: " + (request.to_a - expected.to_a).pretty_inspect}) + + # write_output({"error" => "invalid filter received: "+request.pretty_inspect}) +else + write_output({"protocol" => "io.choria.choria.discovery.v1.external_reply", "nodes" => ["one","two"]}) +end