-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
111 lines (96 loc) · 2.98 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package event
import (
"reflect"
"sync"
)
// Stream is an event stream that implements Publisher and Subscribable
// for publishing events and subscribing to them.
// Every Handler.HandleEvent method call is started as a separate Go routine,
// meaning that event handling will be run unordered and in parallel.
//
// Use SyncStream if synchronous, ordered handling of events is needed.
//
// Stream is threadsafe.
type Stream struct {
subscribable
}
// NewStream returns a new Stream with optional RepublishHandler
// subscriptions to the passed subscribeTo Subscribable implementations.
func NewStream(subscribeTo ...Subscribable) *Stream {
stream := new(Stream)
for _, source := range subscribeTo {
source.Subscribe(RepublishHandler(stream))
}
return stream
}
// Publish calls Handler.HandleEvent(event) for all subscribed event handlers.
// Every Handler.HandleEvent method call is started as a separate Go routine,
// meaning that event handling will be run unordered and in parallel.
//
// Use SyncStream if synchronous, ordered handling of events is needed.
func (stream *Stream) Publish(event interface{}) error {
stream.handlerMtx.RLock()
defer stream.handlerMtx.RUnlock()
typeHandlers := stream.eventTypeHandlers[reflect.TypeOf(event)]
for _, handler := range typeHandlers {
go safelyHandleEvent(handler, event)
}
for _, handler := range stream.anyEventHandlers {
go safelyHandleEvent(handler, event)
}
return nil
}
// PublishAsync publishes an event asynchronousely
// using one or more go routines.
// Exactly one error or nil will be written to
// the returned channel when the event has been
// handled by the subsribed handlers.
// The error can be a combination of multiple
// errors from multiple event handlers.
func (stream *Stream) PublishAsync(event interface{}) <-chan error {
stream.handlerMtx.RLock()
defer stream.handlerMtx.RUnlock()
typeHandlers := stream.eventTypeHandlers[reflect.TypeOf(event)]
var (
errs []error
errsMtx sync.Mutex
wg sync.WaitGroup
)
wg.Add(len(typeHandlers) + len(stream.anyEventHandlers))
handleEventAsync := func(handler Handler, event interface{}) {
err := safelyHandleEvent(handler, event)
if err != nil {
errsMtx.Lock()
errs = append(errs, err)
errsMtx.Unlock()
}
wg.Done()
}
for _, handler := range typeHandlers {
go handleEventAsync(handler, event)
}
for _, handler := range stream.anyEventHandlers {
go handleEventAsync(handler, event)
}
errChan := make(chan error, 1)
go func() {
wg.Wait()
errChan <- combineErrors(errs)
}()
return errChan
}
// PublishAwait publishes an event and waits
// for all handlers to return an error or nil.
// The error can be a combination of multiple
// errors from multiple event handlers.
func (stream *Stream) PublishAwait(event interface{}) error {
return <-stream.PublishAsync(event)
}
func safelyHandleEvent(handler Handler, event interface{}) (err error) {
defer func() {
if r := recover(); r != nil {
err = asError(r)
}
}()
return handler.HandleEvent(event)
}