Skip to content

Commit

Permalink
Enforce MaxConcurrentJobs limit
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Jun 23, 2023
1 parent d94cee0 commit 491d79c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
25 changes: 22 additions & 3 deletions service/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,27 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
var env []corev1.EnvVar
var hostNetwork bool

if os.Getenv("DEV_MODE") == "true" {
devMode := os.Getenv("DEV_MODE") == "true"

client := s.cs.BatchV1().Jobs(s.namespace)
ctx, cancel := context.WithTimeout(context.Background(), k8sRequestTimeout)
defer cancel()

// We fetch the list of jobs to check against it in order to
// ensure we don't exceed the configured MaxConcurrentJobs limit.
jobList, err := client.List(ctx, metav1.ListOptions{})
if err != nil {
return job.Job{}, fmt.Errorf("failed to list jobs: %w", err)
}
if activeJobs := getActiveJobs(jobList.Items); activeJobs >= s.cfg.MaxConcurrentJobs {
if !devMode {
return job.Job{}, fmt.Errorf("max concurrent jobs reached")
}
s.log.Warn("max concurrent jobs reached", mlog.Int("number of active jobs", activeJobs),
mlog.Int("cfg.MaxConcurrentJobs", s.cfg.MaxConcurrentJobs))
}

if devMode {
s.log.Info("DEV_MODE enabled, enabling host networking", mlog.String("hostIP", os.Getenv("HOST_IP")))

// Forward DEV_MODE to recorder process.
Expand Down Expand Up @@ -195,8 +215,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
},
}

client := s.cs.BatchV1().Jobs(s.namespace)
ctx, cancel := context.WithTimeout(context.Background(), k8sRequestTimeout)
ctx, cancel = context.WithTimeout(context.Background(), k8sRequestTimeout)
defer cancel()

if _, err := client.Create(ctx, spec, metav1.CreateOptions{}); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions service/kubernetes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

recorder "github.com/mattermost/calls-recorder/cmd/recorder/config"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/yaml"
)
Expand Down Expand Up @@ -85,3 +86,14 @@ func getJobPodTolerations() ([]corev1.Toleration, error) {

return defaultTolerations, nil
}

func getActiveJobs(jobs []batchv1.Job) int {
var activeJobs int
for _, jb := range jobs {
if jb.Status.Failed > 0 || jb.Status.Succeeded > 0 {
continue
}
activeJobs++
}
return activeJobs
}

0 comments on commit 491d79c

Please sign in to comment.