Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: make GossipNode more independent from wsNetwork implementation #5634

Merged
merged 5 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
fuzzer: fuzzer,
nodeID: nodeID,
mux: network.MakeMultiplexer(fuzzer.log),
mux: network.MakeMultiplexer(),
clocks: make(map[int]chan time.Time),
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
Expand Down
2 changes: 1 addition & 1 deletion agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func makewhiteholeNetwork(domain *whiteholeDomain) *whiteholeNetwork {
w := &whiteholeNetwork{
peer: atomic.AddUint32(&domain.peerIdx, 1),
lastMsgRead: uint32(len(domain.messages)),
mux: network.MakeMultiplexer(domain.log),
mux: network.MakeMultiplexer(),
domain: domain,
disconnected: make(map[uint32]bool),
}
Expand Down
2 changes: 1 addition & 1 deletion network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte,

func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
c := wsPeerMsgDataConverter{
log: wp.net.log,
log: wp.log,
origin: wp.originAddress,
}

Expand Down
13 changes: 4 additions & 9 deletions network/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
package network

import (
"fmt"
"sync/atomic"

"github.com/algorand/go-algorand/logging"
)

// Multiplexer is a message handler that sorts incoming messages by Tag and passes
// them along to the relevant message handler for that type of message.
type Multiplexer struct {
msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map.

log logging.Logger
}

// MakeMultiplexer creates an empty Multiplexer
func MakeMultiplexer(log logging.Logger) *Multiplexer {
m := &Multiplexer{
log: log,
}
func MakeMultiplexer() *Multiplexer {
m := &Multiplexer{}
m.ClearHandlers([]Tag{}) // allocate the map
return m
}
Expand Down Expand Up @@ -78,7 +73,7 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) {
}
for _, v := range dispatch {
if _, has := mp[v.Tag]; has {
m.log.Panicf("Already registered a handler for tag %v", v.Tag)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
panic(fmt.Sprintf("Already registered a handler for tag %v", v.Tag))
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}
mp[v.Tag] = v.MessageHandler
}
Expand Down
7 changes: 3 additions & 4 deletions network/multiplexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
Expand Down Expand Up @@ -57,7 +56,7 @@ func (th *testHandler) SawMsg(msg IncomingMessage) bool {
func TestMultiplexer(t *testing.T) {
partitiontest.PartitionTest(t)

m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
handler := &testHandler{}

// Handler shouldn't be called before it is registered
Expand All @@ -78,7 +77,7 @@ func TestMultiplexer(t *testing.T) {
panicked = true
}
}()
m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}, {protocol.TxnTag, handler}})

}()
Expand All @@ -90,7 +89,7 @@ func TestMultiplexer(t *testing.T) {
panicked = true
}
}()
m := MakeMultiplexer(logging.TestingLog(t))
m := MakeMultiplexer()
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}})
m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}})

Expand Down
16 changes: 9 additions & 7 deletions network/netidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ func (i identityVerificationMessageSigned) Verify(key crypto.PublicKey) bool {
// sender's claimed identity and the challenge that was assigned to it. If the identity is available,
// the peer is loaded into the identity tracker. Otherwise, we ask the network to disconnect the peer.
func identityVerificationHandler(message IncomingMessage) OutgoingMessage {
wn := message.Net.(*WebsocketNetwork)

peer := message.Sender.(*wsPeer)
// avoid doing work (crypto and potentially taking a lock) if the peer is already verified
if atomic.LoadUint32(&peer.identityVerified) == 1 {
Expand All @@ -335,27 +337,27 @@ func identityVerificationHandler(message IncomingMessage) OutgoingMessage {
err := protocol.Decode(message.Data, &msg)
if err != nil {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting")
peer.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
if peer.identityChallenge != msg.Msg.ResponseChallenge {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
if !msg.Verify(peer.identity) {
networkPeerIdentityError.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData}
}
atomic.StoreUint32(&peer.identityVerified, 1)
// if the identity could not be claimed by this peer, it means the identity is in use
peer.net.peersLock.Lock()
ok := peer.net.identityTracker.setIdentity(peer)
peer.net.peersLock.Unlock()
wn.peersLock.Lock()
ok := wn.identityTracker.setIdentity(peer)
wn.peersLock.Unlock()
if !ok {
networkPeerIdentityDisconnect.Inc(nil)
peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting")
peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting")
return OutgoingMessage{Action: Disconnect, reason: disconnectDuplicateConnection}
}
return OutgoingMessage{}
Expand Down
3 changes: 2 additions & 1 deletion network/netidentity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,12 @@ func TestIdentityTrackerSetIdentity(t *testing.T) {
}

// Just tests that if a peer is already verified, it just returns OutgoingMessage{}
func TestHandlerGuard(t *testing.T) {
func TestIdentityTrackerHandlerGuard(t *testing.T) {
partitiontest.PartitionTest(t)
p := wsPeer{identityVerified: uint32(1)}
msg := IncomingMessage{
Sender: &p,
Net: &WebsocketNetwork{},
}
require.Equal(t, OutgoingMessage{}, identityVerificationHandler(msg))
}
Loading