-
Notifications
You must be signed in to change notification settings - Fork 5
/
options.go
135 lines (116 loc) · 2.68 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package redisdb
import (
"context"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
)
// Option for queue system
type Option func(*options)
type options struct {
runFunc func(context.Context, core.QueuedMessage) error
logger queue.Logger
addr string
db int
connectionString string
password string
streamName string
cluster bool
group string
consumer string
maxLength int64
blockTime time.Duration
}
// WithAddr setup the addr of redis
func WithAddr(addr string) Option {
return func(w *options) {
w.addr = addr
}
}
// WithMaxLength setup the max length for publish messages
func WithMaxLength(m int64) Option {
return func(w *options) {
w.maxLength = m
}
}
// WithBlockTime setup the block time for publish messages
// we use the block command to make sure if no entry is found we wait
// until an entry is found
func WithBlockTime(m time.Duration) Option {
return func(w *options) {
w.blockTime = m
}
}
// WithPassword redis password
func WithDB(db int) Option {
return func(w *options) {
w.db = db
}
}
// WithCluster redis cluster
func WithCluster(enable bool) Option {
return func(w *options) {
w.cluster = enable
}
}
// WithStreamName Stream name
func WithStreamName(name string) Option {
return func(w *options) {
w.streamName = name
}
}
// WithGroup group name
func WithGroup(name string) Option {
return func(w *options) {
w.group = name
}
}
// WithConsumer consumer name
func WithConsumer(name string) Option {
return func(w *options) {
w.consumer = name
}
}
// WithPassword redis password
func WithPassword(passwd string) Option {
return func(w *options) {
w.password = passwd
}
}
// WithConnectionString redis connection string
func WithConnectionString(connectionString string) Option {
return func(w *options) {
w.connectionString = connectionString
}
}
// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
return func(w *options) {
w.runFunc = fn
}
}
// WithLogger set custom logger
func WithLogger(l queue.Logger) Option {
return func(w *options) {
w.logger = l
}
}
func newOptions(opts ...Option) options {
defaultOpts := options{
addr: "127.0.0.1:6379",
streamName: "golang-queue",
group: "golang-queue",
consumer: "golang-queue",
logger: queue.NewLogger(),
runFunc: func(context.Context, core.QueuedMessage) error {
return nil
},
blockTime: 60 * time.Second,
}
// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
opt(&defaultOpts)
}
return defaultOpts
}