diff --git a/api/v1alpha1/workspace_condition_types.go b/api/v1alpha1/workspace_condition_types.go index 762d8dafc..157588f29 100644 --- a/api/v1alpha1/workspace_condition_types.go +++ b/api/v1alpha1/workspace_condition_types.go @@ -10,6 +10,9 @@ const ( // WorkspaceConditionTypeMachineStatus is the state when checking machine status. WorkspaceConditionTypeMachineStatus = ConditionType("MachineReady") + // WorkspaceConditionTypeNodeClaimStatus is the state when checking nodeClaim status. + WorkspaceConditionTypeNodeClaimStatus = ConditionType("NodeClaimReady") + // WorkspaceConditionTypeResourceStatus is the state when Resource has been created. WorkspaceConditionTypeResourceStatus = ConditionType("ResourceReady") diff --git a/pkg/controllers/workspace_controller.go b/pkg/controllers/workspace_controller.go index 4ef8d6871..05d830b59 100644 --- a/pkg/controllers/workspace_controller.go +++ b/pkg/controllers/workspace_controller.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package controllers import ( @@ -9,9 +10,12 @@ import ( "strings" "time" + "github.com/azure/kaito/pkg/featuregates" + "github.com/azure/kaito/pkg/nodeclaim" "github.com/azure/kaito/pkg/tuning" "github.com/azure/kaito/pkg/utils/consts" batchv1 "k8s.io/api/batch/v1" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1" @@ -80,7 +84,8 @@ func (c *WorkspaceReconciler) Reconcile(ctx context.Context, req reconcile.Reque if workspaceObj.Inference != nil && workspaceObj.Inference.Preset != nil { if !plugin.KaitoModelRegister.Has(string(workspaceObj.Inference.Preset.Name)) { - return reconcile.Result{}, fmt.Errorf("The preset model name %s is not registered for workspace %s/%s", string(workspaceObj.Inference.Preset.Name), workspaceObj.Namespace, workspaceObj.Name) + return reconcile.Result{}, fmt.Errorf("the preset model name %s is not registered for workspace %s/%s", + string(workspaceObj.Inference.Preset.Name), workspaceObj.Namespace, workspaceObj.Name) } } @@ -96,8 +101,9 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj)) return reconcile.Result{}, updateErr } - // if error is due to machine instance types unavailability, stop reconcile. - if err.Error() == machine.ErrorInstanceTypesUnavailable { + // if error is due to machine/nodeClaim instance types unavailability, stop reconcile. + if err.Error() == machine.ErrorInstanceTypesUnavailable || + err.Error() == nodeclaim.ErrorInstanceTypesUnavailable { return reconcile.Result{Requeue: false}, err } return reconcile.Result{}, err @@ -139,7 +145,6 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka func (c *WorkspaceReconciler) deleteWorkspace(ctx context.Context, wObj *kaitov1alpha1.Workspace) (reconcile.Result, error) { klog.InfoS("deleteWorkspace", "workspace", klog.KObj(wObj)) - // TODO delete workspace, machine(s), fine_tuning and inference (deployment, service) obj ( ok to delete machines? which will delete nodes??) err := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeDeleting, metav1.ConditionTrue, "workspaceDeleted", "workspace is being deleted") if err != nil { klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj)) @@ -149,7 +154,7 @@ func (c *WorkspaceReconciler) deleteWorkspace(ctx context.Context, wObj *kaitov1 return c.garbageCollectWorkspace(ctx, wObj) } -func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node { +func (c *WorkspaceReconciler) selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node { sort.Slice(qualified, func(i, j int) bool { iPreferred := utils.Contains(preferred, qualified[i].Name) @@ -168,13 +173,20 @@ func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous } else if !iPrevious && jPrevious { return false } else { // either all are previous, or none is previous - _, iCreatedByKaito := qualified[i].Labels["kaito.sh/machine-type"] - _, jCreatedByKaito := qualified[j].Labels["kaito.sh/machine-type"] - - // Choose node created by gpu-provisioner since it is more likely to be empty to use. - if iCreatedByKaito && !jCreatedByKaito { + var iCreatedByGPUProvisioner, jCreatedByGPUProvisioner bool + _, iCreatedByGPUProvisioner = qualified[i].Labels[machine.LabelGPUProvisionerCustom] + _, jCreatedByGPUProvisioner = qualified[j].Labels[machine.LabelGPUProvisionerCustom] + // Choose node created by gpu-provisioner and karpenter since it is more likely to be empty to use. + var iCreatedByKarpenter, jCreatedByKarpenter bool + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + _, iCreatedByKarpenter = qualified[i].Labels[nodeclaim.LabelNodePool] + _, jCreatedByKarpenter = qualified[j].Labels[nodeclaim.LabelNodePool] + } + if (iCreatedByGPUProvisioner && !jCreatedByGPUProvisioner) || + (iCreatedByKarpenter && !jCreatedByKarpenter) { return true - } else if !iCreatedByKaito && jCreatedByKaito { + } else if (!iCreatedByGPUProvisioner && jCreatedByGPUProvisioner) || + (!iCreatedByKarpenter && jCreatedByKarpenter) { return false } else { return qualified[i].Name < qualified[j].Name @@ -185,8 +197,8 @@ func selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous if len(qualified) <= count { return qualified - } + return qualified[0:count] } @@ -198,19 +210,34 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj * return err } - // Find all nodes that match the labelSelector and instanceType, they are not necessarily created by machines. + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + // Wait for pending nodeClaims if any before we decide whether to create new node or not. + if err := nodeclaim.WaitForPendingNodeClaims(ctx, wObj, c.Client); err != nil { + return err + } + } + + // Find all nodes that match the labelSelector and instanceType, they are not necessarily created by machines/nodeClaims. validNodes, err := c.getAllQualifiedNodes(ctx, wObj) if err != nil { return err } - selectedNodes := selectWorkspaceNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count)) + selectedNodes := c.selectWorkspaceNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count)) newNodesCount := lo.FromPtr(wObj.Resource.Count) - len(selectedNodes) if newNodesCount > 0 { klog.InfoS("need to create more nodes", "NodeCount", newNodesCount) - if err := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionUnknown, + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + if err := c.updateStatusConditionIfNotMatch(ctx, wObj, + kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionUnknown, + "CreateNodeClaimPending", fmt.Sprintf("creating %d nodeClaims", newNodesCount)); err != nil { + klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj)) + return err + } + } else if err := c.updateStatusConditionIfNotMatch(ctx, wObj, + kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionUnknown, "CreateMachinePending", fmt.Sprintf("creating %d machines", newNodesCount)); err != nil { klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj)) return err @@ -245,7 +272,15 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj * } } - if err = c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionTrue, + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + if err = c.updateStatusConditionIfNotMatch(ctx, wObj, + kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionTrue, + "installNodePluginsSuccess", "nodeClaim plugins have been installed successfully"); err != nil { + klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj)) + return err + } + } else if err = c.updateStatusConditionIfNotMatch(ctx, wObj, + kaitov1alpha1.WorkspaceConditionTypeMachineStatus, metav1.ConditionTrue, "installNodePluginsSuccess", "machines plugins have been installed successfully"); err != nil { klog.ErrorS(err, "failed to update workspace status", "workspace", klog.KObj(wObj)) return err @@ -314,24 +349,33 @@ func (c *WorkspaceReconciler) validateNodeInstanceType(ctx context.Context, wObj return true } -// createAndValidateNode creates a new machine and validates status. +// createAndValidateNode creates a new node and validates status. func (c *WorkspaceReconciler) createAndValidateNode(ctx context.Context, wObj *kaitov1alpha1.Workspace) (*corev1.Node, error) { - var machineOSDiskSize string + var nodeOSDiskSize string if wObj.Inference != nil && wObj.Inference.Preset != nil && wObj.Inference.Preset.Name != "" { presetName := string(wObj.Inference.Preset.Name) - machineOSDiskSize = plugin.KaitoModelRegister.MustGet(presetName).GetInferenceParameters().DiskStorageRequirement + nodeOSDiskSize = plugin.KaitoModelRegister.MustGet(presetName). + GetInferenceParameters().DiskStorageRequirement + } + if nodeOSDiskSize == "" { + nodeOSDiskSize = "0" // The default OS size is used } - if machineOSDiskSize == "" { - machineOSDiskSize = "0" // The default OS size is used + + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + return c.CreateNodeClaim(ctx, wObj, nodeOSDiskSize) + } else { + return c.CreateMachine(ctx, wObj, nodeOSDiskSize) } +} -Retry_withdifferentname: - newMachine := machine.GenerateMachineManifest(ctx, machineOSDiskSize, wObj) +func (c *WorkspaceReconciler) CreateMachine(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeOSDiskSize string) (*corev1.Node, error) { +RetryWithDifferentName: + newMachine := machine.GenerateMachineManifest(ctx, nodeOSDiskSize, wObj) if err := machine.CreateMachine(ctx, newMachine, c.Client); err != nil { if apierrors.IsAlreadyExists(err) { - klog.InfoS("There exists a machine with the same name, retry with a different name", "machine", klog.KObj(newMachine)) - goto Retry_withdifferentname + klog.InfoS("A machine exists with the same name, retry with a different name", "machine", klog.KObj(newMachine)) + goto RetryWithDifferentName } else { klog.ErrorS(err, "failed to create machine", "machine", newMachine.Name) @@ -359,6 +403,41 @@ Retry_withdifferentname: return resources.GetNode(ctx, newMachine.Status.NodeName, c.Client) } +func (c *WorkspaceReconciler) CreateNodeClaim(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeOSDiskSize string) (*corev1.Node, error) { +RetryWithDifferentName: + newNodeClaim := nodeclaim.GenerateNodeClaimManifest(ctx, nodeOSDiskSize, wObj) + + if err := nodeclaim.CreateNodeClaim(ctx, newNodeClaim, c.Client); err != nil { + if apierrors.IsAlreadyExists(err) { + klog.InfoS("There exists a nodeClaim with the same name, retry with a different name", "nodeClaim", klog.KObj(newNodeClaim)) + goto RetryWithDifferentName + } else { + + klog.ErrorS(err, "failed to create nodeClaim", "nodeClaim", newNodeClaim.Name) + if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionFalse, + "nodeClaimFailedCreation", err.Error()); updateErr != nil { + klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj)) + return nil, updateErr + } + return nil, err + } + } + + // check nodeClaim status until it is ready + err := nodeclaim.CheckNodeClaimStatus(ctx, newNodeClaim, c.Client) + if err != nil { + if updateErr := c.updateStatusConditionIfNotMatch(ctx, wObj, kaitov1alpha1.WorkspaceConditionTypeNodeClaimStatus, metav1.ConditionFalse, + "checkNodeClaimStatusFailed", err.Error()); updateErr != nil { + klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj)) + return nil, updateErr + } + return nil, err + } + + // get the node object from the nodeClaim status nodeName. + return resources.GetNode(ctx, newNodeClaim.Status.NodeName, c.Client) +} + // ensureNodePlugins ensures node plugins are installed. func (c *WorkspaceReconciler) ensureNodePlugins(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeObj *corev1.Node) error { timeClock := clock.RealClock{} @@ -523,7 +602,6 @@ func (c *WorkspaceReconciler) applyInference(ctx context.Context, wObj *kaitov1a return updateErr } else { return err - } } @@ -538,20 +616,26 @@ func (c *WorkspaceReconciler) applyInference(ctx context.Context, wObj *kaitov1a // SetupWithManager sets up the controller with the Manager. func (c *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error { c.Recorder = mgr.GetEventRecorderFor("Workspace") - if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string { - pod := rawObj.(*corev1.Pod) - return []string{pod.Spec.NodeName} - }); err != nil { + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.Pod{}, + "spec.nodeName", func(rawObj client.Object) []string { + pod := rawObj.(*corev1.Pod) + return []string{pod.Spec.NodeName} + }); err != nil { return err } - return ctrl.NewControllerManagedBy(mgr). + builder := ctrl.NewControllerManagedBy(mgr). For(&kaitov1alpha1.Workspace{}). Owns(&appsv1.Deployment{}). Owns(&appsv1.StatefulSet{}). Watches(&v1alpha5.Machine{}, c.watchMachines()). - WithOptions(controller.Options{MaxConcurrentReconciles: 5}). - Complete(c) + WithOptions(controller.Options{MaxConcurrentReconciles: 5}) + + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + builder. + Watches(&v1beta1.NodeClaim{}, c.watchNodeClaims()) // watches for nodeClaim with labels indicating workspace name. + } + return builder.Complete(c) } // watches for machine with labels indicating workspace name. @@ -577,3 +661,27 @@ func (c *WorkspaceReconciler) watchMachines() handler.EventHandler { } }) } + +// watches for nodeClaim with labels indicating workspace name. +func (c *WorkspaceReconciler) watchNodeClaims() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc( + func(ctx context.Context, o client.Object) []reconcile.Request { + nodeClaimObj := o.(*v1beta1.NodeClaim) + name, ok := nodeClaimObj.Labels[kaitov1alpha1.LabelWorkspaceName] + if !ok { + return nil + } + namespace, ok := nodeClaimObj.Labels[kaitov1alpha1.LabelWorkspaceNamespace] + if !ok { + return nil + } + return []reconcile.Request{ + { + NamespacedName: client.ObjectKey{ + Name: name, + Namespace: namespace, + }, + }, + } + }) +} diff --git a/pkg/controllers/workspace_controller_test.go b/pkg/controllers/workspace_controller_test.go index 245ab1d9d..89c1ce868 100644 --- a/pkg/controllers/workspace_controller_test.go +++ b/pkg/controllers/workspace_controller_test.go @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package controllers import ( "context" "errors" - "github.com/azure/kaito/pkg/utils/test" "reflect" "sort" "testing" @@ -13,7 +13,11 @@ import ( "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/azure/kaito/api/v1alpha1" + "github.com/azure/kaito/pkg/featuregates" "github.com/azure/kaito/pkg/machine" + "github.com/azure/kaito/pkg/nodeclaim" + "github.com/azure/kaito/pkg/utils/consts" + "github.com/azure/kaito/pkg/utils/test" "github.com/stretchr/testify/mock" "gotest.tools/assert" appsv1 "k8s.io/api/apps/v1" @@ -22,16 +26,18 @@ import ( "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" ) func TestSelectWorkspaceNodes(t *testing.T) { test.RegisterTestModel() testcases := map[string]struct { - qualified []*corev1.Node - preferred []string - previous []string - count int - expected []string + qualified []*corev1.Node + preferred []string + previous []string + count int + karpenterFeatureGates bool + expected []string }{ "two qualified nodes, need one": { qualified: []*corev1.Node{ @@ -148,7 +154,7 @@ func TestSelectWorkspaceNodes(t *testing.T) { expected: []string{"node3"}, }, - "three qualified nodes, one is created by kaito, need one": { + "three qualified nodes, one is created by gpu-provisioner, need one": { qualified: []*corev1.Node{ { ObjectMeta: v1.ObjectMeta{ @@ -164,7 +170,7 @@ func TestSelectWorkspaceNodes(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - "kaito.sh/machine-type": "gpu", + machine.LabelGPUProvisionerCustom: machine.GPUString, }, }, }, @@ -174,7 +180,7 @@ func TestSelectWorkspaceNodes(t *testing.T) { count: 1, expected: []string{"node3"}, }, - "three qualified nodes, one is created by kaito, one is preferred, one is previous, need two": { + "three qualified nodes, one is created by gpu-provisioner, one is preferred, one is previous, need two": { qualified: []*corev1.Node{ { ObjectMeta: v1.ObjectMeta{ @@ -190,7 +196,7 @@ func TestSelectWorkspaceNodes(t *testing.T) { ObjectMeta: v1.ObjectMeta{ Name: "node3", Labels: map[string]string{ - "kaito.sh/machine-type": "gpu", + machine.LabelGPUProvisionerCustom: machine.GPUString, }, }, }, @@ -200,12 +206,100 @@ func TestSelectWorkspaceNodes(t *testing.T) { count: 2, expected: []string{"node1", "node2"}, }, + "three qualified nodes, one is created by gpu-provisioner, one is preferred, one is previous, need three": { + qualified: []*corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + machine.LabelGPUProvisionerCustom: machine.GPUString, + }, + }, + }, + }, + preferred: []string{"node2"}, + previous: []string{"node1"}, + count: 3, + karpenterFeatureGates: false, + expected: []string{"node1", "node2", "node3"}, + }, + "three qualified nodes, one is created by gpu-provisioner (machine), the other created by karpenter (nodeClaim), one is preferred, need two": { + qualified: []*corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + nodeclaim.LabelNodePool: nodeclaim.KaitoNodePoolName, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + machine.LabelGPUProvisionerCustom: machine.GPUString, + }, + }, + }, + }, + preferred: []string{"node1"}, + previous: []string{}, + count: 2, + karpenterFeatureGates: true, + expected: []string{"node1", "node3"}, + }, + "three qualified nodes, one is created by by karpenter (nodeClaim), two is preferred, need two": { + qualified: []*corev1.Node{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "node1", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node2", + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + nodeclaim.LabelNodePool: nodeclaim.KaitoNodePoolName, + }, + }, + }, + }, + preferred: []string{"node1"}, + previous: []string{}, + count: 2, + karpenterFeatureGates: true, + expected: []string{"node1", "node3"}, + }, } for k, tc := range testcases { t.Run(k, func(t *testing.T) { + reconciler := &WorkspaceReconciler{ + Scheme: test.NewTestScheme(), + } + featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGates - selectedNodes := selectWorkspaceNodes(tc.qualified, tc.preferred, tc.previous, tc.count) + selectedNodes := reconciler.selectWorkspaceNodes(tc.qualified, tc.preferred, tc.previous, tc.count) selectedNodesArray := []string{} @@ -223,13 +317,14 @@ func TestSelectWorkspaceNodes(t *testing.T) { } } -func TestCreateAndValidateNode(t *testing.T) { +func TestCreateAndValidateMachineNode(t *testing.T) { test.RegisterTestModel() testcases := map[string]struct { - callMocks func(c *test.MockClient) - machineConditions apis.Conditions - workspace v1alpha1.Workspace - expectedError error + callMocks func(c *test.MockClient) + objectConditions apis.Conditions + workspace v1alpha1.Workspace + karpenterFeatureGates bool + expectedError error }{ "Node is not created because machine creation fails": { callMocks: func(c *test.MockClient) { @@ -238,7 +333,7 @@ func TestCreateAndValidateNode(t *testing.T) { c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) }, - machineConditions: apis.Conditions{ + objectConditions: apis.Conditions{ { Type: v1alpha5.MachineLaunched, Status: corev1.ConditionFalse, @@ -254,7 +349,7 @@ func TestCreateAndValidateNode(t *testing.T) { c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.Node{}), mock.Anything).Return(nil) }, - machineConditions: apis.Conditions{ + objectConditions: apis.Conditions{ { Type: apis.ConditionReady, Status: corev1.ConditionTrue, @@ -263,21 +358,140 @@ func TestCreateAndValidateNode(t *testing.T) { workspace: *test.MockWorkspaceDistributedModel, expectedError: nil, }, + "A nodeClaim is successfully created": { + callMocks: func(c *test.MockClient) { + c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.Node{}), mock.Anything).Return(nil) + }, + objectConditions: apis.Conditions{ + { + Type: apis.ConditionReady, + Status: corev1.ConditionTrue, + }, + }, + workspace: *test.MockWorkspaceDistributedModel, + karpenterFeatureGates: true, + expectedError: nil, + }, + "Node is not created because nodeClaim creation fails": { + callMocks: func(c *test.MockClient) { + c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + }, + objectConditions: apis.Conditions{ + { + Type: v1beta1.Launched, + Status: corev1.ConditionFalse, + Message: nodeclaim.ErrorInstanceTypesUnavailable, + }, + }, + workspace: *test.MockWorkspaceWithPreset, + karpenterFeatureGates: true, + expectedError: errors.New(nodeclaim.ErrorInstanceTypesUnavailable), + }, } for k, tc := range testcases { t.Run(k, func(t *testing.T) { mockClient := test.NewClient() mockMachine := &v1alpha5.Machine{} + mockNodeClaim := &v1beta1.NodeClaim{} mockClient.UpdateCb = func(key types.NamespacedName) { mockClient.GetObjectFromMap(mockMachine, key) - mockMachine.Status.Conditions = tc.machineConditions + mockMachine.Status.Conditions = tc.objectConditions mockClient.CreateOrUpdateObjectInMap(mockMachine) + + if tc.karpenterFeatureGates { + mockClient.GetObjectFromMap(mockNodeClaim, key) + mockNodeClaim.Status.Conditions = tc.objectConditions + mockClient.CreateOrUpdateObjectInMap(mockNodeClaim) + } } tc.callMocks(mockClient) + reconciler := &WorkspaceReconciler{ + Client: mockClient, + Scheme: test.NewTestScheme(), + } + ctx := context.Background() + featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGates + + node, err := reconciler.createAndValidateNode(ctx, &tc.workspace) + if tc.expectedError == nil { + assert.Check(t, err == nil, "Not expected to return error") + assert.Check(t, node != nil, "Response node should not be nil") + } else { + assert.Equal(t, tc.expectedError.Error(), err.Error()) + } + }) + } +} + +func TestCreateAndValidateNodeClaimNode(t *testing.T) { + test.RegisterTestModel() + testcases := map[string]struct { + callMocks func(c *test.MockClient) + karpenterFeatureGates bool + nodeClaimConditions apis.Conditions + workspace v1alpha1.Workspace + expectedError error + }{ + "Node is not created because nodeClaim creation fails": { + callMocks: func(c *test.MockClient) { + c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + nodeClaimConditions: apis.Conditions{ + { + Type: v1beta1.Launched, + Status: corev1.ConditionFalse, + Message: nodeclaim.ErrorInstanceTypesUnavailable, + }, + }, + workspace: *test.MockWorkspaceWithPreset, + expectedError: errors.New(nodeclaim.ErrorInstanceTypesUnavailable), + }, + "A nodeClaim is successfully created": { + callMocks: func(c *test.MockClient) { + c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&corev1.Node{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + nodeClaimConditions: apis.Conditions{ + { + Type: apis.ConditionReady, + Status: corev1.ConditionTrue, + }, + }, + workspace: *test.MockWorkspaceDistributedModel, + expectedError: nil, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + mockClient := test.NewClient() + mockNodeClaim := &v1beta1.NodeClaim{} + + mockClient.UpdateCb = func(key types.NamespacedName) { + mockClient.GetObjectFromMap(mockNodeClaim, key) + mockNodeClaim.Status.Conditions = tc.nodeClaimConditions + mockClient.CreateOrUpdateObjectInMap(mockNodeClaim) + } + + tc.callMocks(mockClient) + + featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGates + reconciler := &WorkspaceReconciler{ Client: mockClient, Scheme: test.NewTestScheme(), @@ -544,163 +758,106 @@ func TestGetAllQualifiedNodes(t *testing.T) { } } -func TestDeleteWorkspace(t *testing.T) { +func TestApplyWorkspaceResource(t *testing.T) { + test.RegisterTestModel() testcases := map[string]struct { - callMocks func(c *test.MockClient) - expectedError error + callMocks func(c *test.MockClient) + karpenterFeatureGateEnabled bool + expectedError error + workspace v1alpha1.Workspace }{ - "Fails to delete workspace because workspace object cannot be retrieved": { - callMocks: func(c *test.MockClient) { - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(errors.New("Failed to get workspace")) - }, - expectedError: errors.New("Failed to get workspace"), - }, - "Fails to delete workspace because associated machines cannot be retrieved": { + "Fail to apply workspace because associated machines cannot be retrieved": { callMocks: func(c *test.MockClient) { - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - - c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("Failed to list machines")) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("failed to retrieve machines")) }, - expectedError: errors.New("Failed to list machines"), + workspace: *test.MockWorkspaceDistributedModel, + expectedError: errors.New("failed to retrieve machines"), }, - "Fails to delete workspace because associated machines cannot be deleted": { + "Fail to apply workspace because can't get qualified nodes": { callMocks: func(c *test.MockClient) { - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - machineList := test.MockMachineList relevantMap := c.CreateMapWithType(machineList) + c.CreateOrUpdateObjectInMap(&test.MockMachine) + //insert machine objects into the map - for _, obj := range test.MockMachineList.Items { + for _, obj := range machineList.Items { m := obj objKey := client.ObjectKeyFromObject(&m) relevantMap[objKey] = &m } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) - c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("Failed to delete machine")) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&corev1.NodeList{}), mock.Anything).Return(errors.New("failed to list nodes")) }, - expectedError: errors.New("Failed to delete machine"), + workspace: *test.MockWorkspaceDistributedModel, + expectedError: errors.New("failed to list nodes"), }, - "Delete workspace because finalizer cannot be removed from workspace": { + "Fail to apply workspace because associated nodeClaim cannot be retrieved": { callMocks: func(c *test.MockClient) { - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(errors.New("Failed to update workspace")) - - machineList := test.MockMachineList - relevantMap := c.CreateMapWithType(machineList) - //insert machine objects into the map - for _, obj := range test.MockMachineList.Items { - m := obj - objKey := client.ObjectKeyFromObject(&m) - - relevantMap[objKey] = &m - } c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) - c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(errors.New("failed to retrieve nodeClaims")) + }, - expectedError: errors.New("Failed to update workspace"), + karpenterFeatureGateEnabled: true, + workspace: *test.MockWorkspaceDistributedModel, + expectedError: errors.New("failed to retrieve nodeClaims"), }, - "Successfully deletes workspace and removes finalizer associated with workspace": { + "Fail to apply workspace with nodeClaims because can't get qualified nodes": { callMocks: func(c *test.MockClient) { - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) - c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + nodeClaimList := test.MockNodeClaimList + relevantMap := c.CreateMapWithType(nodeClaimList) + c.CreateOrUpdateObjectInMap(&test.MockNodeClaim) - machineList := test.MockMachineList - relevantMap := c.CreateMapWithType(machineList) - //insert machine objects into the map - for _, obj := range test.MockMachineList.Items { + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { m := obj objKey := client.ObjectKeyFromObject(&m) relevantMap[objKey] = &m } c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) - c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) - }, - expectedError: nil, - }, - } - - for k, tc := range testcases { - t.Run(k, func(t *testing.T) { - mockClient := test.NewClient() - tc.callMocks(mockClient) - - reconciler := &WorkspaceReconciler{ - Client: mockClient, - Scheme: test.NewTestScheme(), - } - ctx := context.Background() + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) - _, err := reconciler.deleteWorkspace(ctx, test.MockWorkspaceDistributedModel) - if tc.expectedError == nil { - assert.Check(t, err == nil, "Not expected to return error") - } else { - assert.Equal(t, tc.expectedError.Error(), err.Error()) - } - }) - } -} - -func TestApplyWorkspaceResource(t *testing.T) { - test.RegisterTestModel() - testcases := map[string]struct { - callMocks func(c *test.MockClient) - expectedError error - workspace v1alpha1.Workspace - }{ - "Fail to apply workspace because associated machines cannot be retrieved": { - callMocks: func(c *test.MockClient) { - c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("Failed to retrieve machines")) + c.On("List", mock.IsType(context.Background()), mock.IsType(&corev1.NodeList{}), mock.Anything).Return(errors.New("failed to list nodes")) }, - workspace: *test.MockWorkspaceDistributedModel, - expectedError: errors.New("Failed to retrieve machines"), + karpenterFeatureGateEnabled: true, + workspace: *test.MockWorkspaceDistributedModel, + expectedError: errors.New("failed to list nodes"), }, - "Fail to apply workspace because can't get qualified nodes": { + "Successfully apply workspace resource with machine": { callMocks: func(c *test.MockClient) { - machineList := test.MockMachineList - relevantMap := c.CreateMapWithType(machineList) - c.CreateOrUpdateObjectInMap(&test.MockMachine) - - //insert machine objects into the map - for _, obj := range test.MockMachineList.Items { - m := obj - objKey := client.ObjectKeyFromObject(&m) + nodeList := test.MockNodeList + relevantMap := c.CreateMapWithType(nodeList) + //insert node objects into the map + for _, obj := range test.MockNodeList.Items { + n := obj + objKey := client.ObjectKeyFromObject(&n) - relevantMap[objKey] = &m + relevantMap[objKey] = &n } c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) - c.On("List", mock.IsType(context.Background()), mock.IsType(&corev1.NodeList{}), mock.Anything).Return(errors.New("Failed to list nodes")) + c.On("List", mock.IsType(context.Background()), mock.IsType(&corev1.NodeList{}), mock.Anything).Return(nil) + + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + }, workspace: *test.MockWorkspaceDistributedModel, - expectedError: errors.New("Failed to list nodes"), + expectedError: nil, }, - "Successfully apply workspace resource": { + "Successfully apply workspace resource with nodeClaim": { callMocks: func(c *test.MockClient) { - machineList := test.MockMachineList - relevantMap := c.CreateMapWithType(machineList) - c.CreateOrUpdateObjectInMap(&test.MockMachine) - - //insert machine objects into the map - for _, obj := range test.MockMachineList.Items { - m := obj - objKey := client.ObjectKeyFromObject(&m) - - relevantMap[objKey] = &m - } - nodeList := test.MockNodeList - relevantMap = c.CreateMapWithType(nodeList) + relevantMap := c.CreateMapWithType(nodeList) //insert node objects into the map - for _, obj := range test.MockNodeList.Items { + for _, obj := range nodeList.Items { n := obj objKey := client.ObjectKeyFromObject(&n) @@ -710,14 +867,18 @@ func TestApplyWorkspaceResource(t *testing.T) { c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&corev1.NodeList{}), mock.Anything).Return(nil) c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) }, - workspace: *test.MockWorkspaceDistributedModel, - expectedError: nil, + karpenterFeatureGateEnabled: true, + workspace: *test.MockWorkspaceDistributedModel, + expectedError: nil, }, } @@ -727,6 +888,7 @@ func TestApplyWorkspaceResource(t *testing.T) { tc.callMocks(mockClient) mockMachine := &v1alpha5.Machine{} + mockNodeClaim := &v1beta1.NodeClaim{} mockClient.UpdateCb = func(key types.NamespacedName) { mockClient.GetObjectFromMap(mockMachine, key) @@ -737,12 +899,22 @@ func TestApplyWorkspaceResource(t *testing.T) { }, } mockClient.CreateOrUpdateObjectInMap(mockMachine) + + mockClient.GetObjectFromMap(mockNodeClaim, key) + mockNodeClaim.Status.Conditions = apis.Conditions{ + { + Type: apis.ConditionReady, + Status: corev1.ConditionTrue, + }, + } + mockClient.CreateOrUpdateObjectInMap(mockNodeClaim) } reconciler := &WorkspaceReconciler{ Client: mockClient, Scheme: test.NewTestScheme(), } + featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGateEnabled ctx := context.Background() err := reconciler.applyWorkspaceResource(ctx, &tc.workspace) diff --git a/pkg/controllers/workspace_gc_finalizer.go b/pkg/controllers/workspace_gc_finalizer.go index 6e985ec0c..0eb32df7b 100644 --- a/pkg/controllers/workspace_gc_finalizer.go +++ b/pkg/controllers/workspace_gc_finalizer.go @@ -1,12 +1,15 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package controllers import ( "context" kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1" + "github.com/azure/kaito/pkg/featuregates" "github.com/azure/kaito/pkg/machine" + "github.com/azure/kaito/pkg/nodeclaim" "github.com/azure/kaito/pkg/utils/consts" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -18,6 +21,7 @@ import ( func (c *WorkspaceReconciler) garbageCollectWorkspace(ctx context.Context, wObj *kaitov1alpha1.Workspace) (ctrl.Result, error) { klog.InfoS("garbageCollectWorkspace", "workspace", klog.KObj(wObj)) + // Check if there are any machines associated with this workspace. mList, err := machine.ListMachinesByWorkspace(ctx, wObj, c.Client) if err != nil { return ctrl.Result{}, err @@ -30,6 +34,22 @@ func (c *WorkspaceReconciler) garbageCollectWorkspace(ctx context.Context, wObj } } + if featuregates.FeatureGates[consts.FeatureFlagKarpenter] { + // Check if there are any nodeClaims associated with this workspace. + ncList, err := nodeclaim.ListNodeClaimByWorkspace(ctx, wObj, c.Client) + if err != nil { + return ctrl.Result{}, err + } + + // We should delete all the nodeClaims that are created by this workspace + for i := range ncList.Items { + if deleteErr := c.Delete(ctx, &ncList.Items[i], &client.DeleteOptions{}); deleteErr != nil { + klog.ErrorS(deleteErr, "failed to delete the nodeClaim", "nodeClaim", klog.KObj(&ncList.Items[i])) + return ctrl.Result{}, deleteErr + } + } + } + staleWObj := wObj.DeepCopy() staleWObj.SetFinalizers(nil) if updateErr := c.Update(ctx, staleWObj, &client.UpdateOptions{}); updateErr != nil { diff --git a/pkg/controllers/workspace_gc_finalizer_test.go b/pkg/controllers/workspace_gc_finalizer_test.go new file mode 100644 index 000000000..69c3dcb84 --- /dev/null +++ b/pkg/controllers/workspace_gc_finalizer_test.go @@ -0,0 +1,265 @@ +package controllers + +import ( + "context" + "errors" + "testing" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/azure/kaito/api/v1alpha1" + "github.com/azure/kaito/pkg/featuregates" + "github.com/azure/kaito/pkg/utils/consts" + "github.com/azure/kaito/pkg/utils/test" + "github.com/stretchr/testify/mock" + "gotest.tools/assert" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" +) + +func TestGarbageCollectWorkspace(t *testing.T) { + testcases := map[string]struct { + callMocks func(c *test.MockClient) + karpenterFeatureGates bool + expectedError error + }{ + "Fails to delete workspace because associated machines cannot be retrieved": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("failed to list machines")) + }, + expectedError: errors.New("failed to list machines"), + }, + "Fails to delete workspace because associated machines cannot be deleted": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + machineList := test.MockMachineList + relevantMap := c.CreateMapWithType(machineList) + //insert machine objects into the map + for _, obj := range test.MockMachineList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("failed to delete machine")) + + }, + expectedError: errors.New("failed to delete machine"), + }, + "Fails to delete workspace because associated nodeClaims cannot be retrieved": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(errors.New("failed to list nodeClaims")) + }, + karpenterFeatureGates: true, + expectedError: errors.New("failed to list nodeClaims"), + }, + "Fails to delete workspace because associated nodeClaims cannot be deleted": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + nodeClaimList := test.MockNodeClaimList + relevantMap := c.CreateMapWithType(nodeClaimList) + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(errors.New("failed to delete nodeClaim")) + + }, + karpenterFeatureGates: true, + expectedError: errors.New("failed to delete nodeClaim"), + }, + "Delete workspace with associated machine objects because finalizer cannot be removed from workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(errors.New("failed to update workspace")) + + machineList := test.MockMachineList + relevantMap := c.CreateMapWithType(machineList) + //insert machine objects into the map + for _, obj := range machineList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + }, + expectedError: errors.New("failed to update workspace"), + }, + "Successfully deletes workspace with associated machine objects and removes finalizer associated with workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + machineList := test.MockMachineList + relevantMap := c.CreateMapWithType(machineList) + //insert machine objects into the map + for _, obj := range test.MockMachineList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + }, + expectedError: nil, + }, + "Delete workspace with associated nodeClaim objects because finalizer cannot be removed from workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(errors.New("failed to update workspace")) + + nodeClaimList := test.MockNodeClaimList + relevantMap := c.CreateMapWithType(nodeClaimList) + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + expectedError: errors.New("failed to update workspace"), + }, + "Successfully deletes workspace with associated nodeClaim objects and removes finalizer associated with workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + + nodeClaimList := test.MockNodeClaimList + relevantMap := c.CreateMapWithType(nodeClaimList) + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + expectedError: nil, + }, + "Delete workspace with machine and nodeClaim objects because finalizer cannot be removed from workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(errors.New("failed to update workspace")) + + machineList := test.MockMachineList + relevantMachinesMap := c.CreateMapWithType(machineList) + //insert machine objects into the map + for _, obj := range machineList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMachinesMap[objKey] = &m + } + nodeClaimList := test.MockNodeClaimList + relevantNodeClaimsMap := c.CreateMapWithType(nodeClaimList) + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantNodeClaimsMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + expectedError: errors.New("failed to update workspace"), + }, + "Successfully deletes workspace with machine and nodeClaim objects and removes finalizer associated with workspace": { + callMocks: func(c *test.MockClient) { + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + c.On("Update", mock.IsType(context.Background()), mock.IsType(&v1alpha1.Workspace{}), mock.Anything).Return(nil) + + machineList := test.MockMachineList + relevantMachinesMap := c.CreateMapWithType(machineList) + //insert machine objects into the map + for _, obj := range machineList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantMachinesMap[objKey] = &m + } + nodeClaimList := test.MockNodeClaimList + relevantNodeClaimsMap := c.CreateMapWithType(nodeClaimList) + //insert nodeClaim objects into the map + for _, obj := range nodeClaimList.Items { + m := obj + objKey := client.ObjectKeyFromObject(&m) + + relevantNodeClaimsMap[objKey] = &m + } + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(nil) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaimList{}), mock.Anything).Return(nil) + c.On("Delete", mock.IsType(context.Background()), mock.IsType(&v1beta1.NodeClaim{}), mock.Anything).Return(nil) + }, + karpenterFeatureGates: true, + expectedError: nil, + }, + } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + mockClient := test.NewClient() + tc.callMocks(mockClient) + + reconciler := &WorkspaceReconciler{ + Client: mockClient, + Scheme: test.NewTestScheme(), + } + ctx := context.Background() + + featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGates + + _, err := reconciler.garbageCollectWorkspace(ctx, test.MockWorkspaceDistributedModel) + if tc.expectedError == nil { + assert.Check(t, err == nil, "Not expected to return error") + } else { + assert.Equal(t, tc.expectedError.Error(), err.Error()) + } + }) + } +} diff --git a/pkg/controllers/workspace_status.go b/pkg/controllers/workspace_status.go index 8bd11dcfe..b972ceaa9 100644 --- a/pkg/controllers/workspace_status.go +++ b/pkg/controllers/workspace_status.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package controllers import ( diff --git a/pkg/machine/machine.go b/pkg/machine/machine.go index 65086cf95..2be3f4401 100644 --- a/pkg/machine/machine.go +++ b/pkg/machine/machine.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package machine import ( @@ -162,7 +163,7 @@ func WaitForPendingMachines(ctx context.Context, workspaceObj *kaitov1alpha1.Wor return nil } -// ListMachines list all machine objects in the cluster that are created by the workspace identified by the label. +// ListMachinesByWorkspace list all machine objects in the cluster that are created by the workspace identified by the label. func ListMachinesByWorkspace(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) (*v1alpha5.MachineList, error) { machineList := &v1alpha5.MachineList{} diff --git a/pkg/machine/machine_test.go b/pkg/machine/machine_test.go index 291a23520..030eec07f 100644 --- a/pkg/machine/machine_test.go +++ b/pkg/machine/machine_test.go @@ -5,9 +5,10 @@ package machine import ( "context" "errors" - "github.com/azure/kaito/pkg/utils/test" "testing" + "github.com/azure/kaito/pkg/utils/test" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/stretchr/testify/mock" "gotest.tools/assert" @@ -25,9 +26,9 @@ func TestCreateMachine(t *testing.T) { }{ "Machine creation fails": { callMocks: func(c *test.MockClient) { - c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("Failed to create machine")) + c.On("Create", mock.IsType(context.Background()), mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("failed to create machine")) }, - expectedError: errors.New("Failed to create machine"), + expectedError: errors.New("failed to create machine"), }, "Machine creation fails because SKU is not available": { callMocks: func(c *test.MockClient) { @@ -84,9 +85,9 @@ func TestWaitForPendingMachines(t *testing.T) { }{ "Fail to list machines because associated machines cannot be retrieved": { callMocks: func(c *test.MockClient) { - c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("Failed to retrieve machines")) + c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(errors.New("failed to retrieve machines")) }, - expectedError: errors.New("Failed to retrieve machines"), + expectedError: errors.New("failed to retrieve machines"), }, "Fail to list machines because machine status cannot be retrieved": { callMocks: func(c *test.MockClient) { @@ -103,7 +104,7 @@ func TestWaitForPendingMachines(t *testing.T) { } c.On("List", mock.IsType(context.Background()), mock.IsType(&v1alpha5.MachineList{}), mock.Anything).Return(nil) - c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("Fail to get machine")) + c.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&v1alpha5.Machine{}), mock.Anything).Return(errors.New("fail to get machine")) }, machineConditions: apis.Conditions{ { @@ -111,7 +112,7 @@ func TestWaitForPendingMachines(t *testing.T) { Status: corev1.ConditionFalse, }, }, - expectedError: errors.New("Fail to get machine"), + expectedError: errors.New("fail to get machine"), }, "Successfully waits for all pending machines": { callMocks: func(c *test.MockClient) { @@ -163,7 +164,7 @@ func TestWaitForPendingMachines(t *testing.T) { } } -func TestGenerateMachineManifiest(t *testing.T) { +func TestGenerateMachineManifest(t *testing.T) { t.Run("Should generate a machine object from the given workspace", func(t *testing.T) { mockWorkspace := test.MockWorkspaceWithPreset diff --git a/pkg/nodeclaim/nodeclaim.go b/pkg/nodeclaim/nodeclaim.go index 747e17ad9..6a2706a5e 100644 --- a/pkg/nodeclaim/nodeclaim.go +++ b/pkg/nodeclaim/nodeclaim.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package nodeclaim import ( @@ -24,10 +25,8 @@ import ( ) const ( - NodePoolName = "default" - LabelGPUProvisionerCustom = "kaito.sh/nodeclaim-type" - LabelNodePoolName = "karpenter.sh/nodepool-name" - GPUString = "gpu" + KaitoNodePoolName = "kaito" + LabelNodePool = "karpenter.sh/nodepool" ErrorInstanceTypesUnavailable = "all requested instance types were unavailable during launch" ) @@ -41,7 +40,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w digest := sha256.Sum256([]byte(workspaceObj.Namespace + workspaceObj.Name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the a workspace nodeClaimName := "ws" + hex.EncodeToString(digest[0:])[0:9] nodeClaimLabels := map[string]string{ - LabelNodePoolName: NodePoolName, + LabelNodePool: KaitoNodePoolName, kaitov1alpha1.LabelWorkspaceName: workspaceObj.Name, kaitov1alpha1.LabelWorkspaceNamespace: workspaceObj.Namespace, } @@ -69,17 +68,9 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w }, { NodeSelectorRequirement: v1.NodeSelectorRequirement{ - Key: LabelNodePoolName, - Operator: v1.NodeSelectorOpIn, - Values: []string{NodePoolName}, - }, - MinValues: lo.ToPtr(1), - }, - { - NodeSelectorRequirement: v1.NodeSelectorRequirement{ - Key: LabelGPUProvisionerCustom, + Key: LabelNodePool, Operator: v1.NodeSelectorOpIn, - Values: []string{GPUString}, + Values: []string{KaitoNodePoolName}, }, MinValues: lo.ToPtr(1), }, @@ -103,7 +94,7 @@ func GenerateNodeClaimManifest(ctx context.Context, storageRequirement string, w Taints: []v1.Taint{ { Key: "sku", - Value: GPUString, + Value: "gpu", Effect: v1.TaintEffectNoSchedule, }, }, diff --git a/pkg/resources/manifests.go b/pkg/resources/manifests.go index 48ecb854f..87cf65740 100644 --- a/pkg/resources/manifests.go +++ b/pkg/resources/manifests.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package resources import ( @@ -48,7 +49,7 @@ func GenerateHeadlessServiceManifest(ctx context.Context, workspaceObj *kaitov1a Name: "torchrun", Protocol: corev1.ProtocolTCP, Port: 29500, - TargetPort: intstr.FromInt(29500), + TargetPort: intstr.FromInt32(29500), }, }, PublishNotReadyAddresses: true, @@ -88,14 +89,14 @@ func GenerateServiceManifest(ctx context.Context, workspaceObj *kaitov1alpha1.Wo Name: "http", Protocol: corev1.ProtocolTCP, Port: 80, - TargetPort: intstr.FromInt(5000), + TargetPort: intstr.FromInt32(5000), }, // Torch NCCL Port { Name: "torch", Protocol: corev1.ProtocolTCP, Port: 29500, - TargetPort: intstr.FromInt(29500), + TargetPort: intstr.FromInt32(29500), }, }, Selector: selector, diff --git a/pkg/resources/nodes.go b/pkg/resources/nodes.go index 769bb1662..b73957044 100644 --- a/pkg/resources/nodes.go +++ b/pkg/resources/nodes.go @@ -1,5 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. + package resources import ( diff --git a/pkg/utils/test/testUtils.go b/pkg/utils/test/testUtils.go index 80023ac37..bf766482f 100644 --- a/pkg/utils/test/testUtils.go +++ b/pkg/utils/test/testUtils.go @@ -156,8 +156,8 @@ var ( } nodeClaimLabels = map[string]string{ - "karpenter.sh/nodepool-name": "default", - "kaito.sh/workspace": "none", + "karpenter.sh/nodepool": "kaito", + "kaito.sh/workspace": "none", } )