-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathasyncjobs.go
87 lines (78 loc) · 3.54 KB
/
asyncjobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package asyncjobs
import (
"context"
"regexp"
"time"
"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
)
const (
// ShortedScheduledDeadline is the shortest deadline a scheduled task may have
ShortedScheduledDeadline = 30 * time.Second
// DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get
DefaultJobRunTime = time.Hour
// DefaultMaxTries when not configured for a task this is the default tries it will get
DefaultMaxTries = 10
// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
DefaultQueueMaxConcurrent = 100
)
// StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream
// but that's ok, we're not really intending to change the storage or support more
type StorageAdmin interface {
Queues() ([]*QueueInfo, error)
QueueNames() ([]string, error)
QueueInfo(name string) (*QueueInfo, error)
PurgeQueue(name string) error
DeleteQueue(name string) error
PrepareQueue(q *Queue, replicas int, memory bool) error
ConfigurationInfo() (*nats.KeyValueBucketStatus, error)
PrepareConfigurationStore(memory bool, replicas int) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
DeleteTaskByID(id string) error
TasksInfo() (*TasksInfo, error)
Tasks(ctx context.Context, limit int32) (chan *Task, error)
TasksStore() (*jsm.Manager, *jsm.Stream, error)
ElectionStorage() (nats.KeyValue, error)
}
type ScheduledTaskStorage interface {
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
ElectionStorage() (nats.KeyValue, error)
PublishLeaderElectedEvent(ctx context.Context, name string, component string) error
}
// Storage implements the backend access
type Storage interface {
SaveTaskState(ctx context.Context, task *Task, notify bool) error
EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
RetryTaskByID(ctx context.Context, queue *Queue, id string) error
LoadTaskByID(id string) (*Task, error)
DeleteTaskByID(id string) error
PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
AckItem(ctx context.Context, item *ProcessItem) error
NakBlockedItem(ctx context.Context, item *ProcessItem) error
NakItem(ctx context.Context, item *ProcessItem) error
TerminateItem(ctx context.Context, item *ProcessItem) error
PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
PrepareQueue(q *Queue, replicas int, memory bool) error
PrepareTasks(memory bool, replicas int, retention time.Duration) error
PrepareConfigurationStore(memory bool, replicas int) error
SaveScheduledTask(st *ScheduledTask, update bool) error
LoadScheduledTaskByName(name string) (*ScheduledTask, error)
DeleteScheduledTaskByName(name string) error
ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error)
ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error)
}
var (
validNameMatcher = regexp.MustCompile(`^[a-zA-Z0-9_:-]+$`)
)
// IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects
func IsValidName(name string) bool {
return validNameMatcher.MatchString(name)
}