Skip to content

Commit

Permalink
network: make GossipNode more independent from wsNetwork implementati…
Browse files Browse the repository at this point in the history
…on (#5634)
  • Loading branch information
cce authored Aug 4, 2023
1 parent 5acab71 commit 72765c1
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 147 deletions.
2 changes: 1 addition & 1 deletion agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
fuzzer: fuzzer,
nodeID: nodeID,
mux: network.MakeMultiplexer(fuzzer.log),
mux: network.MakeMultiplexer(),
clocks: make(map[int]chan time.Time),
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
Expand Down
2 changes: 1 addition & 1 deletion agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func makewhiteholeNetwork(domain *whiteholeDomain) *whiteholeNetwork {
w := &whiteholeNetwork{
peer: atomic.AddUint32(&domain.peerIdx, 1),
lastMsgRead: uint32(len(domain.messages)),
mux: network.MakeMultiplexer(domain.log),
mux: network.MakeMultiplexer(),
domain: domain,
disconnected: make(map[uint32]bool),
}
Expand Down
2 changes: 1 addition & 1 deletion network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte,

func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
c := wsPeerMsgDataConverter{
log: wp.net.log,
log: wp.log,
origin: wp.originAddress,
}

Expand Down
13 changes: 4 additions & 9 deletions network/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
package network

import (
"fmt"
"sync/atomic"

"github.com/algorand/go-algorand/logging"
)

// Multiplexer is a message handler that sorts incoming messages by Tag and passes
// them along to the relevant message handler for that type of message.
type Multiplexer struct {
msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map.

log logging.Logger
}

// MakeMultiplexer creates an empty Multiplexer
func MakeMultiplexer(log logging.Logger) *Multiplexer {
m := &Multiplexer{
log: log,
}
func MakeMultiplexer() *Multiplexer {
m := &Multiplexer{}
m.ClearHandlers([]Tag{}) // allocate the map
return m
}
Expand Down Expand Up @@ -78,7 +73,7 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) {
}
for _, v := range dispatch {
if _, has := mp[v.Tag]; has {
m.log.Panicf("Already registered a handler for tag %v", v.Tag)
panic(fmt.Sprintf("Already registered a handler for tag %v", v.Tag))
}
mp[v.Tag] = v.MessageHandler
}
Expand Down
7 changes: 3 additions & 4 deletions network/multiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
Expand Down Expand Up @@ -57,7 +56,7 @@ func (th *testHandler) SawMsg(msg IncomingMessage) bool {
func TestMultiplexer(t *testing.T) {
partitiontest.PartitionTest(t)

m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
handler := &testHandler{}

// Handler shouldn't be called before it is registered
Expand All @@ -78,7 +77,7 @@ func TestMultiplexer(t *testing.T) {
panicked = true
}
}()
m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}, {protocol.TxnTag, handler}})

}()
Expand All @@ -90,7 +89,7 @@ func TestMultiplexer(t *testing.T) {
panicked = true
}
}()
m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}})
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}})

Expand Down
16 changes: 9 additions & 7 deletions network/netidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (i identityVerificationMessageSigned) Verify(key crypto.PublicKey) bool {
// sender's claimed identity and the challenge that was assigned to it. If the identity is available,
// the peer is loaded into the identity tracker. Otherwise, we ask the network to disconnect the peer.
func identityVerificationHandler(message IncomingMessage) OutgoingMessage {
wn := message.Net.(*WebsocketNetwork)

peer := message.Sender.(*wsPeer)
// avoid doing work (crypto and potentially taking a lock) if the peer is already verified
if atomic.LoadUint32(&peer.identityVerified) == 1 {
Expand All @@ -335,27 +337,27 @@ func identityVerificationHandler(message IncomingMessage) OutgoingMessage {
err := protocol.Decode(message.Data, &msg)
if err != nil {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting")
peer.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
if peer.identityChallenge != msg.Msg.ResponseChallenge {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
if !msg.Verify(peer.identity) {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
atomic.StoreUint32(&peer.identityVerified, 1)
// if the identity could not be claimed by this peer, it means the identity is in use
peer.net.peersLock.Lock()
ok := peer.net.identityTracker.setIdentity(peer)
peer.net.peersLock.Unlock()
wn.peersLock.Lock()
ok := wn.identityTracker.setIdentity(peer)
wn.peersLock.Unlock()
if !ok {
networkPeerIdentityDisconnect.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectDuplicateConnection}
}
return OutgoingMessage{}
Expand Down
3 changes: 2 additions & 1 deletion network/netidentity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,12 @@ func TestIdentityTrackerSetIdentity(t *testing.T) {
}

// Just tests that if a peer is already verified, it just returns OutgoingMessage{}
func TestHandlerGuard(t *testing.T) {
func TestIdentityTrackerHandlerGuard(t *testing.T) {
partitiontest.PartitionTest(t)
p := wsPeer{identityVerified: uint32(1)}
msg := IncomingMessage{
Sender: &p,
Net: &WebsocketNetwork{},
}
require.Equal(t, OutgoingMessage{}, identityVerificationHandler(msg))
}
Loading

0 comments on commit 72765c1

Please sign in to comment.