Skip to content

Commit

Permalink
[MM-49202] Implement retention policy of failed jobs (#43)
Browse files Browse the repository at this point in the history
* Implement retention policy of failed jobs

* Update service/docker/service.go

Co-authored-by: Christopher Poile <[email protected]>

* Add custom validation to support days

* Skip job if retention time is zero

* Avoid custom type

---------

Co-authored-by: Christopher Poile <[email protected]>
  • Loading branch information
streamer45 and cpoile authored Oct 5, 2023
1 parent d196e46 commit ab0e524
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 17 deletions.
5 changes: 5 additions & 0 deletions config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ data_source = "/tmp/calls-offloader-db"
api_type = "docker"
# Maximum number of jobs allowed to be running at one time.
max_concurrent_jobs = 2
# The time to retain failed jobs before automatically deleting them and their
# resources (i.e. volumes containing recordings). Succeeded jobs are automatically deleted upon
# completion. A zero value means keeping failed jobs indefinitely.
# The supported units of time are "m" (minutes), "h" (hours) and "d" (days).
failed_jobs_retention_time = "30d"

[logger]
# A boolean controlling whether to log to the console.
Expand Down
2 changes: 2 additions & 0 deletions docs/kubernetes_development.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ spec:
value: "kubernetes"
- name: JOBS_MAXCONCURRENTJOBS
value: "1"
- name: JOBS_FAILEDJOBSRETENTIONTIME
value: "7d"
- name: API_SECURITY_ALLOWSELFREGISTRATION # This should only be set to true if running the service inside a private network.
value: "true"
- name: LOGGER_CONSOLELEVEL
Expand Down
82 changes: 80 additions & 2 deletions service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ package service

import (
"fmt"
"regexp"
"strconv"
"time"

"github.com/mattermost/calls-offloader/logger"
"github.com/mattermost/calls-offloader/service/api"
"github.com/mattermost/calls-offloader/service/auth"
)

var (
retentionTimeRE = regexp.MustCompile(`^(\d+)([mhd])$`)
)

type SecurityConfig struct {
// Whether or not to enable admin API access.
EnableAdmin bool `toml:"enable_admin"`
Expand Down Expand Up @@ -69,8 +76,71 @@ const (
)

type JobsConfig struct {
APIType JobAPIType `toml:"api_type"`
MaxConcurrentJobs int `toml:"max_concurrent_jobs"`
APIType JobAPIType `toml:"api_type"`
MaxConcurrentJobs int `toml:"max_concurrent_jobs"`
FailedJobsRetentionTime time.Duration `toml:"failed_jobs_retention_time"`
}

// We need some custom parsing since duration doesn't support days.
func parseRetentionTime(val string) (time.Duration, error) {
// Validate against expected format
matches := retentionTimeRE.FindStringSubmatch(val)
if len(matches) != 3 {
return 0, fmt.Errorf("invalid retention time format")
}

// Parse days into duration
if matches[2] == "d" {
numDays, err := strconv.Atoi(matches[1])
if err != nil {
return 0, err
}
return time.Hour * 24 * time.Duration(numDays), nil
}

// Fallback to native duration parsing for anything else
d, err := time.ParseDuration(val)
if err != nil {
return 0, err
}

return d, nil
}

func (c *JobsConfig) UnmarshalTOML(data interface{}) error {
if c == nil {
return fmt.Errorf("invalid nil pointer")
}

m, ok := data.(map[string]any)
if !ok {
return fmt.Errorf("invalid data type")
}

apiType, ok := m["api_type"].(string)
if !ok {
return fmt.Errorf("invalid api_type type")
}
c.APIType = JobAPIType(apiType)

maxConcurrentJobs, ok := m["max_concurrent_jobs"].(int64)
if !ok {
return fmt.Errorf("invalid max_concurrent_jobs type")
}
c.MaxConcurrentJobs = int(maxConcurrentJobs)

if val, ok := m["failed_jobs_retention_time"]; ok {
retentionTime, ok := val.(string)
if !ok {
return fmt.Errorf("invalid failed_jobs_retention_time type")
}

var err error
c.FailedJobsRetentionTime, err = parseRetentionTime(retentionTime)
return err
}

return nil
}

func (c JobsConfig) IsValid() error {
Expand All @@ -82,6 +152,14 @@ func (c JobsConfig) IsValid() error {
return fmt.Errorf("invalid MaxConcurrentJobs value: should be greater than zero")
}

if c.FailedJobsRetentionTime < 0 {
return fmt.Errorf("invalid FailedJobsRetentionTime value: should be a positive duration")
}

if c.FailedJobsRetentionTime > 0 && c.FailedJobsRetentionTime < time.Minute {
return fmt.Errorf("invalid FailedJobsRetentionTime value: should be at least one minute")
}

return nil
}

Expand Down
69 changes: 69 additions & 0 deletions service/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2022-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.

package service

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestParseRetentionTime(t *testing.T) {
tcs := []struct {
name string
input string
expected time.Duration
err string
}{
{
name: "invalid formatting",
input: "10dd",
expected: 0,
err: "invalid retention time format",
},
{
name: "mixed units",
input: "10h10m",
expected: 0,
err: "invalid retention time format",
},
{
name: "seconds",
input: "45s",
expected: 0,
err: "invalid retention time format",
},
{
name: "minutes",
input: "45m",
expected: time.Minute * 45,
err: "",
},
{
name: "hours",
input: "24h",
expected: time.Hour * 24,
err: "",
},
{
name: "days",
input: "10d",
expected: time.Hour * 24 * 10,
err: "",
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
d, err := parseRetentionTime(tc.input)
if tc.err != "" {
require.EqualError(t, err, tc.err)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expected, d)
})
}
}
123 changes: 114 additions & 9 deletions service/docker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
docker "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
Expand All @@ -33,18 +34,22 @@ const (
)

var (
dockerStopTimeout = 5 * time.Minute
dockerStopTimeout = 5 * time.Minute
dockerRetentionJobInterval = time.Minute
)

type JobServiceConfig struct {
MaxConcurrentJobs int
MaxConcurrentJobs int
FailedJobsRetentionTime time.Duration
}

type JobService struct {
cfg JobServiceConfig
log mlog.LoggerIFace

client *docker.Client
client *docker.Client
stopCh chan struct{}
retentionJobDoneCh chan struct{}
}

func NewJobService(log mlog.LoggerIFace, cfg JobServiceConfig) (*JobService, error) {
Expand All @@ -65,15 +70,106 @@ func NewJobService(log mlog.LoggerIFace, cfg JobServiceConfig) (*JobService, err
mlog.String("api_version", version.APIVersion),
)

return &JobService{
cfg: cfg,
log: log,
client: client,
}, nil
s := &JobService{
cfg: cfg,
log: log,
client: client,
stopCh: make(chan struct{}),
retentionJobDoneCh: make(chan struct{}),
}

if s.cfg.FailedJobsRetentionTime > 0 {
go s.retentionJob()
} else {
s.log.Info("skipping retention job", mlog.Any("retention_time", s.cfg.FailedJobsRetentionTime))
close(s.retentionJobDoneCh)
}

return s, nil
}

func (s *JobService) retentionJob() {
s.log.Info("retention job is starting",
mlog.Any("retention_time", s.cfg.FailedJobsRetentionTime),
)
defer func() {
s.log.Info("exiting retention job")
close(s.retentionJobDoneCh)
}()

ticker := time.NewTicker(dockerRetentionJobInterval)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), dockerRequestTimeout)
containers, err := s.client.ContainerList(ctx, types.ContainerListOptions{
All: true,
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "status",
Value: "exited",
}, filters.KeyValuePair{
Key: "label",
Value: "app=mattermost-calls-offloader",
}),
})
cancel()
if err != nil {
s.log.Error("failed to list containers", mlog.Err(err))
continue
}

