-
Notifications
You must be signed in to change notification settings - Fork 4
/
goInterval.go
206 lines (162 loc) · 4.12 KB
/
goInterval.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package gointerlock
import (
"context"
"errors"
"log"
"time"
"github.com/go-redis/redis/v8"
)
var locker Lock
type LockVendor int32
const (
SingleApp LockVendor = 0 // no distributed lock
RedisLock LockVendor = 1
AwsDynamoDbLock LockVendor = 2
PostgresLock LockVendor = 3
)
type GoInterval struct {
//Name: is a unique job/task name, this is needed for distribution lock, this value enables the distribution mode. for local uses you don't need to set this value
Name string
// Arg: the func that need to be call in every period
Arg func()
// Interval: Timer Interval
Interval time.Duration
LockVendor LockVendor
//redis connection---------
// RedisConnector : in case your app has redis connection configured already
RedisConnector *redis.Client
// RedisHost Redis Host the default value "localhost:6379"
RedisHost string
// RedisPassword: Redis Password (AUTH), It can be blank if Redis has no authentication req
RedisPassword string
// 0 , It's from 0 to 15 (Not for redis cluster)
RedisDB string
// DynamoDb
//leave empty to get from ~/.aws/credentials, (if AwsDynamoDbEndpoint not provided)
AwsDynamoDbRegion string
//leave empty to get from ~/.aws/credentials
AwsDynamoDbEndpoint string
//leave empty to get from ~/.aws/credentials, StaticCredentials (if AwsDynamoDbEndpoint not provided)
AwsDynamoDbAccessKeyID string
//leave empty to get from ~/.aws/credentials, StaticCredentials (if AwsDynamoDbEndpoint not provided)
AwsDynamoDbSecretAccessKey string
//leave empty to get from ~/.aws/credentials, StaticCredentials (if AwsDynamoDbEndpoint not provided)
AwsDynamoDbSessionToken string
// Postgres
PostgresConnStr string
// internal use, it should not get modified
timer *time.Timer
}
// Run to start the interval timer
func (t *GoInterval) Run(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
if t.Interval == 0 {
return errors.New("`Time Interval is missing!`")
}
if t.Arg == nil {
return errors.New("`What this timer should to run?`")
}
//To check if it's a distributed system, support older version v1.0.3
if t.Name != "" {
if t.LockVendor == 0 {
//default one, to support pre. Versions
t.LockVendor = RedisLock
}
}
switch t.LockVendor {
case RedisLock:
r := &RedisLocker{
redisConnector: t.RedisConnector,
Name: t.Name,
RedisHost: t.RedisHost,
RedisPassword: t.RedisPassword,
RedisDB: t.RedisDB,
}
err := r.SetClient()
if err != nil {
return err
}
locker = r
case AwsDynamoDbLock:
d := &DynamoDbLocker{
AwsDynamoDbRegion: t.AwsDynamoDbRegion,
AwsDynamoDbEndpoint: t.AwsDynamoDbEndpoint,
AwsDynamoDbAccessKeyID: t.AwsDynamoDbAccessKeyID,
AwsDynamoDbSecretAccessKey: t.AwsDynamoDbSecretAccessKey,
AwsDynamoDbSessionToken: t.AwsDynamoDbSessionToken,
}
err := d.SetClient()
if err != nil {
return err
}
locker = d
case PostgresLock:
p := &PostgresLocker{
Name: t.Name,
PostgresConnStr: t.PostgresConnStr,
}
err := p.SetClient()
if err != nil {
return err
}
locker = p
}
t.updateTimer()
for {
select {
case <-ctx.Done():
log.Printf("Job %s terminated!", t.Name)
return nil
default:
<-t.timer.C
//lock
lock, err := t.isNotLockThenLock(ctx)
if err != nil {
log.Fatalf("err: %v", err)
return nil
}
if lock {
// run the task
t.Arg()
t.UnLock(ctx)
}
t.updateTimer()
}
}
}
func (t *GoInterval) isNotLockThenLock(ctx context.Context) (bool, error) {
//lock
if t.LockVendor == SingleApp {
return true, nil
}
locked, err := locker.Lock(ctx, t.Name, t.Interval)
if err != nil {
log.Fatalf("err:%v", err)
return false, err
}
return locked, nil
}
func (t *GoInterval) UnLock(ctx context.Context) {
//unlock
if t.LockVendor == SingleApp {
return
}
err := locker.UnLock(ctx, t.Name)
if err != nil {
return
}
}
func (t *GoInterval) updateTimer() {
next := time.Now()
if !next.After(time.Now()) {
next = next.Add(t.Interval)
}
diff := next.Sub(time.Now())
if t.timer == nil {
t.timer = time.NewTimer(diff)
} else {
t.timer.Reset(diff)
}
}