diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index b7d94f37..f75a0b4a 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -5,6 +5,7 @@ import ( "github.com/layer5io/meshkit/logger" internalconfig "github.com/layer5io/meshsync/internal/config" "github.com/myntra/pipeline" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" ) @@ -29,30 +30,48 @@ var ( } ) -func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}) *pipeline.Pipeline { +func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}, dynamicKube dynamic.Interface) *pipeline.Pipeline { + // TODO: best way to check whether WatchList feature is enabled + watchList := true + // Global discovery gdstage := GlobalDiscoveryStage configs := plConfigs[gdstage.Name] - for _, config := range configs { - gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources + if watchList { + for _, config := range configs { + gdstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources + } + } else { + for _, config := range configs { + gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources + } } // Local discovery ldstage := LocalDiscoveryStage configs = plConfigs[ldstage.Name] - for _, config := range configs { - ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources - } - // Start informers - strtInfmrs := StartInformersStage - strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers + if watchList { + for _, config := range configs { + ldstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources + } + } else { + for _, config := range configs { + ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources + } + } // Create Pipeline clusterPipeline := pipeline.New(Name, 1000) + + // add stages to pipeline clusterPipeline.AddStage(gdstage) clusterPipeline.AddStage(ldstage) - clusterPipeline.AddStage(strtInfmrs) - + if !watchList { + // Start informers + strtInfmrs := StartInformersStage + strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers + clusterPipeline.AddStage(strtInfmrs) + } return clusterPipeline } diff --git a/internal/pipeline/step.go b/internal/pipeline/step.go index a7c74812..23afaa77 100644 --- a/internal/pipeline/step.go +++ b/internal/pipeline/step.go @@ -1,12 +1,20 @@ package pipeline import ( + "context" + "sync" + broker "github.com/layer5io/meshkit/broker" "github.com/layer5io/meshkit/logger" internalconfig "github.com/layer5io/meshsync/internal/config" "github.com/myntra/pipeline" + "github.com/layer5io/meshsync/pkg/model" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" ) @@ -85,3 +93,113 @@ func (si *StartInformers) Cancel() error { si.Status("cancel step") return nil } + +type StartWatcher struct { + pipeline.StepContext + stopChan chan struct{} + dynamicKube dynamic.Interface + log logger.Handler + broker broker.Handler + config internalconfig.PipelineConfig +} + +func newStartWatcherStage(dynamicKube dynamic.Interface, config internalconfig.PipelineConfig, stopChan chan struct{}, log logger.Handler, broker broker.Handler) *StartWatcher { + return &StartWatcher{ + stopChan: stopChan, + dynamicKube: dynamicKube, + config: config, + log: log, + broker: broker, + } +} + +func (w *StartWatcher) Exec(request *pipeline.Request) *pipeline.Result { + b := true + opts := metav1.ListOptions{ + Watch: true, + SendInitialEvents: &b, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + } + // attempts to begin watching the namespaces + // returns a `watch.Interface`, or an error + + gvr, _ := schema.ParseResourceArg(w.config.Name) + watcher, err := w.dynamicKube.Resource(*gvr).Watch(context.TODO(), opts) + if err != nil { + return &pipeline.Result{ + Error: err, + Data: nil, + } + } + + var wg sync.WaitGroup + + // Launch the goroutine and pass the channel as an argument + wg.Add(1) + go w.backgroundWatchProcessor(watcher.ResultChan(), w.stopChan, &wg) + data := make(map[string]cache.Store) + + return &pipeline.Result{ + Error: nil, + Data: data, + } +} + +func (w *StartWatcher) backgroundWatchProcessor(result <-chan watch.Event, stopCh chan struct{}, wg *sync.WaitGroup) { + for { + select { + case <-stopCh: + return + default: + for event := range result { + obj := event.Object + switch event.Type { + // when an event is added... + case watch.Added: + err := w.publishItem(obj.(*unstructured.Unstructured), broker.Add, w.config) + if err != nil { + w.log.Error(err) + } + w.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) + // when an event is modified... + case watch.Modified: + err := w.publishItem(obj.(*unstructured.Unstructured), broker.Update, w.config) + if err != nil { + w.log.Error(err) + } + w.log.Info("Received UPDATE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) + // when an event is deleted... + case watch.Deleted: + var objCasted *unstructured.Unstructured + objCasted = obj.(*unstructured.Unstructured) + err := w.publishItem(objCasted, broker.Delete, w.config) + if err != nil { + w.log.Error(err) + } + w.log.Info("Received DELETE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind) + } + } + } + if len(w.stopChan) > 0 { + break + } + } + wg.Wait() +} +func (w *StartWatcher) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error { + err := w.broker.Publish(config.PublishTo, &broker.Message{ + ObjectType: broker.MeshSync, + EventType: evtype, + Object: model.ParseList(*obj), + }) + if err != nil { + w.log.Error(ErrPublish(config.Name, err)) + return err + } + return nil +} + +func (w *StartWatcher) Cancel() error { + w.Status("cancel step") + return nil +} diff --git a/meshsync/discovery.go b/meshsync/discovery.go index 4eee1cf0..f428201b 100644 --- a/meshsync/discovery.go +++ b/meshsync/discovery.go @@ -14,7 +14,7 @@ func (h *Handler) startDiscovery(pipelineCh chan struct{}) { } h.Log.Info("Pipeline started") - pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh) + pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh, h.dynamicClient) result := pl.Run() h.stores = result.Data.(map[string]cache.Store) if result.Error != nil { diff --git a/meshsync/meshsync.go b/meshsync/meshsync.go index de236c16..ce5a3d27 100644 --- a/meshsync/meshsync.go +++ b/meshsync/meshsync.go @@ -8,6 +8,7 @@ import ( "github.com/layer5io/meshsync/internal/channels" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -21,11 +22,12 @@ type Handler struct { Log logger.Handler Broker broker.Handler - restConfig rest.Config - informer dynamicinformer.DynamicSharedInformerFactory - staticClient *kubernetes.Clientset - channelPool map[string]channels.GenericChannel - stores map[string]cache.Store + restConfig rest.Config + informer dynamicinformer.DynamicSharedInformerFactory + staticClient *kubernetes.Clientset + dynamicClient dynamic.Interface + channelPool map[string]channels.GenericChannel + stores map[string]cache.Store } func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) { @@ -57,12 +59,13 @@ func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[ informer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(kubeClient.DynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc) return &Handler{ - Config: config, - Log: log, - Broker: br, - informer: informer, - restConfig: kubeClient.RestConfig, - staticClient: kubeClient.KubeClient, - channelPool: pool, + Config: config, + Log: log, + Broker: br, + informer: informer, + restConfig: kubeClient.RestConfig, + staticClient: kubeClient.KubeClient, + channelPool: pool, + dynamicClient: kubeClient.DynamicKubeClient, }, nil }