Skip to content

Commit

Permalink
Merge pull request #190 from aschmahmann/feat/addOnSubscribeNotification
Browse files Browse the repository at this point in the history
Add the ability to handle newly subscribed peers
  • Loading branch information
vyzo authored Aug 7, 2019
2 parents 4221a39 + 97e63e4 commit 9f04364
Show file tree
Hide file tree
Showing 3 changed files with 464 additions and 8 deletions.
335 changes: 335 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,338 @@ func TestImproperlySignedMessageRejected(t *testing.T) {
)
}
}

func TestSubscriptionJoinNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numLateSubscribers = 10
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Have some peers subscribe earlier than other peers.
// This exercises whether we get subscription notifications from
// existing peers.
for i, ps := range psubs[numLateSubscribers:] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

// Have the rest subscribe
for i, ps := range psubs[:numLateSubscribers] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i+numLateSubscribers] = subch
}

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}
}

func TestSubscriptionLeaveNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers and wait until they've all been found
for i, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}

// Test removing peers and verifying that they cause events
msgs[1].Cancel()
hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
for len(leavingPeers) < 3 {
event, err := msgs[0].NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerLeave {
leavingPeers[event.Peer] = struct{}{}
}
}

if _, ok := leavingPeers[hosts[1].ID()]; !ok {
t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[2].ID()]; !ok {
t.Fatal(fmt.Errorf("closing host did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[3].ID()]; !ok {
t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event"))
}
}

func TestSubscriptionManyNotifications(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "foobar"

const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers except one and wait until they've all been found
for i := 1; i < numHosts; i++ {
subch, err := psubs[i].Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 1; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-2 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound[1:] {
if len(peersFound) != numHosts-2 {
t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2)
}
}

// Wait for remaining peer to find other peers
for len(psubs[0].ListPeers(topic)) < numHosts-1 {
time.Sleep(time.Millisecond * 100)
}

// Subscribe the remaining peer and check that all the events came through
sub, err := psubs[0].Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs[0] = sub

peerState := readAllQueuedEvents(ctx, t, sub)

if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}

for _, e := range peerState {
if e != PeerJoin {
t.Fatal("non Join event occurred")
}
}

// Unsubscribe all peers except one and check that all the events came through
for i := 1; i < numHosts; i++ {
msgs[i].Cancel()
}

// Wait for remaining peer to disconnect from the other peers
for len(psubs[0].ListPeers(topic)) != 0 {
time.Sleep(time.Millisecond * 100)
}

peerState = readAllQueuedEvents(ctx, t, sub)

if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}

for _, e := range peerState {
if e != PeerLeave {
t.Fatal("non Leave event occurred")
}
}
}

func TestSubscriptionNotificationSubUnSub(t *testing.T) {
// Resubscribe and Unsubscribe a peers and check the state for consistency
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "foobar"

const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)
psubs := getPubsubs(ctx, hosts)

for i := 1; i < numHosts; i++ {
connect(t, hosts[0], hosts[i])
}
time.Sleep(time.Millisecond * 100)

notifSubThenUnSub(ctx, t, topic, psubs)
}

func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string,
psubs []*PubSub) {

ps := psubs[0]
msgs := make([]*Subscription, len(psubs))
checkSize := len(psubs) - 1

// Subscribe all peers to the topic
var err error
for i, ps := range psubs {
msgs[i], err = ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

sub := msgs[0]

// Wait for the primary peer to be connected to the other peers
for len(ps.ListPeers(topic)) < checkSize {
time.Sleep(time.Millisecond * 100)
}

// Unsubscribe all peers except the primary
for i := 1; i < checkSize+1; i++ {
msgs[i].Cancel()
}

// Wait for the unsubscribe messages to reach the primary peer
for len(ps.ListPeers(topic)) < 0 {
time.Sleep(time.Millisecond * 100)
}

// read all available events and verify that there are no events to process
// this is because every peer that joined also left
peerState := readAllQueuedEvents(ctx, t, sub)

if len(peerState) != 0 {
for p, s := range peerState {
fmt.Println(p, s)
}
t.Fatalf("Received incorrect events. %d extra events", len(peerState))
}
}

func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType {
peerState := make(map[peer.ID]EventType)
for {
ctx, _ := context.WithTimeout(ctx, time.Millisecond*100)
event, err := sub.NextPeerEvent(ctx)
if err == context.DeadlineExceeded {
break
} else if err != nil {
t.Fatal(err)
}

e, ok := peerState[event.Peer]
if !ok {
peerState[event.Peer] = event.Type
} else if e != event.Type {
delete(peerState, event.Peer)
}
}
return peerState
}
Loading

0 comments on commit 9f04364

Please sign in to comment.