Skip to content

Commit

Permalink
feat: Support Karpenter NodeClaim API in workspace (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
helayoty authored May 7, 2024
1 parent 2c42bdd commit cb2d964
Show file tree
Hide file tree
Showing 12 changed files with 759 additions and 195 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/workspace_condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const (
// WorkspaceConditionTypeMachineStatus is the state when checking machine status.
WorkspaceConditionTypeMachineStatus = ConditionType("MachineReady")

// WorkspaceConditionTypeNodeClaimStatus is the state when checking nodeClaim status.
WorkspaceConditionTypeNodeClaimStatus = ConditionType("NodeClaimReady")

// WorkspaceConditionTypeResourceStatus is the state when Resource has been created.
WorkspaceConditionTypeResourceStatus = ConditionType("ResourceReady")

Expand Down
174 changes: 141 additions & 33 deletions pkg/controllers/workspace_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

package controllers

import (
Expand All @@ -9,9 +10,12 @@ import (
"strings"
"time"

"github.com/azure/kaito/pkg/featuregates"
"github.com/azure/kaito/pkg/nodeclaim"
"github.com/azure/kaito/pkg/tuning"
"github.com/azure/kaito/pkg/utils/consts"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
Expand Down Expand Up @@ -80,7 +84,8 @@ func (c *WorkspaceReconciler) Reconcile(ctx context.Context, req reconcile.Reque

if workspaceObj.Inference != nil && workspaceObj.Inference.Preset != nil {
if !plugin.KaitoModelRegister.Has(string(workspaceObj.Inference.Preset.Name)) {
return reconcile.Result{}, fmt.Errorf("The preset model name %s is not registered for workspace %s/%s", string(workspaceObj.Inference.Preset.Name), workspaceObj.Namespace, workspaceObj.Name)
return reconcile.Result{}, fmt.Errorf("the preset model name %s is not registered for workspace %s/%s",
string(workspaceObj.Inference.Preset.Name), workspaceObj.Namespace, workspaceObj.Name)
}
}

Expand All @@ -96,8 +101,9 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
}
// if error is due to machine instance types unavailability, stop reconcile.
if err.Error() == machine.ErrorInstanceTypesUnavailable {
// if error is due to machine/nodeClaim instance types unavailability, stop reconcile.
if err.Error() == machine.ErrorInstanceTypesUnavailable ||
err.Error() == nodeclaim.ErrorInstanceTypesUnavailable {
return reconcile.Result{Requeue: false}, err
}
return reconcile.Result{}, err
Expand Down Expand Up @@ -139,7 +145,6 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka

func (c *WorkspaceReconciler) deleteWorkspace(ctx context.Context, wObj *kaitov1alpha1.Workspace) (reconcile.Result, error) {
klog.InfoS("deleteWorkspace", "workspace", klog.KObj(wObj))
// TODO delete workspace, machine(s), fine_tuning and inference (deployment, service) obj ( ok to delete machines? which will delete nodes??)
err := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeDeleting, metav1.ConditionTrue, "workspaceDeleted", "workspace is being deleted")
if err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
Expand All @@ -149,7 +154,7 @@ func (c *WorkspaceReconciler) deleteWorkspace(ctx context.Context, wObj *kaitov1
return c.garbageCollectWorkspace(ctx, wObj)
}

func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node {
func (c *WorkspaceReconciler) selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node {

sort.Slice(qualified, func(i, j int) bool {
iPreferred := utils.Contains(preferred, qualified[i].Name)
Expand All @@ -168,13 +173,20 @@ func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous
} else if !iPrevious && jPrevious {
return false
} else { // either all are previous, or none is previous
_, iCreatedByKaito := qualified[i].Labels["kaito.sh/machine-type"]
_, jCreatedByKaito := qualified[j].Labels["kaito.sh/machine-type"]

// Choose node created by gpu-provisioner since it is more likely to be empty to use.
if iCreatedByKaito && !jCreatedByKaito {
var iCreatedByGPUProvisioner, jCreatedByGPUProvisioner bool
_, iCreatedByGPUProvisioner = qualified[i].Labels[machine.LabelGPUProvisionerCustom]
_, jCreatedByGPUProvisioner = qualified[j].Labels[machine.LabelGPUProvisionerCustom]
// Choose node created by gpu-provisioner and karpenter since it is more likely to be empty to use.
var iCreatedByKarpenter, jCreatedByKarpenter bool
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
_, iCreatedByKarpenter = qualified[i].Labels[nodeclaim.LabelNodePool]
_, jCreatedByKarpenter = qualified[j].Labels[nodeclaim.LabelNodePool]
}
if (iCreatedByGPUProvisioner && !jCreatedByGPUProvisioner) ||
(iCreatedByKarpenter && !jCreatedByKarpenter) {
return true
} else if !iCreatedByKaito && jCreatedByKaito {
} else if (!iCreatedByGPUProvisioner && jCreatedByGPUProvisioner) ||
(!iCreatedByKarpenter && jCreatedByKarpenter) {
return false
} else {
return qualified[i].Name < qualified[j].Name
Expand All @@ -185,8 +197,8 @@ func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous

if len(qualified) <= count {
return qualified

}

return qualified[0:count]
}

Expand All @@ -198,19 +210,34 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj *
return err
}

// Find all nodes that match the labelSelector and instanceType, they are not necessarily created by machines.
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
// Wait for pending nodeClaims if any before we decide whether to create new node or not.
if err := nodeclaim.WaitForPendingNodeClaims(ctx, wObj, c.Client); err != nil {
return err
}
}

// Find all nodes that match the labelSelector and instanceType, they are not necessarily created by machines/nodeClaims.
validNodes, err := c.getAllQualifiedNodes(ctx, wObj)
if err != nil {
return err
}

selectedNodes := selectWorkspaceNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count))
selectedNodes := c.selectWorkspaceNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count))

newNodesCount := lo.FromPtr(wObj.Resource.Count) - len(selectedNodes)

if newNodesCount > 0 {
klog.InfoS("need to create more nodes", "NodeCount", newNodesCount)
if err := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionUnknown,
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
if err := c.updateStatusConditionIfNotMatch(ctx, wObj,
kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionUnknown,
"CreateNodeClaimPending", fmt.Sprintf("creating %d nodeClaims", newNodesCount)); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return err
}
} else if err := c.updateStatusConditionIfNotMatch(ctx, wObj,
kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionUnknown,
"CreateMachinePending", fmt.Sprintf("creating %d machines", newNodesCount)); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return err
Expand Down Expand Up @@ -245,7 +272,15 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj *
}
}

