From 491d79c22e9ac65670e48877c1bec330683a54e7 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Fri, 23 Jun 2023 10:05:08 -0600 Subject: [PATCH] Enforce MaxConcurrentJobs limit --- service/kubernetes/service.go | 25 ++++++++++++++++++++++--- service/kubernetes/utils.go | 12 ++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/service/kubernetes/service.go b/service/kubernetes/service.go index b085b1b..dd8e3e2 100644 --- a/service/kubernetes/service.go +++ b/service/kubernetes/service.go @@ -105,7 +105,27 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er var env []corev1.EnvVar var hostNetwork bool - if os.Getenv("DEV_MODE") == "true" { + devMode := os.Getenv("DEV_MODE") == "true" + + client := s.cs.BatchV1().Jobs(s.namespace) + ctx, cancel := context.WithTimeout(context.Background(), k8sRequestTimeout) + defer cancel() + + // We fetch the list of jobs to check against it in order to + // ensure we don't exceed the configured MaxConcurrentJobs limit. + jobList, err := client.List(ctx, metav1.ListOptions{}) + if err != nil { + return job.Job{}, fmt.Errorf("failed to list jobs: %w", err) + } + if activeJobs := getActiveJobs(jobList.Items); activeJobs >= s.cfg.MaxConcurrentJobs { + if !devMode { + return job.Job{}, fmt.Errorf("max concurrent jobs reached") + } + s.log.Warn("max concurrent jobs reached", mlog.Int("number of active jobs", activeJobs), + mlog.Int("cfg.MaxConcurrentJobs", s.cfg.MaxConcurrentJobs)) + } + + if devMode { s.log.Info("DEV_MODE enabled, enabling host networking", mlog.String("hostIP", os.Getenv("HOST_IP"))) // Forward DEV_MODE to recorder process. @@ -195,8 +215,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er }, } - client := s.cs.BatchV1().Jobs(s.namespace) - ctx, cancel := context.WithTimeout(context.Background(), k8sRequestTimeout) + ctx, cancel = context.WithTimeout(context.Background(), k8sRequestTimeout) defer cancel() if _, err := client.Create(ctx, spec, metav1.CreateOptions{}); err != nil { diff --git a/service/kubernetes/utils.go b/service/kubernetes/utils.go index 88769cb..56f3bf0 100644 --- a/service/kubernetes/utils.go +++ b/service/kubernetes/utils.go @@ -11,6 +11,7 @@ import ( recorder "github.com/mattermost/calls-recorder/cmd/recorder/config" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/yaml" ) @@ -85,3 +86,14 @@ func getJobPodTolerations() ([]corev1.Toleration, error) { return defaultTolerations, nil } + +func getActiveJobs(jobs []batchv1.Job) int { + var activeJobs int + for _, jb := range jobs { + if jb.Status.Failed > 0 || jb.Status.Succeeded > 0 { + continue + } + activeJobs++ + } + return activeJobs +}