if len(containers) == 0 {
// nothing to do
continue
}

for _, cnt := range containers {
ctx, cancel = context.WithTimeout(context.Background(), dockerRequestTimeout)
c, err := s.client.ContainerInspect(ctx, cnt.ID)
cancel()
if err != nil {
s.log.Error("failed to get container", mlog.Err(err))
continue
}

if c.State == nil {
s.log.Error("container state is missing", mlog.String("id", cnt.ID))
continue
}

finishedAt, err := time.Parse(time.RFC3339, c.State.FinishedAt)
if err != nil {
s.log.Error("failed to parse finish time", mlog.Err(err))
continue
}

if since := time.Since(finishedAt); since > s.cfg.FailedJobsRetentionTime {
s.log.Info("configured retention time has elapsed since the container finished, deleting",
mlog.String("id", cnt.ID),
mlog.Any("retention_time", s.cfg.FailedJobsRetentionTime),
mlog.Any("finish_at", finishedAt),
mlog.Any("since", since),
)

if err := s.DeleteJob(cnt.ID); err != nil {
s.log.Error("failed to delete job", mlog.Err(err), mlog.String("jobID", cnt.ID))
continue
}
}
}
}
}
}

func (s *JobService) Shutdown() error {
s.log.Info("docker job service shutting down")

close(s.stopCh)
<-s.retentionJobDoneCh

return s.client.Close()
}

Expand Down Expand Up @@ -135,7 +231,12 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er

// We fetch the list of running containers to check against it in order to
// ensure we don't exceed the configured MaxConcurrentJobs limit.
containers, err := s.client.ContainerList(ctx, types.ContainerListOptions{})
containers, err := s.client.ContainerList(ctx, types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{
Key: "label",
Value: "app=mattermost-calls-offloader",
}),
})
if err != nil {
return job.Job{}, fmt.Errorf("failed to list containers: %w", err)
}
Expand Down Expand Up @@ -191,6 +292,10 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
Tty: false,
Env: env,
Volumes: map[string]struct{}{volumeID + ":/recs": {}},
Labels: map[string]string{
// app label helps with identifying jobs.
"app": "mattermost-calls-offloader",
},
}, &container.HostConfig{
NetworkMode: networkMode,
Mounts: []mount.Mount{
Expand Down
Loading

0 comments on commit ab0e524

Please sign in to comment.