From 8037a2e60f69092674d67a7a0cc3f4959750ed74 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 7 Jun 2019 08:31:14 -0400 Subject: [PATCH 01/12] Add the ability to handle newly subscribed peers --- floodsub_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ pubsub.go | 19 ++++++++++++++++++- subscription.go | 23 +++++++++++++++++++---- 3 files changed, 79 insertions(+), 5 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 4e227d5d..c15c57d8 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1063,3 +1063,45 @@ func TestImproperlySignedMessageRejected(t *testing.T) { ) } } + +func TestSubscriptionNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + + psubs := getPubsubs(ctx, hosts) + + msgs := make([]*Subscription, numHosts) + subPeersFound := make([]map[peer.ID]struct{}, numHosts) + for i, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + peersFound := make(map[peer.ID]struct{}) + subPeersFound[i] = peersFound + go func(peersFound map[peer.ID]struct{}) { + for i := 0; i < numHosts-1; i++ { + pid, err := subch.NextSubscribedPeer(ctx) + if err != nil { + t.Fatal(err) + } + peersFound[pid] = struct{}{} + } + }(peersFound) + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + for _, peersFound := range subPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } +} diff --git a/pubsub.go b/pubsub.go index 6df169d8..2be2da79 100644 --- a/pubsub.go +++ b/pubsub.go @@ -78,6 +78,9 @@ type PubSub struct { // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} + // a set of notification channels for newly subscribed peers + newSubs map[string]chan peer.ID + // sendMsg handles messages that have been validated sendMsg chan *sendReq @@ -418,6 +421,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") close(sub.ch) + close(sub.inboundSubs) delete(subs, sub) if len(subs) == 0 { @@ -447,6 +451,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { subs = p.myTopics[sub.topic] } + sub.inboundSubs = make(chan peer.ID, 32) sub.ch = make(chan *Message, 32) sub.cancelCh = p.cancelCh @@ -570,7 +575,19 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { p.topics[t] = tmap } - tmap[rpc.from] = struct{}{} + if _, ok = tmap[rpc.from]; !ok { + tmap[rpc.from] = struct{}{} + if subs, ok := p.myTopics[t]; ok { + inboundPeer := rpc.from + for s := range subs { + select { + case s.inboundSubs <- inboundPeer: + default: + log.Infof("Can't deliver event to subscription for topic %s; subscriber too slow", t) + } + } + } + } } else { tmap, ok := p.topics[t] if !ok { diff --git a/subscription.go b/subscription.go index 66a9e513..ad70778e 100644 --- a/subscription.go +++ b/subscription.go @@ -2,13 +2,15 @@ package pubsub import ( "context" + "github.com/libp2p/go-libp2p-core/peer" ) type Subscription struct { - topic string - ch chan *Message - cancelCh chan<- *Subscription - err error + topic string + ch chan *Message + cancelCh chan<- *Subscription + inboundSubs chan peer.ID + err error } func (sub *Subscription) Topic() string { @@ -31,3 +33,16 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) { func (sub *Subscription) Cancel() { sub.cancelCh <- sub } + +func (sub *Subscription) NextSubscribedPeer(ctx context.Context) (peer.ID, error) { + select { + case newPeer, ok := <-sub.inboundSubs: + if !ok { + return newPeer, sub.err + } + + return newPeer, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} From cc791f28b9886c28fe5235220468cafafaab1a48 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 7 Jun 2019 09:21:17 -0400 Subject: [PATCH 02/12] Made TestSubscriptionNotification thread safe --- floodsub_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/floodsub_test.go b/floodsub_test.go index c15c57d8..47df0f53 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1075,6 +1075,8 @@ func TestSubscriptionNotification(t *testing.T) { msgs := make([]*Subscription, numHosts) subPeersFound := make([]map[peer.ID]struct{}, numHosts) + + wg := sync.WaitGroup{} for i, ps := range psubs { subch, err := ps.Subscribe("foobar") if err != nil { @@ -1084,7 +1086,9 @@ func TestSubscriptionNotification(t *testing.T) { msgs[i] = subch peersFound := make(map[peer.ID]struct{}) subPeersFound[i] = peersFound + wg.Add(1) go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() for i := 0; i < numHosts-1; i++ { pid, err := subch.NextSubscribedPeer(ctx) if err != nil { @@ -1099,6 +1103,7 @@ func TestSubscriptionNotification(t *testing.T) { time.Sleep(time.Millisecond * 100) + wg.Wait() for _, peersFound := range subPeersFound { if len(peersFound) != numHosts-1 { t.Fatal("incorrect number of peers found") From 817651a6d1021babc0cb8f2620a7dd31aa3f68b6 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 11 Jun 2019 17:49:28 -0400 Subject: [PATCH 03/12] Subscription Join events now fire even for peers connected to us before we subscribe. Added a Subscription Leave event --- floodsub_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++++-- pubsub.go | 39 +++++++++++++++--- subscription.go | 16 +++++++- 3 files changed, 146 insertions(+), 11 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 47df0f53..9cb8dfcd 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1064,10 +1064,11 @@ func TestImproperlySignedMessageRejected(t *testing.T) { } } -func TestSubscriptionNotification(t *testing.T) { +func TestSubscriptionJoinNotification(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + const numLateSubscribers = 10 const numHosts = 20 hosts := getNetHosts(t, ctx, numHosts) @@ -1076,21 +1077,42 @@ func TestSubscriptionNotification(t *testing.T) { msgs := make([]*Subscription, numHosts) subPeersFound := make([]map[peer.ID]struct{}, numHosts) - wg := sync.WaitGroup{} - for i, ps := range psubs { + // Have some peers subscribe earlier than other peers. + // This exercises whether we get subscription notifications from + // existing peers. + for i, ps := range psubs[numLateSubscribers:] { subch, err := ps.Subscribe("foobar") if err != nil { t.Fatal(err) } msgs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + // Have the rest subscribe + for i, ps := range psubs[:numLateSubscribers] { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs[i+numLateSubscribers] = subch + } + + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { peersFound := make(map[peer.ID]struct{}) subPeersFound[i] = peersFound + sub := msgs[i] wg.Add(1) go func(peersFound map[peer.ID]struct{}) { defer wg.Done() for i := 0; i < numHosts-1; i++ { - pid, err := subch.NextSubscribedPeer(ctx) + pid, err := sub.NextPeerJoin(ctx) if err != nil { t.Fatal(err) } @@ -1099,14 +1121,86 @@ func TestSubscriptionNotification(t *testing.T) { }(peersFound) } + wg.Wait() + for _, peersFound := range subPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } +} + +func TestSubscriptionLeaveNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + + psubs := getPubsubs(ctx, hosts) + + msgs := make([]*Subscription, numHosts) + subPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers and wait until they've all been found + for i, ps := range psubs { + subch, err := ps.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + } + connectAll(t, hosts) time.Sleep(time.Millisecond * 100) + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + subPeersFound[i] = peersFound + sub := msgs[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for i := 0; i < numHosts-1; i++ { + pid, err := sub.NextPeerJoin(ctx) + if err != nil { + t.Fatal(err) + } + peersFound[pid] = struct{}{} + } + }(peersFound) + } + wg.Wait() for _, peersFound := range subPeersFound { if len(peersFound) != numHosts-1 { t.Fatal("incorrect number of peers found") } } + + // Test removing peers and verifying that they cause events + msgs[1].Cancel() + hosts[2].Close() + psubs[0].BlacklistPeer(hosts[3].ID()) + + leavingPeers := make(map[peer.ID]struct{}) + for i := 0; i < 3; i++ { + pid, err := msgs[0].NextPeerLeave(ctx) + if err != nil { + t.Fatal(err) + } + leavingPeers[pid] = struct{}{} + } + + if _, ok := leavingPeers[hosts[1].ID()]; !ok { + t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[2].ID()]; !ok { + t.Fatal(fmt.Errorf("closing host did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[3].ID()]; !ok { + t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) + } } diff --git a/pubsub.go b/pubsub.go index 2be2da79..dd82c435 100644 --- a/pubsub.go +++ b/pubsub.go @@ -336,8 +336,9 @@ func (p *PubSub) processLoop(ctx context.Context) { } delete(p.peers, pid) - for _, t := range p.topics { - delete(t, pid) + for t, tmap := range p.topics { + delete(tmap, pid) + p.notifySubscriberLeft(t, pid) } p.rt.RemovePeer(pid) @@ -395,8 +396,9 @@ func (p *PubSub) processLoop(ctx context.Context) { if ok { close(ch) delete(p.peers, pid) - for _, t := range p.topics { - delete(t, pid) + for t, tmap := range p.topics { + delete(tmap, pid) + p.notifySubscriberLeft(t, pid) } p.rt.RemovePeer(pid) } @@ -422,6 +424,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") close(sub.ch) close(sub.inboundSubs) + close(sub.leavingSubs) delete(subs, sub) if len(subs) == 0 { @@ -451,10 +454,21 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { subs = p.myTopics[sub.topic] } - sub.inboundSubs = make(chan peer.ID, 32) + tmap := p.topics[sub.topic] + inboundBufSize := len(tmap) + if inboundBufSize < 32 { + inboundBufSize = 32 + } + sub.ch = make(chan *Message, 32) + sub.inboundSubs = make(chan peer.ID, inboundBufSize) + sub.leavingSubs = make(chan peer.ID, 32) sub.cancelCh = p.cancelCh + for pid := range tmap { + sub.inboundSubs <- pid + } + p.myTopics[sub.topic][sub] = struct{}{} req.resp <- sub @@ -565,6 +579,18 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } +func (p *PubSub) notifySubscriberLeft(topic string, pid peer.ID) { + if subs, ok := p.myTopics[topic]; ok { + for s := range subs { + select { + case s.leavingSubs <- pid: + default: + log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic) + } + } + } +} + func (p *PubSub) handleIncomingRPC(rpc *RPC) { for _, subopt := range rpc.GetSubscriptions() { t := subopt.GetTopicid() @@ -583,7 +609,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { select { case s.inboundSubs <- inboundPeer: default: - log.Infof("Can't deliver event to subscription for topic %s; subscriber too slow", t) + log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) } } } @@ -594,6 +620,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { continue } delete(tmap, rpc.from) + p.notifySubscriberLeft(t, rpc.from) } } diff --git a/subscription.go b/subscription.go index ad70778e..61f6e419 100644 --- a/subscription.go +++ b/subscription.go @@ -10,6 +10,7 @@ type Subscription struct { ch chan *Message cancelCh chan<- *Subscription inboundSubs chan peer.ID + leavingSubs chan peer.ID err error } @@ -34,7 +35,7 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } -func (sub *Subscription) NextSubscribedPeer(ctx context.Context) (peer.ID, error) { +func (sub *Subscription) NextPeerJoin(ctx context.Context) (peer.ID, error) { select { case newPeer, ok := <-sub.inboundSubs: if !ok { @@ -46,3 +47,16 @@ func (sub *Subscription) NextSubscribedPeer(ctx context.Context) (peer.ID, error return "", ctx.Err() } } + +func (sub *Subscription) NextPeerLeave(ctx context.Context) (peer.ID, error) { + select { + case leavingPeer, ok := <-sub.leavingSubs: + if !ok { + return leavingPeer, sub.err + } + + return leavingPeer, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} From be69856a1d49973c505cb2e8d3043c9f2087fe0b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 Jun 2019 10:06:16 -0400 Subject: [PATCH 04/12] Fixed some unnecessary Leave notifications. Combined Join and Leave events into a single API with a struct that specifies whether the event is a Join or a Leave. --- floodsub_test.go | 24 +++++++++++++-------- pubsub.go | 35 ++++++++++++++++++------------- subscription.go | 54 +++++++++++++++++++++++++++++------------------- 3 files changed, 69 insertions(+), 44 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 9cb8dfcd..dfd4e7ee 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1111,12 +1111,14 @@ func TestSubscriptionJoinNotification(t *testing.T) { wg.Add(1) go func(peersFound map[peer.ID]struct{}) { defer wg.Done() - for i := 0; i < numHosts-1; i++ { - pid, err := sub.NextPeerJoin(ctx) + for len(peersFound) < numHosts-1 { + event, err := sub.NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - peersFound[pid] = struct{}{} + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } } }(peersFound) } @@ -1163,12 +1165,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) { wg.Add(1) go func(peersFound map[peer.ID]struct{}) { defer wg.Done() - for i := 0; i < numHosts-1; i++ { - pid, err := sub.NextPeerJoin(ctx) + for len(peersFound) < numHosts-1 { + event, err := sub.NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - peersFound[pid] = struct{}{} + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } } }(peersFound) } @@ -1186,12 +1190,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) { psubs[0].BlacklistPeer(hosts[3].ID()) leavingPeers := make(map[peer.ID]struct{}) - for i := 0; i < 3; i++ { - pid, err := msgs[0].NextPeerLeave(ctx) + for len(leavingPeers) < 3 { + event, err := msgs[0].NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - leavingPeers[pid] = struct{}{} + if event.Type == PEER_LEAVE { + leavingPeers[event.Peer] = struct{}{} + } } if _, ok := leavingPeers[hosts[1].ID()]; !ok { diff --git a/pubsub.go b/pubsub.go index dd82c435..7faded5a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -337,8 +337,10 @@ func (p *PubSub) processLoop(ctx context.Context) { delete(p.peers, pid) for t, tmap := range p.topics { - delete(tmap, pid) - p.notifySubscriberLeft(t, pid) + if _, ok := tmap[pid]; ok { + delete(tmap, pid) + p.notifyLeave(t, pid) + } } p.rt.RemovePeer(pid) @@ -397,8 +399,10 @@ func (p *PubSub) processLoop(ctx context.Context) { close(ch) delete(p.peers, pid) for t, tmap := range p.topics { - delete(tmap, pid) - p.notifySubscriberLeft(t, pid) + if _, ok := tmap[pid]; ok { + delete(tmap, pid) + p.notifyLeave(t, pid) + } } p.rt.RemovePeer(pid) } @@ -423,8 +427,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") close(sub.ch) - close(sub.inboundSubs) - close(sub.leavingSubs) + close(sub.joinCh) + close(sub.leaveCh) delete(subs, sub) if len(subs) == 0 { @@ -461,12 +465,12 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { } sub.ch = make(chan *Message, 32) - sub.inboundSubs = make(chan peer.ID, inboundBufSize) - sub.leavingSubs = make(chan peer.ID, 32) + sub.joinCh = make(chan peer.ID, inboundBufSize) + sub.leaveCh = make(chan peer.ID, 32) sub.cancelCh = p.cancelCh for pid := range tmap { - sub.inboundSubs <- pid + sub.joinCh <- pid } p.myTopics[sub.topic][sub] = struct{}{} @@ -579,11 +583,11 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } -func (p *PubSub) notifySubscriberLeft(topic string, pid peer.ID) { +func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { select { - case s.leavingSubs <- pid: + case s.leaveCh <- pid: default: log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic) } @@ -607,7 +611,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { inboundPeer := rpc.from for s := range subs { select { - case s.inboundSubs <- inboundPeer: + case s.joinCh <- inboundPeer: default: log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) } @@ -619,8 +623,11 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if !ok { continue } - delete(tmap, rpc.from) - p.notifySubscriberLeft(t, rpc.from) + + if _, ok := tmap[rpc.from]; ok { + delete(tmap, rpc.from) + p.notifyLeave(t, rpc.from) + } } } diff --git a/subscription.go b/subscription.go index 61f6e419..00908046 100644 --- a/subscription.go +++ b/subscription.go @@ -5,19 +5,33 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +type EventType uint8 + +const ( + UNKNOWN EventType = iota + PEER_JOIN + PEER_LEAVE +) + type Subscription struct { - topic string - ch chan *Message - cancelCh chan<- *Subscription - inboundSubs chan peer.ID - leavingSubs chan peer.ID - err error + topic string + ch chan *Message + cancelCh chan<- *Subscription + joinCh chan peer.ID + leaveCh chan peer.ID + err error +} + +type PeerEvent struct { + Type EventType + Peer peer.ID } func (sub *Subscription) Topic() string { return sub.topic } +// Next returns the next message in our subscription func (sub *Subscription) Next(ctx context.Context) (*Message, error) { select { case msg, ok := <-sub.ch: @@ -35,28 +49,26 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } -func (sub *Subscription) NextPeerJoin(ctx context.Context) (peer.ID, error) { +// NextPeerEvent returns the next event regarding subscribed peers +// Note: There is no guarantee that the Peer Join event will fire before +// the related Peer Leave event for a given peer +func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { select { - case newPeer, ok := <-sub.inboundSubs: + case newPeer, ok := <-sub.joinCh: + event := PeerEvent{Type: PEER_JOIN, Peer: newPeer} if !ok { - return newPeer, sub.err + return event, sub.err } - return newPeer, nil - case <-ctx.Done(): - return "", ctx.Err() - } -} - -func (sub *Subscription) NextPeerLeave(ctx context.Context) (peer.ID, error) { - select { - case leavingPeer, ok := <-sub.leavingSubs: + return event, nil + case leavingPeer, ok := <-sub.leaveCh: + event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer} if !ok { - return leavingPeer, sub.err + return event, sub.err } - return leavingPeer, nil + return event, nil case <-ctx.Done(): - return "", ctx.Err() + return PeerEvent{}, ctx.Err() } } From e26e489bdd8e23050f766f0a25a53d951760facd Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 21 Jun 2019 08:46:41 +0200 Subject: [PATCH 05/12] Small code cleanup + refactor --- pubsub.go | 11 +++-------- subscription.go | 8 +++++++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pubsub.go b/pubsub.go index 7faded5a..580d4fa3 100644 --- a/pubsub.go +++ b/pubsub.go @@ -78,9 +78,6 @@ type PubSub struct { // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} - // a set of notification channels for newly subscribed peers - newSubs map[string]chan peer.ID - // sendMsg handles messages that have been validated sendMsg chan *sendReq @@ -426,9 +423,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { } sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") - close(sub.ch) - close(sub.joinCh) - close(sub.leaveCh) + sub.close() delete(subs, sub) if len(subs) == 0 { @@ -608,10 +603,10 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if _, ok = tmap[rpc.from]; !ok { tmap[rpc.from] = struct{}{} if subs, ok := p.myTopics[t]; ok { - inboundPeer := rpc.from + peer := rpc.from for s := range subs { select { - case s.joinCh <- inboundPeer: + case s.joinCh <- peer: default: log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) } diff --git a/subscription.go b/subscription.go index 00908046..56128af3 100644 --- a/subscription.go +++ b/subscription.go @@ -5,7 +5,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -type EventType uint8 +type EventType int8 const ( UNKNOWN EventType = iota @@ -49,6 +49,12 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } +func (sub *Subscription) close(){ + close(sub.ch) + close(sub.joinCh) + close(sub.leaveCh) +} + // NextPeerEvent returns the next event regarding subscribed peers // Note: There is no guarantee that the Peer Join event will fire before // the related Peer Leave event for a given peer From ae667299a8ddd054f8c92606e30818d00445f268 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 1 Jul 2019 17:43:49 +0200 Subject: [PATCH 06/12] go fmt --- subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscription.go b/subscription.go index 56128af3..5db1cb8b 100644 --- a/subscription.go +++ b/subscription.go @@ -49,7 +49,7 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } -func (sub *Subscription) close(){ +func (sub *Subscription) close() { close(sub.ch) close(sub.joinCh) close(sub.leaveCh) From e4a65bcf1cb9bf4d22849c25f08824c92a1d3c1b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 22 Jul 2019 10:02:58 -0400 Subject: [PATCH 07/12] Made discovery enum use int and removed the UNKNOWN zero value from the enum --- subscription.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/subscription.go b/subscription.go index 5db1cb8b..7c4bdbe5 100644 --- a/subscription.go +++ b/subscription.go @@ -5,11 +5,10 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -type EventType int8 +type EventType int const ( - UNKNOWN EventType = iota - PEER_JOIN + PEER_JOIN EventType = iota PEER_LEAVE ) From 934b813b9aed7239ff051090ee33474f598a8983 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 1 Aug 2019 16:57:05 -0400 Subject: [PATCH 08/12] stronger notification deliveries backed by unbounded buffer --- floodsub_test.go | 163 +++++++++++++++++++++++++++++++++++++++++++++++ pubsub.go | 31 +++------ subscription.go | 86 ++++++++++++++++++++----- 3 files changed, 242 insertions(+), 38 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index dfd4e7ee..65416385 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1210,3 +1210,166 @@ func TestSubscriptionLeaveNotification(t *testing.T) { t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) } } + +func TestSubscriptionNotificationOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + + psubs := getPubsubs(ctx, hosts) + + msgs := make([]*Subscription, numHosts) + subPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers except one and wait until they've all been found + for i := 1; i < numHosts; i++ { + subch, err := psubs[i].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 1; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + subPeersFound[i] = peersFound + sub := msgs[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-2 { + event, err := sub.NextPeerEvent(ctx) + if err != nil { + t.Fatal(err) + } + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range subPeersFound[1:] { + if len(peersFound) != numHosts-2 { + t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) + } + } + + // Wait for remaining peer to find other peers + for len(psubs[0].ListPeers(topic)) < numHosts-1 { + time.Sleep(time.Millisecond * 100) + } + + // Subscribe the remaining peer and check that all the events came through + sub, err := psubs[0].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + + msgs[0] = sub + + peerState := readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PEER_JOIN { + t.Fatal("non JOIN event occurred") + } + } + + // Unsubscribe all peers except one and check that all the events came through + for i := 1; i < numHosts; i++ { + msgs[i].Cancel() + } + + // Wait for remaining peer to find other peers + for len(psubs[0].ListPeers(topic)) != 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState = readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PEER_LEAVE { + t.Fatal("non LEAVE event occurred") + } + } + + // Resubscribe and Unsubscribe a peers and check the state for consistency + notifSubThenUnSub(ctx, t, topic, psubs, msgs, 10) + notifSubThenUnSub(ctx, t, topic, psubs, msgs, numHosts-1) +} + +func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, + psubs []*PubSub, msgs []*Subscription, checkSize int) { + + ps := psubs[0] + sub := msgs[0] + + var err error + + for i := 1; i < checkSize+1; i++ { + msgs[i], err = psubs[i].Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + for len(ps.ListPeers(topic)) < checkSize { + time.Sleep(time.Millisecond * 100) + } + + for i := 1; i < checkSize+1; i++ { + msgs[i].Cancel() + } + + // Wait for subscriptions to register + for len(ps.ListPeers(topic)) < 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState := readAllQueuedEvents(ctx, t, sub) + + if len(peerState) != 0 { + t.Fatal("Received incorrect events") + } +} + +func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType { + peerState := make(map[peer.ID]EventType) + for { + ctx, _ := context.WithTimeout(ctx, time.Millisecond*100) + event, err := sub.NextPeerEvent(ctx) + if err == context.DeadlineExceeded { + break + } else if err != nil { + t.Fatal(err) + } + + e, ok := peerState[event.Peer] + if !ok { + peerState[event.Peer] = event.Type + } else if e != event.Type { + delete(peerState, event.Peer) + } + } + return peerState +} diff --git a/pubsub.go b/pubsub.go index 580d4fa3..37391cd4 100644 --- a/pubsub.go +++ b/pubsub.go @@ -454,19 +454,11 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { } tmap := p.topics[sub.topic] - inboundBufSize := len(tmap) - if inboundBufSize < 32 { - inboundBufSize = 32 - } - - sub.ch = make(chan *Message, 32) - sub.joinCh = make(chan peer.ID, inboundBufSize) - sub.leaveCh = make(chan peer.ID, 32) - sub.cancelCh = p.cancelCh - for pid := range tmap { - sub.joinCh <- pid + for p := range tmap { + sub.evtBacklog[p] = PEER_JOIN } + sub.cancelCh = p.cancelCh p.myTopics[sub.topic][sub] = struct{}{} @@ -581,11 +573,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { - select { - case s.leaveCh <- pid: - default: - log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic) - } + s.sendNotification(PeerEvent{PEER_LEAVE, pid}) } } } @@ -605,11 +593,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if subs, ok := p.myTopics[t]; ok { peer := rpc.from for s := range subs { - select { - case s.joinCh <- peer: - default: - log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) - } + s.sendNotification(PeerEvent{PEER_JOIN, peer}) } } } @@ -712,6 +696,11 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO sub := &Subscription{ topic: td.GetName(), + + ch: make(chan *Message, 32), + peerEvtCh: make(chan PeerEvent, 32), + evtBacklog: make(map[peer.ID]EventType), + backlogCh: make(chan PeerEvent, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index 7c4bdbe5..ae33cdef 100644 --- a/subscription.go +++ b/subscription.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "github.com/libp2p/go-libp2p-core/peer" + "sync" ) type EventType int @@ -16,9 +17,12 @@ type Subscription struct { topic string ch chan *Message cancelCh chan<- *Subscription - joinCh chan peer.ID - leaveCh chan peer.ID err error + + peerEvtCh chan PeerEvent + eventMx sync.Mutex + evtBacklog map[peer.ID]EventType + backlogCh chan PeerEvent } type PeerEvent struct { @@ -50,29 +54,77 @@ func (sub *Subscription) Cancel() { func (sub *Subscription) close() { close(sub.ch) - close(sub.joinCh) - close(sub.leaveCh) } -// NextPeerEvent returns the next event regarding subscribed peers -// Note: There is no guarantee that the Peer Join event will fire before -// the related Peer Leave event for a given peer -func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { +func (sub *Subscription) sendNotification(evt PeerEvent) { + sub.eventMx.Lock() + defer sub.eventMx.Unlock() + + e, ok := sub.evtBacklog[evt.Peer] + if ok && e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } + select { - case newPeer, ok := <-sub.joinCh: - event := PeerEvent{Type: PEER_JOIN, Peer: newPeer} - if !ok { - return event, sub.err + case sub.peerEvtCh <- evt: + default: + // Empty event queue into backlog + emptyqueue: + for { + select { + case e := <-sub.peerEvtCh: + sub.addToBacklog(e) + default: + break emptyqueue + } + } + sub.addToBacklog(evt) + if e, ok := sub.pullFromBacklog(); ok { + sub.peerEvtCh <- e } + } +} + +// addToBacklog assumes a lock has been taken to protect the backlog +func (sub *Subscription) addToBacklog(evt PeerEvent) { + e, ok := sub.evtBacklog[evt.Peer] + if !ok { + sub.evtBacklog[evt.Peer] = evt.Type + } else if e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } +} + +// pullFromBacklog assumes a lock has been taken to protect the backlog +func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { + for k, v := range sub.evtBacklog { + evt := PeerEvent{Peer: k, Type: v} + delete(sub.evtBacklog, k) + return evt, true + } + return PeerEvent{}, false +} - return event, nil - case leavingPeer, ok := <-sub.leaveCh: - event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer} +// NextPeerEvent returns the next event regarding subscribed peers +// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. +// Unless a peer both Joins and Leaves before NextPeerEvent emits either event +// all events will eventually be received from NextPeerEvent. +func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { + sub.eventMx.Lock() + evt, ok := sub.pullFromBacklog() + sub.eventMx.Unlock() + + if ok { + return evt, nil + } + + select { + case evt, ok := <-sub.peerEvtCh: if !ok { - return event, sub.err + return PeerEvent{}, sub.err } - return event, nil + return evt, nil case <-ctx.Done(): return PeerEvent{}, ctx.Err() } From 48c9847240e04eff525b653ec630fce3e7cd1ca4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 2 Aug 2019 00:46:49 -0400 Subject: [PATCH 09/12] oops forgot a return. separated out and added more comments to the new tests. --- floodsub_test.go | 66 ++++++++++++++++++++++++++++++++++++++++-------- subscription.go | 8 +++--- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 65416385..70265b0f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1211,7 +1211,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { } } -func TestSubscriptionNotificationOverflow(t *testing.T) { +func TestSubscriptionNotificationOverflowSimple(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1296,7 +1296,7 @@ func TestSubscriptionNotificationOverflow(t *testing.T) { msgs[i].Cancel() } - // Wait for remaining peer to find other peers + // Wait for remaining peer to disconnect from the other peers for len(psubs[0].ListPeers(topic)) != 0 { time.Sleep(time.Millisecond * 100) } @@ -1312,44 +1312,88 @@ func TestSubscriptionNotificationOverflow(t *testing.T) { t.Fatal("non LEAVE event occurred") } } +} +func TestSubscriptionNotificationSubUnSub(t *testing.T) { // Resubscribe and Unsubscribe a peers and check the state for consistency - notifSubThenUnSub(ctx, t, topic, psubs, msgs, 10) - notifSubThenUnSub(ctx, t, topic, psubs, msgs, numHosts-1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topic, psubs[:11]) +} + +func TestSubscriptionNotificationOverflowSubUnSub(t *testing.T) { + // Resubscribe and Unsubscribe a peers and check the state for consistency + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topic, psubs) } func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, - psubs []*PubSub, msgs []*Subscription, checkSize int) { + psubs []*PubSub) { ps := psubs[0] - sub := msgs[0] + msgs := make([]*Subscription, len(psubs)) + checkSize := len(psubs) - 1 + // Subscribe all peers to the topic var err error - - for i := 1; i < checkSize+1; i++ { - msgs[i], err = psubs[i].Subscribe(topic) + for i, ps := range psubs { + msgs[i], err = ps.Subscribe(topic) if err != nil { t.Fatal(err) } } + sub := msgs[0] + + // Wait for the primary peer to be connected to the other peers for len(ps.ListPeers(topic)) < checkSize { time.Sleep(time.Millisecond * 100) } + // Unsubscribe all peers except the primary for i := 1; i < checkSize+1; i++ { msgs[i].Cancel() } - // Wait for subscriptions to register + // Wait for the unsubscribe messages to reach the primary peer for len(ps.ListPeers(topic)) < 0 { time.Sleep(time.Millisecond * 100) } + // read all available events and verify that there are no events to process + // this is because every peer that joined also left peerState := readAllQueuedEvents(ctx, t, sub) if len(peerState) != 0 { - t.Fatal("Received incorrect events") + for p, s := range peerState { + fmt.Println(p, s) + } + t.Fatalf("Received incorrect events. %d extra events", len(peerState)) } } diff --git a/subscription.go b/subscription.go index ae33cdef..5ed17784 100644 --- a/subscription.go +++ b/subscription.go @@ -60,9 +60,11 @@ func (sub *Subscription) sendNotification(evt PeerEvent) { sub.eventMx.Lock() defer sub.eventMx.Unlock() - e, ok := sub.evtBacklog[evt.Peer] - if ok && e != evt.Type { - delete(sub.evtBacklog, evt.Peer) + if e, ok := sub.evtBacklog[evt.Peer]; ok { + if e != evt.Type { + delete(sub.evtBacklog, evt.Peer) + } + return } select { From 57f2c1efdd44ec2d2527cdd93ed1029f40a79b17 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 2 Aug 2019 12:25:57 -0400 Subject: [PATCH 10/12] Refactored events to be camel case. --- floodsub_test.go | 16 ++++++++-------- pubsub.go | 6 +++--- subscription.go | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 70265b0f..547ea41f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1116,7 +1116,7 @@ func TestSubscriptionJoinNotification(t *testing.T) { if err != nil { t.Fatal(err) } - if event.Type == PEER_JOIN { + if event.Type == PeerJoin { peersFound[event.Peer] = struct{}{} } } @@ -1170,7 +1170,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { if err != nil { t.Fatal(err) } - if event.Type == PEER_JOIN { + if event.Type == PeerJoin { peersFound[event.Peer] = struct{}{} } } @@ -1195,7 +1195,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { if err != nil { t.Fatal(err) } - if event.Type == PEER_LEAVE { + if event.Type == PeerLeave { leavingPeers[event.Peer] = struct{}{} } } @@ -1252,7 +1252,7 @@ func TestSubscriptionNotificationOverflowSimple(t *testing.T) { if err != nil { t.Fatal(err) } - if event.Type == PEER_JOIN { + if event.Type == PeerJoin { peersFound[event.Peer] = struct{}{} } } @@ -1286,8 +1286,8 @@ func TestSubscriptionNotificationOverflowSimple(t *testing.T) { } for _, e := range peerState { - if e != PEER_JOIN { - t.Fatal("non JOIN event occurred") + if e != PeerJoin { + t.Fatal("non Join event occurred") } } @@ -1308,8 +1308,8 @@ func TestSubscriptionNotificationOverflowSimple(t *testing.T) { } for _, e := range peerState { - if e != PEER_LEAVE { - t.Fatal("non LEAVE event occurred") + if e != PeerLeave { + t.Fatal("non Leave event occurred") } } } diff --git a/pubsub.go b/pubsub.go index 37391cd4..a648b0e6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -456,7 +456,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { tmap := p.topics[sub.topic] for p := range tmap { - sub.evtBacklog[p] = PEER_JOIN + sub.evtBacklog[p] = PeerJoin } sub.cancelCh = p.cancelCh @@ -573,7 +573,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { - s.sendNotification(PeerEvent{PEER_LEAVE, pid}) + s.sendNotification(PeerEvent{PeerLeave, pid}) } } } @@ -593,7 +593,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if subs, ok := p.myTopics[t]; ok { peer := rpc.from for s := range subs { - s.sendNotification(PeerEvent{PEER_JOIN, peer}) + s.sendNotification(PeerEvent{PeerJoin, peer}) } } } diff --git a/subscription.go b/subscription.go index 5ed17784..edc603e6 100644 --- a/subscription.go +++ b/subscription.go @@ -9,8 +9,8 @@ import ( type EventType int const ( - PEER_JOIN EventType = iota - PEER_LEAVE + PeerJoin EventType = iota + PeerLeave ) type Subscription struct { From 65825ce63ab551d95aa998f86719e6102afb507c Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 5 Aug 2019 15:48:31 -0400 Subject: [PATCH 11/12] fix race --- pubsub.go | 4 ++-- subscription.go | 63 ++++++++++++++++++------------------------------- 2 files changed, 25 insertions(+), 42 deletions(-) diff --git a/pubsub.go b/pubsub.go index a648b0e6..5f28d382 100644 --- a/pubsub.go +++ b/pubsub.go @@ -698,9 +698,9 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO topic: td.GetName(), ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 32), + peerEvtCh: make(chan PeerEvent, 1), evtBacklog: make(map[peer.ID]EventType), - backlogCh: make(chan PeerEvent, 1), + backlogCh: make(chan struct{}, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index edc603e6..55591bde 100644 --- a/subscription.go +++ b/subscription.go @@ -19,10 +19,11 @@ type Subscription struct { cancelCh chan<- *Subscription err error - peerEvtCh chan PeerEvent - eventMx sync.Mutex - evtBacklog map[peer.ID]EventType - backlogCh chan PeerEvent + peerEvtCh chan PeerEvent + backlogMx sync.Mutex + evtBacklog map[peer.ID]EventType + backlogCh chan struct{} + nextEventMx sync.Mutex } type PeerEvent struct { @@ -57,33 +58,14 @@ func (sub *Subscription) close() { } func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.eventMx.Lock() - defer sub.eventMx.Unlock() + sub.backlogMx.Lock() + defer sub.backlogMx.Unlock() - if e, ok := sub.evtBacklog[evt.Peer]; ok { - if e != evt.Type { - delete(sub.evtBacklog, evt.Peer) - } - return - } + sub.addToBacklog(evt) select { - case sub.peerEvtCh <- evt: + case sub.backlogCh <- struct{}{}: default: - // Empty event queue into backlog - emptyqueue: - for { - select { - case e := <-sub.peerEvtCh: - sub.addToBacklog(e) - default: - break emptyqueue - } - } - sub.addToBacklog(evt) - if e, ok := sub.pullFromBacklog(); ok { - sub.peerEvtCh <- e - } } } @@ -112,22 +94,23 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { // Unless a peer both Joins and Leaves before NextPeerEvent emits either event // all events will eventually be received from NextPeerEvent. func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - sub.eventMx.Lock() - evt, ok := sub.pullFromBacklog() - sub.eventMx.Unlock() + sub.nextEventMx.Lock() + defer sub.nextEventMx.Unlock() - if ok { - return evt, nil - } + for { + sub.backlogMx.Lock() + evt, ok := sub.pullFromBacklog() + sub.backlogMx.Unlock() - select { - case evt, ok := <-sub.peerEvtCh: - if !ok { - return PeerEvent{}, sub.err + if ok { + return evt, nil } - return evt, nil - case <-ctx.Done(): - return PeerEvent{}, ctx.Err() + select { + case <-sub.backlogCh: + continue + case <-ctx.Done(): + return PeerEvent{}, ctx.Err() + } } } From 97e63e477ee7ac22e92d7e8232230586d0d60504 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 6 Aug 2019 00:26:40 -0400 Subject: [PATCH 12/12] better context respect when waiting for new peer events. refactored backlog into eventLog. removed test that was no longer useful. --- floodsub_test.go | 21 +--------------- pubsub.go | 10 ++++---- subscription.go | 63 +++++++++++++++++++++++++----------------------- 3 files changed, 39 insertions(+), 55 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 547ea41f..6e845e70 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1211,7 +1211,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { } } -func TestSubscriptionNotificationOverflowSimple(t *testing.T) { +func TestSubscriptionManyNotifications(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1330,25 +1330,6 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) { } time.Sleep(time.Millisecond * 100) - notifSubThenUnSub(ctx, t, topic, psubs[:11]) -} - -func TestSubscriptionNotificationOverflowSubUnSub(t *testing.T) { - // Resubscribe and Unsubscribe a peers and check the state for consistency - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - psubs := getPubsubs(ctx, hosts) - - for i := 1; i < numHosts; i++ { - connect(t, hosts[0], hosts[i]) - } - time.Sleep(time.Millisecond * 100) - notifSubThenUnSub(ctx, t, topic, psubs) } diff --git a/pubsub.go b/pubsub.go index 5f28d382..5902c00a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -456,7 +456,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { tmap := p.topics[sub.topic] for p := range tmap { - sub.evtBacklog[p] = PeerJoin + sub.evtLog[p] = PeerJoin } sub.cancelCh = p.cancelCh @@ -697,10 +697,10 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO sub := &Subscription{ topic: td.GetName(), - ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 1), - evtBacklog: make(map[peer.ID]EventType), - backlogCh: make(chan struct{}, 1), + ch: make(chan *Message, 32), + peerEvtCh: make(chan PeerEvent, 1), + evtLog: make(map[peer.ID]EventType), + evtLogCh: make(chan struct{}, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index 55591bde..45d957ec 100644 --- a/subscription.go +++ b/subscription.go @@ -19,11 +19,10 @@ type Subscription struct { cancelCh chan<- *Subscription err error - peerEvtCh chan PeerEvent - backlogMx sync.Mutex - evtBacklog map[peer.ID]EventType - backlogCh chan struct{} - nextEventMx sync.Mutex + peerEvtCh chan PeerEvent + evtLogMx sync.Mutex + evtLog map[peer.ID]EventType + evtLogCh chan struct{} } type PeerEvent struct { @@ -58,32 +57,32 @@ func (sub *Subscription) close() { } func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.backlogMx.Lock() - defer sub.backlogMx.Unlock() + sub.evtLogMx.Lock() + defer sub.evtLogMx.Unlock() - sub.addToBacklog(evt) - - select { - case sub.backlogCh <- struct{}{}: - default: - } + sub.addToEventLog(evt) } -// addToBacklog assumes a lock has been taken to protect the backlog -func (sub *Subscription) addToBacklog(evt PeerEvent) { - e, ok := sub.evtBacklog[evt.Peer] +// addToEventLog assumes a lock has been taken to protect the event log +func (sub *Subscription) addToEventLog(evt PeerEvent) { + e, ok := sub.evtLog[evt.Peer] if !ok { - sub.evtBacklog[evt.Peer] = evt.Type + sub.evtLog[evt.Peer] = evt.Type + // send signal that an event has been added to the event log + select { + case sub.evtLogCh <- struct{}{}: + default: + } } else if e != evt.Type { - delete(sub.evtBacklog, evt.Peer) + delete(sub.evtLog, evt.Peer) } } -// pullFromBacklog assumes a lock has been taken to protect the backlog -func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { - for k, v := range sub.evtBacklog { +// pullFromEventLog assumes a lock has been taken to protect the event log +func (sub *Subscription) pullFromEventLog() (PeerEvent, bool) { + for k, v := range sub.evtLog { evt := PeerEvent{Peer: k, Type: v} - delete(sub.evtBacklog, k) + delete(sub.evtLog, k) return evt, true } return PeerEvent{}, false @@ -94,20 +93,24 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { // Unless a peer both Joins and Leaves before NextPeerEvent emits either event // all events will eventually be received from NextPeerEvent. func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - sub.nextEventMx.Lock() - defer sub.nextEventMx.Unlock() - for { - sub.backlogMx.Lock() - evt, ok := sub.pullFromBacklog() - sub.backlogMx.Unlock() - + sub.evtLogMx.Lock() + evt, ok := sub.pullFromEventLog() if ok { + // make sure an event log signal is available if there are events in the event log + if len(sub.evtLog) > 0 { + select { + case sub.evtLogCh <- struct{}{}: + default: + } + } + sub.evtLogMx.Unlock() return evt, nil } + sub.evtLogMx.Unlock() select { - case <-sub.backlogCh: + case <-sub.evtLogCh: continue case <-ctx.Done(): return PeerEvent{}, ctx.Err()