if err = c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionTrue,
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
if err = c.updateStatusConditionIfNotMatch(ctx, wObj,
kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionTrue,
"installNodePluginsSuccess", "nodeClaim plugins have been installed successfully"); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return err
}
} else if err = c.updateStatusConditionIfNotMatch(ctx, wObj,
kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionTrue,
"installNodePluginsSuccess", "machines plugins have been installed successfully"); err != nil {
klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj))
return err
Expand Down Expand Up @@ -314,24 +349,33 @@ func (c *WorkspaceReconciler) validateNodeInstanceType(ctx context.Context, wObj
return true
}

// createAndValidateNode creates a new machine and validates status.
// createAndValidateNode creates a new node and validates status.
func (c *WorkspaceReconciler) createAndValidateNode(ctx context.Context, wObj *kaitov1alpha1.Workspace) (*corev1.Node, error) {
var machineOSDiskSize string
var nodeOSDiskSize string
if wObj.Inference != nil && wObj.Inference.Preset != nil && wObj.Inference.Preset.Name != "" {
presetName := string(wObj.Inference.Preset.Name)
machineOSDiskSize = plugin.KaitoModelRegister.MustGet(presetName).GetInferenceParameters().DiskStorageRequirement
nodeOSDiskSize = plugin.KaitoModelRegister.MustGet(presetName).
GetInferenceParameters().DiskStorageRequirement
}
if nodeOSDiskSize == "" {
nodeOSDiskSize = "0" // The default OS size is used
}
if machineOSDiskSize == "" {
machineOSDiskSize = "0" // The default OS size is used

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
return c.CreateNodeClaim(ctx, wObj, nodeOSDiskSize)
} else {
return c.CreateMachine(ctx, wObj, nodeOSDiskSize)
}
}

