-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathccall.go
72 lines (62 loc) · 1.51 KB
/
ccall.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package ccall
import (
"context"
"github.com/aperturerobotics/util/broadcast"
)
// CallConcurrentlyFunc is a function passed to CallConcurrently.
type CallConcurrentlyFunc = func(ctx context.Context) error
// CallConcurrently calls multiple functions concurrently and waits for exit or error.
func CallConcurrently(ctx context.Context, fns ...CallConcurrentlyFunc) error {
if len(fns) == 0 {
return nil
}
subCtx, subCtxCancel := context.WithCancel(ctx)
defer subCtxCancel()
if len(fns) == 1 {
return fns[0](subCtx)
}
var bcast broadcast.Broadcast
var running int
var exitErr error
callFunc := func(fn CallConcurrentlyFunc) {
err := fn(subCtx)
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
running--
if err != nil && (exitErr == nil || exitErr == context.Canceled) {
exitErr = err
}
broadcast()
})
}
var waitCh <-chan struct{}
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
waitCh = getWaitCh()
for _, fn := range fns {
if fn == nil {
continue
}
running++
go callFunc(fn)
}
})
if running == 0 {
return nil
}
for {
select {
case <-ctx.Done():
return context.Canceled
case <-waitCh:
}
var currRunning int
var currExitErr error
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
currRunning = running
currExitErr = exitErr
waitCh = getWaitCh()
})
if currRunning == 0 || (currExitErr != nil && currExitErr != context.Canceled) {
return currExitErr
}
}
}