-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathclient_options.go
315 lines (270 loc) · 8.54 KB
/
client_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0
package asyncjobs
import (
"crypto/ed25519"
"encoding/hex"
"fmt"
"time"
"github.com/nats-io/jsm.go/natscontext"
"github.com/nats-io/nats.go"
)
// ClientOpts configures the client
type ClientOpts struct {
concurrency int
replicas int
queue *Queue
taskRetention time.Duration
retryPolicy RetryPolicyProvider
memoryStore bool
statsPort int
logger Logger
skipPrepare bool
discard []TaskState
privateKey ed25519.PrivateKey
seedFile string
publicKey ed25519.PublicKey
publicKeyFile string
optionalTaskSignatures bool
nc *nats.Conn
}
// ClientOpt configures the client
type ClientOpt func(opts *ClientOpts) error
func (c *ClientOpts) validate() error {
if c.privateKey != nil && c.seedFile != "" {
return fmt.Errorf("cannot set both private key and seed file")
}
if c.publicKey != nil && c.publicKeyFile != "" {
return fmt.Errorf("cannot set both public key and public key file")
}
if c.seedFile != "" && (c.publicKeyFile != "" || c.publicKey != nil) {
return fmt.Errorf("cannot set a seedfile and public key information")
}
return nil
}
// DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func DiscardTaskStates(states ...TaskState) ClientOpt {
return func(opts *ClientOpts) error {
for _, s := range states {
if s != TaskStateCompleted && s != TaskStateExpired && s != TaskStateTerminated {
return fmt.Errorf("only states completed, expired or terminated can be discarded")
}
}
opts.discard = append(opts.discard, states...)
return nil
}
}
// DiscardTaskStatesByName configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func DiscardTaskStatesByName(states ...string) ClientOpt {
return func(opts *ClientOpts) error {
for _, s := range states {
state, ok := nameToTaskState[s]
if !ok {
return fmt.Errorf("%w: %s", ErrUnknownDiscardPolicy, s)
}
err := DiscardTaskStates(state)(opts)
if err != nil {
return err
}
}
return nil
}
}
// NoStorageInit skips setting up any queues or task stores when creating a client
func NoStorageInit() ClientOpt {
return func(opts *ClientOpts) error {
opts.skipPrepare = true
return nil
}
}
// CustomLogger sets a custom logger to use for all logging
func CustomLogger(log Logger) ClientOpt {
return func(opts *ClientOpts) error {
opts.logger = log
return nil
}
}
// NatsConn sets an already connected NATS connection as communications channel
func NatsConn(nc *nats.Conn) ClientOpt {
return func(opts *ClientOpts) error {
if !nc.Opts.UseOldRequestStyle {
return fmt.Errorf("connection with UseOldRequestStyle() is required")
}
opts.nc = nc
return nil
}
}
// PrometheusListenPort enables prometheus listening on a specific port
func PrometheusListenPort(port int) ClientOpt {
return func(copts *ClientOpts) error {
copts.statsPort = port
return nil
}
}
// NatsContext attempts to connect to the NATS client context c
func NatsContext(c string, opts ...nats.Option) ClientOpt {
return func(copts *ClientOpts) error {
nopts := []nats.Option{
nats.MaxReconnects(-1),
nats.CustomReconnectDelay(RetryLinearOneMinute.Duration),
nats.UseOldRequestStyle(),
nats.Name("Choria Asynchronous Jobs Client"),
nats.ReconnectHandler(func(nc *nats.Conn) {
copts.logger.Infof("Reconnected to NATS server %s", nc.ConnectedUrl())
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
copts.logger.Errorf("Disconnected from server: %v", err)
}),
nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
url := nc.ConnectedUrl()
if url == "" {
copts.logger.Errorf("Unexpected NATS error: %s", err)
} else {
copts.logger.Errorf("Unexpected NATS error from server %s: %s", url, err)
}
}),
nats.CustomReconnectDelay(func(n int) time.Duration {
d := RetryLinearOneMinute.Duration(n)
copts.logger.Warnf("Sleeping %v till the next reconnection attempt after %d attempts", d, n)
return d
}),
}
nc, err := natscontext.Connect(c, append(nopts, opts...)...)
if err != nil {
return err
}
copts.nc = nc
return nil
}
}
// MemoryStorage enables storing tasks and work queue in memory in JetStream
func MemoryStorage() ClientOpt {
return func(opts *ClientOpts) error {
opts.memoryStore = true
return nil
}
}
// RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt {
return func(opts *ClientOpts) error {
opts.retryPolicy = p
return nil
}
}
// RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name)
func RetryBackoffPolicyName(name string) ClientOpt {
return func(opts *ClientOpts) error {
p, err := RetryPolicyLookup(name)
if err != nil {
return err
}
return RetryBackoffPolicy(p)(opts)
}
}
// ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling.
// This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a
// queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor
// would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue
// MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.
func ClientConcurrency(c int) ClientOpt {
return func(opts *ClientOpts) error {
opts.concurrency = c
return nil
}
}
// StoreReplicas sets the replica level to keep for the tasks store and work queue
//
// Used only when initially creating the underlying streams.
func StoreReplicas(r uint) ClientOpt {
return func(opts *ClientOpts) error {
if r < 1 || r > 5 {
return fmt.Errorf("replicas must be between 1 and 5")
}
opts.replicas = int(r)
return nil
}
}
// WorkQueue configures the client to consume messages from a specific queue
//
// When not set the "DEFAULT" queue will be used.
func WorkQueue(queue *Queue) ClientOpt {
return func(opts *ClientOpts) error {
if opts.queue != nil {
return fmt.Errorf("a queue has already been defined")
}
opts.queue = queue
return nil
}
}
// BindWorkQueue binds the client to a work queue that should already exist
func BindWorkQueue(queue string) ClientOpt {
return func(opts *ClientOpts) error {
if queue == "" {
return fmt.Errorf("a queue name is required")
}
if opts.queue != nil {
return fmt.Errorf("a queue has already been defined")
}
opts.queue = &Queue{Name: queue, NoCreate: true}
return nil
}
}
// TaskRetention is the time tasks will be kept for in the task storage
//
// Used only when initially creating the underlying streams.
func TaskRetention(r time.Duration) ClientOpt {
return func(opts *ClientOpts) error {
opts.taskRetention = r
return nil
}
}
// TaskSigningKey sets a key used to sign tasks, will be kept in memory for the duration
func TaskSigningKey(pk ed25519.PrivateKey) ClientOpt {
return func(opts *ClientOpts) error {
opts.privateKey = pk
return nil
}
}
// TaskSigningSeedFile sets the path to a file holding a ed25519 seed, will be used for signing and verification and wiped between uses
func TaskSigningSeedFile(sf string) ClientOpt {
return func(opts *ClientOpts) error {
opts.seedFile = sf
return nil
}
}
// TaskVerificationKeyHexEncoded sets a public key used to verify tasks, hex encoded string
func TaskVerificationKeyHexEncoded(pks string) ClientOpt {
return func(opts *ClientOpts) error {
if pks == "" {
return nil
}
pk, err := hex.DecodeString(pks)
if err != nil {
return err
}
opts.publicKey = pk
return nil
}
}
// TaskVerificationKey sets a public key used to verify tasks
func TaskVerificationKey(pk ed25519.PublicKey) ClientOpt {
return func(opts *ClientOpts) error {
opts.publicKey = pk
return nil
}
}
// TaskVerificationKeyFile sets the path to a file holding a ed25519 public key, will be used for verification of tasks
func TaskVerificationKeyFile(sf string) ClientOpt {
return func(opts *ClientOpts) error {
opts.publicKeyFile = sf
return nil
}
}
// TaskSignaturesOptional indicates that only signed tasks can be loaded
func TaskSignaturesOptional() ClientOpt {
return func(opts *ClientOpts) error {
opts.optionalTaskSignatures = true
return nil
}
}