diff --git a/go.mod b/go.mod index 918861d5..1a04fdf8 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ toolchain go1.21.1 require ( github.com/buger/jsonparser v1.1.1 github.com/google/uuid v1.4.0 - github.com/layer5io/meshkit v0.6.88 + github.com/layer5io/meshkit v0.7.2 github.com/myntra/pipeline v0.0.0-20180618182531-2babf4864ce8 github.com/spf13/viper v1.17.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 @@ -45,7 +45,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/docker/cli v24.0.6+incompatible // indirect github.com/docker/distribution v2.8.2+incompatible // indirect - github.com/docker/docker v24.0.6+incompatible // indirect + github.com/docker/docker v24.0.7+incompatible // indirect github.com/docker/docker-credential-helpers v0.7.0 // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect @@ -162,7 +162,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/postgres v1.5.3 // indirect gorm.io/driver/sqlite v1.5.4 // indirect - helm.sh/helm/v3 v3.13.0 // indirect + helm.sh/helm/v3 v3.13.2 // indirect k8s.io/apiextensions-apiserver v0.28.3 // indirect k8s.io/apiserver v0.28.3 // indirect k8s.io/cli-runtime v0.28.3 // indirect diff --git a/go.sum b/go.sum index 617fd8a9..6818989f 100644 --- a/go.sum +++ b/go.sum @@ -121,8 +121,8 @@ github.com/docker/cli v24.0.6+incompatible h1:fF+XCQCgJjjQNIMjzaSmiKJSCcfcXb3TWT github.com/docker/cli v24.0.6+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.6+incompatible h1:hceabKCtUgDqPu+qm0NgsaXf28Ljf4/pWFL7xjWWDgE= -github.com/docker/docker v24.0.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.7.0 h1:xtCHsjxogADNZcdv1pKUHXryefjlVRqWqIhk/uXJp0A= github.com/docker/docker-credential-helpers v0.7.0/go.mod h1:rETQfLdHNT3foU5kuNkFR1R1V12OJRRO5lzt2D1b5X0= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= @@ -349,8 +349,8 @@ github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= -github.com/layer5io/meshkit v0.6.88 h1:JvL/lzIorXxGR5a3BKo1nSzHmtssh/HstAY4yYc9aqM= -github.com/layer5io/meshkit v0.6.88/go.mod h1:8dujy2ZSjnmtI7bFhMhnBUwFSlkPDyOowp6iLvxMslk= +github.com/layer5io/meshkit v0.7.2 h1:MseAfNKbGJPR/cD0cdySyV8qOSsyYgLYKoNmGEoXEjU= +github.com/layer5io/meshkit v0.7.2/go.mod h1:E5Zmn+CeBiGsnM2vsAsXKQtS39JDB9E91V9KRJMuShU= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= @@ -915,8 +915,8 @@ gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls= gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= -helm.sh/helm/v3 v3.13.0 h1:XPJKIU30K4JTQ6VX/6e0hFAmEIonYa8E7wx5aqv4xOc= -helm.sh/helm/v3 v3.13.0/go.mod h1:2PBEKsMWKLVZTojUOqMS3Eadv5mP43FBWrRgLNkNm9Y= +helm.sh/helm/v3 v3.13.2 h1:IcO9NgmmpetJODLZhR3f3q+6zzyXVKlRizKFwbi7K8w= +helm.sh/helm/v3 v3.13.2/go.mod h1:GIHDwZggaTGbedevTlrQ6DB++LBN6yuQdeGj0HNaDx0= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/channels/system.go b/internal/channels/system.go index 274677a8..feac3052 100644 --- a/internal/channels/system.go +++ b/internal/channels/system.go @@ -37,3 +37,7 @@ type ReSyncChannel chan struct{} func (ch ReSyncChannel) Stop() { <-ch } + +func (ch ReSyncChannel) ReSyncInformer() { + ch <- struct{}{} +} diff --git a/internal/config/default_config.go b/internal/config/default_config.go index 3a093a3a..a81f08a7 100644 --- a/internal/config/default_config.go +++ b/internal/config/default_config.go @@ -12,12 +12,14 @@ var ( "startedat": time.Now().String(), } + DefaultPublishingSubject = "meshery.meshsync.core" + Pipelines = map[string]PipelineConfigs{ GlobalResourceKey: []PipelineConfig{ // Core Resources { Name: "namespaces.v1.", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "configmaps.v1.", @@ -39,99 +41,78 @@ var ( Name: "persistentvolumeclaims.v1.", PublishTo: "meshery.meshsync.core", }, - { - Name: "prometheuses.v1.monitoring.coreos.com", - PublishTo: "meshery.meshsync.core", - }, - { - Name: "grafanas.v1beta1.grafana.integreatly.org", - PublishTo: "meshery.meshsync.core", - }, }, LocalResourceKey: []PipelineConfig{ // Core Resources { Name: "replicasets.v1.apps", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "pods.v1.", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "services.v1.", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "deployments.v1.apps", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "statefulsets.v1.apps", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, { Name: "daemonsets.v1.apps", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added Ingress support { Name: "ingresses.v1.networking.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, - //Added endpoint support + // Added endpoint support { Name: "endpoints.v1.", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added endpointslice support { Name: "endpointslices.v1.discovery.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, - //Added cronJob support + // Added cronJob support { Name: "cronjobs.v1.batch", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added ReplicationController support { Name: "replicationcontrollers.v1.", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added storageClass support { Name: "storageclasses.v1.storage.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added ClusterRole support { Name: "clusterroles.v1.rbac.authorization.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added VolumeAttachment support { Name: "volumeattachments.v1.storage.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, //Added apiservice support { Name: "apiservices.v1.apiregistration.k8s.io", - PublishTo: "meshery.meshsync.core", + PublishTo: DefaultPublishingSubject, }, - // Istio Resources - // { - // Name: "virtualservices.v1beta1.networking.istio.io", - // PublishTo: "meshery.meshsync.istio", - // }, - // { - // Name: "gateways.v1beta1.networking.istio.io", - // PublishTo: "meshery.meshsync.istio", - // }, - // { - // Name: "destinationrules.v1beta1.networking.istio.io", - // PublishTo: "meshery.meshsync.istio", - // }, }, } diff --git a/internal/config/types.go b/internal/config/types.go index d7971395..6f31980c 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -1,5 +1,7 @@ package config +import "golang.org/x/exp/slices" + const ( ServerKey = "server-config" PipelineNameKey = "meshsync-pipeline" @@ -19,6 +21,20 @@ const ( type PipelineConfigs []PipelineConfig +func(p PipelineConfigs) Add(pc PipelineConfig) PipelineConfigs { + p = append(p, pc) + return p +} + +func(p PipelineConfigs) Delete(pc PipelineConfig) PipelineConfigs { + for index, pipelineConfig := range p { + if pipelineConfig.Name == pc.Name { + p = slices.Delete[PipelineConfigs](p, index, index + 1) + break + } + } + return p +} type PipelineConfig struct { Name string `json:"name" yaml:"name"` PublishTo string `json:"publish-to" yaml:"publish-to"` diff --git a/internal/pipeline/handlers.go b/internal/pipeline/handlers.go index aad8f21a..77e384f9 100644 --- a/internal/pipeline/handlers.go +++ b/internal/pipeline/handlers.go @@ -12,8 +12,8 @@ import ( "k8s.io/client-go/tools/cache" ) -func (ri *RegisterInformer) registerHandlers(s cache.SharedIndexInformer) { - handlers := cache.ResourceEventHandlerFuncs{ +func(ri *RegisterInformer) GetEventHandlers() cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { err := ri.publishItem(obj.(*unstructured.Unstructured), broker.Add, ri.config) if err != nil { @@ -66,7 +66,10 @@ func (ri *RegisterInformer) registerHandlers(s cache.SharedIndexInformer) { ri.log.Info("Received DELETE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) }, } - s.AddEventHandler(handlers) // nolint +} + +func (ri *RegisterInformer) registerHandlers(s cache.SharedIndexInformer) { + s.AddEventHandler(ri.GetEventHandlers()) // nolint } func (ri *RegisterInformer) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error { diff --git a/main.go b/main.go index f73bfd99..9ade424e 100644 --- a/main.go +++ b/main.go @@ -116,6 +116,8 @@ func main() { os.Exit(1) } + go meshsyncHandler.WatchCRDs() + go meshsyncHandler.Run() go meshsyncHandler.ListenToRequests() diff --git a/meshsync/handlers.go b/meshsync/handlers.go index d894a09b..ecaf3927 100644 --- a/meshsync/handlers.go +++ b/meshsync/handlers.go @@ -1,35 +1,72 @@ package meshsync import ( + "fmt" "time" "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/utils" + "github.com/layer5io/meshkit/utils/kubernetes" "github.com/layer5io/meshsync/internal/channels" "github.com/layer5io/meshsync/internal/config" "github.com/layer5io/meshsync/pkg/model" + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" ) +func debounce(d time.Duration, f func(ch chan struct{})) func(ch chan struct{}) { + timer := time.NewTimer(d) + return func(pipelineCh chan struct{}) { + timer.Stop() + timer = time.NewTimer(d) + <-timer.C + f(pipelineCh) + timer.Reset(d) + timer.Stop() + } +} + func (h *Handler) Run() { pipelineCh := make(chan struct{}) go h.startDiscovery(pipelineCh) - for range h.channelPool[channels.ReSync].(channels.ReSyncChannel) { - go func(ch chan struct{}) { - for { - h.Log.Info("stopping previous instance") - if _, ok := <-ch; ok { - ch <- struct{}{} - } - } - }(pipelineCh) - h.Log.Info("starting over") + + debouncedStartDiscovery := debounce(time.Second*5, func(pipelinechannel chan struct{}) { + if !utils.IsClosed[struct{}](pipelinechannel) { + h.Log.Info("closing previous instance ") + close(pipelinechannel) + } pipelineCh = make(chan struct{}) - go h.startDiscovery(pipelineCh) - time.Sleep(5 * time.Second) + + err := h.UpdateInformer() + if err != nil { + h.Log.Error(err) + } + h.Log.Info("starting over") + h.startDiscovery(pipelineCh) + + }) + for range h.channelPool[channels.ReSync].(channels.ReSyncChannel) { + go debouncedStartDiscovery(pipelineCh) } } +func(h *Handler) UpdateInformer() error { + dynamicClient, err := dynamic.NewForConfig(&h.restConfig) + if err != nil { + return ErrNewInformer(err) + } + listOptionsFunc, err := GetListOptionsFunc(h.Config) + if err != nil { + return err + } + h.informer = GetDynamicInformer(h.Config, dynamicClient, listOptionsFunc) + return nil +} + func (h *Handler) ListenToRequests() { listenerConfigs := make(map[string]config.ListenerConfig, 10) err := h.Config.GetObject(config.ListenersKey, &listenerConfigs) @@ -130,6 +167,78 @@ func (h *Handler) listStoreObjects() []model.KubernetesResource { return parsedObjects } +func (h *Handler) WatchCRDs() { + kubeclient, err := kubernetes.New(nil) + if err != nil { + h.Log.Error(err) + return + } + + crdWatcher, err := kubeclient.DynamicKubeClient.Resource(schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }).Watch(context.Background(), metav1.ListOptions{}) + + if err != nil { + h.Log.Error(err) + return + } + + for event := range crdWatcher.ResultChan() { + + crd := &kubernetes.CRDItem{} + byt, err := utils.Marshal(event.Object) + if err != nil { + h.Log.Error(err) + continue + } + + err = utils.Unmarshal(byt, crd) + if err != nil { + h.Log.Error(err) + continue + } + + gvr := kubernetes.GetGVRForCustomResources(crd) + + existingPipelines := config.Pipelines + err = h.Config.GetObject(config.ResourcesKey, existingPipelines) + if err != nil { + h.Log.Error(err) + continue + } + + existingPipelineConfigs := existingPipelines[config.GlobalResourceKey] + + configName := fmt.Sprintf("%s.%s.%s", gvr.Resource, gvr.Version, gvr.Group) + updatedPipelineConfigs := existingPipelineConfigs + + switch event.Type { + case watch.Added: + // No need to verify if config is already added because If the config already exists then it indicates the informer has already synced that resource. + // Any subsequent updates will have event type as "modified" + updatedPipelineConfigs = existingPipelineConfigs.Add(config.PipelineConfig{ + Name: configName, + PublishTo: config.DefaultPublishingSubject, + Events: []string{"ADDED", "MODIFIED", "DELETED"}, + }) + case watch.Deleted: + updatedPipelineConfigs = existingPipelineConfigs.Delete(config.PipelineConfig{ + Name: configName, + }) + } + existingPipelines[config.GlobalResourceKey] = updatedPipelineConfigs + err = h.Config.SetObject(config.ResourcesKey, existingPipelines) + if err != nil { + h.Log.Error(err) + h.Log.Info("skipping informer resync") + return + } + h.channelPool[channels.ReSync].(channels.ReSyncChannel).ReSyncInformer() + } +} + // TODO: move this to meshkit // given [1,2,3,4,5,6,7,5,4,4] and 3 as its arguments, it would // return [[1,2,3], [4,5,6], [7,5,4], [4]] diff --git a/meshsync/meshsync.go b/meshsync/meshsync.go index de236c16..6b8534fe 100644 --- a/meshsync/meshsync.go +++ b/meshsync/meshsync.go @@ -8,6 +8,7 @@ import ( "github.com/layer5io/meshsync/internal/channels" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -28,20 +29,14 @@ type Handler struct { stores map[string]cache.Store } -func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) { - // Initialize Kubeconfig - kubeClient, err := mesherykube.New(nil) - if err != nil { - return nil, ErrKubeConfig(err) - } - +func GetListOptionsFunc(config config.Handler) (func(*v1.ListOptions), error) { var blacklist []string - err = config.GetObject("spec.informer_config", blacklist) + err := config.GetObject("spec.informer_config", blacklist) if err != nil { return nil, err } - listOptionsFunc := func(lo *v1.ListOptions) { + return func(lo *v1.ListOptions) { // Create a label selector to include all objects labelSelector := &v1.LabelSelector{} @@ -52,9 +47,21 @@ func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[ Values: blacklist, } labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorReq) + }, nil +} + +func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) { + // Initialize Kubeconfig + kubeClient, err := mesherykube.New(nil) + if err != nil { + return nil, ErrKubeConfig(err) + } + listOptionsFunc, err := GetListOptionsFunc(config) + if err != nil { + return nil, err } - informer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(kubeClient.DynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc) + informer := GetDynamicInformer(config, kubeClient.DynamicKubeClient, listOptionsFunc) return &Handler{ Config: config, @@ -66,3 +73,7 @@ func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[ channelPool: pool, }, nil } + +func GetDynamicInformer(config config.Handler, dynamicKubeClient dynamic.Interface, listOptionsFunc func(*v1.ListOptions)) dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc) +}