Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: more CR fixes: ERL and DHT err logging #6040

Merged
merged 6 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 25 additions & 45 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,24 +595,30 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed
return &d, false
}

// incomingMsgDupErlCheck runs the duplicate and rate limiting checks on a raw incoming messages.
// incomingMsgDupCheck runs the duplicate check on a raw incoming message.
// Returns:
// - the key used for insertion if the message was not found in the cache
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the message was a duplicate or the sender is rate limited
func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.DeadlineSettableConn) (*crypto.Digest, *util.ErlCapacityGuard, bool) {
// - a boolean indicating if the message was a duplicate
func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool) {
var msgKey *crypto.Digest
var capguard *util.ErlCapacityGuard
var isDup bool
if handler.msgCache != nil {
// check for duplicate messages
// this helps against relaying duplicates
if msgKey, isDup = handler.msgCache.CheckAndPut(data); isDup {
transactionMessagesDupRawMsg.Inc(nil)
return msgKey, capguard, true
return msgKey, true
}
}
return msgKey, false
}

// incomingMsgErlCheck runs the rate limiting check on a sender.
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DeadlineSettableConn) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var err error
if handler.erl != nil {
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
Expand All @@ -625,14 +631,14 @@ func (handler *TxHandler) incomingMsgDupErlCheck(data []byte, sender network.Dea
handler.erl.EnableCongestionControl()
// if there is no capacity, it is the same as if we failed to put the item onto the backlog, so report such
transactionMessagesDroppedFromBacklog.Inc(nil)
return msgKey, capguard, true
return capguard, true
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
if !congestedERL {
handler.erl.DisableCongestionControl()
}
}
return msgKey, capguard, false
return capguard, false
}

// decodeMsg decodes TX message buffer into transactions.SignedTxn,
Expand Down Expand Up @@ -711,8 +717,12 @@ func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transa
// - message are checked for duplicates
// - transactions are checked for duplicates
func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)
msgKey, shouldDrop := handler.incomingMsgDupCheck(rawmsg.Data)
if shouldDrop {
return network.OutgoingMessage{Action: network.Ignore}
}

capguard, shouldDrop := handler.incomingMsgErlCheck(rawmsg.Sender)
accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
Expand All @@ -724,7 +734,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}()

if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
// this TX message was rate-limited by ERL
return network.OutgoingMessage{Action: network.Ignore}
}

Expand Down Expand Up @@ -756,12 +766,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
transactionMessagesDroppedFromBacklog.Inc(nil)

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
if handler.txCanonicalCache != nil && canonicalKey != nil {
handler.txCanonicalCache.Delete(canonicalKey)
}
if handler.msgCache != nil && msgKey != nil {
handler.msgCache.DeleteByKey(msgKey)
}
handler.deleteFromCaches(msgKey, canonicalKey)
}

return network.OutgoingMessage{Action: network.Ignore}
Expand All @@ -772,25 +777,12 @@ type validatedIncomingTxMessage struct {
unverifiedTxGroup []transactions.SignedTxn
msgKey *crypto.Digest
canonicalKey *crypto.Digest
capguard *util.ErlCapacityGuard
}

// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage {
msgKey, capguard, shouldDrop := handler.incomingMsgDupErlCheck(rawmsg.Data, rawmsg.Sender)

accepted := false
defer func() {
// if we failed to put the item onto the backlog, we should release the capacity if any
if !accepted && capguard != nil {
if capErr := capguard.Release(); capErr != nil {
logging.Base().Warnf("validateIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
}()

if shouldDrop {
// this TX message was found in the duplicate cache, or ERL rate-limited it
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
}

Expand All @@ -807,7 +799,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
}

accepted = true
return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
Expand All @@ -816,7 +807,6 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
canonicalKey: canonicalKey,
capguard: capguard,
},
}
}
Expand All @@ -830,25 +820,15 @@ func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.Vali
unverifiedTxGroup: msg.unverifiedTxGroup,
rawmsgDataHash: msg.msgKey,
unverifiedTxGroupHash: msg.canonicalKey,
capguard: msg.capguard,
capguard: nil,
}:
default:
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
if handler.txCanonicalCache != nil && msg.canonicalKey != nil {
handler.txCanonicalCache.Delete(msg.canonicalKey)
}
if handler.msgCache != nil && msg.msgKey != nil {
handler.msgCache.DeleteByKey(msg.msgKey)
}
if msg.capguard != nil {
if capErr := msg.capguard.Release(); capErr != nil {
logging.Base().Warnf("processIncomingTxMessage: failed to release capacity to ElasticRateLimiter: %v", capErr)
}
}
handler.deleteFromCaches(msg.msgKey, msg.canonicalKey)
}
return network.OutgoingMessage{Action: network.Ignore}
}
Expand Down
5 changes: 3 additions & 2 deletions network/p2p/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ func (c *CapabilitiesDiscovery) FindPeers(ctx context.Context, ns string, opts .
}

// Close should be called when fully shutting down the node
func (c *CapabilitiesDiscovery) Close() {
_ = c.dht.Close()
func (c *CapabilitiesDiscovery) Close() error {
err := c.dht.Close()
c.wg.Wait()
return err
}

// Host exposes the underlying libp2p host.Host object
Expand Down
6 changes: 4 additions & 2 deletions network/p2p/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ func TestCapabilities_Varying(t *testing.T) {
wg.Wait()

for _, disc := range capsDisc[3:] {
disc.Close()
err := disc.Close()
require.NoError(t, err)
// Make sure it actually closes
disc.wg.Wait()
}
Expand Down Expand Up @@ -347,6 +348,7 @@ func TestCapabilities_ExcludesSelf(t *testing.T) {
"Found self when searching for capability",
)

disc[0].Close()
err := disc[0].Close()
require.NoError(t, err)
disc[0].wg.Wait()
}
5 changes: 4 additions & 1 deletion network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,10 @@ func (n *P2PNetwork) Start() error {
// Stop closes sockets and stop threads.
func (n *P2PNetwork) Stop() {
if n.capabilitiesDiscovery != nil {
n.capabilitiesDiscovery.Close()
err := n.capabilitiesDiscovery.Close()
if err != nil {
n.log.Warnf("Error closing capabilities discovery: %v", err)
}
}

n.handler.ClearHandlers([]Tag{})
Expand Down
Loading