Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT - feat: Limit cache to watch only relevant objects #994

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions automation/e2e-upgrade-functests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,9 @@ oc apply -n $NAMESPACE -f "https://github.com/kubevirt/ssp-operator/releases/dow
oc wait --for=condition=Available --timeout=600s -n ${NAMESPACE} deployments/ssp-operator

SSP_NAME="ssp-test"
SSP_NAMESPACE="ssp-operator-functests"
SSP_NAMESPACE="kubevirt"
SSP_TEMPLATES_NAMESPACE="ssp-operator-functests-templates"

oc apply -f - <<EOF
apiVersion: v1
kind: Namespace
metadata:
name: ${SSP_NAMESPACE}
labels:
openshift.io/cluster-monitoring: "true"
EOF

oc apply -f - <<EOF
apiVersion: v1
kind: Namespace
Expand All @@ -66,7 +57,6 @@ export VALIDATOR_IMG=${CI_VALIDATOR_IMG}
export IMG=${CI_OPERATOR_IMG}
export SKIP_CLEANUP_AFTER_TESTS="true"
export TEST_EXISTING_CR_NAME="${SSP_NAME}"
export TEST_EXISTING_CR_NAMESPACE="${SSP_NAMESPACE}"
export IS_UPGRADE_LANE="true"

make deploy functest
2 changes: 2 additions & 0 deletions internal/common/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

