diff --git a/pkg/backends/k8s.go b/pkg/backends/k8s.go index ca578026..47e7e72d 100644 --- a/pkg/backends/k8s.go +++ b/pkg/backends/k8s.go @@ -24,6 +24,7 @@ import ( "github.com/goccy/go-yaml" "github.com/grycap/oscar/v2/pkg/imagepuller" "github.com/grycap/oscar/v2/pkg/types" + "github.com/grycap/oscar/v2/pkg/utils" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -112,7 +113,23 @@ func (k *KubeBackend) CreateService(service types.Service) error { } return err } - + //Create an expose service + if service.ExposeOptions.MaxReplicas != 0 { + exposeConf := utils.Expose{ + Name: service.Name, + NameSpace: k.namespace, + Variables: service.Environment.Vars, + Image: service.Image, + MaxReplicas: service.ExposeOptions.MaxReplicas, + } + if service.ExposeOptions.Port != 0 { + exposeConf.Port = service.ExposeOptions.Port + } + if service.ExposeOptions.TopCPU != 0 { + exposeConf.TopCPU = service.ExposeOptions.TopCPU + } + utils.CreateExpose(exposeConf, k.kubeClientset, *k.config) + } //Create deaemonset to cache the service image on all the nodes if service.ImagePrefetch { err = imagepuller.CreateDaemonset(k.config, service, k.kubeClientset) @@ -187,6 +204,24 @@ func (k *KubeBackend) UpdateService(service types.Service) error { return err } + //Update an expose service + if service.ExposeOptions.MaxReplicas != 0 { + exposeConf := utils.Expose{ + Name: service.Name, + NameSpace: k.namespace, + Variables: service.Environment.Vars, + Image: service.Image, + MaxReplicas: service.ExposeOptions.MaxReplicas, + } + if service.ExposeOptions.Port != 0 { + exposeConf.Port = service.ExposeOptions.Port + } + if service.ExposeOptions.TopCPU != 0 { + exposeConf.TopCPU = service.ExposeOptions.TopCPU + } + utils.UpdateExpose(exposeConf, k.kubeClientset) + } + return nil } @@ -205,7 +240,13 @@ func (k *KubeBackend) DeleteService(name string) error { if err := deleteServiceJobs(name, k.namespace, k.kubeClientset); err != nil { log.Printf("Error deleting associated jobs for service \"%s\": %v\n", name, err) } - + exposeConf := utils.Expose{ + Name: name, + NameSpace: k.namespace, + } + if err2 := utils.DeleteExpose(exposeConf, k.kubeClientset); err2 != nil { + log.Printf("Error deleting all associated kubernetes component of an exposed service \"%s\": %v\n", name, err2) + } return nil } diff --git a/pkg/backends/knative.go b/pkg/backends/knative.go index d445e47f..6366d016 100644 --- a/pkg/backends/knative.go +++ b/pkg/backends/knative.go @@ -25,6 +25,7 @@ import ( "github.com/grycap/oscar/v2/pkg/imagepuller" "github.com/grycap/oscar/v2/pkg/types" + "github.com/grycap/oscar/v2/pkg/utils" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -119,6 +120,24 @@ func (kn *KnativeBackend) CreateService(service types.Service) error { return err } + //Create an expose service + if service.ExposeOptions.MaxReplicas != 0 { + exposeConf := utils.Expose{ + Name: service.Name, + NameSpace: kn.namespace, + Variables: service.Environment.Vars, + Image: service.Image, + MaxReplicas: service.ExposeOptions.MaxReplicas, + } + if service.ExposeOptions.Port != 0 { + exposeConf.Port = service.ExposeOptions.Port + } + if service.ExposeOptions.TopCPU != 0 { + exposeConf.TopCPU = service.ExposeOptions.TopCPU + } + utils.CreateExpose(exposeConf, kn.kubeClientset, *kn.config) + + } //Create deaemonset to cache the service image on all the nodes if service.ImagePrefetch { err = imagepuller.CreateDaemonset(kn.config, service, kn.kubeClientset) @@ -195,6 +214,24 @@ func (kn *KnativeBackend) UpdateService(service types.Service) error { return err } + //Update an expose service + if service.ExposeOptions.MaxReplicas != 0 { + exposeConf := utils.Expose{ + Name: service.Name, + NameSpace: kn.namespace, + Variables: service.Environment.Vars, + Image: service.Image, + MaxReplicas: service.ExposeOptions.MaxReplicas, + } + if service.ExposeOptions.Port != 0 { + exposeConf.Port = service.ExposeOptions.Port + } + if service.ExposeOptions.TopCPU != 0 { + exposeConf.TopCPU = service.ExposeOptions.TopCPU + } + utils.UpdateExpose(exposeConf, kn.kubeClientset) + } + return nil } @@ -213,6 +250,14 @@ func (kn *KnativeBackend) DeleteService(name string) error { if err := deleteServiceJobs(name, kn.namespace, kn.kubeClientset); err != nil { log.Printf("Error deleting associated jobs for service \"%s\": %v\n", name, err) } + exposeConf := utils.Expose{ + Name: name, + NameSpace: kn.namespace, + Image: "service.Image", + } + if err2 := utils.DeleteExpose(exposeConf, kn.kubeClientset); err2 != nil { + log.Printf("Error deleting all associated kubernetes component of an exposed service \"%s\": %v\n", name, err2) + } return nil } diff --git a/pkg/imagepuller/daemonset.go b/pkg/imagepuller/daemonset.go index 386e8b24..7ce79888 100644 --- a/pkg/imagepuller/daemonset.go +++ b/pkg/imagepuller/daemonset.go @@ -137,7 +137,6 @@ func watchPods(kubeClientset kubernetes.Interface, cfg *types.Config) { sharedInformerOp := informers.WithTweakListOptions(optionsFunc) factory := informers.NewSharedInformerFactoryWithOptions(kubeClientset, 2*time.Second, informers.WithNamespace(cfg.ServicesNamespace), sharedInformerOp) - //factory := informers.NewSharedInformerFactory(kubeClientset, 2*time.Second) podInformer := factory.Core().V1().Pods().Informer() factory.Start(stopper) diff --git a/pkg/types/config.go b/pkg/types/config.go index 536b3a79..fd225255 100644 --- a/pkg/types/config.go +++ b/pkg/types/config.go @@ -175,6 +175,9 @@ type Config struct { // Groups defined in the "eduperson_entitlement" OIDC scope, // as described here: https://docs.egi.eu/providers/check-in/sp/#10-groups OIDCGroups []string `json:"-"` + + // + IngressHost string `json:"-"` } var configVars = []configVar{ @@ -219,6 +222,7 @@ var configVars = []configVar{ {"OIDCIssuer", "OIDC_ISSUER", false, stringType, "https://aai.egi.eu/oidc/"}, {"OIDCSubject", "OIDC_SUBJECT", false, stringType, ""}, {"OIDCGroups", "OIDC_GROUPS", false, stringSliceType, ""}, + {"IngressHost", "INGRESS_HOST", false, stringType, ""}, } func readConfigVar(cfgVar configVar) (string, error) { diff --git a/pkg/types/service.go b/pkg/types/service.go index 6f9a7fb0..bff07942 100644 --- a/pkg/types/service.go +++ b/pkg/types/service.go @@ -202,6 +202,12 @@ type Service struct { // Optional ImagePullSecrets []string `json:"image_pull_secrets,omitempty"` + ExposeOptions struct { + MaxReplicas int `json:"max_replicas" ` + Port int `json:"port" ` + TopCPU int32 `json:"top_cpu" ` + } `json:"expose_options"` + // The user-defined environment variables assigned to the service // Optional Environment struct { @@ -240,7 +246,7 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) { { Name: ContainerName, Image: service.Image, - Env: convertEnvVars(service.Environment.Vars), + Env: ConvertEnvVars(service.Environment.Vars), VolumeMounts: []v1.VolumeMount{ { Name: VolumeName, @@ -299,7 +305,7 @@ func (service *Service) GetMinIOWebhookARN() string { return fmt.Sprintf("arn:minio:sqs:%s:%s:webhook", service.StorageProviders.MinIO[DefaultProvider].Region, service.Name) } -func convertEnvVars(vars map[string]string) []v1.EnvVar { +func ConvertEnvVars(vars map[string]string) []v1.EnvVar { envVars := []v1.EnvVar{} for k, v := range vars { envVars = append(envVars, v1.EnvVar{ diff --git a/pkg/types/service_test.go b/pkg/types/service_test.go index 8f7770b2..39156f4b 100644 --- a/pkg/types/service_test.go +++ b/pkg/types/service_test.go @@ -187,7 +187,7 @@ func TestConvertEnvVars(t *testing.T) { {Name: "TEST", Value: "test"}, } - res := convertEnvVars(vars) + res := ConvertEnvVars(vars) if res[0].Name != expected[0].Name && res[0].Value != expected[0].Value { t.Errorf("invalid conversion of environment variables. Expected: %v, got %v", expected, res) @@ -240,6 +240,10 @@ script: testscript image_pull_secrets: - testcred1 - testcred2 +expose_options: + max_replicas: 0 + port: 0 + top_cpu: 0 environment: Variables: TEST_VAR: testvalue @@ -345,7 +349,7 @@ func checkEnvVars(cfg *Config, podSpec *v1.PodSpec) error { case "max_inflight": expected = strconv.Itoa(cfg.WatchdogMaxInflight) if envVar.Value != expected { - return fmt.Errorf("the max_inflight environment variable has not the correct value. Expected: %s, got: %s", expected, envVar.Value) + return fmt.Errorf("componenteax_inflight environment variable has not the correct value. Expected: %s, got: %s", expected, envVar.Value) } case "write_debug": expected = strconv.FormatBool(cfg.WatchdogWriteDebug) diff --git a/pkg/utils/expose.go b/pkg/utils/expose.go new file mode 100644 index 00000000..26c0cf89 --- /dev/null +++ b/pkg/utils/expose.go @@ -0,0 +1,469 @@ +/* +Copyright (C) GRyCAP - I3M - UPV + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + "log" + + "github.com/grycap/oscar/v2/pkg/types" + apps "k8s.io/api/apps/v1" + autos "k8s.io/api/autoscaling/v1" + v1 "k8s.io/api/core/v1" + net "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" +) + +type Expose struct { + Name string `json:"name" binding:"required"` + NameSpace string `json:"namespace" binding:"required"` + Image string `json:"image" ` + Variables map[string]string + MaxReplicas int `json:"maxreplicas" default:"10"` + Port int `json:"port" default:"80"` + TopCPU int32 `json:"top_cpu" default:"80"` +} + +// / Main function that creates all the kubernetes components +func CreateExpose(expose Expose, kubeClientset kubernetes.Interface, cfg types.Config) error { + + err := createDeployment(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + err = createService(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + err = createIngress(expose, kubeClientset, cfg) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + return nil +} + +// /Main function that deletes all the kubernetes components +func DeleteExpose(expose Expose, kubeClientset kubernetes.Interface) error { + err := deleteDeployment(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + err = deleteService(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + err = deleteIngress(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + return nil +} + +// /Main function that updates all the kubernetes components +func UpdateExpose(expose Expose, kubeClientset kubernetes.Interface) error { + err := updateDeployment(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + err2 := updateService(expose, kubeClientset) + if err2 != nil { + log.Printf("WARNING: %v\n", err2) + return err2 + } + return nil +} + +// /Main function that list all the kubernetes components + +func ListExpose(expose Expose, kubeClientset kubernetes.Interface) error { + deploy, hpa, err := listDeployments(expose, kubeClientset) + + services, err2 := listServices(expose, kubeClientset) + ingress, err3 := listIngress(expose, kubeClientset) + if err != nil { + log.Printf("WARNING: %v\n", err) + return err + } + if err2 != nil { + log.Printf("WARNING: %v\n", err2) + return err + } + if err3 != nil { + log.Printf("WARNING: %v\n", err3) + return err + } + fmt.Println(deploy, hpa, services, ingress) + return nil + +} + +//////////// Deployment + +/// Create deployment and horizontal autoscale + +func createDeployment(e Expose, client kubernetes.Interface) error { + deployment := getDeployment(e) + _, err := client.AppsV1().Deployments(e.NameSpace).Create(context.TODO(), deployment, metav1.CreateOptions{}) + if err != nil { + return err + } + + hpa := getHortizontalAutoScale(e) + _, err = client.AutoscalingV1().HorizontalPodAutoscalers(e.NameSpace).Create(context.TODO(), hpa, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +// Return the component deployment, ready to create or update +func getDeployment(e Expose) *apps.Deployment { + name_deployment := getNameDeployment(e.Name) + var replicas int32 = 1 + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name_deployment, + Namespace: e.NameSpace, + }, + Spec: apps.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "oscar-svc-exp-" + e.Name, + }, + }, + Template: getPodTemplateSpec(e), + }, + Status: apps.DeploymentStatus{}, + } + return deployment +} + +// Return the component HorizontalAutoScale, ready to create or update +func getHortizontalAutoScale(e Expose) *autos.HorizontalPodAutoscaler { + name_hpa := getNameHPA(e.Name) + name_deployment := getNameDeployment(e.Name) + var minReplicas int32 = 1 + hpa := &autos.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: name_hpa, + Namespace: e.NameSpace, + }, + Spec: autos.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autos.CrossVersionObjectReference{ + Kind: "Deployment", + Name: name_deployment, + APIVersion: "apps/v1", + }, + MinReplicas: &minReplicas, + MaxReplicas: int32(e.MaxReplicas), + TargetCPUUtilizationPercentage: &e.TopCPU, + }, + Status: autos.HorizontalPodAutoscalerStatus{}, + } + return hpa +} + +// Return the Pod spec inside of deployment, ready to create or update + +func getPodTemplateSpec(e Expose) v1.PodTemplateSpec { + var ports v1.ContainerPort = v1.ContainerPort{ + Name: "port", + ContainerPort: int32(e.Port), + } + cores := resource.NewMilliQuantity(500, resource.DecimalSI) + var container v1.Container = v1.Container{ + Name: e.Name, + Image: e.Image, + Env: types.ConvertEnvVars(e.Variables), + Ports: []v1.ContainerPort{ports}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "cpu": *cores, + }, + }, + } + template := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: e.Name, + Namespace: e.Image, + Labels: map[string]string{ + "app": "oscar-svc-exp-" + e.Name, + }, + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{}, + Containers: []v1.Container{container}, + }, + } + return template +} + +// / List deployment and the horizontal auto scale +func listDeployments(e Expose, client kubernetes.Interface) (*apps.DeploymentList, *autos.HorizontalPodAutoscalerList, error) { + deployment, err := client.AppsV1().Deployments(e.NameSpace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, nil, err + } + + hpa, err2 := client.AutoscalingV1().HorizontalPodAutoscalers(e.NameSpace).List(context.TODO(), metav1.ListOptions{}) + if err2 != nil { + return nil, nil, err2 + } + return deployment, hpa, nil +} + +// Delete Deployment and HPA +func deleteDeployment(e Expose, client kubernetes.Interface) error { + name_hpa := getNameHPA(e.Name) + err := client.AutoscalingV1().HorizontalPodAutoscalers(e.NameSpace).Delete(context.TODO(), name_hpa, metav1.DeleteOptions{}) + if err != nil { + return err + } + deployment := getNameDeployment(e.Name) + err = client.AppsV1().Deployments(e.NameSpace).Delete(context.TODO(), deployment, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +///Update Deployment and HPA + +func updateDeployment(e Expose, client kubernetes.Interface) error { + _, err := client.AppsV1().Deployments(e.NameSpace).Get(context.TODO(), getNameDeployment(e.Name), metav1.GetOptions{}) + if err != nil { + return err + } + + deployment := getDeployment(e) + _, err = client.AppsV1().Deployments(e.NameSpace).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + return err + } + + client.AutoscalingV1().HorizontalPodAutoscalers(e.NameSpace).Get(context.TODO(), getNameHPA(e.Name), metav1.GetOptions{}) + hpa := getHortizontalAutoScale(e) + _, err = client.AutoscalingV1().HorizontalPodAutoscalers(e.NameSpace).Update(context.TODO(), hpa, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +/////////// Service + +// Create a kubernetes service component +func createService(e Expose, client kubernetes.Interface) error { + service := getService(e) + _, err := client.CoreV1().Services(e.NameSpace).Create(context.TODO(), service, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +// Return a kubernetes service component, ready to deploy or update +func getService(e Expose) *v1.Service { + name_service := getNameService(e.Name) + var port v1.ServicePort = v1.ServicePort{ + Name: "", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: 0, + IntVal: int32(e.Port), + }, + } + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name_service, + Namespace: e.NameSpace, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{port}, + Selector: map[string]string{ + "app": "oscar-svc-exp-" + e.Name, + }, + }, + Status: v1.ServiceStatus{}, + } + return service +} + +/// List services in a certain namespace + +func listServices(e Expose, client kubernetes.Interface) (*v1.ServiceList, error) { + services, err := client.CoreV1().Services(e.NameSpace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + return services, nil +} + +// / Update a kubernete service +func updateService(e Expose, client kubernetes.Interface) error { + service := getService(e) + _, err := client.CoreV1().Services(e.NameSpace).Update(context.TODO(), service, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +/// Delete kubernetes service + +func deleteService(e Expose, client kubernetes.Interface) error { + service := getNameService(e.Name) + err := client.CoreV1().Services(e.NameSpace).Delete(context.TODO(), service, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +/////////// Ingress + +// / Create an ingress component +func createIngress(e Expose, client kubernetes.Interface, cfg types.Config) error { + + ingress := getIngress(e, client, cfg) + _, err := client.NetworkingV1().Ingresses(e.NameSpace).Create(context.TODO(), ingress, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +// Return a kubernetes ingress component, ready to deploy or update +func getIngress(e Expose, client kubernetes.Interface, cfg types.Config) *net.Ingress { + name_ingress := getNameIngress(e.Name) + pathofapi := getPathAPI(e.Name) + name_service := getNameService(e.Name) + var ptype net.PathType = "Prefix" + var ingresspath net.HTTPIngressPath = net.HTTPIngressPath{ + Path: pathofapi, + PathType: &ptype, + Backend: net.IngressBackend{ + Service: &net.IngressServiceBackend{ + Name: name_service, + Port: net.ServiceBackendPort{ + Number: 80, + }, + }, + }, + } + var ingresssrulevalue net.HTTPIngressRuleValue = net.HTTPIngressRuleValue{ + Paths: []net.HTTPIngressPath{ingresspath}, + } + var rule net.IngressRule + var tls net.IngressTLS + var specification net.IngressSpec + + var host string = cfg.IngressHost + + if host == "" { + rule = net.IngressRule{ + IngressRuleValue: net.IngressRuleValue{HTTP: &ingresssrulevalue}, + } + specification = net.IngressSpec{ + TLS: []net.IngressTLS{}, + Rules: []net.IngressRule{rule}, //IngressClassName: + } + } else { + rule = net.IngressRule{ + Host: host, + IngressRuleValue: net.IngressRuleValue{HTTP: &ingresssrulevalue}, + } + tls = net.IngressTLS{ + Hosts: []string{host}, + SecretName: host, + } + specification = net.IngressSpec{ + TLS: []net.IngressTLS{tls}, + Rules: []net.IngressRule{rule}, //IngressClassName: + } + } + + ////// + ingress := &net.Ingress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name_ingress, + Namespace: e.NameSpace, + Annotations: map[string]string{ + "nginx.ingress.kubernetes.io/rewrite-target": "/$1", + "kubernetes.io/ingress.class": "nginx", + "nginx.ingress.kubernetes.io/use-regex": "true", + }, + }, + Spec: specification, + Status: net.IngressStatus{}, + } + return ingress +} + +/// List the kuberntes ingress + +func listIngress(e Expose, client kubernetes.Interface) (*net.IngressList, error) { + ingress, err := client.NetworkingV1().Ingresses(e.NameSpace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + return ingress, nil +} + +// Delete a kubernetes ingress +func deleteIngress(e Expose, client kubernetes.Interface) error { + ingress := getNameIngress(e.Name) + err := client.NetworkingV1().Ingresses(e.NameSpace).Delete(context.TODO(), ingress, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +/// These are auxiliary functions + +func getNameService(name_container string) string { + return name_container + "-svc" +} + +func getNameIngress(name_container string) string { + return name_container + "-ing" +} + +func getPathAPI(name_container string) string { + return "/system/services/" + name_container + "/exposed/?(.*)" +} + +func getNameDeployment(name_container string) string { + return name_container + "-dlp" +} + +func getNameHPA(name_container string) string { + return name_container + "-hpa" +}