Retry_withdifferentname:
newMachine := machine.GenerateMachineManifest(ctx, machineOSDiskSize, wObj)
func (c *WorkspaceReconciler) CreateMachine(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeOSDiskSize string) (*corev1.Node, error) {
RetryWithDifferentName:
newMachine := machine.GenerateMachineManifest(ctx, nodeOSDiskSize, wObj)

if err := machine.CreateMachine(ctx, newMachine, c.Client); err != nil {
if apierrors.IsAlreadyExists(err) {
klog.InfoS("There exists a machine with the same name, retry with a different name", "machine", klog.KObj(newMachine))
goto Retry_withdifferentname
klog.InfoS("A machine exists with the same name, retry with a different name", "machine", klog.KObj(newMachine))
goto RetryWithDifferentName
} else {

klog.ErrorS(err, "failed to create machine", "machine", newMachine.Name)
Expand Down Expand Up @@ -359,6 +403,41 @@ Retry_withdifferentname:
return resources.GetNode(ctx, newMachine.Status.NodeName, c.Client)
}

func (c *WorkspaceReconciler) CreateNodeClaim(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeOSDiskSize string) (*corev1.Node, error) {
RetryWithDifferentName:
newNodeClaim := nodeclaim.GenerateNodeClaimManifest(ctx, nodeOSDiskSize, wObj)

if err := nodeclaim.CreateNodeClaim(ctx, newNodeClaim, c.Client); err != nil {
if apierrors.IsAlreadyExists(err) {
klog.InfoS("There exists a nodeClaim with the same name, retry with a different name", "nodeClaim", klog.KObj(newNodeClaim))
goto RetryWithDifferentName
} else {

klog.ErrorS(err, "failed to create nodeClaim", "nodeClaim", newNodeClaim.Name)
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionFalse,
"nodeClaimFailedCreation", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return nil, updateErr
}
return nil, err
}
}

// check nodeClaim status until it is ready
err := nodeclaim.CheckNodeClaimStatus(ctx, newNodeClaim, c.Client)
if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionFalse,
"checkNodeClaimStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return nil, updateErr
}
return nil, err
}

// get the node object from the nodeClaim status nodeName.
return resources.GetNode(ctx, newNodeClaim.Status.NodeName, c.Client)
}

// ensureNodePlugins ensures node plugins are installed.
func (c *WorkspaceReconciler) ensureNodePlugins(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeObj *corev1.Node) error {
timeClock := clock.RealClock{}
Expand Down Expand Up @@ -523,7 +602,6 @@ func (c *WorkspaceReconciler) applyInference(ctx context.Context, wObj *kaitov1a
return updateErr
} else {
return err

}
}

Expand All @@ -538,20 +616,26 @@ func (c *WorkspaceReconciler) applyInference(ctx context.Context, wObj *kaitov1a
// SetupWithManager sets up the controller with the Manager.
func (c *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {
c.Recorder = mgr.GetEventRecorderFor("Workspace")
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}); err != nil {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{},
"spec.nodeName", func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
builder := ctrl.NewControllerManagedBy(mgr).
For(&kaitov1alpha1.Workspace{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.StatefulSet{}).
Watches(&v1alpha5.Machine{}, c.watchMachines()).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
Complete(c)
WithOptions(controller.Options{MaxConcurrentReconciles: 5})

if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
builder.
Watches(&v1beta1.NodeClaim{}, c.watchNodeClaims()) // watches for nodeClaim with labels indicating workspace name.
}
return builder.Complete(c)
}

// watches for machine with labels indicating workspace name.
Expand All @@ -577,3 +661,27 @@ func (c *WorkspaceReconciler) watchMachines() handler.EventHandler {
}
})
}

// watches for nodeClaim with labels indicating workspace name.
func (c *WorkspaceReconciler) watchNodeClaims() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, o client.Object) []reconcile.Request {
nodeClaimObj := o.(*v1beta1.NodeClaim)
name, ok := nodeClaimObj.Labels[kaitov1alpha1.LabelWorkspaceName]
if !ok {
return nil
}
namespace, ok := nodeClaimObj.Labels[kaitov1alpha1.LabelWorkspaceNamespace]
if !ok {
return nil
}
return []reconcile.Request{
{
NamespacedName: client.ObjectKey{
Name: name,
Namespace: namespace,
},
},
}
})
}
Loading

0 comments on commit cb2d964

Please sign in to comment.