From 96f0bbf52f42eafb86b734eb18d31527cefb26d6 Mon Sep 17 00:00:00 2001 From: Mohd Uzair Date: Thu, 21 Sep 2023 00:34:39 +0530 Subject: [PATCH] Revert "[Enhancement] Use Streaming Lists when available" --- internal/pipeline/pipeline.go | 41 ++++-------- internal/pipeline/step.go | 118 ---------------------------------- meshsync/discovery.go | 2 +- meshsync/meshsync.go | 27 ++++---- 4 files changed, 24 insertions(+), 164 deletions(-) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index f75a0b4a..b7d94f37 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -5,7 +5,6 @@ import ( "github.com/layer5io/meshkit/logger" internalconfig "github.com/layer5io/meshsync/internal/config" "github.com/myntra/pipeline" - "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" ) @@ -30,48 +29,30 @@ var ( } ) -func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}, dynamicKube dynamic.Interface) *pipeline.Pipeline { - // TODO: best way to check whether WatchList feature is enabled - watchList := true - +func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}) *pipeline.Pipeline { // Global discovery gdstage := GlobalDiscoveryStage configs := plConfigs[gdstage.Name] - if watchList { - for _, config := range configs { - gdstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources - } - } else { - for _, config := range configs { - gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources - } + for _, config := range configs { + gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources } // Local discovery ldstage := LocalDiscoveryStage configs = plConfigs[ldstage.Name] - - if watchList { - for _, config := range configs { - ldstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources - } - } else { - for _, config := range configs { - ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources - } + for _, config := range configs { + ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources } + // Start informers + strtInfmrs := StartInformersStage + strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers + // Create Pipeline clusterPipeline := pipeline.New(Name, 1000) - - // add stages to pipeline clusterPipeline.AddStage(gdstage) clusterPipeline.AddStage(ldstage) - if !watchList { - // Start informers - strtInfmrs := StartInformersStage - strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers - clusterPipeline.AddStage(strtInfmrs) - } + clusterPipeline.AddStage(strtInfmrs) + return clusterPipeline } diff --git a/internal/pipeline/step.go b/internal/pipeline/step.go index 23afaa77..a7c74812 100644 --- a/internal/pipeline/step.go +++ b/internal/pipeline/step.go @@ -1,20 +1,12 @@ package pipeline import ( - "context" - "sync" - broker "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/logger" internalconfig "github.com/layer5io/meshsync/internal/config" "github.com/myntra/pipeline" - "github.com/layer5io/meshsync/pkg/model" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" ) @@ -93,113 +85,3 @@ func (si *StartInformers) Cancel() error { si.Status("cancel step") return nil } - -type StartWatcher struct { - pipeline.StepContext - stopChan chan struct{} - dynamicKube dynamic.Interface - log logger.Handler - broker broker.Handler - config internalconfig.PipelineConfig -} - -func newStartWatcherStage(dynamicKube dynamic.Interface, config internalconfig.PipelineConfig, stopChan chan struct{}, log logger.Handler, broker broker.Handler) *StartWatcher { - return &StartWatcher{ - stopChan: stopChan, - dynamicKube: dynamicKube, - config: config, - log: log, - broker: broker, - } -} - -func (w *StartWatcher) Exec(request *pipeline.Request) *pipeline.Result { - b := true - opts := metav1.ListOptions{ - Watch: true, - SendInitialEvents: &b, - ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, - } - // attempts to begin watching the namespaces - // returns a `watch.Interface`, or an error - - gvr, _ := schema.ParseResourceArg(w.config.Name) - watcher, err := w.dynamicKube.Resource(*gvr).Watch(context.TODO(), opts) - if err != nil { - return &pipeline.Result{ - Error: err, - Data: nil, - } - } - - var wg sync.WaitGroup - - // Launch the goroutine and pass the channel as an argument - wg.Add(1) - go w.backgroundWatchProcessor(watcher.ResultChan(), w.stopChan, &wg) - data := make(map[string]cache.Store) - - return &pipeline.Result{ - Error: nil, - Data: data, - } -} - -func (w *StartWatcher) backgroundWatchProcessor(result <-chan watch.Event, stopCh chan struct{}, wg *sync.WaitGroup) { - for { - select { - case <-stopCh: - return - default: - for event := range result { - obj := event.Object - switch event.Type { - // when an event is added... - case watch.Added: - err := w.publishItem(obj.(*unstructured.Unstructured), broker.Add, w.config) - if err != nil { - w.log.Error(err) - } - w.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) - // when an event is modified... - case watch.Modified: - err := w.publishItem(obj.(*unstructured.Unstructured), broker.Update, w.config) - if err != nil { - w.log.Error(err) - } - w.log.Info("Received UPDATE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) - // when an event is deleted... - case watch.Deleted: - var objCasted *unstructured.Unstructured - objCasted = obj.(*unstructured.Unstructured) - err := w.publishItem(objCasted, broker.Delete, w.config) - if err != nil { - w.log.Error(err) - } - w.log.Info("Received DELETE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) - } - } - } - if len(w.stopChan) > 0 { - break - } - } - wg.Wait() -} -func (w *StartWatcher) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error { - err := w.broker.Publish(config.PublishTo, &broker.Message{ - ObjectType: broker.MeshSync, - EventType: evtype, - Object: model.ParseList(*obj), - }) - if err != nil { - w.log.Error(ErrPublish(config.Name, err)) - return err - } - return nil -} - -func (w *StartWatcher) Cancel() error { - w.Status("cancel step") - return nil -} diff --git a/meshsync/discovery.go b/meshsync/discovery.go index f428201b..4eee1cf0 100644 --- a/meshsync/discovery.go +++ b/meshsync/discovery.go @@ -14,7 +14,7 @@ func (h *Handler) startDiscovery(pipelineCh chan struct{}) { } h.Log.Info("Pipeline started") - pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh, h.dynamicClient) + pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh) result := pl.Run() h.stores = result.Data.(map[string]cache.Store) if result.Error != nil { diff --git a/meshsync/meshsync.go b/meshsync/meshsync.go index ce5a3d27..de236c16 100644 --- a/meshsync/meshsync.go +++ b/meshsync/meshsync.go @@ -8,7 +8,6 @@ import ( "github.com/layer5io/meshsync/internal/channels" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -22,12 +21,11 @@ type Handler struct { Log logger.Handler Broker broker.Handler - restConfig rest.Config - informer dynamicinformer.DynamicSharedInformerFactory - staticClient *kubernetes.Clientset - dynamicClient dynamic.Interface - channelPool map[string]channels.GenericChannel - stores map[string]cache.Store + restConfig rest.Config + informer dynamicinformer.DynamicSharedInformerFactory + staticClient *kubernetes.Clientset + channelPool map[string]channels.GenericChannel + stores map[string]cache.Store } func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) { @@ -59,13 +57,12 @@ func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[ informer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(kubeClient.DynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc) return &Handler{ - Config: config, - Log: log, - Broker: br, - informer: informer, - restConfig: kubeClient.RestConfig, - staticClient: kubeClient.KubeClient, - channelPool: pool, - dynamicClient: kubeClient.DynamicKubeClient, + Config: config, + Log: log, + Broker: br, + informer: informer, + restConfig: kubeClient.RestConfig, + staticClient: kubeClient.KubeClient, + channelPool: pool, }, nil }