-
Notifications
You must be signed in to change notification settings - Fork 11
/
subscriber.go
55 lines (45 loc) · 2.1 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package courier
import (
"context"
)
// Subscriber defines behaviour of an MQTT subscriber that can create subscriptions.
type Subscriber interface {
// Subscribe allows to subscribe to messages from an MQTT broker
Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error
// SubscribeMultiple allows to subscribe to messages on multiple topics from an MQTT broker
SubscribeMultiple(ctx context.Context, topicsWithQos map[string]QOSLevel, callback MessageHandler) error
}
type subscribeMiddleware interface {
// Middleware helps chain Subscriber(s).
Middleware(subscriber Subscriber) Subscriber
}
// SubscriberMiddlewareFunc functions are closures that intercept Subscriber.Subscribe calls.
type SubscriberMiddlewareFunc func(Subscriber) Subscriber
// Middleware allows SubscriberMiddlewareFunc to implement the subscribeMiddleware interface.
func (smw SubscriberMiddlewareFunc) Middleware(subscriber Subscriber) Subscriber {
return smw(subscriber)
}
// SubscriberFuncs defines signature of a Subscribe function.
type SubscriberFuncs struct {
subscribe func(context.Context, string, MessageHandler, ...Option) error
subscribeMultiple func(context.Context, map[string]QOSLevel, MessageHandler) error
}
// NewSubscriberFuncs is a helper function to create SubscriberFuncs
func NewSubscriberFuncs(
subscribeFunc func(context.Context, string, MessageHandler, ...Option) error,
subscribeMultipleFunc func(context.Context, map[string]QOSLevel, MessageHandler) error,
) SubscriberFuncs {
return SubscriberFuncs{subscribe: subscribeFunc, subscribeMultiple: subscribeMultipleFunc}
}
// Subscribe implements Subscriber interface on SubscriberFuncs.
func (s SubscriberFuncs) Subscribe(ctx context.Context, topic string, callback MessageHandler, opts ...Option) error {
return s.subscribe(ctx, topic, callback, opts...)
}
// SubscribeMultiple implements Subscriber interface on SubscriberFuncs.
func (s SubscriberFuncs) SubscribeMultiple(
ctx context.Context,
topicsWithQos map[string]QOSLevel,
callback MessageHandler,
) error {
return s.subscribeMultiple(ctx, topicsWithQos, callback)
}