Skip to content

Commit

Permalink
RAG engine deployment
Browse files Browse the repository at this point in the history
Signed-off-by: Bangqi Zhu <[email protected]>
  • Loading branch information
Bangqi Zhu committed Nov 7, 2024
1 parent 38fae09 commit 2140883
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 0 deletions.
5 changes: 5 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
// WorkspaceConditionTypeInferenceStatus is the state when Inference service has been ready.
WorkspaceConditionTypeInferenceStatus = ConditionType("InferenceReady")

// RAGConditionTypeServiceStatus is the state when RAG Engine service has been ready.
RAGConditionTypeServiceStatus = ConditionType("RAGEngineServiceReady")

// WorkspaceConditionTypeTuningJobStatus is the state when the tuning job starts normally.
WorkspaceConditionTypeTuningJobStatus ConditionType = ConditionType("JobStarted")

Expand All @@ -32,4 +35,6 @@ const (
//For inference, the "True" condition means the inference service is ready to serve requests.
//For fine tuning, the "True" condition means the tuning job completes successfully.
WorkspaceConditionTypeSucceeded ConditionType = ConditionType("WorkspaceSucceeded")

RAGEngineConditionTypeSucceeded ConditionType = ConditionType("RAGEngineSucceeded")
)
100 changes: 100 additions & 0 deletions pkg/ragengine/controllers/preset-rag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package controllers

import (
"context"

"github.com/kaito-project/kaito/pkg/utils"
"github.com/kaito-project/kaito/pkg/utils/consts"

kaitov1alpha1 "github.com/kaito-project/kaito/api/v1alpha1"
"github.com/kaito-project/kaito/pkg/resources"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
ProbePath = "/health"
Port5000 = int32(5000)
)

var (
containerPorts = []corev1.ContainerPort{{
ContainerPort: Port5000,
},
}

livenessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(5000),
Path: ProbePath,
},
},
InitialDelaySeconds: 600, // 10 minutes
PeriodSeconds: 10,
}

readinessProbe = &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(5000),
Path: ProbePath,
},
},
InitialDelaySeconds: 30,
PeriodSeconds: 10,
}

tolerations = []corev1.Toleration{
{
Effect: corev1.TaintEffectNoSchedule,
Operator: corev1.TolerationOpExists,
Key: resources.CapacityNvidiaGPU,
},
{
Effect: corev1.TaintEffectNoSchedule,
Value: consts.GPUString,
Key: consts.SKUString,
Operator: corev1.TolerationOpEqual,
},
}
)

func CreatePresetRAG(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine, kubeClient client.Client) (client.Object, error) {
// TODO: use real data instead of dummy ones
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*ragEngineObj.Spec.Compute.Count)
if shmVolume.Name != "" {
volumes = append(volumes, shmVolume)
}
if shmVolumeMount.Name != "" {
volumeMounts = append(volumeMounts, shmVolumeMount)
}

commands := utils.ShellCmd("python3 main.py")
resourceReq := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse("2"),
},
Limits: corev1.ResourceList{
corev1.ResourceName(resources.CapacityNvidiaGPU): resource.MustParse("2"),
},
}

image := "mcr.microsoft.com/aks/kaito/kaito-rag-service:0.0.1"
imagePullSecretRefs := []corev1.LocalObjectReference{}

depObj := resources.GenerateRAGDeploymentManifest(ctx, ragEngineObj, image, imagePullSecretRefs, *ragEngineObj.Spec.Compute.Count, commands,
containerPorts, livenessProbe, readinessProbe, resourceReq, tolerations, volumes, volumeMounts)

