Skip to content

Commit

Permalink
chore: refactor to move ragengine to a central package
Browse files Browse the repository at this point in the history
  • Loading branch information
Fei-Guo committed Nov 5, 2024
1 parent 1248109 commit d797f07
Show file tree
Hide file tree
Showing 38 changed files with 127 additions and 123 deletions.
2 changes: 1 addition & 1 deletion cmd/ragengine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/kaito-project/kaito/pkg/k8sclient"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/kaito-project/kaito/pkg/controllers"
"github.com/kaito-project/kaito/pkg/ragengine/controllers"
"github.com/kaito-project/kaito/pkg/webhooks"
"k8s.io/api/apps/v1beta1"
"k8s.io/klog/v2"
Expand Down
60 changes: 5 additions & 55 deletions pkg/controllers/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
)

const (
gpuSkuPrefix = "Standard_N"
nodePluginInstallTimeout = 60 * time.Second
WorkspaceHashAnnotation = "workspace.kaito.io/hash"
WorkspaceNameLabel = "workspace.kaito.io/name"
Expand Down Expand Up @@ -135,9 +134,8 @@ func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *ka
klog.ErrorS(updateErr, "failed to update workspace status", "workspace", klog.KObj(wObj))
return reconcile.Result{}, updateErr
}
// if error is due to machine/nodeClaim instance types unavailability, stop reconcile.
if err.Error() == machine.ErrorInstanceTypesUnavailable ||
err.Error() == nodeclaim.ErrorInstanceTypesUnavailable {
// If the error is due to machine/nodeClaim instance types unavailability, stop reconcile.
if err.Error() == consts.ErrorInstanceTypesUnavailable {
return reconcile.Result{Requeue: false}, err
}
return reconcile.Result{}, err
Expand Down Expand Up @@ -323,54 +321,6 @@ func computeHash(w *kaitov1alpha1.Workspace) string {
return hex.EncodeToString(hasher.Sum(nil))
}

func selectNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node {

sort.Slice(qualified, func(i, j int) bool {
iPreferred := utils.Contains(preferred, qualified[i].Name)
jPreferred := utils.Contains(preferred, qualified[j].Name)

if iPreferred && !jPreferred {
return true
} else if !iPreferred && jPreferred {
return false
} else { // either all are preferred, or none is preferred
iPrevious := utils.Contains(previous, qualified[i].Name)
jPrevious := utils.Contains(previous, qualified[j].Name)

if iPrevious && !jPrevious {
return true
} else if !iPrevious && jPrevious {
return false
} else { // either all are previous, or none is previous
var iCreatedByGPUProvisioner, jCreatedByGPUProvisioner bool
_, iCreatedByGPUProvisioner = qualified[i].Labels[machine.LabelGPUProvisionerCustom]
_, jCreatedByGPUProvisioner = qualified[j].Labels[machine.LabelGPUProvisionerCustom]
// Choose node created by gpu-provisioner and karpenter since it is more likely to be empty to use.
var iCreatedByKarpenter, jCreatedByKarpenter bool
if featuregates.FeatureGates[consts.FeatureFlagKarpenter] {
_, iCreatedByKarpenter = qualified[i].Labels[nodeclaim.LabelNodePool]
_, jCreatedByKarpenter = qualified[j].Labels[nodeclaim.LabelNodePool]
}
if (iCreatedByGPUProvisioner && !jCreatedByGPUProvisioner) ||
(iCreatedByKarpenter && !jCreatedByKarpenter) {
return true
} else if (!iCreatedByGPUProvisioner && jCreatedByGPUProvisioner) ||
(!iCreatedByKarpenter && jCreatedByKarpenter) {
return false
} else {
return qualified[i].Name < qualified[j].Name
}
}
}
})

if len(qualified) <= count {
return qualified
}

return qualified[0:count]
}

// applyWorkspaceResource applies workspace resource spec.
func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj *kaitov1alpha1.Workspace) error {

Expand All @@ -392,7 +342,7 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj *
return err
}

selectedNodes := selectNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count))
selectedNodes := utils.SelectNodes(validNodes, wObj.Resource.PreferredNodes, wObj.Status.WorkerNodes, lo.FromPtr(wObj.Resource.Count))

newNodesCount := lo.FromPtr(wObj.Resource.Count) - len(selectedNodes)

Expand Down Expand Up @@ -427,7 +377,7 @@ func (c *WorkspaceReconciler) applyWorkspaceResource(ctx context.Context, wObj *
}

