-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathswimy.go
117 lines (103 loc) · 2.32 KB
/
swimy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package swimy
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"sync"
"sync/atomic"
"time"
)
type Membership struct {
cfg *Config
observer *defaultObserver
membersMu sync.RWMutex
me Member
others map[net.Addr]Member
wg sync.WaitGroup
cancel func()
}
func New(cfg *Config) (*Membership, error) {
setDefaults(&cfg)
setUpSlog(os.Stdout)
ms := Membership{
cfg: cfg,
others: make(map[net.Addr]Member),
}
nTCP, err := newNetTCP(cfg.Port, ms.stream)
if err != nil {
return nil, fmt.Errorf("new tcp listener: %w", err)
}
me := Member{
addr: nTCP.ln.Addr(),
state: statusAlive,
}
ms.me = me
ms.observer = &defaultObserver{
me: me.Addr(),
onJoinCallback: ms.cfg.OnJoin,
onLeaveCallback: ms.cfg.OnLeave,
}
ctx, cancel := context.WithCancel(context.Background())
ms.cancel = cancel
ms.wg.Add(2)
go func() {
defer ms.wg.Done()
if err := ms.schedule(ctx, cfg.GossipInterval, ms.gossip); err != nil {
slog.ErrorContext(ctx, err.Error())
}
}()
go func() {
defer ms.wg.Done()
if err := nTCP.listen(ctx); err != nil {
slog.ErrorContext(ctx, err.Error())
}
}()
return &ms, nil
}
func (ms *Membership) Join(ctx context.Context, existing ...string) error {
for _, exist := range existing {
addr, err := net.ResolveTCPAddr("tcp", exist)
if err != nil {
return fmt.Errorf("resolve tcp addr: %w", err)
}
if err = ms.joinReq(ctx, addr); err != nil {
return fmt.Errorf("join req: %w", err)
}
m := Member{
addr: addr,
state: statusAlive,
since: time.Now().UTC(),
}
ms.becomeMembers(m)
}
return nil
}
func (ms *Membership) Leave(ctx context.Context) error {
req := leaveReq{sender: ms.me.Addr()}
if err := ms.broadCastToLives(ctx, req.encode()); err != nil {
return fmt.Errorf("broadcast leave-req :%w", err)
}
return nil
}
func (ms *Membership) Stop() {
ms.cancel()
ms.observer.onStop()
ms.wg.Wait()
}
func (ms *Membership) Members() []Member {
lives := ms.alives()
lives = append(lives, ms.me)
return lives
}
func (ms *Membership) Me() Member {
return ms.me
}
func (ms *Membership) Metrics() Metrics {
return Metrics{
ActiveMembers: atomic.LoadUint32(&ms.observer.metrics.ActiveMembers),
SentNum: atomic.LoadUint32(&ms.observer.metrics.SentNum),
ReceivedNum: atomic.LoadUint32(&ms.observer.metrics.ReceivedNum),
}
}