Skip to content

Commit

Permalink
Merge pull request #90 from deefreak/policyControllerChanges
Browse files Browse the repository at this point in the history
Added finalizer and handled delete/update operations for policies
  • Loading branch information
deefreak authored Nov 9, 2023
2 parents f843db9 + 7f81af7 commit 14fb7dd
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ func main() {

if err = controller.NewPolicyWatcher(mgr.GetClient(),
mgr.GetScheme(),
triggerHandler.QueueAllForExecution).SetupWithManager(mgr); err != nil {
triggerHandler.QueueAllForExecution,
triggerHandler.QueueForExecution).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Policy")
os.Exit(1)
}
Expand Down
132 changes: 130 additions & 2 deletions pkg/controller/policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,47 @@ package controller

import (
"context"
"reflect"

ottoscaleriov1alpha1 "github.com/flipkart-incubator/ottoscalr/api/v1alpha1"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const PolicyWatcherCtrl = "PolicyWatcher"
const policyFinalizerName = "finalizer.ottoscaler.io"

var policyRefKey = ".spec.policy"

// PolicyWatcher reconciles a Policy object
type PolicyWatcher struct {
Client client.Client
Scheme *runtime.Scheme
requeueAllFunc func()
requeueOneFunc func(types.NamespacedName)
}

func NewPolicyWatcher(client client.Client,
scheme *runtime.Scheme,
requeueAllFunc func(),
requeueOneFunc func(types.NamespacedName),
) *PolicyWatcher {
return &PolicyWatcher{Client: client,
Scheme: scheme,
requeueAllFunc: requeueAllFunc,
requeueOneFunc: requeueOneFunc,
}
}

Expand All @@ -63,8 +79,38 @@ func (r *PolicyWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, err
}

// Add finalizer to the policy if it doesn't have one
policy, err := r.addFinalizer(ctx, policy)

if err != nil {
logger.Error(err, "Error adding finalizer to policy")
return ctrl.Result{}, err
}
//Handle Reconcile
//If it is a delete event or update in the spec
//Requeue all policyRecommendations having the request Policy object as a reference
err = r.handleReconcilation(ctx, policy, logger)

if err != nil {
logger.Error(err, "Error handling reconcilation of policy")
return ctrl.Result{}, err
}

// If the policy is deleted
if !policy.ObjectMeta.DeletionTimestamp.IsZero() {

// Remove finalizer from the policy
policy.ObjectMeta.Finalizers = removeString(policy.ObjectMeta.Finalizers, policyFinalizerName)
if err := r.Client.Update(ctx, &policy); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil

}

// If the policy is marked as default, ensure no other policy is marked as default
if policy.Spec.IsDefault {
// If the policy is marked as default, ensure no other policy is marked as default
var allPolicies ottoscaleriov1alpha1.PolicyList
if err := r.Client.List(ctx, &allPolicies); err != nil {
logger.Error(err, "Error getting allPolicies")
Expand Down Expand Up @@ -95,8 +141,90 @@ func (r *PolicyWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// SetupWithManager sets up the controller with the Manager.
func (r *PolicyWatcher) SetupWithManager(mgr ctrl.Manager) error {

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &ottoscaleriov1alpha1.PolicyRecommendation{}, policyRefKey, func(rawObj client.Object) []string {
policyRecommendation := rawObj.(*ottoscaleriov1alpha1.PolicyRecommendation)
if policyRecommendation.Spec.Policy == "" {
return nil
}
return []string{policyRecommendation.Spec.Policy}
}); err != nil {
return err
}

reconcilePredicate := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*ottoscaleriov1alpha1.Policy)
newObj := e.ObjectNew.(*ottoscaleriov1alpha1.Policy)

return !reflect.DeepEqual(oldObj.Spec, newObj.Spec)
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&ottoscaleriov1alpha1.Policy{}).
Watches(&source.Kind{Type: &ottoscaleriov1alpha1.Policy{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(predicate.Or(predicate.GenerationChangedPredicate{}, reconcilePredicate))).
Named(PolicyWatcherCtrl).
Complete(r)
}

func (r *PolicyWatcher) addFinalizer(ctx context.Context, policy ottoscaleriov1alpha1.Policy) (ottoscaleriov1alpha1.Policy, error) {
// Add finalizer to the policy if it doesn't have one
if !containsString(policy.ObjectMeta.Finalizers, policyFinalizerName) {
policy.ObjectMeta.Finalizers = append(policy.ObjectMeta.Finalizers, policyFinalizerName)
if err := r.Client.Update(ctx, &policy); err != nil {
return ottoscaleriov1alpha1.Policy{}, err
}
}

return policy, nil
}

func (r *PolicyWatcher) handleReconcilation(ctx context.Context, policy ottoscaleriov1alpha1.Policy, logger logr.Logger) error {
// Get all PolicyRecommendation objects that reference the Policy object
var policyRecommendations ottoscaleriov1alpha1.PolicyRecommendationList
if err := r.Client.List(ctx, &policyRecommendations, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(policyRefKey, policy.Name)}); err != nil {
return err
}

// Requeue all PolicyRecommendation objects having the Policy object as a reference
for _, policyRecommendation := range policyRecommendations.Items {
logger.Info("Requeueing PolicyRecommendation as some update/delete in seen the policy field", "policyRecommendation", policyRecommendation.Name, "Namespace", policyRecommendation.Namespace,
"policy", policyRecommendation.Spec.Policy)
r.requeueOneFunc(types.NamespacedName{Namespace: policyRecommendation.Namespace,
Name: policyRecommendation.Name})
}

return nil
}

func containsString(slice []string, str string) bool {
for _, s := range slice {
if s == str {
return true
}
}

return false
}

func removeString(slice []string, str string) []string {
var result []string
for _, s := range slice {
if s != str {
result = append(result, s)
}
}

return result
}
147 changes: 139 additions & 8 deletions pkg/controller/policy_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ package controller

import (
"context"
"fmt"
"time"

ottoscaleriov1alpha1 "github.com/flipkart-incubator/ottoscalr/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"time"
)

var _ = Describe("PolicyWatcher controller", func() {
const (
timeout = time.Second * 10
interval = time.Millisecond * 250
timeout = time.Second * 10
interval = time.Millisecond * 250
PolicyRecoName = "test-deployment-afgre"
PolicyRecoNamespace = "default"
)

BeforeEach(func() {
queuedAllRecos = false
})
var policy1, policy2, policy3 ottoscaleriov1alpha1.Policy
Context("When updating default policy", func() {
AfterEach(func() {
Expect(k8sClient.Delete(ctx, &policy1)).Should(Succeed())
Expect(k8sClient.Delete(ctx, &policy2)).Should(Succeed())
Expect(k8sClient.Delete(ctx, &policy3)).Should(Succeed())
})

It("Should mark other policies as non-default and requeue all policy recommendations ", func() {
By("Seeding all policies")
ctx := context.TODO()
Expand Down Expand Up @@ -62,6 +62,9 @@ var _ = Describe("PolicyWatcher controller", func() {
Expect(k8sClient.Create(ctx, &policy2)).Should(Succeed())
Expect(k8sClient.Create(ctx, &policy3)).Should(Succeed())

time.Sleep(2 * time.Second)
policy2 = ottoscaleriov1alpha1.Policy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "policy2"}, &policy2)).Should(Succeed())
policy2.Spec.IsDefault = true
err := k8sClient.Update(ctx, &policy2)
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -97,6 +100,134 @@ var _ = Describe("PolicyWatcher controller", func() {

By("Testing that queuedAllRecos was called")
Eventually(Expect(queuedAllRecos).Should(BeTrue()))

Expect(k8sClient.Delete(ctx, &policy1)).Should(Succeed())
Expect(k8sClient.Delete(ctx, &policy2)).Should(Succeed())
Expect(k8sClient.Delete(ctx, &policy3)).Should(Succeed())
time.Sleep(1 * time.Second)
})
})
Context("When handling Add/Update/Delete on a Policy", func() {
policyreco1 := &ottoscaleriov1alpha1.PolicyRecommendation{}
policyreco2 := &ottoscaleriov1alpha1.PolicyRecommendation{}

It("Should add finalizer if not present and requeue all policyrecommendations having this policy ", func() {
By("Seeding all policies")
ctx := context.TODO()

policy1 = ottoscaleriov1alpha1.Policy{
ObjectMeta: metav1.ObjectMeta{Name: "policy1", Namespace: "defualt"},
Spec: ottoscaleriov1alpha1.PolicySpec{
IsDefault: false,
RiskIndex: 10,
MinReplicaPercentageCut: 100,
TargetUtilization: 15,
},
}
policy2 = ottoscaleriov1alpha1.Policy{
ObjectMeta: metav1.ObjectMeta{Name: "policy2", Namespace: "defualt"},
Spec: ottoscaleriov1alpha1.PolicySpec{
IsDefault: true,
RiskIndex: 20,
MinReplicaPercentageCut: 100,
TargetUtilization: 20,
},
}
policy3 = ottoscaleriov1alpha1.Policy{
ObjectMeta: metav1.ObjectMeta{Name: "policy3", Namespace: "defualt"},
Spec: ottoscaleriov1alpha1.PolicySpec{
IsDefault: false,
RiskIndex: 30,
MinReplicaPercentageCut: 100,
TargetUtilization: 30,
},
}

Expect(k8sClient.Create(ctx, &policy1)).Should(Succeed())
Expect(k8sClient.Create(ctx, &policy2)).Should(Succeed())
Expect(k8sClient.Create(ctx, &policy3)).Should(Succeed())

time.Sleep(2 * time.Second)

//check if finalizer is added
policy1 = ottoscaleriov1alpha1.Policy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "policy1"}, &policy1)).Should(Succeed())
Expect(policy1.Finalizers).ShouldNot(BeNil())
Expect(policy1.Finalizers).Should(ContainElement(policyFinalizerName))

policy2 = ottoscaleriov1alpha1.Policy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "policy2"}, &policy2)).Should(Succeed())
Expect(policy2.Finalizers).ShouldNot(BeNil())
Expect(policy2.Finalizers).Should(ContainElement(policyFinalizerName))

policy3 = ottoscaleriov1alpha1.Policy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "policy3"}, &policy3)).Should(Succeed())
Expect(policy3.Finalizers).ShouldNot(BeNil())
Expect(policy3.Finalizers).Should(ContainElement(policyFinalizerName))