const (
WatchedObjectLabel = "ssp.kubevirt.io/watched"

AppKubernetesNameLabel = "app.kubernetes.io/name"
AppKubernetesPartOfLabel = "app.kubernetes.io/part-of"
AppKubernetesVersionLabel = "app.kubernetes.io/version"
Expand Down
33 changes: 31 additions & 2 deletions internal/common/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (r *reconcileBuilder) Reconcile() (ReconcileResult, error) {
found := newEmptyResource(r.resource)
found.SetName(r.resource.GetName())
found.SetNamespace(r.resource.GetNamespace())
// TODO -- consider refactoring this mutateFn out.
mutateFn := func() error {
if !found.GetDeletionTimestamp().IsZero() {
// Skip update, because the resource is being deleted
Expand All @@ -177,11 +178,22 @@ func (r *reconcileBuilder) Reconcile() (ReconcileResult, error) {

UpdateLabels(r.resource, found)
updateAnnotations(r.resource, found)
// TODO -- consider how the watch label relates to version cache
if r.options.AlwaysCallUpdateFunc || !r.request.VersionCache.Contains(found) {
// The generation was updated by other cluster components,
// operator needs to update the resource
r.updateFunc(r.resource, found)
}

// TODO -- move to a different function, because this label is needed
// by all resources

// Adding this label after, so that updateFunc cannot remove it.
if found.GetLabels() == nil {
found.SetLabels(map[string]string{})
}
found.GetLabels()[WatchedObjectLabel] = "true"

return nil
}

Expand Down Expand Up @@ -287,10 +299,27 @@ func (r *reconcileBuilder) createOrUpdateWithImmutableSpec(obj client.Object, f
if err := mutate(f, key, obj); err != nil {
return OperationResultNone, nil, err
}
if err := r.request.Client.Create(r.request.Context, obj); err != nil {

err := r.request.Client.Create(r.request.Context, obj)
if err == nil {
return OperationResultCreated, nil, nil
}
if !errors.IsAlreadyExists(err) {
return OperationResultNone, nil, err
}

// If the get operation above does not find the object and
// create returns an error, because the object already exists,
// then it means that cache is stale. This can happen
// if the cache only watches objects with a certain label,
// and the object was created without the label.
//
// Using uncached client to read the object again.

// TODO -- validate that this is correct.
if err = r.request.UncachedReader.Get(r.request.Context, key, obj); err != nil {
return OperationResultNone, nil, err
}
return OperationResultCreated, nil, nil
}

existing := obj.DeepCopyObject().(client.Object)
Expand Down
7 changes: 7 additions & 0 deletions internal/common/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
name = "test-ssp"
)

// TODO -- add unit tests for the watch label

var _ = Describe("Resource", func() {
var (
request Request
Expand Down Expand Up @@ -340,5 +342,10 @@ func expectEqualResourceExists(resource client.Object, request *Request) {
resource.SetResourceVersion(found.GetResourceVersion())
resource.SetOwnerReferences(found.GetOwnerReferences())

if resource.GetLabels() == nil {
resource.SetLabels(map[string]string{})
}
resource.GetLabels()[WatchedObjectLabel] = "true"

ExpectWithOffset(1, found).To(Equal(resource))
}
19 changes: 18 additions & 1 deletion internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ package controllers

import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

crd_watch "kubevirt.io/ssp-operator/internal/crd-watch"
)

type Controller interface {
Name() string
AddToManager(mgr ctrl.Manager, crdList crd_watch.CrdList) error
RequiredCrds() []string
GetWatchObjects() []WatchObject
}

type WatchObject struct {
// Object is the object that this option applies to.
Object client.Object

// CrdName is the name of the CRD that defines this object.
CrdName string

// WatchOnlyOperatorNamespace sets if the cache should only watch the object type
// in the same namespace where the operator is defined.
WatchOnlyOperatorNamespace bool

// WatchOnlyObjectsWithLabel sets if the cache should only watch objets
/// with label "ssp.kubevirt.io/watched".
WatchOnlyObjectsWithLabel bool
}
6 changes: 4 additions & 2 deletions internal/controllers/services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ func (s *serviceReconciler) AddToManager(mgr ctrl.Manager, _ crd_watch.CrdList)
}))
}

func (s *serviceReconciler) RequiredCrds() []string {
return nil
func (s *serviceReconciler) GetWatchObjects() []WatchObject {
return []WatchObject{{
Object: &v1.Service{},
}}
}

func (s *serviceReconciler) setServiceOwnerReference(service *v1.Service) error {
Expand Down
8 changes: 6 additions & 2 deletions internal/controllers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// Need to watch CRDs
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch

func StartControllers(ctx context.Context, mgr controllerruntime.Manager, controllers []Controller) error {
mgrCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -105,7 +105,11 @@ func CreateControllers(ctx context.Context, apiReader client.Reader) ([]Controll
func setupManager(ctx context.Context, cancel context.CancelFunc, mgr controllerruntime.Manager, controllers []Controller) error {
var requiredCrds []string
for _, controller := range controllers {
requiredCrds = append(requiredCrds, controller.RequiredCrds()...)
for _, watchObject := range controller.GetWatchObjects() {
if watchObject.CrdName != "" {
requiredCrds = append(requiredCrds, watchObject.CrdName)
}
}
}

crdWatch := crd_watch.New(mgr.GetCache(), requiredCrds...)
Expand Down
45 changes: 26 additions & 19 deletions internal/controllers/ssp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ var _ Controller = &sspController{}

var _ reconcile.Reconciler = &sspController{}

// +kubebuilder:rbac:groups=ssp.kubevirt.io,resources=ssps,verbs=list;watch;update
// +kubebuilder:rbac:groups=ssp.kubevirt.io,resources=ssps,verbs=get;list;watch;update
// +kubebuilder:rbac:groups=ssp.kubevirt.io,resources=ssps/status,verbs=update
// +kubebuilder:rbac:groups=ssp.kubevirt.io,resources=ssps/finalizers,verbs=update
// +kubebuilder:rbac:groups=config.openshift.io,resources=infrastructures;clusterversions,verbs=get
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list

func (s *sspController) Name() string {
return "ssp-controller"
Expand Down Expand Up @@ -131,27 +131,34 @@ func (s *sspController) AddToManager(mgr ctrl.Manager, crdList crd_watch.CrdList
return builder.Complete(s)
}

func (r *sspController) RequiredCrds() []string {
var result []string
for _, operand := range r.operands {
result = append(result, getRequiredCrds(operand)...)
}
return result
}
func (s *sspController) GetWatchObjects() []WatchObject {
var results []WatchObject

results = append(results, WatchObject{
Object: &ssp.SSP{},
CrdName: "ssps.ssp.kubevirt.io",
WatchOnlyOperatorNamespace: true,
})

func getRequiredCrds(operand operands.Operand) []string {
var result []string
for _, watchType := range operand.WatchTypes() {
if watchType.Crd != "" {
result = append(result, watchType.Crd)
for _, operand := range s.operands {
for _, watchType := range operand.WatchTypes() {
results = append(results, WatchObject{
Object: watchType.Object,
CrdName: watchType.Crd,
WatchOnlyOperatorNamespace: true,
WatchOnlyObjectsWithLabel: watchType.WatchOnlyWithLabel,
})
}
}
for _, watchType := range operand.WatchClusterTypes() {
if watchType.Crd != "" {
result = append(result, watchType.Crd)
for _, watchType := range operand.WatchClusterTypes() {
results = append(results, WatchObject{
Object: watchType.Object,
CrdName: watchType.Crd,
WatchOnlyObjectsWithLabel: watchType.WatchOnlyWithLabel,
})
}
}
return result

return results
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
11 changes: 9 additions & 2 deletions internal/controllers/vm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ func (v *vmController) AddToManager(mgr ctrl.Manager, crdList crd_watch.CrdList)
Complete(v)
}

func (v *vmController) RequiredCrds() []string {
return []string{getVmCrd()}
func (v *vmController) GetWatchObjects() []WatchObject {
return []WatchObject{{
Object: &kubevirtv1.VirtualMachine{},
CrdName: getVmCrd(),
}, {
Object: &corev1.PersistentVolumeClaim{},
}, {
Object: &corev1.PersistentVolume{},
}}
}

func (v *vmController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
6 changes: 4 additions & 2 deletions internal/controllers/webhook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func (w *webhookCtrl) AddToManager(mgr ctrl.Manager, _ crd_watch.CrdList) error
Complete(w)
}

func (w *webhookCtrl) RequiredCrds() []string {
return nil
func (w *webhookCtrl) GetWatchObjects() []WatchObject {
return []WatchObject{{
Object: &admissionv1.ValidatingWebhookConfiguration{},
}}
}

func (w *webhookCtrl) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand Down
19 changes: 13 additions & 6 deletions internal/operands/common-instancetypes/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

// Define RBAC rules needed by this operand:
// +kubebuilder:rbac:groups=instancetype.kubevirt.io,resources=virtualmachineclusterinstancetypes,verbs=list;watch;create;update;delete
// +kubebuilder:rbac:groups=instancetype.kubevirt.io,resources=virtualmachineclusterpreferences,verbs=list;watch;create;update;delete
// +kubebuilder:rbac:groups=instancetype.kubevirt.io,resources=virtualmachineclusterinstancetypes,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=instancetype.kubevirt.io,resources=virtualmachineclusterpreferences,verbs=get;list;watch;create;update;delete

const (
operandName = "common-instancetypes"
Expand Down Expand Up @@ -57,10 +57,17 @@ func (c *CommonInstancetypes) Name() string {
}

func WatchClusterTypes() []operands.WatchType {
return []operands.WatchType{
{Object: &instancetypev1beta1.VirtualMachineClusterInstancetype{}, Crd: virtualMachineClusterInstancetypeCrd, WatchFullObject: true},
{Object: &instancetypev1beta1.VirtualMachineClusterPreference{}, Crd: virtualMachineClusterPreferenceCrd, WatchFullObject: true},
}
return []operands.WatchType{{
Object: &instancetypev1beta1.VirtualMachineClusterInstancetype{},
Crd: virtualMachineClusterInstancetypeCrd,
WatchFullObject: true,
WatchOnlyWithLabel: true,
}, {
Object: &instancetypev1beta1.VirtualMachineClusterPreference{},
Crd: virtualMachineClusterPreferenceCrd,
WatchFullObject: true,
WatchOnlyWithLabel: true,
}}
}

func (c *CommonInstancetypes) WatchClusterTypes() []operands.WatchType {
Expand Down
8 changes: 5 additions & 3 deletions internal/operands/common-templates/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ func init() {
}

func WatchClusterTypes() []operands.WatchType {
return []operands.WatchType{
{Object: &templatev1.Template{}},
}
return []operands.WatchType{{
Object: &templatev1.Template{},
// TODO -- consider setting to "true" in the future
WatchOnlyWithLabel: false,
}}
}

type commonTemplates struct {
Expand Down
36 changes: 27 additions & 9 deletions internal/operands/data-sources/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// Define RBAC rules needed by this operand:
// Define RBAC rules needed by this operand:
// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;roles;rolebindings,verbs=list;watch;create;update;delete
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;roles;rolebindings,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=cdi.kubevirt.io,resources=datasources,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=cdi.kubevirt.io,resources=dataimportcrons,verbs=get;list;watch;create;update;patch;delete

Expand Down Expand Up @@ -47,15 +47,29 @@ func init() {
}

func WatchClusterTypes() []operands.WatchType {
return []operands.WatchType{
{Object: &rbac.ClusterRole{}},
{Object: &rbac.Role{}},
{Object: &rbac.RoleBinding{}},
{Object: &core.Namespace{}},
return []operands.WatchType{{
Object: &rbac.ClusterRole{},
WatchOnlyWithLabel: true,
}, {
Object: &rbac.Role{},
WatchOnlyWithLabel: true,
}, {
Object: &rbac.RoleBinding{},
WatchOnlyWithLabel: true,
}, {
Object: &core.Namespace{},
WatchOnlyWithLabel: true,
}, {
Object: &cdiv1beta1.DataSource{},
Crd: dataSourceCrd,
// Need to watch status of DataSource to notice if referenced PVC was deleted.
{Object: &cdiv1beta1.DataSource{}, Crd: dataSourceCrd, WatchFullObject: true},
{Object: &cdiv1beta1.DataImportCron{}, Crd: dataImportCronCrd},
}
WatchFullObject: true,
WatchOnlyWithLabel: true,
}, {
Object: &cdiv1beta1.DataImportCron{},
Crd: dataImportCronCrd,
WatchOnlyWithLabel: true,
}}
}

type dataSources struct {
Expand Down Expand Up @@ -384,6 +398,10 @@ func reconcileDataSource(dsInfo dataSourceInfo, request *common.Request) (common
ClusterResource(dsInfo.dataSource).
Options(common.ReconcileOptions{AlwaysCallUpdateFunc: true}).
UpdateFunc(func(newRes, foundRes client.Object) {
// TODO -- think about data sources and the watched label
// - if DS is created by CDI, it should not have the watched label
// - how does this method work with stale cache?

if dsInfo.autoUpdateEnabled {
if foundRes.GetLabels() == nil {
foundRes.SetLabels(make(map[string]string))
Expand Down
Loading
Loading