Skip to content

Commit

Permalink
Merge pull request #247 from meshery/revert-244-feature/kiptoonkipkur…
Browse files Browse the repository at this point in the history
…ui/221

Revert "[Enhancement] Use Streaming Lists when available"
  • Loading branch information
Mohd Uzair authored Sep 20, 2023
2 parents 3f46529 + 96f0bbf commit 1e21ae5
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 164 deletions.
41 changes: 11 additions & 30 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/layer5io/meshkit/logger"
internalconfig "github.com/layer5io/meshsync/internal/config"
"github.com/myntra/pipeline"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
)

Expand All @@ -30,48 +29,30 @@ var (
}
)

func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}, dynamicKube dynamic.Interface) *pipeline.Pipeline {
// TODO: best way to check whether WatchList feature is enabled
watchList := true

func New(log logger.Handler, informer dynamicinformer.DynamicSharedInformerFactory, broker broker.Handler, plConfigs map[string]internalconfig.PipelineConfigs, stopChan chan struct{}) *pipeline.Pipeline {
// Global discovery
gdstage := GlobalDiscoveryStage
configs := plConfigs[gdstage.Name]
if watchList {
for _, config := range configs {
gdstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources
}
} else {
for _, config := range configs {
gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
}
for _, config := range configs {
gdstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
}

// Local discovery
ldstage := LocalDiscoveryStage
configs = plConfigs[ldstage.Name]

if watchList {
for _, config := range configs {
ldstage.AddStep(newStartWatcherStage(dynamicKube, config, stopChan, log, broker)) // Register the watchers for different resources
}
} else {
for _, config := range configs {
ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
}
for _, config := range configs {
ldstage.AddStep(newRegisterInformerStep(log, informer, config, broker)) // Register the informers for different resources
}

// Start informers
strtInfmrs := StartInformersStage
strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers

// Create Pipeline
clusterPipeline := pipeline.New(Name, 1000)

// add stages to pipeline
clusterPipeline.AddStage(gdstage)
clusterPipeline.AddStage(ldstage)
if !watchList {
// Start informers
strtInfmrs := StartInformersStage
strtInfmrs.AddStep(newStartInformersStep(stopChan, log, informer)) // Start the registered informers
clusterPipeline.AddStage(strtInfmrs)
}
clusterPipeline.AddStage(strtInfmrs)

return clusterPipeline
}
118 changes: 0 additions & 118 deletions internal/pipeline/step.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
package pipeline

import (
"context"
"sync"

broker "github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshkit/logger"
internalconfig "github.com/layer5io/meshsync/internal/config"
"github.com/myntra/pipeline"

"github.com/layer5io/meshsync/pkg/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -93,113 +85,3 @@ func (si *StartInformers) Cancel() error {
si.Status("cancel step")
return nil
}

type StartWatcher struct {
pipeline.StepContext
stopChan chan struct{}
dynamicKube dynamic.Interface
log logger.Handler
broker broker.Handler
config internalconfig.PipelineConfig
}

func newStartWatcherStage(dynamicKube dynamic.Interface, config internalconfig.PipelineConfig, stopChan chan struct{}, log logger.Handler, broker broker.Handler) *StartWatcher {
return &StartWatcher{
stopChan: stopChan,
dynamicKube: dynamicKube,
config: config,
log: log,
broker: broker,
}
}

func (w *StartWatcher) Exec(request *pipeline.Request) *pipeline.Result {
b := true
opts := metav1.ListOptions{
Watch: true,
SendInitialEvents: &b,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
}
// attempts to begin watching the namespaces
// returns a `watch.Interface`, or an error

gvr, _ := schema.ParseResourceArg(w.config.Name)
watcher, err := w.dynamicKube.Resource(*gvr).Watch(context.TODO(), opts)
if err != nil {
return &pipeline.Result{
Error: err,
Data: nil,
}
}

var wg sync.WaitGroup

// Launch the goroutine and pass the channel as an argument
wg.Add(1)
go w.backgroundWatchProcessor(watcher.ResultChan(), w.stopChan, &wg)
data := make(map[string]cache.Store)

return &pipeline.Result{
Error: nil,
Data: data,
}
}

func (w *StartWatcher) backgroundWatchProcessor(result <-chan watch.Event, stopCh chan struct{}, wg *sync.WaitGroup) {
for {
select {
case <-stopCh:
return
default:
for event := range result {
obj := event.Object
switch event.Type {
// when an event is added...
case watch.Added:
err := w.publishItem(obj.(*unstructured.Unstructured), broker.Add, w.config)
if err != nil {
w.log.Error(err)
}
w.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
// when an event is modified...
case watch.Modified:
err := w.publishItem(obj.(*unstructured.Unstructured), broker.Update, w.config)
if err != nil {
w.log.Error(err)
}
w.log.Info("Received UPDATE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
// when an event is deleted...
case watch.Deleted:
var objCasted *unstructured.Unstructured
objCasted = obj.(*unstructured.Unstructured)
err := w.publishItem(objCasted, broker.Delete, w.config)
if err != nil {
w.log.Error(err)
}
w.log.Info("Received DELETE event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
}
}
}
if len(w.stopChan) > 0 {
break
}
}
wg.Wait()
}
func (w *StartWatcher) publishItem(obj *unstructured.Unstructured, evtype broker.EventType, config internalconfig.PipelineConfig) error {
err := w.broker.Publish(config.PublishTo, &broker.Message{
ObjectType: broker.MeshSync,
EventType: evtype,
Object: model.ParseList(*obj),
})
if err != nil {
w.log.Error(ErrPublish(config.Name, err))
return err
}
return nil
}

func (w *StartWatcher) Cancel() error {
w.Status("cancel step")
return nil
}
2 changes: 1 addition & 1 deletion meshsync/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (h *Handler) startDiscovery(pipelineCh chan struct{}) {
}

h.Log.Info("Pipeline started")
pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh, h.dynamicClient)
pl := pipeline.New(h.Log, h.informer, h.Broker, pipelineConfigs, pipelineCh)
result := pl.Run()
h.stores = result.Data.(map[string]cache.Store)
if result.Error != nil {
Expand Down
27 changes: 12 additions & 15 deletions meshsync/meshsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/layer5io/meshsync/internal/channels"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -22,12 +21,11 @@ type Handler struct {
Log logger.Handler
Broker broker.Handler

restConfig rest.Config
informer dynamicinformer.DynamicSharedInformerFactory
staticClient *kubernetes.Clientset
dynamicClient dynamic.Interface
channelPool map[string]channels.GenericChannel
stores map[string]cache.Store
restConfig rest.Config
informer dynamicinformer.DynamicSharedInformerFactory
staticClient *kubernetes.Clientset
channelPool map[string]channels.GenericChannel
stores map[string]cache.Store
}

func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) {
Expand Down Expand Up @@ -59,13 +57,12 @@ func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[
informer := dynamicinformer.NewFilteredDynamicSharedInformerFactory(kubeClient.DynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc)

return &Handler{
Config: config,
Log: log,
Broker: br,
informer: informer,
restConfig: kubeClient.RestConfig,
staticClient: kubeClient.KubeClient,
channelPool: pool,
dynamicClient: kubeClient.DynamicKubeClient,
Config: config,
Log: log,
Broker: br,
informer: informer,
restConfig: kubeClient.RestConfig,
staticClient: kubeClient.KubeClient,
channelPool: pool,
}, nil
}

0 comments on commit 1e21ae5

Please sign in to comment.