//create two policy recommendations for policy2
now := metav1.Now()
policyreco1 = &ottoscaleriov1alpha1.PolicyRecommendation{
ObjectMeta: metav1.ObjectMeta{
Name: PolicyRecoName,
Namespace: PolicyRecoNamespace,
},
Spec: ottoscaleriov1alpha1.PolicyRecommendationSpec{
WorkloadMeta: ottoscaleriov1alpha1.WorkloadMeta{
Name: PolicyRecoName,
},
Policy: policy2.Name,
GeneratedAt: &now,
TransitionedAt: &now,
QueuedForExecution: &falseBool,
},
}
policyreco2 = &ottoscaleriov1alpha1.PolicyRecommendation{
ObjectMeta: metav1.ObjectMeta{
Name: PolicyRecoName + "2",
Namespace: PolicyRecoNamespace,
},
Spec: ottoscaleriov1alpha1.PolicyRecommendationSpec{
WorkloadMeta: ottoscaleriov1alpha1.WorkloadMeta{
Name: PolicyRecoName + "2",
},
Policy: policy2.Name,
GeneratedAt: &now,
TransitionedAt: &now,
QueuedForExecution: &falseBool,
},
}

Expect(k8sClient.Create(ctx, policyreco1)).Should(Succeed())
Expect(k8sClient.Create(ctx, policyreco2)).Should(Succeed())

