diff --git a/cmd/main.go b/cmd/main.go index 0615508..2b25c7d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -21,6 +21,7 @@ import ( "github.com/nacos-group/nacos-controller/pkg/nacos" "github.com/nacos-group/nacos-controller/pkg/nacos/auth" "github.com/nacos-group/nacos-controller/pkg/nacos/client/impl" + "k8s.io/client-go/kubernetes" "os" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) @@ -94,7 +95,9 @@ func main() { os.Exit(1) } - if err = controller.NewDynamicConfigurationReconciler(mgr.GetClient(), mgr.GetScheme(), nacos.SyncConfigOptions{ + clientSet := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie()) + + if err = controller.NewDynamicConfigurationReconciler(mgr.GetClient(), clientSet, nacos.SyncConfigOptions{ ConfigClient: impl.NewDefaultNacosConfigClient(auth.NewDefaultNacosAuthProvider(mgr.GetClient())), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DynamicConfiguration") diff --git a/pkg/controller/dynamicconfiguration_controller.go b/pkg/controller/dynamicconfiguration_controller.go index 3f453db..69c3ab1 100644 --- a/pkg/controller/dynamicconfiguration_controller.go +++ b/pkg/controller/dynamicconfiguration_controller.go @@ -24,11 +24,11 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" runtimehandler "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" "strings" - "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -48,15 +48,13 @@ const ( // DynamicConfigurationReconciler reconciles a DynamicConfiguration object type DynamicConfigurationReconciler struct { client.Client - Scheme *runtime.Scheme controller *nacos.SyncConfigurationController } -func NewDynamicConfigurationReconciler(c client.Client, s *runtime.Scheme, opt nacos.SyncConfigOptions) *DynamicConfigurationReconciler { +func NewDynamicConfigurationReconciler(c client.Client, cs *kubernetes.Clientset, opt nacos.SyncConfigOptions) *DynamicConfigurationReconciler { return &DynamicConfigurationReconciler{ Client: c, - Scheme: s, - controller: nacos.NewSyncConfigurationController(c, opt), + controller: nacos.NewSyncConfigurationController(c, cs, opt), } } diff --git a/pkg/controller/suite_test.go b/pkg/controller/suite_test.go index fd58a6a..797ccfc 100644 --- a/pkg/controller/suite_test.go +++ b/pkg/controller/suite_test.go @@ -19,6 +19,9 @@ package controller import ( "context" "github.com/nacos-group/nacos-controller/pkg/nacos" + "github.com/nacos-group/nacos-controller/pkg/nacos/auth" + "github.com/nacos-group/nacos-controller/pkg/nacos/client/impl" + "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" "os" "path/filepath" @@ -82,7 +85,11 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - err = NewDynamicConfigurationReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), nacos.SyncConfigOptions{}).SetupWithManager(k8sManager) + clientSet := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie()) + + err = NewDynamicConfigurationReconciler(k8sManager.GetClient(), clientSet, nacos.SyncConfigOptions{ + ConfigClient: impl.NewDefaultNacosConfigClient(auth.NewDefaultNacosAuthProvider(k8sManager.GetClient())), + }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) //+kubebuilder:scaffold:scheme diff --git a/pkg/nacos/nacos_controller.go b/pkg/nacos/nacos_controller.go index c5dd8cc..f774447 100644 --- a/pkg/nacos/nacos_controller.go +++ b/pkg/nacos/nacos_controller.go @@ -8,13 +8,14 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "strings" ) type SyncConfigurationController struct { - client.Client + cs *kubernetes.Clientset mappings *DataId2DCMappings locks *LockManager configClient nacosclient.NacosConfigClient @@ -28,7 +29,7 @@ type SyncConfigOptions struct { Locks *LockManager } -func NewSyncConfigurationController(c client.Client, opt SyncConfigOptions) *SyncConfigurationController { +func NewSyncConfigurationController(c client.Client, cs *kubernetes.Clientset, opt SyncConfigOptions) *SyncConfigurationController { if opt.ConfigClient == nil { opt.ConfigClient = nacosclient.GetDefaultNacosClient() } @@ -39,10 +40,10 @@ func NewSyncConfigurationController(c client.Client, opt SyncConfigOptions) *Syn opt.Locks = NewLockManager() } if opt.Callback == nil { - opt.Callback = NewDefaultServer2ClusterCallback(c, opt.Mappings, opt.Locks) + opt.Callback = NewDefaultServer2ClusterCallback(c, cs, opt.Mappings, opt.Locks) } return &SyncConfigurationController{ - Client: c, + cs: cs, mappings: opt.Mappings, locks: opt.Locks, configClient: opt.ConfigClient, @@ -125,7 +126,7 @@ func (scc *SyncConfigurationController) syncCluster2Server(ctx context.Context, APIVersion: dc.Spec.ObjectRef.APIVersion, Kind: dc.Spec.ObjectRef.Kind, } - objWrapper, err := NewObjectReferenceWrapper(scc.Client, dc, &objRef) + objWrapper, err := NewObjectReferenceWrapper(scc.cs, dc, &objRef) if err != nil { l.Error(err, "create object wrapper error", "obj", objRef) return err @@ -243,7 +244,7 @@ func (scc *SyncConfigurationController) syncServer2Cluster(ctx context.Context, } dc.Status.ObjectRef = &objectRef - objWrapper, err := NewObjectReferenceWrapper(scc.Client, dc, &objectRef) + objWrapper, err := NewObjectReferenceWrapper(scc.cs, dc, &objectRef) if err != nil { l.Error(err, "create object wrapper error") return err diff --git a/pkg/nacos/object_reference_wrapper.go b/pkg/nacos/object_reference_wrapper.go index e3e5bff..c5a8c03 100644 --- a/pkg/nacos/object_reference_wrapper.go +++ b/pkg/nacos/object_reference_wrapper.go @@ -6,8 +6,9 @@ import ( "github.com/nacos-group/nacos-controller/pkg" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "strings" @@ -15,9 +16,9 @@ import ( func init() { // Using ConfigMapWrapper as default ConfigMap resource wrapper - RegisterObjectWrapperIfAbsent(ConfigMapGVK.String(), func(c client.Client, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) { + RegisterObjectWrapperIfAbsent(ConfigMapGVK.String(), func(cs *kubernetes.Clientset, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) { return &ConfigMapWrapper{ - Client: c, + cs: cs, ObjectRef: objRef, owner: owner, }, nil @@ -41,7 +42,7 @@ type ObjectReferenceWrapper interface { Reload() error } -type NewObjectWrapperFn func(client.Client, client.Object, *v1.ObjectReference) (ObjectReferenceWrapper, error) +type NewObjectWrapperFn func(*kubernetes.Clientset, client.Object, *v1.ObjectReference) (ObjectReferenceWrapper, error) var objectWrapperMap = map[string]NewObjectWrapperFn{} @@ -57,20 +58,20 @@ func RegisterObjectWrapper(targetGVK string, fn NewObjectWrapperFn) { objectWrapperMap[targetGVK] = fn } -func NewObjectReferenceWrapper(c client.Client, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) { +func NewObjectReferenceWrapper(cs *kubernetes.Clientset, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) { targetGVK := objRef.GroupVersionKind().String() fn, exist := objectWrapperMap[targetGVK] if !exist { return nil, fmt.Errorf("unsupport object reference type: %s", targetGVK) } - return fn(c, owner, objRef) + return fn(cs, owner, objRef) } type ConfigMapWrapper struct { ObjectRef *v1.ObjectReference cm *v1.ConfigMap owner client.Object - client.Client + cs *kubernetes.Clientset } func (cmw *ConfigMapWrapper) GetContent(dataId string) (string, bool, error) { @@ -149,7 +150,13 @@ func (cmw *ConfigMapWrapper) Flush() error { if cmw.cm == nil { return nil } - return cmw.Update(context.TODO(), cmw.cm) + var err error + var cm *v1.ConfigMap + if cm, err = cmw.cs.CoreV1().ConfigMaps(cmw.cm.Namespace).Update(context.TODO(), cmw.cm, metav1.UpdateOptions{}); err != nil { + return err + } + cmw.cm = cm + return nil } func (cmw *ConfigMapWrapper) InjectLabels(labels map[string]string) { @@ -170,33 +177,35 @@ func (cmw *ConfigMapWrapper) InjectLabels(labels map[string]string) { } func (cmw *ConfigMapWrapper) Reload() error { - cm := v1.ConfigMap{} - if err := cmw.Get(context.TODO(), types.NamespacedName{Namespace: cmw.ObjectRef.Namespace, Name: cmw.ObjectRef.Name}, &cm); err != nil { - if errors.IsNotFound(err) { - // if not found, try to create object reference - cm.Namespace = cmw.ObjectRef.Namespace - cm.Name = cmw.ObjectRef.Name - - owner := cmw.owner - apiVersion, kind := owner.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind() - cm.SetOwnerReferences([]v12.OwnerReference{ - { - APIVersion: apiVersion, - Kind: kind, - Name: owner.GetName(), - UID: owner.GetUID(), - Controller: pointer.Bool(true), - BlockOwnerDeletion: pointer.Bool(true), - }, - }) - if err := cmw.Create(context.TODO(), &cm); err != nil { - return err - } - } else { + var cm *v1.ConfigMap + var err error + + if cm, err = cmw.cs.CoreV1().ConfigMaps(cmw.ObjectRef.Namespace).Get(context.TODO(), cmw.ObjectRef.Name, metav1.GetOptions{}); err != nil { + if !errors.IsNotFound(err) { + return err + } + // if not found, try to create object reference + cm = &v1.ConfigMap{} + cm.Namespace = cmw.ObjectRef.Namespace + cm.Name = cmw.ObjectRef.Name + + owner := cmw.owner + apiVersion, kind := owner.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind() + cm.SetOwnerReferences([]v12.OwnerReference{ + { + APIVersion: apiVersion, + Kind: kind, + Name: owner.GetName(), + UID: owner.GetUID(), + Controller: pointer.Bool(true), + BlockOwnerDeletion: pointer.Bool(true), + }, + }) + if cm, err = cmw.cs.CoreV1().ConfigMaps(cm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil { return err } } - cmw.cm = &cm + cmw.cm = cm return cmw.ensureOwnerLabel() } diff --git a/pkg/nacos/server2cluster.go b/pkg/nacos/server2cluster.go index 66bc30b..a3c187e 100644 --- a/pkg/nacos/server2cluster.go +++ b/pkg/nacos/server2cluster.go @@ -8,6 +8,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -20,9 +21,10 @@ type Server2ClusterCallback interface { CallbackWithContext(ctx context.Context, namespace, group, dataId, content string) } -func NewDefaultServer2ClusterCallback(c client.Client, mappings *DataId2DCMappings, locks *LockManager) Server2ClusterCallback { +func NewDefaultServer2ClusterCallback(c client.Client, cs *kubernetes.Clientset, mappings *DataId2DCMappings, locks *LockManager) Server2ClusterCallback { return &DefaultServer2ClusterCallback{ Client: c, + cs: cs, mappings: mappings, locks: locks, } @@ -30,6 +32,7 @@ func NewDefaultServer2ClusterCallback(c client.Client, mappings *DataId2DCMappin type DefaultServer2ClusterCallback struct { client.Client + cs *kubernetes.Clientset mappings *DataId2DCMappings locks *LockManager } @@ -99,7 +102,7 @@ func (cb *DefaultServer2ClusterCallback) server2ClusterCallbackOneDC(ctx context objRef := dc.Status.ObjectRef.DeepCopy() objRef.Namespace = dc.Namespace - objWrapper, err := NewObjectReferenceWrapper(cb.Client, &dc, objRef) + objWrapper, err := NewObjectReferenceWrapper(cb.cs, &dc, objRef) if err != nil { l.Error(err, "create object wrapper error", "objRef", objRef) return err