From 638914b6fcfe45223df345687d78cdbd08b3088d Mon Sep 17 00:00:00 2001 From: Aldo Lacuku Date: Mon, 27 Nov 2023 15:46:15 +0100 Subject: [PATCH] new(cmd): introduce the run and version commands The run commnand starts the k8s-metacollector. The version command prints the version and exits. Signed-off-by: Aldo Lacuku --- cmd/collector/doc.go | 17 ++ cmd/collector/root.go | 40 +++ cmd/collector/run/doc.go | 17 ++ cmd/collector/run/run.go | 404 +++++++++++++++++++++++++++++++ cmd/collector/version/doc.go | 17 ++ cmd/collector/version/version.go | 37 +++ go.mod | 4 +- go.sum | 6 + main.go | 345 +------------------------- 9 files changed, 546 insertions(+), 341 deletions(-) create mode 100644 cmd/collector/doc.go create mode 100644 cmd/collector/root.go create mode 100644 cmd/collector/run/doc.go create mode 100644 cmd/collector/run/run.go create mode 100644 cmd/collector/version/doc.go create mode 100644 cmd/collector/version/version.go diff --git a/cmd/collector/doc.go b/cmd/collector/doc.go new file mode 100644 index 0000000..2da8ff7 --- /dev/null +++ b/cmd/collector/doc.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector implements the command line for the metacollector. +package collector diff --git a/cmd/collector/root.go b/cmd/collector/root.go new file mode 100644 index 0000000..ee8d7af --- /dev/null +++ b/cmd/collector/root.go @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/falcosecurity/k8s-metacollector/cmd/collector/run" + "github.com/falcosecurity/k8s-metacollector/cmd/collector/version" + "github.com/go-logr/logr" + "github.com/spf13/cobra" +) + +// New returns the root command. +func New(ctx context.Context, logger *logr.Logger) *cobra.Command { + cmd := &cobra.Command{ + Use: "k8s-metacollector", + Short: "Fetches the metadata from kubernetes API server and dispatches them to Falco instances", + SilenceErrors: true, + SilenceUsage: true, + TraverseChildren: true, + } + + cmd.AddCommand(run.New(ctx, logger)) + cmd.AddCommand(version.New()) + return cmd +} diff --git a/cmd/collector/run/doc.go b/cmd/collector/run/doc.go new file mode 100644 index 0000000..7a68933 --- /dev/null +++ b/cmd/collector/run/doc.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package run implement the run command. +package run diff --git a/cmd/collector/run/run.go b/cmd/collector/run/run.go new file mode 100644 index 0000000..5febc66 --- /dev/null +++ b/cmd/collector/run/run.go @@ -0,0 +1,404 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "context" + "flag" + "os" + + "github.com/falcosecurity/k8s-metacollector/broker" + "github.com/falcosecurity/k8s-metacollector/collectors" + "github.com/falcosecurity/k8s-metacollector/pkg/events" + "github.com/falcosecurity/k8s-metacollector/pkg/resource" + "github.com/falcosecurity/k8s-metacollector/pkg/subscriber" + "github.com/go-logr/logr" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) +} + +type flags struct { + metricsAddr string + probeAddr string + brokerAddr string + certFilePath string + keyFilePath string +} + +func (fl *flags) add(flags *pflag.FlagSet) { + flags.StringVar(&fl.metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to") + flags.StringVar(&fl.probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to") + flags.StringVar(&fl.brokerAddr, "broker-bind-address", ":45000", "The address the broker endpoint binds to") + flags.StringVar(&fl.certFilePath, "broker-server-cert", "", "Cert file path for grpc server") + flags.StringVar(&fl.keyFilePath, "broker-server-key", "", "Key file path for grpc server") +} + +type options struct { + flags + logger *logr.Logger + loggerOpts *zap.Options +} + +// New returns a new run command. +func New(ctx context.Context, logger *logr.Logger) *cobra.Command { + opts := options{ + flags: flags{}, + } + + cmd := &cobra.Command{ + Use: "run [flags]", + Short: "Runs the metacollector", + Long: "Runs the metacollector", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + opts.Run(ctx) + }, + } + + // If the logger is not set, then create logger options and bind the flags. + if logger == nil { + logOpts := zap.Options{ + Development: false, + } + logOpts.BindFlags(flag.CommandLine) + opts.loggerOpts = &logOpts + } else { + opts.logger = logger + } + + // Add the go flags to the cobra flagset. It registers the flags for the kubeconfig path + // and the logger. + cmd.Flags().AddGoFlagSet(flag.CommandLine) + opts.flags.add(cmd.Flags()) + + return cmd +} + +// Run starts the metacollector. +func (opts *options) Run(ctx context.Context) { + // Set the logger. + if opts.logger != nil { + ctrl.SetLogger(*opts.logger) + } else { + ctrl.SetLogger(zap.New(zap.UseFlagOptions(opts.loggerOpts))) + } + + setupLog := ctrl.Log.WithName("setup") + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + Metrics: server.Options{ + BindAddress: opts.metricsAddr, + }, + HealthProbeBindAddress: opts.probeAddr, + Cache: cache.Options{ + DefaultUnsafeDisableDeepCopy: ptr.To(true), + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Transform: collectors.PodTransformer(setupLog), + }, + &corev1.Service{}: { + Transform: collectors.ServiceTransformer(setupLog), + }, + &corev1.Namespace{}: { + Transform: collectors.PartialObjectTransformer(setupLog), + }, + &corev1.ReplicationController{}: { + Transform: collectors.PartialObjectTransformer(setupLog), + }, + &v1.Deployment{}: { + Transform: collectors.PartialObjectTransformer(setupLog), + }, + &v1.ReplicaSet{}: { + Transform: collectors.PartialObjectTransformer(setupLog), + }, + &v1.DaemonSet{}: { + Transform: collectors.PartialObjectTransformer(setupLog), + }, + &discoveryv1.EndpointSlice{}: { + Transform: collectors.EndpointsliceTransformer(setupLog), + }, + }, + }, + }) + if err != nil { + setupLog.Error(err, "creating manager") + os.Exit(1) + } + if err = collectors.IndexPodByPrefixName(ctx, mgr.GetFieldIndexer()); err != nil { + setupLog.Error(err, "unable to add indexer by prefix name for pods") + os.Exit(1) + } + + if err = collectors.IndexPodByNode(ctx, mgr.GetFieldIndexer()); err != nil { + setupLog.Error(err, "unable to add indexer by node name for pods") + os.Exit(1) + } + + // Create source for deployments. + dpl := make(chan event.GenericEvent, 1) + deploymentSource := &source.Channel{Source: dpl} + + // Create source for replicasets. + rs := make(chan event.GenericEvent, 1) + replicasetSource := &source.Channel{Source: rs} + + // Create source for namespaces. + ns := make(chan event.GenericEvent, 1) + namespaceSource := &source.Channel{Source: ns} + + // Create source for daemonsets. + ds := make(chan event.GenericEvent, 1) + daemonsetSource := &source.Channel{Source: ds} + + // Create source for replicationcontrollers. + rc := make(chan event.GenericEvent, 1) + rcSource := &source.Channel{Source: rc} + + externalSrc := make(map[string]chan<- event.GenericEvent) + externalSrc[resource.Deployment] = dpl + externalSrc[resource.ReplicaSet] = rs + externalSrc[resource.Namespace] = ns + externalSrc[resource.Daemonset] = rc + + // Create source for pods. + pd := make(chan event.GenericEvent, 1) + podSource := &source.Channel{Source: pd} + + // Create source for services. + svc := make(chan event.GenericEvent, 1) + serviceSource := &source.Channel{Source: svc} + + podChanTrig := make(subscriber.SubsChan) + + queue := broker.NewBlockingChannel(1) + + podCollector := collectors.NewPodCollector(mgr.GetClient(), queue, events.NewCache(), "pod-collector", + collectors.WithOwnerSources(externalSrc), + collectors.WithSubscribersChan(podChanTrig), + collectors.WithExternalSource(podSource)) + + if err = podCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.Pod) + os.Exit(1) + } + + dplChanTrig := make(subscriber.SubsChan) + dplCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), + collectors.NewPartialObjectMetadata(resource.Deployment, nil), "deployment-collector", + collectors.WithSubscribersChan(dplChanTrig), + collectors.WithExternalSource(deploymentSource), + collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { + return &client.MatchingFields{ + "metadata.generateName": meta.Name, + } + })) + + if err = dplCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.Deployment) + os.Exit(1) + } + + rsChanTrig := make(subscriber.SubsChan) + rsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), + collectors.NewPartialObjectMetadata(resource.ReplicaSet, nil), "replicaset-collector", + collectors.WithSubscribersChan(rsChanTrig), + collectors.WithExternalSource(replicasetSource), + collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { + return &client.MatchingFields{ + "metadata.generateName": meta.Name + "-", + } + })) + + if err = rsCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.ReplicaSet) + os.Exit(1) + } + + nsChanTrig := make(subscriber.SubsChan) + nsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), + collectors.NewPartialObjectMetadata(resource.Namespace, nil), "namespace-collector", + collectors.WithSubscribersChan(nsChanTrig), + collectors.WithExternalSource(namespaceSource)) + + if err = nsCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.Namespace) + os.Exit(1) + } + + dsChanTrig := make(subscriber.SubsChan) + dsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), + collectors.NewPartialObjectMetadata(resource.Daemonset, nil), "daemonset-collector", + collectors.WithSubscribersChan(dsChanTrig), + collectors.WithExternalSource(daemonsetSource), + collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { + return &client.MatchingFields{ + "metadata.generateName": meta.Name + "-", + } + })) + + if err = dsCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.Daemonset) + os.Exit(1) + } + + rcChanTrig := make(subscriber.SubsChan) + rcCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), + collectors.NewPartialObjectMetadata(resource.ReplicationController, nil), "replicationcontroller-collector", + collectors.WithSubscribersChan(rcChanTrig), + collectors.WithExternalSource(rcSource), + collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { + return &client.MatchingFields{ + "metadata.generateName": meta.Name + "-", + } + })) + + if err = rcCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.ReplicationController) + os.Exit(1) + } + + svcChanTrig := make(subscriber.SubsChan) + + svcCollector := collectors.NewServiceCollector(mgr.GetClient(), queue, events.NewCache(), "service-collector", + collectors.WithExternalSource(serviceSource), + collectors.WithSubscribersChan(svcChanTrig)) + + if err = svcCollector.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create collector for", "resource kind", resource.Service) + os.Exit(1) + } + + if err = (&collectors.EndpointsDispatcher{ + Client: mgr.GetClient(), + Name: "endpoint-dispatcher", + ServiceCollectorSource: svc, + PodCollectorSource: pd, + Pods: make(map[string]map[string]struct{}), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create dispatcher for", "resource kind", resource.EndpointSlice) + os.Exit(1) + } + + if err = (&collectors.EndpointslicesDispatcher{ + Client: mgr.GetClient(), + Name: "endpointslices-dispatcher", + ServiceCollectorSource: svc, + PodCollectorSource: pd, + Pods: make(map[string]map[string]struct{}), + ServicesName: make(map[string]string), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create dispatcher for", "resource kind", resource.EndpointSlice) + os.Exit(1) + } + + br, err := broker.New(ctrl.Log.WithName("broker"), queue, map[string]subscriber.SubsChan{ + resource.Pod: podChanTrig, + resource.Deployment: dplChanTrig, + resource.ReplicaSet: rsChanTrig, + resource.Daemonset: dsChanTrig, + resource.Service: svcChanTrig, + resource.Namespace: nsChanTrig, + resource.ReplicationController: rcChanTrig, + }, + broker.WithAddress(opts.brokerAddr), + broker.WithTLS(opts.certFilePath, opts.keyFilePath)) + + if err != nil { + setupLog.Error(err, "unable to create the broker") + os.Exit(1) + } + + if err = mgr.Add(br); err != nil { + setupLog.Error(err, "unable to add broker to the manager") + os.Exit(1) + } + + if err = mgr.Add(podCollector); err != nil { + setupLog.Error(err, "unable to add pod collector to the manager as a runnable") + os.Exit(1) + } + + if err = mgr.Add(dsCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", dsCollector.GetName()) + os.Exit(1) + } + + if err = mgr.Add(nsCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", nsCollector.GetName()) + os.Exit(1) + } + + if err = mgr.Add(dplCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", dplCollector.GetName()) + os.Exit(1) + } + + if err = mgr.Add(rsCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", rsCollector.GetName()) + os.Exit(1) + } + + if err = mgr.Add(rcCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", rcCollector.GetName()) + os.Exit(1) + } + + if err = mgr.Add(svcCollector); err != nil { + setupLog.Error(err, "unable to add %s collector to the manager as a runnable", svcCollector.GetName()) + os.Exit(1) + } + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up health check") + os.Exit(1) + } + + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + setupLog.Error(err, "unable to set up ready check") + os.Exit(1) + } + + setupLog.Info("starting manager") + if err := mgr.Start(ctx); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } +} diff --git a/cmd/collector/version/doc.go b/cmd/collector/version/doc.go new file mode 100644 index 0000000..db4ed81 --- /dev/null +++ b/cmd/collector/version/doc.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package version implements the logic for the version command. +package version diff --git a/cmd/collector/version/version.go b/cmd/collector/version/version.go new file mode 100644 index 0000000..8b2b03d --- /dev/null +++ b/cmd/collector/version/version.go @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2023 The Falco Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "fmt" + + "github.com/falcosecurity/k8s-metacollector/pkg/version" + "github.com/spf13/cobra" +) + +// New returns the version command. +func New() *cobra.Command { + cmd := &cobra.Command{ + Use: "version", + Short: "Print version", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(version.Version()) + }, + } + + return cmd +} diff --git a/go.mod b/go.mod index b99b066..e51ceac 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 github.com/prometheus/client_golang v1.17.0 + github.com/spf13/cobra v1.7.0 + github.com/spf13/pflag v1.0.5 google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 k8s.io/api v0.28.2 @@ -39,6 +41,7 @@ require ( github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect github.com/google/uuid v1.3.1 // indirect github.com/imdario/mergo v0.3.16 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -50,7 +53,6 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect diff --git a/go.sum b/go.sum index 2ee7af2..bfb819c 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,7 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -57,6 +58,8 @@ github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -103,6 +106,9 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index f1784d1..0236261 100644 --- a/main.go +++ b/main.go @@ -16,353 +16,18 @@ package main import ( - "context" - "flag" "fmt" "os" - "github.com/falcosecurity/k8s-metacollector/broker" - "github.com/falcosecurity/k8s-metacollector/collectors" - "github.com/falcosecurity/k8s-metacollector/pkg/events" - "github.com/falcosecurity/k8s-metacollector/pkg/resource" - "github.com/falcosecurity/k8s-metacollector/pkg/subscriber" - "github.com/falcosecurity/k8s-metacollector/pkg/version" - v1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - discoveryv1 "k8s.io/api/discovery/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/utils/ptr" + "github.com/falcosecurity/k8s-metacollector/cmd/collector" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/source" ) -var ( - scheme = runtime.NewScheme() -) - -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - - //+kubebuilder:scaffold:scheme -} - func main() { - var metricsAddr string - var probeAddr string - var brokerAddr string - var certFilePath string - var keyFilePath string - var versionFlag bool - - flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") - flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") - flag.StringVar(&brokerAddr, "broker-bind-address", ":45000", "The address the broker endpoint binds to.") - flag.StringVar(&certFilePath, "broker-server-cert", "", "Cert file path for grpc server.") - flag.StringVar(&keyFilePath, "broker-server-key", "", "Key file path for grpc server.") - flag.BoolVar(&versionFlag, "version", false, "Print version.") - - opts := zap.Options{ - Development: false, - } - opts.BindFlags(flag.CommandLine) - flag.Parse() - - if versionFlag { - fmt.Println(version.Version()) - os.Exit(0) - } - - ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - - setupLog := ctrl.Log.WithName("setup") - - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: scheme, - Metrics: server.Options{ - BindAddress: metricsAddr, - }, - HealthProbeBindAddress: probeAddr, - Cache: cache.Options{ - DefaultUnsafeDisableDeepCopy: ptr.To(true), - ByObject: map[client.Object]cache.ByObject{ - &corev1.Pod{}: { - Transform: collectors.PodTransformer(setupLog), - }, - &corev1.Service{}: { - Transform: collectors.ServiceTransformer(setupLog), - }, - &corev1.Namespace{}: { - Transform: collectors.PartialObjectTransformer(setupLog), - }, - &corev1.ReplicationController{}: { - Transform: collectors.PartialObjectTransformer(setupLog), - }, - &v1.Deployment{}: { - Transform: collectors.PartialObjectTransformer(setupLog), - }, - &v1.ReplicaSet{}: { - Transform: collectors.PartialObjectTransformer(setupLog), - }, - &v1.DaemonSet{}: { - Transform: collectors.PartialObjectTransformer(setupLog), - }, - &discoveryv1.EndpointSlice{}: { - Transform: collectors.EndpointsliceTransformer(setupLog), - }, - }, - }, - }) - if err != nil { - setupLog.Error(err, "creating manager") - os.Exit(1) - } - if err = collectors.IndexPodByPrefixName(context.Background(), mgr.GetFieldIndexer()); err != nil { - setupLog.Error(err, "unable to add indexer by prefix name for pods") - os.Exit(1) - } - - if err = collectors.IndexPodByNode(context.Background(), mgr.GetFieldIndexer()); err != nil { - setupLog.Error(err, "unable to add indexer by node name for pods") - os.Exit(1) - } - - // Create source for deployments. - dpl := make(chan event.GenericEvent, 1) - deploymentSource := &source.Channel{Source: dpl} - - // Create source for replicasets. - rs := make(chan event.GenericEvent, 1) - replicasetSource := &source.Channel{Source: rs} - - // Create source for namespaces. - ns := make(chan event.GenericEvent, 1) - namespaceSource := &source.Channel{Source: ns} - - // Create source for daemonsets. - ds := make(chan event.GenericEvent, 1) - daemonsetSource := &source.Channel{Source: ds} - - // Create source for replicationcontrollers. - rc := make(chan event.GenericEvent, 1) - rcSource := &source.Channel{Source: rc} - - externalSrc := make(map[string]chan<- event.GenericEvent) - externalSrc[resource.Deployment] = dpl - externalSrc[resource.ReplicaSet] = rs - externalSrc[resource.Namespace] = ns - externalSrc[resource.Daemonset] = rc - - // Create source for pods. - pd := make(chan event.GenericEvent, 1) - podSource := &source.Channel{Source: pd} - - // Create source for services. - svc := make(chan event.GenericEvent, 1) - serviceSource := &source.Channel{Source: svc} - - podChanTrig := make(subscriber.SubsChan) - - queue := broker.NewBlockingChannel(1) - - podCollector := collectors.NewPodCollector(mgr.GetClient(), queue, events.NewCache(), "pod-collector", - collectors.WithOwnerSources(externalSrc), - collectors.WithSubscribersChan(podChanTrig), - collectors.WithExternalSource(podSource)) - - if err = podCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.Pod) - os.Exit(1) - } - - dplChanTrig := make(subscriber.SubsChan) - dplCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), - collectors.NewPartialObjectMetadata(resource.Deployment, nil), "deployment-collector", - collectors.WithSubscribersChan(dplChanTrig), - collectors.WithExternalSource(deploymentSource), - collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { - return &client.MatchingFields{ - "metadata.generateName": meta.Name, - } - })) - - if err = dplCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.Deployment) - os.Exit(1) - } - - rsChanTrig := make(subscriber.SubsChan) - rsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), - collectors.NewPartialObjectMetadata(resource.ReplicaSet, nil), "replicaset-collector", - collectors.WithSubscribersChan(rsChanTrig), - collectors.WithExternalSource(replicasetSource), - collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { - return &client.MatchingFields{ - "metadata.generateName": meta.Name + "-", - } - })) - - if err = rsCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.ReplicaSet) - os.Exit(1) - } - - nsChanTrig := make(subscriber.SubsChan) - nsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), - collectors.NewPartialObjectMetadata(resource.Namespace, nil), "namespace-collector", - collectors.WithSubscribersChan(nsChanTrig), - collectors.WithExternalSource(namespaceSource)) - - if err = nsCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.Namespace) - os.Exit(1) - } - - dsChanTrig := make(subscriber.SubsChan) - dsCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), - collectors.NewPartialObjectMetadata(resource.Daemonset, nil), "daemonset-collector", - collectors.WithSubscribersChan(dsChanTrig), - collectors.WithExternalSource(daemonsetSource), - collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { - return &client.MatchingFields{ - "metadata.generateName": meta.Name + "-", - } - })) - - if err = dsCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.Daemonset) - os.Exit(1) - } - - rcChanTrig := make(subscriber.SubsChan) - rcCollector := collectors.NewObjectMetaCollector(mgr.GetClient(), queue, events.NewCache(), - collectors.NewPartialObjectMetadata(resource.ReplicationController, nil), "replicationcontroller-collector", - collectors.WithSubscribersChan(rcChanTrig), - collectors.WithExternalSource(rcSource), - collectors.WithPodMatchingFields(func(meta *metav1.ObjectMeta) client.ListOption { - return &client.MatchingFields{ - "metadata.generateName": meta.Name + "-", - } - })) - - if err = rcCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.ReplicationController) - os.Exit(1) - } - - svcChanTrig := make(subscriber.SubsChan) - - svcCollector := collectors.NewServiceCollector(mgr.GetClient(), queue, events.NewCache(), "service-collector", - collectors.WithExternalSource(serviceSource), - collectors.WithSubscribersChan(svcChanTrig)) - - if err = svcCollector.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create collector for", "resource kind", resource.Service) - os.Exit(1) - } - - if err = (&collectors.EndpointsDispatcher{ - Client: mgr.GetClient(), - Name: "endpoint-dispatcher", - ServiceCollectorSource: svc, - PodCollectorSource: pd, - Pods: make(map[string]map[string]struct{}), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create dispatcher for", "resource kind", resource.EndpointSlice) - os.Exit(1) - } - - if err = (&collectors.EndpointslicesDispatcher{ - Client: mgr.GetClient(), - Name: "endpointslices-dispatcher", - ServiceCollectorSource: svc, - PodCollectorSource: pd, - Pods: make(map[string]map[string]struct{}), - ServicesName: make(map[string]string), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create dispatcher for", "resource kind", resource.EndpointSlice) - os.Exit(1) - } - - br, err := broker.New(ctrl.Log.WithName("broker"), queue, map[string]subscriber.SubsChan{ - resource.Pod: podChanTrig, - resource.Deployment: dplChanTrig, - resource.ReplicaSet: rsChanTrig, - resource.Daemonset: dsChanTrig, - resource.Service: svcChanTrig, - resource.Namespace: nsChanTrig, - resource.ReplicationController: rcChanTrig, - }, - broker.WithAddress(brokerAddr), - broker.WithTLS(certFilePath, keyFilePath)) - - if err != nil { - setupLog.Error(err, "unable to create the broker") - os.Exit(1) - } - - if err = mgr.Add(br); err != nil { - setupLog.Error(err, "unable to add broker to the manager") - os.Exit(1) - } - - if err = mgr.Add(podCollector); err != nil { - setupLog.Error(err, "unable to add pod collector to the manager as a runnable") - os.Exit(1) - } - - if err = mgr.Add(dsCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", dsCollector.GetName()) - os.Exit(1) - } - - if err = mgr.Add(nsCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", nsCollector.GetName()) - os.Exit(1) - } - - if err = mgr.Add(dplCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", dplCollector.GetName()) - os.Exit(1) - } - - if err = mgr.Add(rsCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", rsCollector.GetName()) - os.Exit(1) - } - - if err = mgr.Add(rcCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", rcCollector.GetName()) - os.Exit(1) - } - - if err = mgr.Add(svcCollector); err != nil { - setupLog.Error(err, "unable to add %s collector to the manager as a runnable", svcCollector.GetName()) - os.Exit(1) - } - //+kubebuilder:scaffold:builder - - if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up health check") - os.Exit(1) - } - - if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - setupLog.Error(err, "unable to set up ready check") - os.Exit(1) - } - - setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - setupLog.Error(err, "problem running manager") + // new cmd. + cmd := collector.New(ctrl.SetupSignalHandler(), nil) + if err := cmd.Execute(); err != nil { + fmt.Println(err) os.Exit(1) } }