diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 4137161cc..f4a28acec 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -79,7 +79,7 @@ func (m *Main) Run() error { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.UseCache, m.flags.EnableObjectHashing) // Create the redis clients redisClient := redis.New(metricsRecorder) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 578fe0e62..9d692a341 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -19,6 +19,8 @@ type CMDFlags struct { K8sQueriesBurstable int Concurrency int LogLevel string + UseCache bool + EnableObjectHashing bool } // Init initializes and parse the flags @@ -35,6 +37,8 @@ func (c *CMDFlags) Init() { // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") flag.StringVar(&c.LogLevel, "log-level", "info", "set log level") + flag.BoolVar(&c.UseCache, "use-cache", false, "use cache stores to get k8s objects") + flag.BoolVar(&c.EnableObjectHashing, "enable-hash", false, "Add hashed annotations to k8s objects, apply changes only when theres a diff.") // Parse flags flag.Parse() } diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 1b6fc1424..b8b2b0b28 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -2,14 +2,17 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/tools/cache" ) // ConfigMap the ServiceAccount service that knows how to interact with k8s to manage them @@ -26,26 +29,51 @@ type ConfigMap interface { type ConfigMapService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewConfigMapService returns a new ConfigMap KubeService. func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService { logger = logger.With("service", "k8s.configMap") + var err error + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cmCacheStore *cache.Store + if ShouldUseCache() { + cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, + cacheStore: cmCacheStore, metricsRecorder: metricsRecorder, } } func (p *ConfigMapService) GetConfigMap(namespace string, name string) (*corev1.ConfigMap, error) { - configMap, err := p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var cm *corev1.ConfigMap + var err error + var exists bool + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + cm = item.(*corev1.ConfigMap) + } + if !exists { + err = fmt.Errorf("configmap %v not found in namespace %v", name, namespace) + } + } else { + cm, err = p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return configMap, err + + recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) + + return cm, err } func (p *ConfigMapService) CreateConfigMap(namespace string, configMap *corev1.ConfigMap) error { @@ -76,6 +104,14 @@ func (p *ConfigMapService) CreateOrUpdateConfigMap(namespace string, configMap * return err } + if hashingEnabled() { + if !shouldUpdate(configMap, storedConfigMap) { + p.logger.Debugf("%v/%v configmap is upto date, no need to apply changes...", configMap.Namespace, configMap.Name) + return nil + } + p.logger.Debugf("%v/%v configmap has a different resource hash, updating the object...", configMap.Namespace, configMap.Name) + addHashAnnotation(configMap) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 46d63bb93..7cc97f22e 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -10,6 +10,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -30,26 +32,49 @@ type Deployment interface { type DeploymentService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewDeploymentService returns a new Deployment KubeService. func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService { logger = logger.With("service", "k8s.deployment") + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = DeploymentCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &DeploymentService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetDeployment will retrieve the requested deployment based on namespace and name func (d *DeploymentService) GetDeployment(namespace, name string) (*appsv1.Deployment, error) { - deployment, err := d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) - if err != nil { - return nil, err + var deployment *appsv1.Deployment + var err error + var exists bool + if d.cacheStore != nil { + c := *d.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + deployment = item.(*appsv1.Deployment) + } + if !exists { + err = fmt.Errorf("deployment %v not found in namespace %v", name, namespace) + } + } else { + deployment, err = d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) return deployment, err } @@ -101,6 +126,15 @@ func (d *DeploymentService) CreateOrUpdateDeployment(namespace string, deploymen return err } + if hashingEnabled() { + if !shouldUpdate(deployment, storedDeployment) { + d.logger.Debugf("%v/%v deployment is upto date, no need to apply changes...", deployment.Namespace, deployment.Name) + return nil + } + d.logger.Debugf("%v/%v deployment has a different resource hash, updating the object...", deployment.Namespace, deployment.Name) + addHashAnnotation(deployment) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/hash_annotations.go b/service/k8s/hash_annotations.go new file mode 100644 index 000000000..b3449d7ec --- /dev/null +++ b/service/k8s/hash_annotations.go @@ -0,0 +1,64 @@ +package k8s + +import ( + "crypto/sha256" + "encoding/base64" + "hash" + + "github.com/davecgh/go-spew/spew" +) + +// taken from https://github.com/k8ssandra/cass-operator/blob/master/pkg/utils/hash_annotation.go + +type Annotated interface { + GetAnnotations() map[string]string + SetAnnotations(annotations map[string]string) + GetName() string +} + +const resourceHashAnnotationKey = "databases.spotahome.com/resource-hash" + +// Create hash of a given object + +func addHashAnnotation(r Annotated) { + hash := deepHashString(r) + m := r.GetAnnotations() + if m == nil { + m = map[string]string{} + } + m[resourceHashAnnotationKey] = hash + r.SetAnnotations(m) +} + +func deepHashString(obj interface{}) string { + hasher := sha256.New() + deepHashObject(hasher, obj) + hashBytes := hasher.Sum([]byte{}) + b64Hash := base64.StdEncoding.EncodeToString(hashBytes) + return b64Hash +} + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func deepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +func shouldUpdate(desired Annotated, stored Annotated) bool { + + storedHash, exists := stored.GetAnnotations()[resourceHashAnnotationKey] + if !exists { + return true + } + desiredHash := deepHashString(desired) + + return desiredHash != storedHash +} diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index b6e68ae44..bc00e7bb5 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -9,6 +9,14 @@ import ( "github.com/spotahome/redis-operator/metrics" ) +var ( + useCache bool +) + +func ShouldUseCache() bool { + return useCache +} + // Service is the K8s service entrypoint. type Services interface { ConfigMap @@ -22,6 +30,14 @@ type Services interface { StatefulSet } +var ( + objectHashingEnabled bool +) + +func hashingEnabled() bool { + return objectHashingEnabled +} + type services struct { ConfigMap Secret @@ -35,7 +51,10 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services { + +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, cacheEnabled bool, enableHashing bool) Services { + useCache = cacheEnabled + objectHashingEnabled = enableHashing return &services{ ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), Secret: NewSecretService(kubecli, logger, metricsRecorder), diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 0583302ce..97e3b1672 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -3,16 +3,18 @@ package k8s import ( "context" "encoding/json" + "fmt" "k8s.io/apimachinery/pkg/types" + "github.com/spotahome/redis-operator/log" + "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - - "github.com/spotahome/redis-operator/log" - "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Pod the ServiceAccount service that knows how to interact with k8s to manage them @@ -30,25 +32,52 @@ type Pod interface { type PodService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodService returns a new Pod KubeService. func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService { logger = logger.With("service", "k8s.pod") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + fmt.Printf("[POD]-- rest client interface: %v\n", rc) + var podCacheStore *cache.Store + var err error + if ShouldUseCache() { + podCacheStore, err = PodCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &PodService{ kubeClient: kubeClient, logger: logger, + cacheStore: podCacheStore, metricsRecorder: metricsRecorder, } } func (p *PodService) GetPod(namespace string, name string) (*corev1.Pod, error) { - pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var pod *corev1.Pod + var err error + var exists bool + if p.cacheStore != nil { + + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + pod = item.(*corev1.Pod) + } + if !exists { + err = fmt.Errorf("pod %v not found in namespace %v", name, namespace) + } + } else { + pod, err = p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) return pod, err } diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index 48350bc43..3ba8ba2a0 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -25,26 +28,54 @@ type PodDisruptionBudget interface { type PodDisruptionBudgetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodDisruptionBudgetService returns a new PodDisruptionBudget KubeService. func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodDisruptionBudgetService { logger = logger.With("service", "k8s.podDisruptionBudget") + + rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = PodDisruptionBudgetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &PodDisruptionBudgetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (p *PodDisruptionBudgetService) GetPodDisruptionBudget(namespace string, name string) (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudget, err := p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var podDisruptionBudget *policyv1.PodDisruptionBudget + var err error + var exists bool + + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + podDisruptionBudget = item.(*policyv1.PodDisruptionBudget) + } + if !exists { + + err = fmt.Errorf("podDisruptionBudget %v not found in namespace %v", name, namespace) + } + } else { + podDisruptionBudget, err = p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return podDisruptionBudget, nil + recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) + + return podDisruptionBudget, err } func (p *PodDisruptionBudgetService) CreatePodDisruptionBudget(namespace string, podDisruptionBudget *policyv1.PodDisruptionBudget) error { @@ -77,6 +108,15 @@ func (p *PodDisruptionBudgetService) CreateOrUpdatePodDisruptionBudget(namespace return err } + if hashingEnabled() { + if !shouldUpdate(podDisruptionBudget, storedPodDisruptionBudget) { + p.logger.Debugf("%v/%v pdb is upto date, no need to apply changes...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + return nil + } + p.logger.Debugf("%v/%v pdb has a different resource hash, updating the object...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + addHashAnnotation(podDisruptionBudget) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index a5534b445..786cedd94 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,37 +30,117 @@ type RBAC interface { // NamespaceService is the Namespace service implementation using API calls to kubernetes. type RBACService struct { - kubeClient kubernetes.Interface - logger log.Logger - metricsRecorder metrics.Recorder + kubeClient kubernetes.Interface + logger log.Logger + roleCacheStore *cache.Store + roleBindingCacheStore *cache.Store + clusterRoleCacheStore *cache.Store + metricsRecorder metrics.Recorder } // NewRBACService returns a new RBAC KubeService. func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *RBACService { logger = logger.With("service", "k8s.rbac") + + rc := kubeClient.RbacV1().RESTClient().(*rest.RESTClient) + + var roleCacheStore *cache.Store + var roleBindingCacheStore *cache.Store + var clusterRoleCacheStore *cache.Store + var err error + + if ShouldUseCache() { + roleCacheStore, err = RoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for roles: %v", err) + } + roleBindingCacheStore, err = RoleBindingCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for rolebinding: %v", err) + } + clusterRoleCacheStore, err = ClusterRoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache cluster role: %v", err) + } + } + return &RBACService{ - kubeClient: kubeClient, - logger: logger, - metricsRecorder: metricsRecorder, + kubeClient: kubeClient, + logger: logger, + roleCacheStore: roleCacheStore, + roleBindingCacheStore: roleBindingCacheStore, + clusterRoleCacheStore: clusterRoleCacheStore, + metricsRecorder: metricsRecorder, } } func (r *RBACService) GetClusterRole(name string) (*rbacv1.ClusterRole, error) { - clusterRole, err := r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + var clusterRole *rbacv1.ClusterRole + var err error + var exists bool + if r.clusterRoleCacheStore != nil { + + c := *r.clusterRoleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v", name)) + if exists && nil == err { + clusterRole = item.(*rbacv1.ClusterRole) + } + + if !exists { + err = fmt.Errorf("clusterRole %v not found", name) + } + + } else { + clusterRole, err = r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(metrics.NOT_APPLICABLE, "ClusterRole", name, "GET", err, r.metricsRecorder) return clusterRole, err + } func (r *RBACService) GetRole(namespace, name string) (*rbacv1.Role, error) { - role, err := r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var role *rbacv1.Role + var err error + var exists bool + if r.roleCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + role = item.(*rbacv1.Role) + } + if !exists { + err = fmt.Errorf("role %v not found in namespace %v", name, namespace) + } + + } else { + role, err = r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Role", name, "GET", err, r.metricsRecorder) return role, err } func (r *RBACService) GetRoleBinding(namespace, name string) (*rbacv1.RoleBinding, error) { - rolbinding, err := r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var roleBinding *rbacv1.RoleBinding + var err error + var exists bool + if r.roleBindingCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + roleBinding = item.(*rbacv1.RoleBinding) + } + if !exists { + err = fmt.Errorf("role binding %v not found in namespace %v", name, namespace) + } + } else { + roleBinding, err = r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(namespace, "RoleBinding", name, "GET", err, r.metricsRecorder) - return rolbinding, err + return roleBinding, err } func (r *RBACService) DeleteRole(namespace, name string) error { @@ -100,6 +183,14 @@ func (r *RBACService) CreateOrUpdateRole(namespace string, role *rbacv1.Role) er return err } + if hashingEnabled() { + if !shouldUpdate(role, storedRole) { + r.logger.Debugf("%v/%v role is upto date, no need to apply changes...", role.Namespace, role.Name) + return nil + } + r.logger.Debugf("%v/%v role has a different resource hash, updating the object...", role.Namespace, role.Name) + addHashAnnotation(role) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), @@ -148,6 +239,15 @@ func (r *RBACService) CreateOrUpdateRoleBinding(namespace string, binding *rbacv return err } + if hashingEnabled() { + if !shouldUpdate(binding, storedBinding) { + r.logger.Debugf("%v/%v rolebinding is upto date, no need to apply changes...", binding.Namespace, binding.Name) + return nil + } + r.logger.Debugf("%v/%v rolebinding has a different resource hash, updating the object...", binding.Namespace, binding.Name) + addHashAnnotation(binding) + } + // Check if the role ref has changed, roleref updates are not allowed, if changed then delete and create again the role binding. // https://github.com/kubernetes/kubernetes/blob/0f0a5223dfc75337d03c9b80ae552ae8ef138eeb/pkg/apis/rbac/validation/validation.go#L157-L159 if storedBinding.RoleRef != binding.RoleRef { diff --git a/service/k8s/secret.go b/service/k8s/secret.go index 0edd23dea..6b540a51f 100644 --- a/service/k8s/secret.go +++ b/service/k8s/secret.go @@ -2,12 +2,15 @@ package k8s import ( "context" + "fmt" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Secret interacts with k8s to get secrets @@ -19,26 +22,49 @@ type Secret interface { type SecretService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *SecretService { logger = logger.With("service", "k8s.secret") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = SecretCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &SecretService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *SecretService) GetSecret(namespace, name string) (*corev1.Secret, error) { + var secret *corev1.Secret + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + secret = item.(*corev1.Secret) + } + if !exists { + err = fmt.Errorf("secret %v not found in namespace %v", name, namespace) + } + } else { + secret, err = s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } - secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) recordMetrics(namespace, "Secret", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err - } return secret, err } diff --git a/service/k8s/service.go b/service/k8s/service.go index 712cc4c0d..1388ddc7f 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,25 +30,51 @@ type Service interface { type ServiceService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewServiceService returns a new Service KubeService. func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ServiceService { logger = logger.With("service", "k8s.service") + + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + + if ShouldUseCache() { + cacheStore, err = ServiceCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &ServiceService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *ServiceService) GetService(namespace string, name string) (*corev1.Service, error) { - service, err := s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var service *corev1.Service + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + service = item.(*corev1.Service) + } + if !exists { + err = fmt.Errorf("svc %v/%v not found", namespace, name) + } + } else { + service, err = s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) return service, err } @@ -90,6 +119,15 @@ func (s *ServiceService) CreateOrUpdateService(namespace string, service *corev1 return err } + if hashingEnabled() { + if !shouldUpdate(service, storedService) { + s.logger.Debugf("%v/%v service is upto date, no need to apply changes...", service.Namespace, service.Name) + return nil + } + s.logger.Debugf("%v/%v service has a different resource hash, updating the object...", service.Namespace, service.Name) + addHashAnnotation(service) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..6245f2c8a 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -15,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -35,27 +37,52 @@ type StatefulSet interface { type StatefulSetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewStatefulSetService returns a new StatefulSet KubeService. func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *StatefulSetService { logger = logger.With("service", "k8s.statefulSet") + + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = StatefulSetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &StatefulSetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetStatefulSet will retrieve the requested statefulset based on namespace and name func (s *StatefulSetService) GetStatefulSet(namespace, name string) (*appsv1.StatefulSet, error) { - statefulSet, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var ss *appsv1.StatefulSet + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + ss = item.(*appsv1.StatefulSet) + } + if !exists { + err = fmt.Errorf("statefulset %s not found in namespace %v", name, namespace) + } + } else { + ss, err = s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return statefulSet, err + + recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) + return ss, err } // GetStatefulSetPods will give a list of pods that are managed by the statefulset @@ -97,6 +124,7 @@ func (s *StatefulSetService) UpdateStatefulSet(namespace string, statefulSet *ap // CreateOrUpdateStatefulSet will update the statefulset or create it if does not exist func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefulSet *appsv1.StatefulSet) error { storedStatefulSet, err := s.GetStatefulSet(namespace, statefulSet.Name) + if err != nil { // If no resource we need to create. if errors.IsNotFound(err) { @@ -171,6 +199,16 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu // set stored.volumeClaimTemplates statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations) + + if hashingEnabled() { + if !shouldUpdate(statefulSet, storedStatefulSet) { + s.logger.Debugf("%v/%v statefulset is upto date, no need to apply changes...", statefulSet.Namespace, statefulSet.Name) + return nil + } + s.logger.Debugf("%v/%v statefulset has a different resource hash, updating the object...", statefulSet.Namespace, statefulSet.Name) + addHashAnnotation(statefulSet) + } + return s.UpdateStatefulSet(namespace, statefulSet) } diff --git a/service/k8s/util.go b/service/k8s/util.go index 2cd9bbd73..397301c8a 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -1,11 +1,23 @@ package k8s import ( + "context" "fmt" + "time" redisfailoverv1 "github.com/spotahome/redis-operator/api/redisfailover/v1" "github.com/spotahome/redis-operator/metrics" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // GetRedisPassword retreives password from kubernetes secret or, if @@ -42,3 +54,428 @@ func recordMetrics(namespace string, kind string, object string, operation strin metricsRecorder.RecordK8sOperation(namespace, kind, object, operation, metrics.FAIL, metrics.K8S_MISC) } } + +// TODO: Update *CacheStoreFromKubeClient be implemented via generics + +func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("pods"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.PodList, error) { + result := corev1.PodList{} + err := rc.Get().Resource("pods").Do(context.Background()).Into(&result) + return &result, err + } + podCacheStore, podCacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Pod{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go podCacheController.Run(wait.NeverStop) + + return &podCacheStore, nil + +} + +func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("services"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ServiceList, error) { + result := corev1.ServiceList{} + err := rc.Get().Resource("services").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Service{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if err != nil { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("configmap"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ConfigMapList, error) { + fmt.Printf("cm lister calling...") + fmt.Printf("resr client: %v...", rc) + result := corev1.ConfigMapList{} + err := rc.Get().Resource("configmap").Do(context.Background()).Into(&result) + fmt.Printf("cm lister called; error found: %v\n", err) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.ConfigMap{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("deployments"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.DeploymentList, error) { + result := appsv1.DeploymentList{} + err := rc.Get().Resource("deployments").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.Deployment{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := policyv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("poddisruptionbudgets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*policyv1.PodDisruptionBudgetList, error) { + result := policyv1.PodDisruptionBudgetList{} + err := rc.Get().Resource("poddisruptionbudgets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &policyv1.PodDisruptionBudget{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("roles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleList, error) { + result := rbacv1.RoleList{} + err := rc.Get().Resource("roles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.Role{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("clusterroles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.ClusterRoleList, error) { + result := rbacv1.ClusterRoleList{} + err := rc.Get().Resource("clusterroles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.ClusterRole{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} + +func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("rolebindings"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleBindingList, error) { + result := rbacv1.RoleBindingList{} + err := rc.Get().Resource("rolebindings").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.RoleBinding{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} +func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("secrets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.SecretList, error) { + result := corev1.SecretList{} + err := rc.Get().Resource("secrets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Secret{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} + +func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("statefulsets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.StatefulSetList, error) { + result := appsv1.StatefulSetList{} + err := rc.Get().Resource("statefulsets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.StatefulSet{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} diff --git a/test/integration/redisfailover/creation_test.go b/test/integration/redisfailover/creation_test.go index 5458634ba..c63af3f64 100644 --- a/test/integration/redisfailover/creation_test.go +++ b/test/integration/redisfailover/creation_test.go @@ -94,7 +94,7 @@ func TestRedisFailover(t *testing.T) { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy, true) // Prepare namespace prepErr := clients.prepareNS()