forked from vgarvardt/gue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob.go
172 lines (141 loc) · 4.85 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package gue
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/oklog/ulid/v2"
"github.com/vgarvardt/gue/v5/adapter"
)
// JobPriority is the wrapper type for Job.Priority
type JobPriority int16
// Some shortcut values for JobPriority that can be any, but chances are high that one of these will be the most used.
const (
JobPriorityHighest JobPriority = -32768
JobPriorityHigh JobPriority = -16384
JobPriorityDefault JobPriority = 0
JobPriorityLow JobPriority = 16384
JobPriorityLowest JobPriority = 32767
)
// Job is a single unit of work for Gue to perform.
type Job struct {
// ID is the unique database ID of the Job. It is ignored on job creation.
ID ulid.ULID
// Queue is the name of the queue. It defaults to the empty queue "".
Queue string
// Priority is the priority of the Job. The default priority is 0, and a
// lower number means a higher priority.
//
// The highest priority is JobPriorityHighest, the lowest one is JobPriorityLowest
Priority JobPriority
// RunAt is the time that this job should be executed. It defaults to now(),
// meaning the job will execute immediately. Set it to a value in the future
// to delay a job's execution.
RunAt time.Time
// Type maps job to a worker func.
Type string
// Args for the job.
Args []byte
// ErrorCount is the number of times this job has attempted to run, but failed with an error.
// It is ignored on job creation.
// This field is initialised only when the Job is being retrieved from the DB and is not
// being updated when the current Job handler errored.
ErrorCount int32
// LastError is the error message or stack trace from the last time the job failed. It is ignored on job creation.
// This field is initialised only when the Job is being retrieved from the DB and is not
// being updated when the current Job run errored. This field supposed to be used mostly for the debug reasons.
LastError sql.NullString
mu sync.Mutex
deleted bool
tx adapter.Tx
backoff Backoff
logger adapter.Logger
}
// Tx returns DB transaction that this job is locked to. You may use
// it as you please until you call Done(). At that point, this transaction
// will be committed. This function will return nil if the Job's
// transaction was closed with Done().
func (j *Job) Tx() adapter.Tx {
return j.tx
}
// Delete marks this job as complete by deleting it from the database.
//
// You must also later call Done() to return this job's database connection to
// the pool. If you got the job from the worker - it will take care of cleaning up the job and resources,
// no need to do this manually in a WorkFunc.
func (j *Job) Delete(ctx context.Context) error {
j.mu.Lock()
defer j.mu.Unlock()
if j.deleted {
return nil
}
_, err := j.tx.Exec(ctx, `DELETE FROM gue_jobs WHERE job_id = $1`, j.ID.String())
if err != nil {
return err
}
j.deleted = true
return nil
}
// Done commits transaction that marks job as done. If you got the job from the worker - it will take care of
// cleaning up the job and resources, no need to do this manually in a WorkFunc.
func (j *Job) Done(ctx context.Context) error {
j.mu.Lock()
defer j.mu.Unlock()
if j.tx == nil {
// already marked as done
return nil
}
if err := j.tx.Commit(ctx); err != nil {
return err
}
j.tx = nil
return nil
}
// Error marks the job as failed and schedules it to be reworked. An error
// message or backtrace can be provided as msg, which will be saved on the job.
// It will also increase the error count.
//
// This call marks job as done and releases (commits) transaction,
// so calling Done() is not required, although calling it will not cause any issues.
// If you got the job from the worker - it will take care of cleaning up the job and resources,
// no need to do this manually in a WorkFunc.
func (j *Job) Error(ctx context.Context, jErr error) (err error) {
defer func() {
doneErr := j.Done(ctx)
if doneErr != nil {
err = fmt.Errorf("failed to mark job as done (original error: %v): %w", err, doneErr)
}
}()
errorCount := j.ErrorCount + 1
now := time.Now().UTC()
newRunAt := j.calculateErrorRunAt(jErr, now, errorCount)
if newRunAt.IsZero() {
j.logger.Info(
"Got empty new run at for the errored job, discarding it",
adapter.F("job-type", j.Type),
adapter.F("job-queue", j.Queue),
adapter.F("job-errors", errorCount),
adapter.Err(jErr),
)
err = j.Delete(ctx)
return
}
_, err = j.tx.Exec(
ctx,
`UPDATE gue_jobs SET error_count = $1, run_at = $2, last_error = $3, updated_at = $4 WHERE job_id = $5`,
errorCount, newRunAt, jErr.Error(), now, j.ID.String(),
)
return err
}
func (j *Job) calculateErrorRunAt(err error, now time.Time, errorCount int32) time.Time {
errReschedule, ok := err.(ErrJobReschedule)
if ok {
return errReschedule.rescheduleJobAt()
}
backoff := j.backoff(int(errorCount))
if backoff < 0 {
return time.Time{}
}
return now.Add(backoff)
}