From b1cd82576315158f207be73761c1525c0b45f6d4 Mon Sep 17 00:00:00 2001 From: joey Date: Thu, 3 Oct 2024 20:01:34 +0800 Subject: [PATCH] fix namespaced kruise-controller panic due to `multiNamespaceCache` Signed-off-by: joey --- main.go | 18 +++++++++++------ pkg/controller/controllers.go | 19 ++++++++++++++++-- .../daemonset/daemonset_controller.go | 10 +++++----- .../statefulset/statefulset_controller.go | 12 +++++------ pkg/controller/util/cache.go | 20 +++++++++++++++++++ 5 files changed, 60 insertions(+), 19 deletions(-) create mode 100644 pkg/controller/util/cache.go diff --git a/main.go b/main.go index 56fea93e0e..4f7454f6f2 100644 --- a/main.go +++ b/main.go @@ -247,19 +247,25 @@ func main() { setupLog.Error(err, "unable to wait webhook ready") os.Exit(1) } - - setupLog.Info("setup controllers") - if err = controller.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to setup controllers") - os.Exit(1) - } }() + setupLog.Info("setup controllers") + if err = controller.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup controllers") + os.Exit(1) + } + setupLog.Info("starting manager") if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } + + setupLog.Info("setup controllers that need manager started") + if err = controller.SetupAfterStart(mgr); err != nil { + setupLog.Error(err, "unable to setup controllers after manager start") + os.Exit(1) + } } func setRestConfig(c *rest.Config) { diff --git a/pkg/controller/controllers.go b/pkg/controller/controllers.go index 621136cc6f..1452841671 100644 --- a/pkg/controller/controllers.go +++ b/pkg/controller/controllers.go @@ -44,6 +44,7 @@ import ( ) var controllerAddFuncs []func(manager.Manager) error +var controllerAddAfterStarts []func(manager.Manager) error func init() { controllerAddFuncs = append(controllerAddFuncs, advancedcronjob.Add) @@ -58,15 +59,16 @@ func init() { controllerAddFuncs = append(controllerAddFuncs, statefulset.Add) controllerAddFuncs = append(controllerAddFuncs, uniteddeployment.Add) controllerAddFuncs = append(controllerAddFuncs, podunavailablebudget.Add) - controllerAddFuncs = append(controllerAddFuncs, workloadspread.Add) controllerAddFuncs = append(controllerAddFuncs, resourcedistribution.Add) controllerAddFuncs = append(controllerAddFuncs, ephemeraljob.Add) controllerAddFuncs = append(controllerAddFuncs, containerlauchpriority.Add) - controllerAddFuncs = append(controllerAddFuncs, persistentpodstate.Add) controllerAddFuncs = append(controllerAddFuncs, sidecarterminator.Add) controllerAddFuncs = append(controllerAddFuncs, podprobemarker.Add) controllerAddFuncs = append(controllerAddFuncs, nodepodprobe.Add) controllerAddFuncs = append(controllerAddFuncs, imagelistpulljob.Add) + + controllerAddAfterStarts = append(controllerAddAfterStarts, workloadspread.Add) + controllerAddAfterStarts = append(controllerAddAfterStarts, persistentpodstate.Add) } func SetupWithManager(m manager.Manager) error { @@ -81,3 +83,16 @@ func SetupWithManager(m manager.Manager) error { } return nil } + +func SetupAfterStart(m manager.Manager) error { + for _, f := range controllerAddAfterStarts { + if err := f(m); err != nil { + if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { + klog.InfoS("CRD is not installed, its controller will perform noops!", "CRD", kindMatchErr.GroupKind) + continue + } + return err + } + } + return nil +} diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index 86f2333f8d..2244a7ecdf 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -40,7 +40,6 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/retry" @@ -66,6 +65,7 @@ import ( kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" "github.com/openkruise/kruise/pkg/client/clientset/versioned/scheme" kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + controllerutil "github.com/openkruise/kruise/pkg/controller/util" "github.com/openkruise/kruise/pkg/features" kruiseutil "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" @@ -172,10 +172,10 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { return nil, err } - dsLister := kruiseappslisters.NewDaemonSetLister(dsInformer.(cache.SharedIndexInformer).GetIndexer()) - historyLister := appslisters.NewControllerRevisionLister(revInformer.(cache.SharedIndexInformer).GetIndexer()) - podLister := corelisters.NewPodLister(podInformer.(cache.SharedIndexInformer).GetIndexer()) - nodeLister := corelisters.NewNodeLister(nodeInformer.(cache.SharedIndexInformer).GetIndexer()) + dsLister := kruiseappslisters.NewDaemonSetLister(controllerutil.GetCacheIndexer(dsInformer)) + historyLister := appslisters.NewControllerRevisionLister(controllerutil.GetCacheIndexer(revInformer)) + podLister := corelisters.NewPodLister(controllerutil.GetCacheIndexer(podInformer)) + nodeLister := corelisters.NewNodeLister(controllerutil.GetCacheIndexer(nodeInformer)) failedPodsBackoff := flowcontrol.NewBackOff(1*time.Second, 15*time.Minute) revisionAdapter := revisionadapter.NewDefaultImpl() diff --git a/pkg/controller/statefulset/statefulset_controller.go b/pkg/controller/statefulset/statefulset_controller.go index 704b6678f4..5c33853af2 100644 --- a/pkg/controller/statefulset/statefulset_controller.go +++ b/pkg/controller/statefulset/statefulset_controller.go @@ -35,7 +35,6 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" - toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" kubecontroller "k8s.io/kubernetes/pkg/controller" @@ -54,6 +53,7 @@ import ( "github.com/openkruise/kruise/pkg/client" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" kruiseappslisters "github.com/openkruise/kruise/pkg/client/listers/apps/v1beta1" + controllerutil "github.com/openkruise/kruise/pkg/controller/util" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" @@ -133,10 +133,10 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { return nil, err } - statefulSetLister := kruiseappslisters.NewStatefulSetLister(statefulSetInformer.(toolscache.SharedIndexInformer).GetIndexer()) - podLister := corelisters.NewPodLister(podInformer.(toolscache.SharedIndexInformer).GetIndexer()) - pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcInformer.(toolscache.SharedIndexInformer).GetIndexer()) - scLister := storagelisters.NewStorageClassLister(scInformer.(toolscache.SharedIndexInformer).GetIndexer()) + statefulSetLister := kruiseappslisters.NewStatefulSetLister(controllerutil.GetCacheIndexer(statefulSetInformer)) + podLister := corelisters.NewPodLister(controllerutil.GetCacheIndexer(podInformer)) + pvcLister := corelisters.NewPersistentVolumeClaimLister(controllerutil.GetCacheIndexer(pvcInformer)) + scLister := storagelisters.NewStorageClassLister(controllerutil.GetCacheIndexer(scInformer)) genericClient := client.GetGenericClientWithName("statefulset-controller") eventBroadcaster := record.NewBroadcaster() @@ -159,7 +159,7 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { inplaceupdate.New(utilclient.NewClientFromManager(mgr, "statefulset-controller"), revisionadapter.NewDefaultImpl()), lifecycle.New(utilclient.NewClientFromManager(mgr, "statefulset-controller")), NewRealStatefulSetStatusUpdater(genericClient.KruiseClient, statefulSetLister), - history.NewHistory(genericClient.KubeClient, appslisters.NewControllerRevisionLister(revInformer.(toolscache.SharedIndexInformer).GetIndexer())), + history.NewHistory(genericClient.KubeClient, appslisters.NewControllerRevisionLister(controllerutil.GetCacheIndexer(revInformer))), recorder, ), podControl: kubecontroller.RealPodControl{KubeClient: genericClient.KubeClient, Recorder: recorder}, diff --git a/pkg/controller/util/cache.go b/pkg/controller/util/cache.go new file mode 100644 index 0000000000..f84d30c9d7 --- /dev/null +++ b/pkg/controller/util/cache.go @@ -0,0 +1,20 @@ +package util + +import ( + "k8s.io/client-go/tools/cache" + toolscache "sigs.k8s.io/controller-runtime/pkg/cache" +) + +// GetCacheIndexer helps to get the cache indexer from the informer +func GetCacheIndexer(informer toolscache.Informer) cache.Indexer { + shardIndexInformer, ok := informer.(cache.SharedIndexInformer) + if ok { + return shardIndexInformer.GetIndexer() + } + indexers := cache.Indexers{} + err := informer.(toolscache.Informer).AddIndexers(indexers) + if err != nil { + panic(err) + } + return cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers) +}