Skip to content

Commit

Permalink
++
Browse files Browse the repository at this point in the history
  • Loading branch information
yalosev committed Nov 28, 2023
1 parent 0ca59d0 commit c4703fc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
26 changes: 15 additions & 11 deletions pkg/kube_events_manager/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kube_events_manager

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -31,7 +32,8 @@ type FactoryIndex struct {
type Factory struct {
shared dynamicinformer.DynamicSharedInformerFactory
score uint64
stopCh chan struct{}
ctx context.Context
cancel context.CancelFunc
}

type FactoryStore struct {
Expand All @@ -45,15 +47,17 @@ func NewFactoryStore() *FactoryStore {
}
}

func (c *FactoryStore) add(index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinformer.DynamicSharedInformerFactory) {
cctx, cancel := context.WithCancel(ctx)
c.data[index] = Factory{
shared: f,
score: uint64(1),
stopCh: make(chan struct{}, 1),
ctx: cctx,
cancel: cancel,
}
}

func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory {
func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) Factory {
f, ok := c.data[index]
if ok {
f.score++
Expand All @@ -76,15 +80,15 @@ func (c *FactoryStore) get(client dynamic.Interface, index FactoryIndex) Factory
client, resyncPeriod, index.Namespace, tweakListOptions)
factory.ForResource(index.GVR)

c.add(index, factory)
c.add(ctx, index, factory)
return c.data[index]
}

func (c *FactoryStore) Start(client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error {
func (c *FactoryStore) Start(ctx context.Context, client dynamic.Interface, index FactoryIndex, handler cache.ResourceEventHandler, errorHandler *WatchErrorHandler) error {
c.mu.Lock()
defer c.mu.Unlock()

factory := c.get(client, index)
factory := c.get(ctx, client, index)

informer := factory.shared.ForResource(index.GVR).Informer()
// Add error handler, ignore "already started" error.
Expand All @@ -93,11 +97,11 @@ func (c *FactoryStore) Start(client dynamic.Interface, index FactoryIndex, handl
informer.AddEventHandler(handler)

if !informer.HasSynced() {
go informer.Run(factory.stopCh)
go informer.Run(factory.ctx.Done())

if err := wait.PollImmediateUntil(DefaultSyncTime, func() (bool, error) {
if err := wait.PollUntilContextCancel(ctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
return informer.HasSynced(), nil
}, factory.stopCh); err != nil {
}); err != nil {
return err
}
}
Expand All @@ -116,7 +120,7 @@ func (c *FactoryStore) Stop(index FactoryIndex) {

f.score--
if f.score == 0 {
close(f.stopCh)
f.cancel()
}

delete(c.data, index)
Expand Down
11 changes: 6 additions & 5 deletions pkg/kube_events_manager/namespace_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,18 @@ func (ni *namespaceInformer) start() {
log.Errorf("%s: Possible BUG!!! Start called before createSharedInformer, ShredInformer is nil", ni.Monitor.Metadata.DebugName)
return
}
stopCh := make(chan struct{}, 1)
cctx, cancel := context.WithCancel(ni.ctx)
go func() {
<-ni.ctx.Done()
ni.stopped = true
close(stopCh)
cancel()
}()

go ni.SharedInformer.Run(stopCh)
if err := wait.PollImmediateUntil(DefaultSyncTime, func() (bool, error) {
go ni.SharedInformer.Run(cctx.Done())

if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) {
return ni.SharedInformer.HasSynced(), nil
}, stopCh); err != nil {
}); err != nil {
ni.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ni.Monitor.Metadata.DebugName)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kube_events_manager/resource_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (ei *resourceInformer) start() {

// TODO: separate handler and informer
errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage)
err := DefaultFactoryStore.Start(ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
err := DefaultFactoryStore.Start(ei.ctx, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler)
if err != nil {
ei.Monitor.LogEntry.Errorf("%s: cache is not synced for informer", ei.Monitor.Metadata.DebugName)
return
Expand Down

0 comments on commit c4703fc

Please sign in to comment.