diff --git a/data/txHandler.go b/data/txHandler.go index a66de93ea3..15e43dbc91 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -595,24 +595,30 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed return &d, false } -// incomingMsgDupErlCheck runs the duplicate and rate limiting checks on a raw incoming messages. +// incomingMsgDupCheck runs the duplicate check on a raw incoming message. // Returns: // - the key used for insertion if the message was not found in the cache -// - the capacity guard returned by the elastic rate limiter -// - a boolean indicating if the message was a duplicate or the sender is rate limited -func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DeadlineSettableConn) (*crypto.Digest, *util.ErlCapacityGuard, bool) { +// - a boolean indicating if the message was a duplicate +func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool) { var msgKey *crypto.Digest - var capguard *util.ErlCapacityGuard var isDup bool if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates if msgKey, isDup = handler.msgCache.CheckAndPut(data); isDup { transactionMessagesDupRawMsg.Inc(nil) - return msgKey, capguard, true + return msgKey, true } } + return msgKey, false +} +// incomingMsgErlCheck runs the rate limiting check on a sender. +// Returns: +// - the capacity guard returned by the elastic rate limiter +// - a boolean indicating if the sender is rate limited +func (handler *TxHandler) incomingMsgErlCheck(sender network.DeadlineSettableConn) (*util.ErlCapacityGuard, bool) { + var capguard *util.ErlCapacityGuard var err error if handler.erl != nil { congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue)) @@ -625,14 +631,14 @@ func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.Dea handler.erl.EnableCongestionControl() // if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such transactionMessagesDroppedFromBacklog.Inc(nil) - return msgKey, capguard, true + return capguard, true } // if the backlog Queue has 50% of its buffer back, turn congestion control off if !congestedERL { handler.erl.DisableCongestionControl() } } - return msgKey, capguard, false + return capguard, false } // decodeMsg decodes TX message buffer into transactions.SignedTxn, @@ -711,8 +717,12 @@ func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transa // - message are checked for duplicates // - transactions are checked for duplicates func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { - msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) + msgKey, shouldDrop := handler.incomingMsgDupCheck(rawmsg.Data) + if shouldDrop { + return network.OutgoingMessage{Action: network.Ignore} + } + capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender) accepted := false defer func() { // if we failed to put the item onto the backlog, we should release the capacity if any @@ -724,7 +734,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net }() if shouldDrop { - // this TX message was found in the duplicate cache, or ERL rate-limited it + // this TX message was rate-limited by ERL return network.OutgoingMessage{Action: network.Ignore} } @@ -756,12 +766,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net transactionMessagesDroppedFromBacklog.Inc(nil) // additionally, remove the txn from duplicate caches to ensure it can be re-submitted - if handler.txCanonicalCache != nil && canonicalKey != nil { - handler.txCanonicalCache.Delete(canonicalKey) - } - if handler.msgCache != nil && msgKey != nil { - handler.msgCache.DeleteByKey(msgKey) - } + handler.deleteFromCaches(msgKey, canonicalKey) } return network.OutgoingMessage{Action: network.Ignore} @@ -772,25 +777,12 @@ type validatedIncomingTxMessage struct { unverifiedTxGroup []transactions.SignedTxn msgKey *crypto.Digest canonicalKey *crypto.Digest - capguard *util.ErlCapacityGuard } // validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork. func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage { - msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender) - - accepted := false - defer func() { - // if we failed to put the item onto the backlog, we should release the capacity if any - if !accepted && capguard != nil { - if capErr := capguard.Release(); capErr != nil { - logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr) - } - } - }() - - if shouldDrop { - // this TX message was found in the duplicate cache, or ERL rate-limited it + msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data) + if isDup { return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} } @@ -807,7 +799,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil} } - accepted = true return network.ValidatedMessage{ Action: network.Accept, Tag: rawmsg.Tag, @@ -816,7 +807,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa unverifiedTxGroup: unverifiedTxGroup, msgKey: msgKey, canonicalKey: canonicalKey, - capguard: capguard, }, } } @@ -830,7 +820,7 @@ func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.Vali unverifiedTxGroup: msg.unverifiedTxGroup, rawmsgDataHash: msg.msgKey, unverifiedTxGroupHash: msg.canonicalKey, - capguard: msg.capguard, + capguard: nil, }: default: // if we failed here we want to increase the corresponding metric. It might suggest that we @@ -838,17 +828,7 @@ func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.Vali transactionMessagesDroppedFromBacklog.Inc(nil) // additionally, remove the txn from duplicate caches to ensure it can be re-submitted - if handler.txCanonicalCache != nil && msg.canonicalKey != nil { - handler.txCanonicalCache.Delete(msg.canonicalKey) - } - if handler.msgCache != nil && msg.msgKey != nil { - handler.msgCache.DeleteByKey(msg.msgKey) - } - if msg.capguard != nil { - if capErr := msg.capguard.Release(); capErr != nil { - logging.Base().Warnf("processIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr) - } - } + handler.deleteFromCaches(msg.msgKey, msg.canonicalKey) } return network.OutgoingMessage{Action: network.Ignore} } diff --git a/network/p2p/capabilities.go b/network/p2p/capabilities.go index f489773975..e5781aa389 100644 --- a/network/p2p/capabilities.go +++ b/network/p2p/capabilities.go @@ -67,9 +67,10 @@ func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts . } // Close should be called when fully shutting down the node -func (c *CapabilitiesDiscovery) Close() { - _ = c.dht.Close() +func (c *CapabilitiesDiscovery) Close() error { + err := c.dht.Close() c.wg.Wait() + return err } // Host exposes the underlying libp2p host.Host object diff --git a/network/p2p/capabilities_test.go b/network/p2p/capabilities_test.go index 2b98b49806..881860f647 100644 --- a/network/p2p/capabilities_test.go +++ b/network/p2p/capabilities_test.go @@ -309,7 +309,8 @@ func TestCapabilities_Varying(t *testing.T) { wg.Wait() for _, disc := range capsDisc[3:] { - disc.Close() + err := disc.Close() + require.NoError(t, err) // Make sure it actually closes disc.wg.Wait() } @@ -347,6 +348,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) { "Found self when searching for capability", ) - disc[0].Close() + err := disc[0].Close() + require.NoError(t, err) disc[0].wg.Wait() } diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 95cc3295f4..be20f1943c 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -365,7 +365,10 @@ func (n *P2PNetwork) Start() error { // Stop closes sockets and stop threads. func (n *P2PNetwork) Stop() { if n.capabilitiesDiscovery != nil { - n.capabilitiesDiscovery.Close() + err := n.capabilitiesDiscovery.Close() + if err != nil { + n.log.Warnf("Error closing capabilities discovery: %v", err) + } } n.handler.ClearHandlers([]Tag{})