-
Notifications
You must be signed in to change notification settings - Fork 44
/
pubsub_broadcaster.go
87 lines (74 loc) · 2.05 KB
/
pubsub_broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package crdt
import (
"context"
"strings"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
var _ Broadcaster = (*PubSubBroadcaster)(nil)
// PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
type PubSubBroadcaster struct {
ctx context.Context
psub *pubsub.PubSub
topic *pubsub.Topic
subs *pubsub.Subscription
}
// NewPubSubBroadcaster returns a new broadcaster using the given PubSub and
// a topic to subscribe/broadcast to. The given context can be used to cancel
// the broadcaster.
// Please register any topic validators before creating the Broadcaster.
//
// The broadcaster can be shut down by cancelling the given context.
// This must be done before Closing the crdt.Datastore, otherwise things
// may hang.
func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string) (*PubSubBroadcaster, error) {
psubTopic, err := psub.Join(topic)
if err != nil {
return nil, err
}
subs, err := psubTopic.Subscribe()
if err != nil {
return nil, err
}
go func(ctx context.Context, subs *pubsub.Subscription) {
<-ctx.Done()
subs.Cancel()
// subs.Next returns error when subscription closed. Subscription must
// be closed before psubTopic can be closed.
var err error
for err == nil {
_, err = subs.Next(ctx)
}
psubTopic.Close()
}(ctx, subs)
return &PubSubBroadcaster{
ctx: ctx,
psub: psub,
topic: psubTopic,
subs: subs,
}, nil
}
// Broadcast publishes some data.
func (pbc *PubSubBroadcaster) Broadcast(ctx context.Context, data []byte) error {
return pbc.topic.Publish(ctx, data)
}
// Next returns published data.
func (pbc *PubSubBroadcaster) Next(ctx context.Context) ([]byte, error) {
var msg *pubsub.Message
var err error
select {
case <-pbc.ctx.Done():
return nil, ErrNoMoreBroadcast
case <-ctx.Done():
return nil, ErrNoMoreBroadcast
default:
}
msg, err = pbc.subs.Next(ctx)
if err != nil {
if strings.Contains(err.Error(), "subscription cancelled") ||
strings.Contains(err.Error(), "context") {
return nil, ErrNoMoreBroadcast
}
return nil, err
}
return msg.GetData(), nil
}