time.Sleep(2 * time.Second)

//update policy2 now
policy2.Spec.RiskIndex = 4
queuedOneReco = nil
err := k8sClient.Update(ctx, &policy2)
Expect(err).ToNot(HaveOccurred())

time.Sleep(2 * time.Second)
//check if policyreco1 and policyreco2 are queued for execution
Expect(len(queuedOneReco)).Should(Equal(2))
Expect(queuedOneReco[0]).Should(Equal(true))
Expect(queuedOneReco[1]).Should(Equal(true))

//delete policy2 now, again check if policyreco1 and policyreco2 are queued for execution
queuedOneReco = nil
policy2 := ottoscaleriov1alpha1.Policy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: "policy2"}, &policy2)).Should(Succeed())
fmt.Println(policy2)
Expect(k8sClient.Delete(ctx, &policy2)).Should(Succeed())
time.Sleep(5 * time.Second)
Expect(len(queuedOneReco)).Should(Equal(2))
Expect(queuedOneReco[0]).Should(Equal(true))
Expect(queuedOneReco[1]).Should(Equal(true))

Expect(k8sClient.Delete(ctx, &policy1)).Should(Succeed())
Expect(k8sClient.Delete(ctx, &policy3)).Should(Succeed())
})
})
})
Loading

0 comments on commit 14fb7dd

Please sign in to comment.