err := resources.CreateResource(ctx, depObj, kubeClient)
if client.IgnoreAlreadyExists(err) != nil {
return nil, err
}
return depObj, nil
}
148 changes: 148 additions & 0 deletions pkg/ragengine/controllers/ragengine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kaito-project/kaito/pkg/utils"
"github.com/kaito-project/kaito/pkg/utils/consts"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -98,11 +99,158 @@ func (c *RAGEngineReconciler) ensureFinalizer(ctx context.Context, ragEngineObj
func (c *RAGEngineReconciler) addRAGEngine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) (reconcile.Result, error) {
err := c.applyRAGEngineResource(ctx, ragEngineObj)
if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGEngineConditionTypeSucceeded, metav1.ConditionFalse,
"ragengineFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return reconcile.Result{}, updateErr
}
// if error is due to machine/nodeClaim instance types unavailability, stop reconcile.
if err.Error() == consts.ErrorInstanceTypesUnavailable || err.Error() == consts.ErrorInstanceTypesUnavailable {
return reconcile.Result{Requeue: false}, err
}
return reconcile.Result{}, err
}
if err = c.applyRAG(ctx, ragEngineObj); err != nil {
// TODO: revision
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGEngineConditionTypeSucceeded, metav1.ConditionFalse,
"ragengineFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return reconcile.Result{}, updateErr
}
return reconcile.Result{}, err
}

if err = c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGEngineConditionTypeSucceeded, metav1.ConditionTrue,
"ragengineSucceeded", "ragengine succeeds"); err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *RAGEngineReconciler) applyRAG(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) error {
var err error
func() {

var existingObj client.Object

if err = resources.GetResource(ctx, ragEngineObj.Name, ragEngineObj.Namespace, c.Client, existingObj); err == nil {
klog.InfoS("An inference workload already exists for ragengine", "ragengine", klog.KObj(ragEngineObj))

deployment := existingObj.(*appsv1.Deployment)

var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*ragEngineObj.Spec.Compute.Count)
if shmVolume.Name != "" {
volumes = append(volumes, shmVolume)
}
if shmVolumeMount.Name != "" {
volumeMounts = append(volumeMounts, shmVolumeMount)
}
var embeddingType string
var envs []corev1.EnvVar
if ragEngineObj.Spec.Embedding.Remote != nil {
embeddingType = "remote"
if ragEngineObj.Spec.Embedding.Local.ModelID != "" {
modelID := ragEngineObj.Spec.Embedding.Local.ModelID
modelIDEnv := corev1.EnvVar{
Name: "MODEL_ID",
Value: modelID,
}
envs = append(envs, modelIDEnv)
}
if ragEngineObj.Spec.Embedding.Local.ModelAccessSecret != "" {
accessSecret := ragEngineObj.Spec.Embedding.Local.ModelAccessSecret
accessSecretEnv := corev1.EnvVar{
Name: "ACCESS_SECRET",
Value: accessSecret,
}
envs = append(envs, accessSecretEnv)
}
} else if ragEngineObj.Spec.Embedding.Local != nil {
embeddingType = "local"
// TODO: Model ID Env
}
embeddingTypeEnv := corev1.EnvVar{
Name: "EMBEDDING_TYPE",
Value: embeddingType,
}
envs = append(envs, embeddingTypeEnv)

stoageEnv := corev1.EnvVar{
Name: "VECTOR_DB_TYPE",
Value: "faiss", // TODO: get storage done
}
envs = append(envs, stoageEnv)

inferenceServiceURL := ragEngineObj.Spec.InferenceService.URL
inferenceServiceURLEnv := corev1.EnvVar{
Name: "INFERENCE_URL",
Value: inferenceServiceURL,
}
envs = append(envs, inferenceServiceURLEnv)

if ragEngineObj.Spec.InferenceService.AccessSecret != "" {
accessSecretEnv := corev1.EnvVar{
Name: "INFERENCE_ACCESS_SECRET",
Value: ragEngineObj.Spec.InferenceService.AccessSecret,
}
envs = append(envs, accessSecretEnv)
}
if ragEngineObj.Spec.IndexServiceName != "" {
indexServiceNameEnv := corev1.EnvVar{
Name: "INDEX_SERVICE_NAME",
Value: ragEngineObj.Spec.IndexServiceName,
}
envs = append(envs, indexServiceNameEnv)
}

spec := &deployment.Spec

spec.Template.Spec.Containers[0].Env = envs
spec.Template.Spec.Containers[0].VolumeMounts = volumeMounts

spec.Template.Spec.Volumes = volumes

if err := c.Update(ctx, deployment); err != nil {
return
}

if err = resources.CheckResourceStatus(existingObj, c.Client, time.Duration(10)*time.Minute); err != nil {
return
}
} else if apierrors.IsNotFound(err) {
var workloadObj client.Object
// Need to create a new workload
workloadObj, err = CreatePresetRAG(ctx, ragEngineObj, c.Client)
if err != nil {
return
}
if err = resources.CheckResourceStatus(workloadObj, c.Client, time.Duration(10)*time.Minute); err != nil {
return
}
}

}()

