diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index d00ec4e7c1..6db2d88c2e 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -77,7 +77,7 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade { n := &NetworkFacade{ fuzzer: fuzzer, nodeID: nodeID, - mux: network.MakeMultiplexer(fuzzer.log), + mux: network.MakeMultiplexer(), clocks: make(map[int]chan time.Time), eventsQueues: make(map[string]int), eventsQueuesCh: make(chan int, 1000), diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index bd4f4baf38..8584dc8d26 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -323,7 +323,7 @@ func makewhiteholeNetwork(domain *whiteholeDomain) *whiteholeNetwork { w := &whiteholeNetwork{ peer: atomic.AddUint32(&domain.peerIdx, 1), lastMsgRead: uint32(len(domain.messages)), - mux: network.MakeMultiplexer(domain.log), + mux: network.MakeMultiplexer(), domain: domain, disconnected: make(map[uint32]bool), } diff --git a/network/msgCompressor.go b/network/msgCompressor.go index a46f37f2c2..28f835832f 100644 --- a/network/msgCompressor.go +++ b/network/msgCompressor.go @@ -144,7 +144,7 @@ func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter { c := wsPeerMsgDataConverter{ - log: wp.net.log, + log: wp.log, origin: wp.originAddress, } diff --git a/network/multiplexer.go b/network/multiplexer.go index fe5b3dcf42..ddcf1845d8 100644 --- a/network/multiplexer.go +++ b/network/multiplexer.go @@ -17,24 +17,19 @@ package network import ( + "fmt" "sync/atomic" - - "github.com/algorand/go-algorand/logging" ) // Multiplexer is a message handler that sorts incoming messages by Tag and passes // them along to the relevant message handler for that type of message. type Multiplexer struct { msgHandlers atomic.Value // stores map[Tag]MessageHandler, an immutable map. - - log logging.Logger } // MakeMultiplexer creates an empty Multiplexer -func MakeMultiplexer(log logging.Logger) *Multiplexer { - m := &Multiplexer{ - log: log, - } +func MakeMultiplexer() *Multiplexer { + m := &Multiplexer{} m.ClearHandlers([]Tag{}) // allocate the map return m } @@ -78,7 +73,7 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) { } for _, v := range dispatch { if _, has := mp[v.Tag]; has { - m.log.Panicf("Already registered a handler for tag %v", v.Tag) + panic(fmt.Sprintf("Already registered a handler for tag %v", v.Tag)) } mp[v.Tag] = v.MessageHandler } diff --git a/network/multiplexer_test.go b/network/multiplexer_test.go index 1d0215b909..c9d17dacbc 100644 --- a/network/multiplexer_test.go +++ b/network/multiplexer_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -57,7 +56,7 @@ func (th *testHandler) SawMsg(msg IncomingMessage) bool { func TestMultiplexer(t *testing.T) { partitiontest.PartitionTest(t) - m := MakeMultiplexer(logging.TestingLog(t)) + m := MakeMultiplexer() handler := &testHandler{} // Handler shouldn't be called before it is registered @@ -78,7 +77,7 @@ func TestMultiplexer(t *testing.T) { panicked = true } }() - m := MakeMultiplexer(logging.TestingLog(t)) + m := MakeMultiplexer() m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}, {protocol.TxnTag, handler}}) }() @@ -90,7 +89,7 @@ func TestMultiplexer(t *testing.T) { panicked = true } }() - m := MakeMultiplexer(logging.TestingLog(t)) + m := MakeMultiplexer() m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}}) m.RegisterHandlers([]TaggedMessageHandler{{protocol.TxnTag, handler}}) diff --git a/network/netidentity.go b/network/netidentity.go index 9fce21fcb0..6414c5e897 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -325,6 +325,8 @@ func (i identityVerificationMessageSigned) Verify(key crypto.PublicKey) bool { // sender's claimed identity and the challenge that was assigned to it. If the identity is available, // the peer is loaded into the identity tracker. Otherwise, we ask the network to disconnect the peer. func identityVerificationHandler(message IncomingMessage) OutgoingMessage { + wn := message.Net.(*WebsocketNetwork) + peer := message.Sender.(*wsPeer) // avoid doing work (crypto and potentially taking a lock) if the peer is already verified if atomic.LoadUint32(&peer.identityVerified) == 1 { @@ -335,27 +337,27 @@ func identityVerificationHandler(message IncomingMessage) OutgoingMessage { err := protocol.Decode(message.Data, &msg) if err != nil { networkPeerIdentityError.Inc(nil) - peer.net.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting") + peer.log.With("err", err).With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification could not be decoded, disconnecting") return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData} } if peer.identityChallenge != msg.Msg.ResponseChallenge { networkPeerIdentityError.Inc(nil) - peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting") + peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification challenge does not match, disconnecting") return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData} } if !msg.Verify(peer.identity) { networkPeerIdentityError.Inc(nil) - peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting") + peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting") return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData} } atomic.StoreUint32(&peer.identityVerified, 1) // if the identity could not be claimed by this peer, it means the identity is in use - peer.net.peersLock.Lock() - ok := peer.net.identityTracker.setIdentity(peer) - peer.net.peersLock.Unlock() + wn.peersLock.Lock() + ok := wn.identityTracker.setIdentity(peer) + wn.peersLock.Unlock() if !ok { networkPeerIdentityDisconnect.Inc(nil) - peer.net.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting") + peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity already in use, disconnecting") return OutgoingMessage{Action: Disconnect, reason: disconnectDuplicateConnection} } return OutgoingMessage{} diff --git a/network/netidentity_test.go b/network/netidentity_test.go index f3c72e3e8c..13731aaaeb 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -356,11 +356,12 @@ func TestIdentityTrackerSetIdentity(t *testing.T) { } // Just tests that if a peer is already verified, it just returns OutgoingMessage{} -func TestHandlerGuard(t *testing.T) { +func TestIdentityTrackerHandlerGuard(t *testing.T) { partitiontest.PartitionTest(t) p := wsPeer{identityVerified: uint32(1)} msg := IncomingMessage{ Sender: &p, + Net: &WebsocketNetwork{}, } require.Equal(t, OutgoingMessage{}, identityVerificationHandler(msg)) } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 56314f14b9..07902bde79 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -173,12 +173,9 @@ const ( type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error Disconnect(badnode Peer) - DisconnectPeers() - Ready() chan struct{} + DisconnectPeers() // only used by testing // RegisterHTTPHandler path accepts gorilla/mux path annotations RegisterHTTPHandler(path string, handler http.Handler) @@ -226,12 +223,8 @@ type GossipNode interface { // SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID. SubstituteGenesisID(rawURL string) string - // GetPeerData returns a value stored by SetPeerData - GetPeerData(peer Peer, key string) interface{} - - // SetPeerData attaches a piece of data to a peer. - // Other services inside go-algorand may attach data to a peer that gets garbage collected when the peer is closed. - SetPeerData(peer Peer, key string, value interface{}) + // called from wsPeer to report that it has closed + peerRemoteClose(peer *wsPeer, reason disconnectReason) } // IncomingMessage represents a message arriving from some peer in our p2p network @@ -354,12 +347,8 @@ type WebsocketNetwork struct { log logging.Logger - readBuffer chan IncomingMessage - wg sync.WaitGroup - handlers Multiplexer - ctx context.Context ctxCancel context.CancelFunc @@ -367,8 +356,8 @@ type WebsocketNetwork struct { peers []*wsPeer peersChangeCounter int32 // peersChangeCounter is an atomic variable that increases on each change to the peers. It helps avoiding taking the peersLock when checking if the peers list was modified. - broadcastQueueHighPrio chan broadcastRequest - broadcastQueueBulk chan broadcastRequest + broadcaster msgBroadcaster + handler msgHandler phonebook Phonebook @@ -408,9 +397,6 @@ type WebsocketNetwork struct { // wsMaxHeaderBytes is the maximum accepted size of the header prior to upgrading to websocket connection. wsMaxHeaderBytes int64 - // slowWritingPeerMonitorInterval defines the interval between two consecutive tests for slow peer writing - slowWritingPeerMonitorInterval time.Duration - requestsTracker *RequestTracker requestsLogger *RequestLogger @@ -491,6 +477,43 @@ type broadcastRequest struct { ctx context.Context } +// msgBroadcaster contains the logic for preparing data for broadcast, managing broadcast priorities +// and queues. It provides a goroutine (broadcastThread) for reading from those queues and scheduling +// broadcasts to peers managed by networkPeerManager. +type msgBroadcaster struct { + ctx context.Context + log logging.Logger + config config.Local + broadcastQueueHighPrio chan broadcastRequest + broadcastQueueBulk chan broadcastRequest + // slowWritingPeerMonitorInterval defines the interval between two consecutive tests for slow peer writing + slowWritingPeerMonitorInterval time.Duration +} + +// msgHandler contains the logic for handling incoming messages and managing a readBuffer. It provides +// a goroutine (messageHandlerThread) for reading incoming messages and calling handlers. +type msgHandler struct { + ctx context.Context + log logging.Logger + config config.Local + readBuffer chan IncomingMessage + Multiplexer +} + +// networkPeerManager provides the network functionality needed by msgBroadcaster and msgHandler for managing +// peer connectivity, and also sending messages. +type networkPeerManager interface { + // used by msgBroadcaster + peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) + checkSlowWritingPeers() + getPeersChangeCounter() int32 + + // used by msgHandler + Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error + disconnectThread(badnode Peer, reason disconnectReason) + checkPeersConnectivity() +} + // Address returns a string and whether that is a 'final' address or guessed. // Part of GossipNode interface func (wn *WebsocketNetwork) Address() (string, bool) { @@ -528,18 +551,17 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat dataArray[0] = data tagArray := make([]protocol.Tag, 1, 1) tagArray[0] = tag - return wn.BroadcastArray(ctx, tagArray, dataArray, wait, except) + return wn.broadcaster.BroadcastArray(ctx, tagArray, dataArray, wait, except) } // BroadcastArray sends an array of messages. // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. // TODO: add `priority` argument so that we don't have to guess it based on tag -func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { +func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { if wn.config.DisableNetworking { return nil } - if len(tags) != len(data) { return errBcastInvalidArray } @@ -598,7 +620,7 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] // RelayArray relays array of messages func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { if wn.relayMessages { - return wn.BroadcastArray(ctx, tags, data, wait, except) + return wn.broadcaster.BroadcastArray(ctx, tags, data, wait, except) } return nil } @@ -691,14 +713,14 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivalNodes: var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryRelayRole) for _, addr := range addrs { - peerCore := makePeerCore(wn, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersPhonebookArchivers: @@ -706,7 +728,7 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000, PhoneBookEntryArchiverRole) for _, addr := range addrs { - peerCore := makePeerCore(wn, addr, wn.GetRoundTripper(), "" /*origin address*/) + peerCore := makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /*origin address*/) outPeers = append(outPeers, &peerCore) } case PeersConnectedIn: @@ -784,16 +806,21 @@ func (wn *WebsocketNetwork) setup() { wn.identityTracker = NewIdentityTracker() - wn.broadcastQueueHighPrio = make(chan broadcastRequest, wn.outgoingMessagesBufferSize) - wn.broadcastQueueBulk = make(chan broadcastRequest, 100) + wn.broadcaster = msgBroadcaster{ + ctx: wn.ctx, + log: wn.log, + config: wn.config, + broadcastQueueHighPrio: make(chan broadcastRequest, wn.outgoingMessagesBufferSize), + broadcastQueueBulk: make(chan broadcastRequest, 100), + } + if wn.broadcaster.slowWritingPeerMonitorInterval == 0 { + wn.broadcaster.slowWritingPeerMonitorInterval = slowWritingPeerMonitorInterval + } wn.meshUpdateRequests = make(chan meshRequest, 5) wn.readyChan = make(chan struct{}) wn.tryConnectAddrs = make(map[string]int64) wn.eventualReadyDelay = time.Minute wn.prioTracker = newPrioTracker(wn) - if wn.slowWritingPeerMonitorInterval == 0 { - wn.slowWritingPeerMonitorInterval = slowWritingPeerMonitorInterval - } readBufferLen := wn.config.IncomingConnectionsLimit + wn.config.GossipFanout if readBufferLen < 100 { @@ -802,7 +829,12 @@ func (wn *WebsocketNetwork) setup() { if readBufferLen > 10000 { readBufferLen = 10000 } - wn.readBuffer = make(chan IncomingMessage, readBufferLen) + wn.handler = msgHandler{ + ctx: wn.ctx, + log: wn.log, + config: wn.config, + readBuffer: make(chan IncomingMessage, readBufferLen), + } var rbytes [10]byte crypto.RandBytes(rbytes[:]) @@ -813,7 +845,6 @@ func (wn *WebsocketNetwork) setup() { } wn.connPerfMonitor = makeConnectionPerformanceMonitor([]Tag{protocol.AgreementVoteTag, protocol.TxnTag}) wn.lastNetworkAdvance = time.Now().UTC() - wn.handlers.log = wn.log // set our supported versions if wn.config.NetworkProtocolVersion != "" { @@ -904,10 +935,10 @@ func (wn *WebsocketNetwork) Start() { for i := 0; i < incomingThreads; i++ { wn.wg.Add(1) // We pass the peersConnectivityCheckTicker.C here so that we don't need to syncronize the access to the ticker's data structure. - go wn.messageHandlerThread(wn.peersConnectivityCheckTicker.C) + go wn.handler.messageHandlerThread(&wn.wg, wn.peersConnectivityCheckTicker.C, wn) } wn.wg.Add(1) - go wn.broadcastThread() + go wn.broadcaster.broadcastThread(&wn.wg, wn) if wn.prioScheme != nil { wn.wg.Add(1) go wn.prioWeightRefresh() @@ -950,7 +981,7 @@ func (wn *WebsocketNetwork) innerStop() { // Stop closes network connections and stops threads. // Stop blocks until all activity on this node is done. func (wn *WebsocketNetwork) Stop() { - wn.handlers.ClearHandlers([]Tag{}) + wn.handler.ClearHandlers([]Tag{}) // if we have a working ticker, just stop it and clear it out. The access to this variable is safe since the Start()/Stop() are synced by the // caller, and the WebsocketNetwork doesn't access wn.peersConnectivityCheckTicker directly. @@ -987,13 +1018,13 @@ func (wn *WebsocketNetwork) Stop() { // RegisterHandlers registers the set of given message handlers. func (wn *WebsocketNetwork) RegisterHandlers(dispatch []TaggedMessageHandler) { - wn.handlers.RegisterHandlers(dispatch) + wn.handler.RegisterHandlers(dispatch) } // ClearHandlers deregisters all the existing message handlers. func (wn *WebsocketNetwork) ClearHandlers() { // exclude the internal handlers. These would get cleared out when Stop is called. - wn.handlers.ClearHandlers([]Tag{protocol.PingTag, protocol.PingReplyTag, protocol.NetPrioResponseTag}) + wn.handler.ClearHandlers([]Tag{protocol.PingTag, protocol.PingReplyTag, protocol.NetPrioResponseTag}) } func (wn *WebsocketNetwork) setHeaders(header http.Header) { @@ -1233,8 +1264,8 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost), - conn: conn, + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost), + conn: wsPeerWebsocketConnImpl{conn}, outgoing: false, InstanceName: trackedRequest.otherInstanceName, incomingMsgFilter: wn.incomingMsgFilter, @@ -1281,8 +1312,8 @@ func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOf } } -func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan time.Time) { - defer wn.wg.Done() +func (wn *msgHandler) messageHandlerThread(wg *sync.WaitGroup, peersConnectivityCheckCh <-chan time.Time, net networkPeerManager) { + defer wg.Done() for { select { @@ -1298,13 +1329,13 @@ func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan } } if wn.config.EnableOutgoingNetworkMessageFiltering && len(msg.Data) >= messageFilterSize { - wn.sendFilterMessage(msg) + wn.sendFilterMessage(msg, net) } //wn.log.Debugf("msg handling %#v [%d]byte", msg.Tag, len(msg.Data)) start := time.Now() // now, send to global handlers - outmsg := wn.handlers.Handle(msg) + outmsg := wn.Handle(msg) handled := time.Now() bufferNanos := start.UnixNano() - msg.Received networkIncomingBufferMicros.AddUint64(uint64(bufferNanos/1000), nil) @@ -1312,14 +1343,14 @@ func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan networkHandleMicros.AddUint64(uint64(handleTime.Nanoseconds()/1000), nil) switch outmsg.Action { case Disconnect: - wn.wg.Add(1) + wg.Add(1) reason := disconnectBadData if outmsg.reason != disconnectReasonNone { reason = outmsg.reason } - go wn.disconnectThread(msg.Sender, reason) + go net.disconnectThread(msg.Sender, reason) case Broadcast: - err := wn.Broadcast(wn.ctx, msg.Tag, msg.Data, false, msg.Sender) + err := net.Broadcast(wn.ctx, msg.Tag, msg.Data, false, msg.Sender) if err != nil && err != errBcastQFull { wn.log.Warnf("WebsocketNetwork.messageHandlerThread: WebsocketNetwork.Broadcast returned unexpected error %v", err) } @@ -1332,7 +1363,7 @@ func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan } case <-peersConnectivityCheckCh: // go over the peers and ensure we have some type of communication going on. - wn.checkPeersConnectivity() + net.checkPeersConnectivity() } } } @@ -1372,25 +1403,25 @@ func (wn *WebsocketNetwork) checkSlowWritingPeers() { } } -func (wn *WebsocketNetwork) sendFilterMessage(msg IncomingMessage) { +func (wn *msgHandler) sendFilterMessage(msg IncomingMessage, net networkPeerManager) { digest := generateMessageDigest(msg.Tag, msg.Data) //wn.log.Debugf("send filter %s(%d) %v", msg.Tag, len(msg.Data), digest) - err := wn.Broadcast(context.Background(), protocol.MsgDigestSkipTag, digest[:], false, msg.Sender) + err := net.Broadcast(context.Background(), protocol.MsgDigestSkipTag, digest[:], false, msg.Sender) if err != nil && err != errBcastQFull { wn.log.Warnf("WebsocketNetwork.sendFilterMessage: WebsocketNetwork.Broadcast returned unexpected error %v", err) } } -func (wn *WebsocketNetwork) broadcastThread() { - defer wn.wg.Done() +func (wn *msgBroadcaster) broadcastThread(wg *sync.WaitGroup, net networkPeerManager) { + defer wg.Done() slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval) defer slowWritingPeerCheckTicker.Stop() - peers, lastPeersChangeCounter := wn.peerSnapshot([]*wsPeer{}) + peers, lastPeersChangeCounter := net.peerSnapshot([]*wsPeer{}) // updatePeers update the peers list if their peer change counter has changed. updatePeers := func() { - if curPeersChangeCounter := atomic.LoadInt32(&wn.peersChangeCounter); curPeersChangeCounter != lastPeersChangeCounter { - peers, lastPeersChangeCounter = wn.peerSnapshot(peers) + if curPeersChangeCounter := net.getPeersChangeCounter(); curPeersChangeCounter != lastPeersChangeCounter { + peers, lastPeersChangeCounter = net.peerSnapshot(peers) } } @@ -1481,7 +1512,7 @@ func (wn *WebsocketNetwork) broadcastThread() { } wn.innerBroadcast(request, true, peers) case <-slowWritingPeerCheckTicker.C: - wn.checkSlowWritingPeers() + net.checkSlowWritingPeers() continue case request := <-wn.broadcastQueueBulk: // check if peers need to be updated, since we've been waiting a while. @@ -1516,13 +1547,16 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { dest = make([]*wsPeer, len(wn.peers)) } copy(dest, wn.peers) - peerChangeCounter := atomic.LoadInt32(&wn.peersChangeCounter) - return dest, peerChangeCounter + return dest, wn.getPeersChangeCounter() +} + +func (wn *WebsocketNetwork) getPeersChangeCounter() int32 { + return atomic.LoadInt32(&wn.peersChangeCounter) } // preparePeerData prepares batches of data for sending. // It performs optional zstd compression for proposal massages -func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { +func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { // determine if there is a payload proposal and peers supporting compressed payloads wantCompression := false containsPrioPPTag := false @@ -1572,7 +1606,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, } // prio is set if the broadcast is a high-priority broadcast. -func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, peers []*wsPeer) { +func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, peers []*wsPeer) { if request.done != nil { defer close(request.done) } @@ -1946,7 +1980,7 @@ func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, pee connDetail.TCP = *tcpInfo } if peer.outgoing { - connDetail.Address = justHost(peer.conn.RemoteAddr().String()) + connDetail.Address = justHost(peer.conn.RemoteAddrString()) connDetail.Endpoint = peer.GetAddress() connDetail.MessageDelay = peer.peerMessageDelay connectionDetails.OutgoingPeers = append(connectionDetails.OutgoingPeers, connDetail) @@ -2357,8 +2391,8 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { } peer := &wsPeer{ - wsPeerCore: makePeerCore(wn, addr, wn.GetRoundTripper(), "" /* origin */), - conn: conn, + wsPeerCore: makePeerCore(wn.ctx, wn, wn.log, wn.handler.readBuffer, addr, wn.GetRoundTripper(), "" /* origin */), + conn: wsPeerWebsocketConnImpl{conn}, outgoing: true, incomingMsgFilter: wn.incomingMsgFilter, createTime: time.Now(), @@ -2491,9 +2525,9 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) { peerAddr := peer.OriginAddress() // we might be able to get addr out of conn, or it might be closed if peerAddr == "" && peer.conn != nil { - paddr := peer.conn.RemoteAddr() - if paddr != nil { - peerAddr = justHost(paddr.String()) + paddr := peer.conn.RemoteAddrString() + if paddr != "" { + peerAddr = justHost(paddr) } } if peerAddr == "" { @@ -2550,7 +2584,7 @@ func (wn *WebsocketNetwork) addPeer(peer *wsPeer) { // guard against peers which are closed or closing if atomic.LoadInt32(&peer.didSignalClose) == 1 { networkPeerAlreadyClosed.Inc(nil) - wn.log.Debugf("peer closing %s", peer.conn.RemoteAddr().String()) + wn.log.Debugf("peer closing %s", peer.conn.RemoteAddrString()) return } // simple duplicate *pointer* check. should never trigger given the callers to addPeer diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 05992ec8d6..ac59b40b29 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -543,7 +543,7 @@ func TestWebsocketNetworkArray(t *testing.T) { tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag} data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")} - netA.BroadcastArray(context.Background(), tags, data, false, nil) + netA.broadcaster.BroadcastArray(context.Background(), tags, data, false, nil) select { case <-counterDone: @@ -571,7 +571,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { cancel() // try calling BroadcastArray - netA.BroadcastArray(ctx, tags, data, true, nil) + netA.broadcaster.BroadcastArray(ctx, tags, data, true, nil) select { case <-counterDone: @@ -583,7 +583,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { // try calling innerBroadcast request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx} peers, _ := netA.peerSnapshot([]*wsPeer{}) - netA.innerBroadcast(request, true, peers) + netA.broadcaster.innerBroadcast(request, true, peers) select { case <-counterDone: @@ -760,9 +760,11 @@ func TestAddrToGossipAddr(t *testing.T) { type nopConn struct{} func (nc *nopConn) RemoteAddr() net.Addr { return nil } +func (nc *nopConn) RemoteAddrString() string { return "" } func (nc *nopConn) NextReader() (int, io.Reader, error) { return 0, nil, nil } func (nc *nopConn) WriteMessage(int, []byte) error { return nil } func (nc *nopConn) WriteControl(int, []byte, time.Time) error { return nil } +func (nc *nopConn) CloseWithMessage([]byte, time.Time) error { return nil } func (nc *nopConn) SetReadLimit(limit int64) {} func (nc *nopConn) CloseWithoutFlush() error { return nil } func (nc *nopConn) SetPingHandler(h func(appData string) error) {} @@ -799,7 +801,7 @@ func TestSlowHandlers(t *testing.T) { // start slow handler calls that will block all handler threads for i := 0; i < incomingThreads; i++ { data := []byte{byte(i)} - node.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: slowTag, Data: data, Net: node} + node.handler.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: slowTag, Data: data, Net: node} ipi++ } defer slowCounter.Broadcast() @@ -807,7 +809,7 @@ func TestSlowHandlers(t *testing.T) { // start fast handler calls that won't get to run for i := 0; i < incomingThreads; i++ { data := []byte{byte(i)} - node.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: fastTag, Data: data, Net: node} + node.handler.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: fastTag, Data: data, Net: node} ipi++ } ok := false @@ -890,7 +892,7 @@ func TestFloodingPeer(t *testing.T) { } select { - case node.readBuffer <- IncomingMessage{Sender: &injectionPeers[myIpi], Tag: slowTag, Data: data, Net: node, processing: processed}: + case node.handler.readBuffer <- IncomingMessage{Sender: &injectionPeers[myIpi], Tag: slowTag, Data: data, Net: node, processing: processed}: case <-ctx.Done(): return } @@ -912,7 +914,7 @@ func TestFloodingPeer(t *testing.T) { fastCounterDone := fastCounter.done for ipi < len(injectionPeers) { data := []byte{byte(ipi)} - node.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: fastTag, Data: data, Net: node} + node.handler.readBuffer <- IncomingMessage{Sender: &injectionPeers[ipi], Tag: fastTag, Data: data, Net: node} numFast++ ipi++ } @@ -2176,7 +2178,7 @@ func BenchmarkWebsocketNetworkBasic(t *testing.B) { networkBroadcastsDropped.WriteMetric(&buf, "") t.Errorf( "a out queue=%d, metric: %s", - len(netA.broadcastQueueBulk), + len(netA.broadcaster.broadcastQueueBulk), buf.String(), ) return @@ -2450,9 +2452,9 @@ func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte tagArr[0] = tag request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()} - broadcastQueue := wn.broadcastQueueBulk + broadcastQueue := wn.broadcaster.broadcastQueueBulk if highPriorityTag(tagArr) { - broadcastQueue = wn.broadcastQueueHighPrio + broadcastQueue = wn.broadcaster.broadcastQueueHighPrio } // no wait select { @@ -2508,13 +2510,13 @@ func TestSlowPeerDisconnection(t *testing.T) { log := logging.TestingLog(t) log.SetLevel(logging.Info) wn := &WebsocketNetwork{ - log: log, - config: defaultConfig, - phonebook: MakePhonebook(1, 1*time.Millisecond), - GenesisID: genesisID, - NetworkID: config.Devtestnet, - slowWritingPeerMonitorInterval: time.Millisecond * 50, + log: log, + config: defaultConfig, + phonebook: MakePhonebook(1, 1*time.Millisecond), + GenesisID: genesisID, + NetworkID: config.Devtestnet, } + wn.broadcaster.slowWritingPeerMonitorInterval = time.Millisecond * 50 wn.setup() wn.eventualReadyDelay = time.Second wn.messagesOfInterest = nil // clear this before starting the network so that we won't be sending a MOI upon connection. @@ -3705,7 +3707,7 @@ func TestPreparePeerData(t *testing.T) { peers := []*wsPeer{} wn := WebsocketNetwork{} - data, comp, digests, seenPrioPPTag := wn.preparePeerData(req, false, peers) + data, comp, digests, seenPrioPPTag := wn.broadcaster.preparePeerData(req, false, peers) require.NotEmpty(t, data) require.Empty(t, comp) require.NotEmpty(t, digests) @@ -3725,7 +3727,7 @@ func TestPreparePeerData(t *testing.T) { features: pfCompressedProposal, } peers = []*wsPeer{&peer1, &peer2} - data, comp, digests, seenPrioPPTag = wn.preparePeerData(req, true, peers) + data, comp, digests, seenPrioPPTag = wn.broadcaster.preparePeerData(req, true, peers) require.NotEmpty(t, data) require.NotEmpty(t, comp) require.NotEmpty(t, digests) diff --git a/network/wsPeer.go b/network/wsPeer.go index 1c9aad3db9..a717eef60c 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -36,6 +36,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/metrics" @@ -119,17 +120,31 @@ var defaultSendMessageTags = map[protocol.Tag]bool{ // interface allows substituting debug implementation for *websocket.Conn type wsPeerWebsocketConn interface { - RemoteAddr() net.Addr + RemoteAddrString() string NextReader() (int, io.Reader, error) WriteMessage(int, []byte) error - WriteControl(int, []byte, time.Time) error + CloseWithMessage([]byte, time.Time) error SetReadLimit(int64) CloseWithoutFlush() error - SetPingHandler(h func(appData string) error) - SetPongHandler(h func(appData string) error) wrappedConn } +type wsPeerWebsocketConnImpl struct { + *websocket.Conn +} + +func (c wsPeerWebsocketConnImpl) RemoteAddrString() string { + addr := c.RemoteAddr() + if addr == nil { + return "" + } + return addr.String() +} + +func (c wsPeerWebsocketConnImpl) CloseWithMessage(msg []byte, deadline time.Time) error { + return c.WriteControl(websocket.CloseMessage, msg, deadline) +} + type wrappedConn interface { UnderlyingConn() net.Conn } @@ -145,7 +160,10 @@ type sendMessage struct { // wsPeerCore also works for non-connected peers we want to do HTTP GET from type wsPeerCore struct { - net *WebsocketNetwork + net GossipNode + netCtx context.Context + log logging.Logger + readBuffer chan<- IncomingMessage rootURL string originAddress string // incoming connection remote host client http.Client @@ -325,9 +343,12 @@ type TCPInfoUnicastPeer interface { } // Create a wsPeerCore object -func makePeerCore(net *WebsocketNetwork, rootURL string, roundTripper http.RoundTripper, originAddress string) wsPeerCore { +func makePeerCore(ctx context.Context, net GossipNode, log logging.Logger, readBuffer chan<- IncomingMessage, rootURL string, roundTripper http.RoundTripper, originAddress string) wsPeerCore { return wsPeerCore{ net: net, + netCtx: ctx, + log: log, + readBuffer: readBuffer, rootURL: rootURL, originAddress: originAddress, client: http.Client{Transport: roundTripper}, @@ -419,7 +440,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, outMsg Ou if outMsg.OnRelease != nil { outMsg.OnRelease() } - wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) + wp.log.Debugf("peer closing %s", wp.conn.RemoteAddrString()) return case <-ctx.Done(): if outMsg.OnRelease != nil { @@ -432,7 +453,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, outMsg Ou // setup values not trivially assigned func (wp *wsPeer) init(config config.Local, sendBufferLength int) { - wp.net.log.Debugf("wsPeer init outgoing=%v %#v", wp.outgoing, wp.rootURL) + wp.log.Debugf("wsPeer init outgoing=%v %#v", wp.outgoing, wp.rootURL) wp.closing = make(chan struct{}) wp.sendBufferHighPrio = make(chan sendMessages, sendBufferLength) wp.sendBufferBulk = make(chan sendMessages, sendBufferLength) @@ -468,7 +489,7 @@ func (wp *wsPeer) reportReadErr(err error) { // only report error if we haven't already closed the peer if atomic.LoadInt32(&wp.didInnerClose) == 0 { _, _, line, _ := runtime.Caller(1) - wp.net.log.Warnf("peer[%s] line=%d read err: %s", wp.conn.RemoteAddr().String(), line, err) + wp.log.Warnf("peer[%s] line=%d read err: %s", wp.conn.RemoteAddrString(), line, err) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "reader err"}) } } @@ -506,7 +527,7 @@ func (wp *wsPeer) readLoop() { return } if mtype != websocket.BinaryMessage { - wp.net.log.Errorf("peer sent non websocket-binary message: %#v", mtype) + wp.log.Errorf("peer sent non websocket-binary message: %#v", mtype) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "protocol"}) return } @@ -524,7 +545,7 @@ func (wp *wsPeer) readLoop() { // This peers has sent us more responses than we have requested. This is a protocol violation and we should disconnect. if atomic.LoadInt64(&wp.outstandingTopicRequests) < 0 { - wp.net.log.Errorf("wsPeer readloop: peer %s sent TS response without a request", wp.conn.RemoteAddr().String()) + wp.log.Errorf("wsPeer readloop: peer %s sent TS response without a request", wp.conn.RemoteAddrString()) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "unrequestedTS"}) cleanupCloseError = disconnectUnexpectedTopicResp return @@ -533,11 +554,11 @@ func (wp *wsPeer) readLoop() { // Peer sent us a response to a request we made but we've already timed out -- discard n, err = io.Copy(io.Discard, reader) if err != nil { - wp.net.log.Infof("wsPeer readloop: could not discard timed-out TS message from %s : %s", wp.conn.RemoteAddr().String(), err) + wp.log.Infof("wsPeer readloop: could not discard timed-out TS message from %s : %s", wp.conn.RemoteAddrString(), err) wp.reportReadErr(err) return } - wp.net.log.Warnf("wsPeer readLoop: received a TS response for a stale request from %s. %d bytes discarded", wp.conn.RemoteAddr().String(), n) + wp.log.Warnf("wsPeer readLoop: received a TS response for a stale request from %s. %d bytes discarded", wp.conn.RemoteAddrString(), n) continue } @@ -586,18 +607,18 @@ func (wp *wsPeer) readLoop() { atomic.AddInt64(&wp.outstandingTopicRequests, -1) topics, err := UnmarshallTopics(msg.Data) if err != nil { - wp.net.log.Warnf("wsPeer readLoop: could not read the message from: %s %s", wp.conn.RemoteAddr().String(), err) + wp.log.Warnf("wsPeer readLoop: could not read the message from: %s %s", wp.conn.RemoteAddrString(), err) continue } requestHash, found := topics.GetValue(requestHashKey) if !found { - wp.net.log.Warnf("wsPeer readLoop: message from %s is missing the %s", wp.conn.RemoteAddr().String(), requestHashKey) + wp.log.Warnf("wsPeer readLoop: message from %s is missing the %s", wp.conn.RemoteAddrString(), requestHashKey) continue } hashKey, _ := binary.Uvarint(requestHash) channel, found := wp.getAndRemoveResponseChannel(hashKey) if !found { - wp.net.log.Warnf("wsPeer readLoop: received a message response from %s for a stale request", wp.conn.RemoteAddr().String()) + wp.log.Warnf("wsPeer readLoop: received a message response from %s for a stale request", wp.conn.RemoteAddrString()) continue } @@ -605,7 +626,7 @@ func (wp *wsPeer) readLoop() { case channel <- &Response{Topics: topics}: // do nothing. writing was successful. default: - wp.net.log.Warn("wsPeer readLoop: channel blocked. Could not pass the response to the requester", wp.conn.RemoteAddr().String()) + wp.log.Warn("wsPeer readLoop: channel blocked. Could not pass the response to the requester", wp.conn.RemoteAddrString()) } continue case protocol.MsgDigestSkipTag: @@ -629,28 +650,28 @@ func (wp *wsPeer) readLoop() { } if len(msg.Data) > 0 && wp.incomingMsgFilter != nil && dedupSafeTag(msg.Tag) { if wp.incomingMsgFilter.CheckIncomingMessage(msg.Tag, msg.Data, true, true) { - //wp.net.log.Debugf("dropped incoming duplicate %s(%d)", msg.Tag, len(msg.Data)) + //wp.log.Debugf("dropped incoming duplicate %s(%d)", msg.Tag, len(msg.Data)) duplicateNetworkMessageReceivedTotal.Inc(nil) duplicateNetworkMessageReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+len(msg.Tag)), nil) // drop message, skip adding it to queue continue } } - //wp.net.log.Debugf("got msg %d bytes from %s", len(msg.Data), wp.conn.RemoteAddr().String()) + //wp.log.Debugf("got msg %d bytes from %s", len(msg.Data), wp.conn.RemoteAddrString()) // Wait for a previous message from this peer to be processed, // to achieve fairness in wp.net.readBuffer. select { case <-wp.processed: case <-wp.closing: - wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) + wp.log.Debugf("peer closing %s", wp.conn.RemoteAddrString()) return } select { - case wp.net.readBuffer <- msg: + case wp.readBuffer <- msg: case <-wp.closing: - wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) + wp.log.Debugf("peer closing %s", wp.conn.RemoteAddrString()) return } } @@ -662,7 +683,7 @@ func (wp *wsPeer) handleMessageOfInterest(msg IncomingMessage) (close bool, reas // decode the message, and ensure it's a valid message. msgTagsMap, err := unmarshallMessageOfInterest(msg.Data) if err != nil { - wp.net.log.Warnf("wsPeer handleMessageOfInterest: could not unmarshall message from: %s %v", wp.conn.RemoteAddr().String(), err) + wp.log.Warnf("wsPeer handleMessageOfInterest: could not unmarshall message from: %s %v", wp.conn.RemoteAddrString(), err) return true, disconnectBadData } msgs := make([]sendMessage, 1, 1) @@ -681,7 +702,7 @@ func (wp *wsPeer) handleMessageOfInterest(msg IncomingMessage) (close bool, reas case wp.sendBufferHighPrio <- sm: return case <-wp.closing: - wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) + wp.log.Debugf("peer closing %s", wp.conn.RemoteAddrString()) return true, disconnectReasonNone default: } @@ -690,7 +711,7 @@ func (wp *wsPeer) handleMessageOfInterest(msg IncomingMessage) (close bool, reas case wp.sendBufferHighPrio <- sm: case wp.sendBufferBulk <- sm: case <-wp.closing: - wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) + wp.log.Debugf("peer closing %s", wp.conn.RemoteAddrString()) return true, disconnectReasonNone } return @@ -707,12 +728,12 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) { return } if len(msg.Data) != crypto.DigestSize { - wp.net.log.Warnf("bad filter message size %d", len(msg.Data)) + wp.log.Warnf("bad filter message size %d", len(msg.Data)) return } var digest crypto.Digest copy(digest[:], msg.Data) - //wp.net.log.Debugf("add filter %v", digest) + //wp.log.Debugf("add filter %v", digest) has := wp.outgoingMsgFilter.CheckDigest(digest, true, true) if has { // Count that this peer has sent us duplicate filter messages: this means it received the same @@ -745,7 +766,7 @@ func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason { func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { if len(msg.data) > MaxMessageLength { - wp.net.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), MaxMessageLength, string(msg.data[0:2])) + wp.log.Errorf("trying to send a message longer than we would receive: %d > %d tag=%s", len(msg.data), MaxMessageLength, string(msg.data[0:2])) // just drop it, don't break the connection return disconnectReasonNone } @@ -766,7 +787,7 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { now := time.Now() msgWaitDuration := now.Sub(msg.enqueued) if msgWaitDuration > maxMessageQueueDuration { - wp.net.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000) + wp.log.Warnf("peer stale enqueued message %dms", msgWaitDuration.Nanoseconds()/1000000) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "stale message"}) return disconnectStaleWrite } @@ -776,7 +797,7 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data) if err != nil { if atomic.LoadInt32(&wp.didInnerClose) == 0 { - wp.net.log.Warn("peer write error ", err) + wp.log.Warn("peer write error ", err) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "write err"}) } return disconnectWriteError @@ -842,7 +863,7 @@ func (wp *wsPeer) writeNonBlockMsgs(ctx context.Context, data [][]byte, highPrio includeIndices := make([]int, 0, len(data)) for i := range data { if wp.outgoingMsgFilter != nil && len(data[i]) > messageFilterSize && wp.outgoingMsgFilter.CheckDigest(digest[i], false, false) { - //wp.net.log.Debugf("msg drop as outbound dup %s(%d) %v", string(data[:2]), len(data)-2, digest) + //wp.log.Debugf("msg drop as outbound dup %s(%d) %v", string(data[:2]), len(data)-2, digest) // peer has notified us it doesn't need this message outgoingNetworkMessageFilteredOutTotal.Inc(nil) outgoingNetworkMessageFilteredOutBytesTotal.AddUint64(uint64(len(data)), nil) @@ -926,13 +947,13 @@ func (wp *wsPeer) Close(deadline time.Time) { atomic.StoreInt32(&wp.didSignalClose, 1) if atomic.CompareAndSwapInt32(&wp.didInnerClose, 0, 1) { close(wp.closing) - err := wp.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) + err := wp.conn.CloseWithMessage(websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) if err != nil { - wp.net.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddr().String()) + wp.log.Infof("failed to write CloseMessage to connection for %s", wp.conn.RemoteAddrString()) } err = wp.conn.CloseWithoutFlush() if err != nil { - wp.net.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddr().String()) + wp.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddrString()) } } @@ -1019,7 +1040,7 @@ func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Re case wp.sendBufferBulk <- sendMessages{msgs: msg}: atomic.AddInt64(&wp.outstandingTopicRequests, 1) case <-wp.closing: - e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddr().String()) + e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddrString()) return case <-ctx.Done(): return resp, ctx.Err() @@ -1030,7 +1051,7 @@ func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Re case resp = <-responseChannel: return resp, nil case <-wp.closing: - e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddr().String()) + e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddrString()) return case <-ctx.Done(): return resp, ctx.Err() @@ -1077,9 +1098,9 @@ func (wp *wsPeer) setPeerData(key string, value interface{}) { } func (wp *wsPeer) sendMessagesOfInterest(messagesOfInterestGeneration uint32, messagesOfInterestEnc []byte) { - err := wp.Unicast(wp.net.ctx, messagesOfInterestEnc, protocol.MsgOfInterestTag) + err := wp.Unicast(wp.netCtx, messagesOfInterestEnc, protocol.MsgOfInterestTag) if err != nil { - wp.net.log.Errorf("ws send msgOfInterest: %v", err) + wp.log.Errorf("ws send msgOfInterest: %v", err) } else { atomic.StoreUint32(&wp.messagesOfInterestGeneration, messagesOfInterestGeneration) }