Skip to content

Commit

Permalink
fix: ObjectWrapper using ClientSet to avoid reading cache (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
junfengP authored Nov 24, 2023
1 parent aff4595 commit 4059dbd
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 47 deletions.
5 changes: 4 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/nacos-group/nacos-controller/pkg/nacos"
"github.com/nacos-group/nacos-controller/pkg/nacos/auth"
"github.com/nacos-group/nacos-controller/pkg/nacos/client/impl"
"k8s.io/client-go/kubernetes"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -94,7 +95,9 @@ func main() {
os.Exit(1)
}

if err = controller.NewDynamicConfigurationReconciler(mgr.GetClient(), mgr.GetScheme(), nacos.SyncConfigOptions{
clientSet := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())

if err = controller.NewDynamicConfigurationReconciler(mgr.GetClient(), clientSet, nacos.SyncConfigOptions{
ConfigClient: impl.NewDefaultNacosConfigClient(auth.NewDefaultNacosAuthProvider(mgr.GetClient())),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DynamicConfiguration")
Expand Down
8 changes: 3 additions & 5 deletions pkg/controller/dynamicconfiguration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
runtimehandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -48,15 +48,13 @@ const (
// DynamicConfigurationReconciler reconciles a DynamicConfiguration object
type DynamicConfigurationReconciler struct {
client.Client
Scheme *runtime.Scheme
controller *nacos.SyncConfigurationController
}

func NewDynamicConfigurationReconciler(c client.Client, s *runtime.Scheme, opt nacos.SyncConfigOptions) *DynamicConfigurationReconciler {
func NewDynamicConfigurationReconciler(c client.Client, cs *kubernetes.Clientset, opt nacos.SyncConfigOptions) *DynamicConfigurationReconciler {
return &DynamicConfigurationReconciler{
Client: c,
Scheme: s,
controller: nacos.NewSyncConfigurationController(c, opt),
controller: nacos.NewSyncConfigurationController(c, cs, opt),
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package controller
import (
"context"
"github.com/nacos-group/nacos-controller/pkg/nacos"
"github.com/nacos-group/nacos-controller/pkg/nacos/auth"
"github.com/nacos-group/nacos-controller/pkg/nacos/client/impl"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"os"
"path/filepath"
Expand Down Expand Up @@ -82,7 +85,11 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

err = NewDynamicConfigurationReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), nacos.SyncConfigOptions{}).SetupWithManager(k8sManager)
clientSet := kubernetes.NewForConfigOrDie(ctrl.GetConfigOrDie())

err = NewDynamicConfigurationReconciler(k8sManager.GetClient(), clientSet, nacos.SyncConfigOptions{
ConfigClient: impl.NewDefaultNacosConfigClient(auth.NewDefaultNacosAuthProvider(k8sManager.GetClient())),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

//+kubebuilder:scaffold:scheme
Expand Down
13 changes: 7 additions & 6 deletions pkg/nacos/nacos_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"strings"
)

type SyncConfigurationController struct {
client.Client
cs *kubernetes.Clientset
mappings *DataId2DCMappings
locks *LockManager
configClient nacosclient.NacosConfigClient
Expand All @@ -28,7 +29,7 @@ type SyncConfigOptions struct {
Locks *LockManager
}

func NewSyncConfigurationController(c client.Client, opt SyncConfigOptions) *SyncConfigurationController {
func NewSyncConfigurationController(c client.Client, cs *kubernetes.Clientset, opt SyncConfigOptions) *SyncConfigurationController {
if opt.ConfigClient == nil {
opt.ConfigClient = nacosclient.GetDefaultNacosClient()
}
Expand All @@ -39,10 +40,10 @@ func NewSyncConfigurationController(c client.Client, opt SyncConfigOptions) *Syn
opt.Locks = NewLockManager()
}
if opt.Callback == nil {
opt.Callback = NewDefaultServer2ClusterCallback(c, opt.Mappings, opt.Locks)
opt.Callback = NewDefaultServer2ClusterCallback(c, cs, opt.Mappings, opt.Locks)
}
return &SyncConfigurationController{
Client: c,
cs: cs,
mappings: opt.Mappings,
locks: opt.Locks,
configClient: opt.ConfigClient,
Expand Down Expand Up @@ -125,7 +126,7 @@ func (scc *SyncConfigurationController) syncCluster2Server(ctx context.Context,
APIVersion: dc.Spec.ObjectRef.APIVersion,
Kind: dc.Spec.ObjectRef.Kind,
}
objWrapper, err := NewObjectReferenceWrapper(scc.Client, dc, &objRef)
objWrapper, err := NewObjectReferenceWrapper(scc.cs, dc, &objRef)
if err != nil {
l.Error(err, "create object wrapper error", "obj", objRef)
return err
Expand Down Expand Up @@ -243,7 +244,7 @@ func (scc *SyncConfigurationController) syncServer2Cluster(ctx context.Context,
}
dc.Status.ObjectRef = &objectRef

objWrapper, err := NewObjectReferenceWrapper(scc.Client, dc, &objectRef)
objWrapper, err := NewObjectReferenceWrapper(scc.cs, dc, &objectRef)
if err != nil {
l.Error(err, "create object wrapper error")
return err
Expand Down
73 changes: 41 additions & 32 deletions pkg/nacos/object_reference_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"github.com/nacos-group/nacos-controller/pkg"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

func init() {
// Using ConfigMapWrapper as default ConfigMap resource wrapper
RegisterObjectWrapperIfAbsent(ConfigMapGVK.String(), func(c client.Client, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) {
RegisterObjectWrapperIfAbsent(ConfigMapGVK.String(), func(cs *kubernetes.Clientset, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) {
return &ConfigMapWrapper{
Client: c,
cs: cs,
ObjectRef: objRef,
owner: owner,
}, nil
Expand All @@ -41,7 +42,7 @@ type ObjectReferenceWrapper interface {
Reload() error
}

type NewObjectWrapperFn func(client.Client, client.Object, *v1.ObjectReference) (ObjectReferenceWrapper, error)
type NewObjectWrapperFn func(*kubernetes.Clientset, client.Object, *v1.ObjectReference) (ObjectReferenceWrapper, error)

var objectWrapperMap = map[string]NewObjectWrapperFn{}

Expand All @@ -57,20 +58,20 @@ func RegisterObjectWrapper(targetGVK string, fn NewObjectWrapperFn) {
objectWrapperMap[targetGVK] = fn
}

func NewObjectReferenceWrapper(c client.Client, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) {
func NewObjectReferenceWrapper(cs *kubernetes.Clientset, owner client.Object, objRef *v1.ObjectReference) (ObjectReferenceWrapper, error) {
targetGVK := objRef.GroupVersionKind().String()
fn, exist := objectWrapperMap[targetGVK]
if !exist {
return nil, fmt.Errorf("unsupport object reference type: %s", targetGVK)
}
return fn(c, owner, objRef)
return fn(cs, owner, objRef)
}

type ConfigMapWrapper struct {
ObjectRef *v1.ObjectReference
cm *v1.ConfigMap
owner client.Object
client.Client
cs *kubernetes.Clientset
}

func (cmw *ConfigMapWrapper) GetContent(dataId string) (string, bool, error) {
Expand Down Expand Up @@ -149,7 +150,13 @@ func (cmw *ConfigMapWrapper) Flush() error {
if cmw.cm == nil {
return nil
}
return cmw.Update(context.TODO(), cmw.cm)
var err error
var cm *v1.ConfigMap
if cm, err = cmw.cs.CoreV1().ConfigMaps(cmw.cm.Namespace).Update(context.TODO(), cmw.cm, metav1.UpdateOptions{}); err != nil {
return err
}
cmw.cm = cm
return nil
}

func (cmw *ConfigMapWrapper) InjectLabels(labels map[string]string) {
Expand All @@ -170,33 +177,35 @@ func (cmw *ConfigMapWrapper) InjectLabels(labels map[string]string) {
}

func (cmw *ConfigMapWrapper) Reload() error {
cm := v1.ConfigMap{}
if err := cmw.Get(context.TODO(), types.NamespacedName{Namespace: cmw.ObjectRef.Namespace, Name: cmw.ObjectRef.Name}, &cm); err != nil {
if errors.IsNotFound(err) {
// if not found, try to create object reference
cm.Namespace = cmw.ObjectRef.Namespace
cm.Name = cmw.ObjectRef.Name

owner := cmw.owner
apiVersion, kind := owner.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
cm.SetOwnerReferences([]v12.OwnerReference{
{
APIVersion: apiVersion,
Kind: kind,
Name: owner.GetName(),
UID: owner.GetUID(),
Controller: pointer.Bool(true),
BlockOwnerDeletion: pointer.Bool(true),
},
})
if err := cmw.Create(context.TODO(), &cm); err != nil {
return err
}
} else {
var cm *v1.ConfigMap
var err error

if cm, err = cmw.cs.CoreV1().ConfigMaps(cmw.ObjectRef.Namespace).Get(context.TODO(), cmw.ObjectRef.Name, metav1.GetOptions{}); err != nil {
if !errors.IsNotFound(err) {
return err
}
// if not found, try to create object reference
cm = &v1.ConfigMap{}
cm.Namespace = cmw.ObjectRef.Namespace
cm.Name = cmw.ObjectRef.Name

owner := cmw.owner
apiVersion, kind := owner.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
cm.SetOwnerReferences([]v12.OwnerReference{
{
APIVersion: apiVersion,
Kind: kind,
Name: owner.GetName(),
UID: owner.GetUID(),
Controller: pointer.Bool(true),
BlockOwnerDeletion: pointer.Bool(true),
},
})
if cm, err = cmw.cs.CoreV1().ConfigMaps(cm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
return err
}
}
cmw.cm = &cm
cmw.cm = cm
return cmw.ensureOwnerLabel()
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/nacos/server2cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -20,16 +21,18 @@ type Server2ClusterCallback interface {
CallbackWithContext(ctx context.Context, namespace, group, dataId, content string)
}

func NewDefaultServer2ClusterCallback(c client.Client, mappings *DataId2DCMappings, locks *LockManager) Server2ClusterCallback {
func NewDefaultServer2ClusterCallback(c client.Client, cs *kubernetes.Clientset, mappings *DataId2DCMappings, locks *LockManager) Server2ClusterCallback {
return &DefaultServer2ClusterCallback{
Client: c,
cs: cs,
mappings: mappings,
locks: locks,
}
}

type DefaultServer2ClusterCallback struct {
client.Client
cs *kubernetes.Clientset
mappings *DataId2DCMappings
locks *LockManager
}
Expand Down Expand Up @@ -99,7 +102,7 @@ func (cb *DefaultServer2ClusterCallback) server2ClusterCallbackOneDC(ctx context

objRef := dc.Status.ObjectRef.DeepCopy()
objRef.Namespace = dc.Namespace
objWrapper, err := NewObjectReferenceWrapper(cb.Client, &dc, objRef)
objWrapper, err := NewObjectReferenceWrapper(cb.cs, &dc, objRef)
if err != nil {
l.Error(err, "create object wrapper error", "objRef", objRef)
return err
Expand Down

0 comments on commit 4059dbd

Please sign in to comment.