Skip to content

Commit

Permalink
add delete and finalizer to ragengine
Browse files Browse the repository at this point in the history
Signed-off-by: Bangqi Zhu <[email protected]>
  • Loading branch information
Bangqi Zhu committed Oct 23, 2024
1 parent 788c32c commit bc8e411
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 13 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (
// WorkspaceConditionTypeTuningJobStatus is the state when the tuning job starts normally.
WorkspaceConditionTypeTuningJobStatus ConditionType = ConditionType("JobStarted")

//RAGEngineConditionTypeDeleting is the RAGEngine state when starts to get deleted.
RAGEngineConditionTypeDeleting = ConditionType("RAGEngineDeleting")

//WorkspaceConditionTypeDeleting is the Workspace state when starts to get deleted.
WorkspaceConditionTypeDeleting = ConditionType("WorkspaceDeleting")

Expand Down
33 changes: 33 additions & 0 deletions pkg/controllers/ragengine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand Down Expand Up @@ -64,6 +65,15 @@ func (c *RAGEngineReconciler) Reconcile(ctx context.Context, req reconcile.Reque

klog.InfoS("Reconciling", "RAG Engine", req.NamespacedName)

if err := c.ensureFinalizer(ctx, ragEngineObj); err != nil {
return reconcile.Result{}, err
}

// Handle deleting ragengine, garbage collect all the resources.
if !ragEngineObj.DeletionTimestamp.IsZero() {
return c.deleteRAGEngine(ctx, ragEngineObj)
}

result, err := c.addRAGEngine(ctx, ragEngineObj)
if err != nil {
return result, err
Expand All @@ -72,6 +82,18 @@ func (c *RAGEngineReconciler) Reconcile(ctx context.Context, req reconcile.Reque
return result, nil
}

func (c *RAGEngineReconciler) ensureFinalizer(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) error {
if !controllerutil.ContainsFinalizer(ragEngineObj, consts.RAGEngineFinalizer) {
patch := client.MergeFrom(ragEngineObj.DeepCopy())
controllerutil.AddFinalizer(ragEngineObj, consts.RAGEngineFinalizer)
if err := c.Client.Patch(ctx, ragEngineObj, patch); err != nil {
klog.ErrorS(err, "failed to ensure the finalizer to the ragengine", "ragengine", klog.KObj(ragEngineObj))
return err
}
}
return nil
}

func (c *RAGEngineReconciler) addRAGEngine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) (reconcile.Result, error) {
err := c.applyRAGEngineResource(ctx, ragEngineObj)
if err != nil {
Expand All @@ -80,6 +102,17 @@ func (c *RAGEngineReconciler) addRAGEngine(ctx context.Context, ragEngineObj *ka
return reconcile.Result{}, nil
}

func (c *RAGEngineReconciler) deleteRAGEngine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) (reconcile.Result, error) {
klog.InfoS("deleteRAGEngine", "ragengine", klog.KObj(ragEngineObj))
err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGEngineConditionTypeDeleting, metav1.ConditionTrue, "ragengineDeleted", "ragengine is being deleted")
if err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return reconcile.Result{}, err
}

return c.garbageCollectRAGEngine(ctx, ragEngineObj)
}

// applyRAGEngineResource applies RAGEngine resource spec.
func (c *RAGEngineReconciler) applyRAGEngineResource(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) error {

Expand Down
64 changes: 64 additions & 0 deletions pkg/controllers/ragengine_gc_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

package controllers

import (
"context"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/azure/kaito/pkg/featuregates"
"github.com/azure/kaito/pkg/machine"
"github.com/azure/kaito/pkg/nodeclaim"
"github.com/azure/kaito/pkg/utils/consts"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

// garbageCollectRAGEngine remove finalizer associated with ragengine object.
func (c *RAGEngineReconciler) garbageCollectRAGEngine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) (ctrl.Result, error) {
klog.InfoS("garbageCollectRAGEngine", "ragengine", klog.KObj(ragEngineObj))

// Check if there are any machines associated with this ragengine.
mList, err := machine.ListMachines(ctx, ragEngineObj, c.Client)
if err != nil {
return ctrl.Result{}, err
}
// We should delete all the machines that are created by this ragengine
for i := range mList.Items {
if deleteErr := c.Delete(ctx, &mList.Items[i], &client.DeleteOptions{}); deleteErr != nil {
klog.ErrorS(deleteErr, "failed to delete the machine", "machine", klog.KObj(&mList.Items[i]))
return ctrl.Result{}, deleteErr
}
}

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
// Check if there are any nodeClaims associated with this ragengine.
ncList, err := nodeclaim.ListNodeClaim(ctx, ragEngineObj, c.Client)
if err != nil {
return ctrl.Result{}, err
}

// We should delete all the nodeClaims that are created by this ragengine
for i := range ncList.Items {
if deleteErr := c.Delete(ctx, &ncList.Items[i], &client.DeleteOptions{}); deleteErr != nil {
klog.ErrorS(deleteErr, "failed to delete the nodeClaim", "nodeClaim", klog.KObj(&ncList.Items[i]))
return ctrl.Result{}, deleteErr
}
}
}

staleWObj := ragEngineObj.DeepCopy()
staleWObj.SetFinalizers(nil)
if updateErr := c.Update(ctx, staleWObj, &client.UpdateOptions{}); updateErr != nil {
klog.ErrorS(updateErr, "failed to remove the finalizer from the ragengine",
"ragengine", klog.KObj(ragEngineObj), "ragengine", klog.KObj(staleWObj))
return ctrl.Result{}, updateErr
}
klog.InfoS("successfully removed the ragengine finalizers",
"ragengine", klog.KObj(ragEngineObj))
controllerutil.RemoveFinalizer(ragEngineObj, consts.RAGEngineFinalizer)
return ctrl.Result{}, nil
}
10 changes: 5 additions & 5 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj interface{}) *v1alpha5.Machine {

// Determine the type of the input object and extract relevant fields
instanceType, namespace, name, labelSelector, err := resources.ExtractObjFields(obj)
instanceType, namespace, name, labelSelector, nameLabel, namespaceLabel, err := resources.ExtractObjFields(obj)
if err != nil {
klog.Error(err)
return nil
Expand All @@ -51,9 +51,9 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj
digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
machineName := "ws" + hex.EncodeToString(digest[0:])[0:9]
machineLabels := map[string]string{
LabelProvisionerName: ProvisionerName,
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
LabelProvisionerName: ProvisionerName,
nameLabel: name,
namespaceLabel: namespace,
}

if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
Expand Down Expand Up @@ -146,7 +146,7 @@ func WaitForPendingMachines(ctx context.Context, obj interface{}, kubeClient cli
var instanceType string

// Determine the type of the input object and retrieve the InstanceType
instanceType, _, _, _, err := resources.ExtractObjFields(obj)
instanceType, _, _, _, _, _, err := resources.ExtractObjFields(obj)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/nodeclaim/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, o
klog.InfoS("GenerateNodeClaimManifest", "object", obj)

// Determine the type of the input object and extract relevant fields
instanceType, namespace, name, labelSelector, err := resources.ExtractObjFields(obj)
instanceType, namespace, name, labelSelector, nameLabel, namespaceLabel, err := resources.ExtractObjFields(obj)
if err != nil {
klog.Error(err)
return nil
Expand All @@ -56,9 +56,9 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, o
nodeClaimName := GenerateNodeClaimName(obj)

nodeClaimLabels := map[string]string{
LabelNodePool: KaitoNodePoolName, // Fake nodepool name to prevent Karpenter from scaling up.
kaitov1alpha1.LabelWorkspaceName: name,
kaitov1alpha1.LabelWorkspaceNamespace: namespace,
LabelNodePool: KaitoNodePoolName, // Fake nodepool name to prevent Karpenter from scaling up.
nameLabel: name,
namespaceLabel: namespace,
}
if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
nodeClaimLabels = lo.Assign(nodeClaimLabels, labelSelector.MatchLabels)
Expand Down Expand Up @@ -143,7 +143,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, o
// GenerateNodeClaimName generates a nodeClaim name from the given workspace or RAGEngine.
func GenerateNodeClaimName(obj interface{}) string {
// Determine the type of the input object and extract relevant fields
_, namespace, name, _, err := resources.ExtractObjFields(obj)
_, namespace, name, _, _, _, err := resources.ExtractObjFields(obj)
if err != nil {
return ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func CreateKarpenterNodeClass(ctx context.Context, kubeClient client.Client) err
func WaitForPendingNodeClaims(ctx context.Context, obj interface{}, kubeClient client.Client) error {

// Determine the type of the input object and retrieve the InstanceType
instanceType, _, _, _, err := resources.ExtractObjFields(obj)
instanceType, _, _, _, _, _, err := resources.ExtractObjFields(obj)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/resources/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,23 @@ func CheckNvidiaPlugin(ctx context.Context, nodeObj *corev1.Node) bool {
return false
}

func ExtractObjFields(obj interface{}) (instanceType, namespace, name string, labelSelector *metav1.LabelSelector, err error) {
func ExtractObjFields(obj interface{}) (instanceType, namespace, name string, labelSelector *metav1.LabelSelector,
nameLabel, namespaceLabel string, err error) {
switch o := obj.(type) {
case *kaitov1alpha1.Workspace:
instanceType = o.Resource.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Resource.LabelSelector
nameLabel = kaitov1alpha1.LabelWorkspaceName
namespaceLabel = kaitov1alpha1.LabelWorkspaceNamespace
case *kaitov1alpha1.RAGEngine:
instanceType = o.Spec.Compute.InstanceType
namespace = o.Namespace
name = o.Name
labelSelector = o.Spec.Compute.LabelSelector
nameLabel = kaitov1alpha1.LabelRAGEngineName
namespaceLabel = kaitov1alpha1.LabelRAGEngineNamespace
default:
err = fmt.Errorf("unsupported object type: %T", obj)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/utils/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package consts

const (
// WorkspaceFinalizer is used to make sure that workspace controller handles garbage collection.
WorkspaceFinalizer = "workspace.finalizer.kaito.sh"
WorkspaceFinalizer = "workspace.finalizer.kaito.sh"
// RAGEngineFinalizer is used to make sure that ragengine controller handles garbage collection.
RAGEngineFinalizer = "ragengine.finalizer.kaito.sh"
DefaultReleaseNamespaceEnvVar = "RELEASE_NAMESPACE"
FeatureFlagKarpenter = "Karpenter"
AzureCloudName = "azure"
Expand Down

0 comments on commit bc8e411

Please sign in to comment.