Skip to content

Commit

Permalink
Optmize node watcher in daemon
Browse files Browse the repository at this point in the history
Node watcher is very heavy when watching
all the nodes, not only the current one.
Nodes are regularly updated, e.g. by kublet.

Watch only the current node all the time and
other nodes only when holding the drain lock.

Signed-off-by: Alan Kutniewski <[email protected]>
  • Loading branch information
alan-kut committed Feb 13, 2024
1 parent 1163ef9 commit 683f40c
Showing 1 changed file with 57 additions and 15 deletions.
72 changes: 57 additions & 15 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ type Daemon struct {

disableDrain bool

nodeLister listerv1.NodeLister

workqueue workqueue.RateLimitingInterface

mcpName string
Expand Down Expand Up @@ -243,14 +241,17 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
})

rand.Seed(time.Now().UnixNano())
nodeInformerFactory := informers.NewSharedInformerFactory(dn.kubeClient,
nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(dn.kubeClient,
time.Second*15,
informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.FieldSelector = metadataKey + "=" + vars.NodeName
}),
)
dn.nodeLister = nodeInformerFactory.Core().V1().Nodes().Lister()
nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dn.nodeAddHandler,
UpdateFunc: dn.nodeUpdateHandler,
AddFunc: dn.currentNodeAddHandler,
UpdateFunc: dn.currentNodeUpdateHandler,
DeleteFunc: dn.currentNodeDeleteHandler,
})
go cfgInformer.Run(dn.stopCh)
go nodeInformer.Run(dn.stopCh)
Expand Down Expand Up @@ -353,19 +354,49 @@ func (dn *Daemon) processNextWorkItem() bool {
return true
}

func (dn *Daemon) nodeAddHandler(obj interface{}) {
dn.nodeUpdateHandler(nil, obj)
func (dn *Daemon) currentNodeAddHandler(obj interface{}) {
dn.currentNodeUpdateHandler(nil, obj)
}

func (dn *Daemon) nodeUpdateHandler(old, new interface{}) {
node, err := dn.nodeLister.Get(vars.NodeName)
if errors.IsNotFound(err) {
log.Log.V(2).Info("nodeUpdateHandler(): node has been deleted", "name", vars.NodeName)
return
func (dn *Daemon) currentNodeUpdateHandler(old, new interface{}) {
dn.node = new.(*corev1.Node).DeepCopy()
}

func (dn *Daemon) currentNodeDeleteHandler(obj interface{}) {
log.Log.V(2).Info("nodeUpdateHandler(): node has been deleted", "name", vars.NodeName)
}

func (dn *Daemon) startAllNodesInformer(stopWatchCh chan struct{}) error {
go func() {
for {
select {
case <-dn.stopCh:
close(stopWatchCh)
return
case <-stopWatchCh:
return
}
}
}()
nodeInformerFactory := informers.NewSharedInformerFactory(dn.kubeClient,
time.Minute*5,
)
nodeLister := nodeInformerFactory.Core().V1().Nodes().Lister()
nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) { dn.nodeUpdateHandler(nodeLister) },
UpdateFunc: func(_, _ interface{}) { dn.nodeUpdateHandler(nodeLister) },
})
go nodeInformer.Run(stopWatchCh)
if ok := cache.WaitForCacheSync(stopWatchCh, nodeInformer.HasSynced); !ok {
return fmt.Errorf("failed to wait for node caches to sync")
}
dn.node = node.DeepCopy()
dn.nodeUpdateHandler(nodeLister)
return nil
}

nodes, err := dn.nodeLister.List(labels.Everything())
func (dn *Daemon) nodeUpdateHandler(nodeLister listerv1.NodeLister) {
nodes, err := nodeLister.List(labels.Everything())
if err != nil {
log.Log.Error(err, "nodeUpdateHandler(): failed to list nodes")
return
Expand Down Expand Up @@ -903,6 +934,15 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Log.V(2).Info("getDrainLock(): started leading")
stopAllNodesWatchChan := make(chan struct{})
err = dn.startAllNodesInformer(stopAllNodesWatchChan)
if err != nil {
log.Log.Error(err, "getDrainLock(): Failed to start node informer")
// The context was canceled, stopChannel closed. There is no need to block here
done <- true
return
}
log.Log.V(2).Info("getDrainLock(): started node informer")
for {
time.Sleep(3 * time.Second)
if dn.node.Annotations[annoKey] == annoMcpPaused {
Expand All @@ -911,6 +951,8 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
return
}
if dn.drainable {
// Stop node informer
close(stopAllNodesWatchChan)
log.Log.V(2).Info("getDrainLock(): no other node is draining")
err = dn.annotateNode(vars.NodeName, annoDraining)
if err != nil {
Expand Down

0 comments on commit 683f40c

Please sign in to comment.