Skip to content

Commit

Permalink
optimize webhook by parsing managedJobsNamespaceSelector at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Dec 18, 2024
1 parent 607dfef commit 6f3e60b
Showing 1 changed file with 57 additions and 36 deletions.
93 changes: 57 additions & 36 deletions internal/webhook/appwrapper_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
authv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -54,36 +55,42 @@ const (
QueueNameLabel = "kueue.x-k8s.io/queue-name"
)

type AppWrapperWebhook struct {
Config *config.AppWrapperConfig
SubjectAccessReviewer authClientv1.SubjectAccessReviewInterface
client client.Client
DiscoveryClient *discovery.DiscoveryClient
type rbacACSupport struct {
discoveryClient *discovery.DiscoveryClient
subjectAccessReviewer authClientv1.SubjectAccessReviewInterface
kindToResourceCache map[string]string
}

type appWrapperWebhook struct {
client client.Client
defaultQueueName string
enableKueueIntegrations bool
manageJobsWithoutQueueName bool
managedJobsNamespaceSelector labels.Selector
userRBACAdmissionCheck bool

// support for userRBACAdmissionCheck; will be nil if it is not enabled
rbacACSupport *rbacACSupport
}

//+kubebuilder:webhook:path=/mutate-workload-codeflare-dev-v1beta2-appwrapper,mutating=true,failurePolicy=fail,sideEffects=None,groups=workload.codeflare.dev,resources=appwrappers,verbs=create,versions=v1beta2,name=mappwrapper.kb.io,admissionReviewVersions=v1

var _ webhook.CustomDefaulter = &AppWrapperWebhook{}
var _ webhook.CustomDefaulter = &appWrapperWebhook{}

// Default fills in default values when an AppWrapper is created:
// 1. Inject default queue name
// 2. Ensure Suspend is set appropriately
// 3. Add labels with the user name and id
func (w *AppWrapperWebhook) Default(ctx context.Context, obj runtime.Object) error {
func (w *appWrapperWebhook) Default(ctx context.Context, obj runtime.Object) error {
aw := obj.(*workloadv1beta2.AppWrapper)
log.FromContext(ctx).V(2).Info("Applying defaults", "job", aw)

// Queue name and Suspend
if w.Config.EnableKueueIntegrations {
if w.Config.DefaultQueueName != "" {
aw.Labels = utilmaps.MergeKeepFirst(aw.Labels, map[string]string{QueueNameLabel: w.Config.DefaultQueueName})
if w.enableKueueIntegrations {
if w.defaultQueueName != "" {
aw.Labels = utilmaps.MergeKeepFirst(aw.Labels, map[string]string{QueueNameLabel: w.defaultQueueName})
}
nsSelector, err := metav1.LabelSelectorAsSelector(w.Config.KueueJobReconciller.ManageJobsNamespaceSelector)
if err != nil {
return err
}
err = jobframework.ApplyDefaultForSuspend(ctx, (*wlc.AppWrapper)(aw), w.client, w.Config.KueueJobReconciller.ManageJobsWithoutQueueName, nsSelector)
err := jobframework.ApplyDefaultForSuspend(ctx, (*wlc.AppWrapper)(aw), w.client, w.manageJobsWithoutQueueName, w.managedJobsNamespaceSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -111,33 +118,33 @@ func (w *AppWrapperWebhook) Default(ctx context.Context, obj runtime.Object) err

//+kubebuilder:webhook:path=/validate-workload-codeflare-dev-v1beta2-appwrapper,mutating=false,failurePolicy=fail,sideEffects=None,groups=workload.codeflare.dev,resources=appwrappers,verbs=create;update,versions=v1beta2,name=vappwrapper.kb.io,admissionReviewVersions=v1

var _ webhook.CustomValidator = &AppWrapperWebhook{}
var _ webhook.CustomValidator = &appWrapperWebhook{}

// ValidateCreate validates invariants when an AppWrapper is created
func (w *AppWrapperWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
func (w *appWrapperWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
aw := obj.(*workloadv1beta2.AppWrapper)
log.FromContext(ctx).V(2).Info("Validating create", "job", aw)
allErrors := w.validateAppWrapperCreate(ctx, aw)
if w.Config.EnableKueueIntegrations {
if w.enableKueueIntegrations {
allErrors = append(allErrors, jobframework.ValidateJobOnCreate((*wlc.AppWrapper)(aw))...)
}
return nil, allErrors.ToAggregate()
}

// ValidateUpdate validates invariants when an AppWrapper is updated
func (w *AppWrapperWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
func (w *appWrapperWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) {
oldAW := oldObj.(*workloadv1beta2.AppWrapper)
newAW := newObj.(*workloadv1beta2.AppWrapper)
log.FromContext(ctx).V(2).Info("Validating update", "job", newAW)
allErrors := w.validateAppWrapperUpdate(oldAW, newAW)
if w.Config.EnableKueueIntegrations {
if w.enableKueueIntegrations {
allErrors = append(allErrors, jobframework.ValidateJobOnUpdate((*wlc.AppWrapper)(oldAW), (*wlc.AppWrapper)(newAW))...)
}
return nil, allErrors.ToAggregate()
}

// ValidateDelete is a noop for us, but is required to implement the CustomValidator interface
func (w *AppWrapperWebhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) {
func (w *appWrapperWebhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) {
return nil, nil
}

Expand All @@ -151,7 +158,7 @@ func (w *AppWrapperWebhook) ValidateDelete(context.Context, runtime.Object) (adm
// 3. AppWrappers must not contain any resources that the user could not create directly
// 4. Every PodSet must be well-formed: the Path must exist and must be parseable as a PodSpecTemplate
// 5. AppWrappers must contain between 1 and 8 PodSets (Kueue invariant)
func (w *AppWrapperWebhook) validateAppWrapperCreate(ctx context.Context, aw *workloadv1beta2.AppWrapper) field.ErrorList {
func (w *appWrapperWebhook) validateAppWrapperCreate(ctx context.Context, aw *workloadv1beta2.AppWrapper) field.ErrorList {
allErrors := field.ErrorList{}
components := aw.Spec.Components
componentsPath := field.NewPath("spec").Child("components")
Expand Down Expand Up @@ -182,7 +189,7 @@ func (w *AppWrapperWebhook) validateAppWrapperCreate(ctx context.Context, aw *wo
}

// 3. RBAC check: Perform SubjectAccessReview to verify user is entitled to create component
if w.Config.UserRBACAdmissionCheck {
if w.userRBACAdmissionCheck {
ra := authv1.ResourceAttributes{
Namespace: aw.Namespace,
Verb: "create",
Expand All @@ -203,7 +210,7 @@ func (w *AppWrapperWebhook) validateAppWrapperCreate(ctx context.Context, aw *wo
sar.Spec.Extra[k] = authv1.ExtraValue(v)
}
}
sar, err = w.SubjectAccessReviewer.Create(ctx, sar, metav1.CreateOptions{})
sar, err = w.rbacACSupport.subjectAccessReviewer.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
allErrors = append(allErrors, field.InternalError(compPath.Child("template"), err))
} else {
Expand Down Expand Up @@ -253,7 +260,7 @@ func (w *AppWrapperWebhook) validateAppWrapperCreate(ctx context.Context, aw *wo
}

// validateAppWrapperUpdate enforces deep immutablity of all fields that were validated by validateAppWrapperCreate
func (w *AppWrapperWebhook) validateAppWrapperUpdate(old *workloadv1beta2.AppWrapper, new *workloadv1beta2.AppWrapper) field.ErrorList {
func (w *appWrapperWebhook) validateAppWrapperUpdate(old *workloadv1beta2.AppWrapper, new *workloadv1beta2.AppWrapper) field.ErrorList {
allErrors := field.ErrorList{}
msg := "attempt to change immutable field"
componentsPath := field.NewPath("spec").Child("components")
Expand Down Expand Up @@ -297,34 +304,48 @@ func (w *AppWrapperWebhook) validateAppWrapperUpdate(old *workloadv1beta2.AppWra
return allErrors
}

func (w *AppWrapperWebhook) lookupResource(gvk *schema.GroupVersionKind) string {
if known, ok := w.kindToResourceCache[gvk.String()]; ok {
func (w *appWrapperWebhook) lookupResource(gvk *schema.GroupVersionKind) string {
if known, ok := w.rbacACSupport.kindToResourceCache[gvk.String()]; ok {
return known
}
resources, err := w.DiscoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
resources, err := w.rbacACSupport.discoveryClient.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return "*"
}
for _, r := range resources.APIResources {
if r.Kind == gvk.Kind {
w.kindToResourceCache[gvk.String()] = r.Name
w.rbacACSupport.kindToResourceCache[gvk.String()] = r.Name
return r.Name
}
}
return "*"
}

func SetupAppWrapperWebhook(mgr ctrl.Manager, awConfig *config.AppWrapperConfig) error {
kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
nsSelector, err := metav1.LabelSelectorAsSelector(awConfig.KueueJobReconciller.ManageJobsNamespaceSelector)
if err != nil {
return err
}
wh := &AppWrapperWebhook{
Config: awConfig,
client: mgr.GetClient(),
DiscoveryClient: kubeClient.DiscoveryClient,
SubjectAccessReviewer: kubeClient.AuthorizationV1().SubjectAccessReviews(),
kindToResourceCache: make(map[string]string),
wh := &appWrapperWebhook{
client: mgr.GetClient(),
defaultQueueName: awConfig.DefaultQueueName,
enableKueueIntegrations: awConfig.EnableKueueIntegrations,
manageJobsWithoutQueueName: awConfig.KueueJobReconciller.ManageJobsWithoutQueueName,
managedJobsNamespaceSelector: nsSelector,
userRBACAdmissionCheck: awConfig.UserRBACAdmissionCheck,
}

if awConfig.UserRBACAdmissionCheck {
kubeClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return err
}
wh.rbacACSupport = &rbacACSupport{
discoveryClient: kubeClient.DiscoveryClient,
subjectAccessReviewer: kubeClient.AuthorizationV1().SubjectAccessReviews(),
kindToResourceCache: make(map[string]string),
}

}

return ctrl.NewWebhookManagedBy(mgr).
Expand Down

0 comments on commit 6f3e60b

Please sign in to comment.