// Ensure all gpu plugins are running successfully.
if strings.Contains(wObj.Resource.InstanceType, gpuSkuPrefix) { // GPU skus
if strings.Contains(wObj.Resource.InstanceType, consts.GpuSkuPrefix) { // GPU skus
for i := range selectedNodes {
err = c.ensureNodePlugins(ctx, wObj, selectedNodes[i])
if err != nil {
Expand Down Expand Up @@ -612,7 +562,7 @@ RetryWithDifferentName:
// ensureNodePlugins ensures node plugins are installed.
func (c *WorkspaceReconciler) ensureNodePlugins(ctx context.Context, wObj *kaitov1alpha1.Workspace, nodeObj *corev1.Node) error {
timeClock := clock.RealClock{}
tick := timeClock.NewTicker(nodePluginInstallTimeout)
tick := timeClock.NewTicker(consts.NodePluginInstallTimeout)
defer tick.Stop()

for {
Expand Down
29 changes: 14 additions & 15 deletions pkg/controllers/workspace_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"github.com/kaito-project/kaito/api/v1alpha1"
kaitov1alpha1 "github.com/kaito-project/kaito/api/v1alpha1"
"github.com/kaito-project/kaito/pkg/featuregates"
"github.com/kaito-project/kaito/pkg/machine"
"github.com/kaito-project/kaito/pkg/nodeclaim"
"github.com/kaito-project/kaito/pkg/utils"
"github.com/kaito-project/kaito/pkg/utils/consts"
"github.com/kaito-project/kaito/pkg/utils/test"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -178,7 +177,7 @@ func TestSelectWorkspaceNodes(t *testing.T) {
ObjectMeta: v1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
machine.LabelGPUProvisionerCustom: consts.GPUString,
consts.LabelGPUProvisionerCustom: consts.GPUString,
},
},
},
Expand All @@ -204,7 +203,7 @@ func TestSelectWorkspaceNodes(t *testing.T) {
ObjectMeta: v1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
machine.LabelGPUProvisionerCustom: consts.GPUString,
consts.LabelGPUProvisionerCustom: consts.GPUString,
},
},
},
Expand All @@ -230,7 +229,7 @@ func TestSelectWorkspaceNodes(t *testing.T) {
ObjectMeta: v1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
machine.LabelGPUProvisionerCustom: consts.GPUString,
consts.LabelGPUProvisionerCustom: consts.GPUString,
},
},
},
Expand All @@ -252,15 +251,15 @@ func TestSelectWorkspaceNodes(t *testing.T) {
ObjectMeta: v1.ObjectMeta{
Name: "node2",
Labels: map[string]string{
nodeclaim.LabelNodePool: nodeclaim.KaitoNodePoolName,
consts.LabelNodePool: consts.KaitoNodePoolName,
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
machine.LabelGPUProvisionerCustom: consts.GPUString,
consts.LabelGPUProvisionerCustom: consts.GPUString,
},
},
},
Expand All @@ -287,7 +286,7 @@ func TestSelectWorkspaceNodes(t *testing.T) {
ObjectMeta: v1.ObjectMeta{
Name: "node3",
Labels: map[string]string{
nodeclaim.LabelNodePool: nodeclaim.KaitoNodePoolName,
consts.LabelNodePool: consts.KaitoNodePoolName,
},
},
},
Expand All @@ -304,7 +303,7 @@ func TestSelectWorkspaceNodes(t *testing.T) {
t.Run(k, func(t *testing.T) {
featuregates.FeatureGates[consts.FeatureFlagKarpenter] = tc.karpenterFeatureGates

selectedNodes := selectNodes(tc.qualified, tc.preferred, tc.previous, tc.count)
selectedNodes := utils.SelectNodes(tc.qualified, tc.preferred, tc.previous, tc.count)

selectedNodesArray := []string{}

Expand Down Expand Up @@ -342,11 +341,11 @@ func TestCreateAndValidateMachineNode(t *testing.T) {
{
Type: v1alpha5.MachineLaunched,
Status: corev1.ConditionFalse,
Message: machine.ErrorInstanceTypesUnavailable,
Message: consts.ErrorInstanceTypesUnavailable,
},
},
workspace: *test.MockWorkspaceWithPreset,
expectedError: errors.New(machine.ErrorInstanceTypesUnavailable),
expectedError: errors.New(consts.ErrorInstanceTypesUnavailable),
},
"A machine is successfully created": {
callMocks: func(c *test.MockClient) {
Expand Down Expand Up @@ -415,12 +414,12 @@ func TestCreateAndValidateMachineNode(t *testing.T) {
{
Type: v1beta1.Launched,
Status: corev1.ConditionFalse,
Message: nodeclaim.ErrorInstanceTypesUnavailable,
Message: consts.ErrorInstanceTypesUnavailable,
},
},
workspace: *test.MockWorkspaceWithPreset,
karpenterFeatureGates: true,
expectedError: errors.New(nodeclaim.ErrorInstanceTypesUnavailable),
expectedError: errors.New(consts.ErrorInstanceTypesUnavailable),
},
}

Expand Down Expand Up @@ -486,11 +485,11 @@ func TestCreateAndValidateNodeClaimNode(t *testing.T) {
{
Type: v1beta1.Launched,
Status: corev1.ConditionFalse,
Message: nodeclaim.ErrorInstanceTypesUnavailable,
Message: consts.ErrorInstanceTypesUnavailable,
},
},
workspace: *test.MockWorkspaceWithPreset,
expectedError: errors.New(nodeclaim.ErrorInstanceTypesUnavailable),
expectedError: errors.New(consts.ErrorInstanceTypesUnavailable),
},
"A nodeClaim is successfully created": {
callMocks: func(c *test.MockClient) {
Expand Down
27 changes: 10 additions & 17 deletions pkg/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
ProvisionerName = "default"
LabelGPUProvisionerCustom = "kaito.sh/machine-type"
LabelProvisionerName = "karpenter.sh/provisioner-name"
ErrorInstanceTypesUnavailable = "all requested instance types were unavailable during launch"
)

var (
// machineStatusTimeoutInterval is the interval to check the machine status.
machineStatusTimeoutInterval = 240 * time.Second
Expand All @@ -51,9 +44,9 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj
digest := sha256.Sum256([]byte(namespace + name + time.Now().Format("2006-01-02 15:04:05.000000000"))) // We make sure the nodeClaim name is not fixed to the object
machineName := "ws" + hex.EncodeToString(digest[0:])[0:9]
machineLabels := map[string]string{
LabelProvisionerName: ProvisionerName,
nameLabel: name,
namespaceLabel: namespace,
consts.LabelProvisionerName: consts.ProvisionerName,
nameLabel: name,
namespaceLabel: namespace,
}

if labelSelector != nil && len(labelSelector.MatchLabels) != 0 {
Expand All @@ -77,12 +70,12 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj
Values: []string{instanceType},
},
{
Key: LabelProvisionerName,
Key: consts.LabelProvisionerName,
Operator: v1.NodeSelectorOpIn,
Values: []string{ProvisionerName},
Values: []string{consts.ProvisionerName},
},
{
Key: LabelGPUProvisionerCustom,
Key: consts.LabelGPUProvisionerCustom,
Operator: v1.NodeSelectorOpIn,
Values: []string{consts.GPUString},
},
Expand Down Expand Up @@ -117,7 +110,7 @@ func GenerateMachineManifest(ctx context.Context, storageRequirement string, obj
func CreateMachine(ctx context.Context, machineObj *v1alpha5.Machine, kubeClient client.Client) error {
klog.InfoS("CreateMachine", "machine", klog.KObj(machineObj))
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
return err.Error() != ErrorInstanceTypesUnavailable
return err.Error() != consts.ErrorInstanceTypesUnavailable
}, func() error {
err := kubeClient.Create(ctx, machineObj, &client.CreateOptions{})
if err != nil {
Expand All @@ -131,11 +124,11 @@ func CreateMachine(ctx context.Context, machineObj *v1alpha5.Machine, kubeClient
// if SKU is not available, then exit.
_, conditionFound := lo.Find(updatedObj.GetConditions(), func(condition apis.Condition) bool {
return condition.Type == v1alpha5.MachineLaunched &&
condition.Status == v1.ConditionFalse && condition.Message == ErrorInstanceTypesUnavailable
condition.Status == v1.ConditionFalse && condition.Message == consts.ErrorInstanceTypesUnavailable
})
if conditionFound {
klog.Error(ErrorInstanceTypesUnavailable, "reconcile will not continue")
return fmt.Errorf(ErrorInstanceTypesUnavailable)
klog.Error(consts.ErrorInstanceTypesUnavailable, "reconcile will not continue")
return fmt.Errorf(consts.ErrorInstanceTypesUnavailable)
}
return err
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/machine/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"testing"

"github.com/kaito-project/kaito/pkg/utils/consts"
"github.com/kaito-project/kaito/pkg/utils/test"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
Expand Down Expand Up @@ -39,10 +40,10 @@ func TestCreateMachine(t *testing.T) {
{
Type: v1alpha5.MachineLaunched,
Status: corev1.ConditionFalse,
Message: ErrorInstanceTypesUnavailable,
Message: consts.ErrorInstanceTypesUnavailable,
},
},
expectedError: errors.New(ErrorInstanceTypesUnavailable),
expectedError: errors.New(consts.ErrorInstanceTypesUnavailable),
},
"A machine is successfully created": {
callMocks: func(c *test.MockClient) {
Expand Down
Loading

0 comments on commit d797f07

Please sign in to comment.