if err != nil {
if updateErr := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGConditionTypeServiceStatus, metav1.ConditionFalse,
"RAGEngineServiceStatusFailed", err.Error()); updateErr != nil {
klog.ErrorS(updateErr, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return updateErr
} else {
return err
}
}

if err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGConditionTypeServiceStatus, metav1.ConditionTrue,
"RAGEngineServiceStatusSuccess", "Service has been deployed successfully"); err != nil {
klog.ErrorS(err, "failed to update ragengine status", "ragengine", klog.KObj(ragEngineObj))
return err
}
return nil
}
func (c *RAGEngineReconciler) deleteRAGEngine(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine) (reconcile.Result, error) {
klog.InfoS("deleteRAGEngine", "ragengine", klog.KObj(ragEngineObj))
err := c.updateStatusConditionIfNotMatch(ctx, ragEngineObj, kaitov1alpha1.RAGEngineConditionTypeDeleting, metav1.ConditionTrue, "ragengineDeleted", "ragengine is being deleted")
Expand Down
92 changes: 92 additions & 0 deletions pkg/resources/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,98 @@ func GenerateDeploymentManifest(ctx context.Context, workspaceObj *kaitov1alpha1
}
}

func GenerateRAGDeploymentManifest(ctx context.Context, ragEngineObj *kaitov1alpha1.RAGEngine, imageName string,
imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort,
livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements,
tolerations []corev1.Toleration, volumes []corev1.Volume, volumeMount []corev1.VolumeMount) *appsv1.Deployment {

nodeRequirements := make([]corev1.NodeSelectorRequirement, 0, len(ragEngineObj.Spec.Compute.LabelSelector.MatchLabels))
for key, value := range ragEngineObj.Spec.Compute.LabelSelector.MatchLabels {
nodeRequirements = append(nodeRequirements, corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{value},
})
}

selector := map[string]string{
kaitov1alpha1.LabelRAGEngineName: ragEngineObj.Name,
}
labelselector := &v1.LabelSelector{
MatchLabels: selector,
}
initContainers := []corev1.Container{}
envs := []corev1.EnvVar{}

return &appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{
Name: ragEngineObj.Name,
Namespace: ragEngineObj.Namespace,
OwnerReferences: []v1.OwnerReference{
{
APIVersion: kaitov1alpha1.GroupVersion.String(),
Kind: "RAGEngine",
UID: ragEngineObj.UID,
Name: ragEngineObj.Name,
Controller: &controller,
},
},
},
Spec: appsv1.DeploymentSpec{
Replicas: lo.ToPtr(int32(replicas)),
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
RollingUpdate: &appsv1.RollingUpdateDeployment{
MaxSurge: &intstr.IntOrString{
Type: intstr.Int,
IntVal: 0,
},
MaxUnavailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: 1,
},
}, // Configuration for rolling updates: allows no extra pods during the update and permits at most one unavailable pod at a time。
},
Selector: labelselector,
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: selector,
},
Spec: corev1.PodSpec{
ImagePullSecrets: imagePullSecretRefs,
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: nodeRequirements,
},
},
},
},
},
InitContainers: initContainers,
Containers: []corev1.Container{
{
Name: ragEngineObj.Name,
Image: imageName,
Command: commands,
Resources: resourceRequirements,
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
Ports: containerPorts,
VolumeMounts: volumeMount,
Env: envs,
},
},
Tolerations: tolerations,
Volumes: volumes,
},
},
},
}
}

func GenerateInitContainers(wObj *kaitov1alpha1.Workspace, volumeMount []corev1.VolumeMount) ([]corev1.Container, []corev1.EnvVar) {
var initContainers []corev1.Container
var envs []corev1.EnvVar
Expand Down

0 comments on commit 2140883

Please sign in to comment.