Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let the thread which fetched a TX add it to the mempool #4984

Draft
wants to merge 5 commits into
base: bolt12/coot/tx-submission
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
them $ \api -> do
let server = txSubmissionInboundV2
txSubmissionInboundTracer
(getMempoolReader mempool)
(getMempoolWriter mempool)
api
labelThisThread "TxSubmissionServer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ unit_txSubmission_allTransactions (ArbTxDecisionPolicy decisionPolicy)
case x of
-- When we add txids to the mempool, we collect them
-- into the map
DiffusionTxSubmissionInbound (TraceTxInboundAddedToMempool txids) ->
DiffusionTxSubmissionInbound (TraceTxInboundAddedToMempool txids _) ->
Map.alter (maybe (Just []) (Just . sort . (txids ++))) n rr
-- When the node is shutdown we have to reset the accepted
-- txids list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ runTxSubmission tracer tracerTxLogic state txDecisionPolicy = do
(getMempoolReader inboundMempool)
addr $ \api -> do
let server = txSubmissionInboundV2 verboseTracer
(getMempoolReader inboundMempool)
(getMempoolWriter inboundMempool)
api
runPipelinedPeerWithLimits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mkArbPeerTxState mempoolHasTxFun txIdsInflight unacked txMaskMap =
requestedTxIdsInflight,
requestedTxsInflight,
requestedTxsInflightSize,
unknownTxs }
unknownTxs,
rejectedTxs = 0,
fetchedTxs = Set.empty }
(Set.fromList $ Map.elems inflightMap)
bufferedMap
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer, traceWith)

Expand Down Expand Up @@ -314,13 +315,18 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
traceWith tracer $
TraceTxSubmissionCollected collected

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs txsReady

!end <- getMonotonicTime
let duration = diffTime end start
traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
let !accepted = length txidsAccepted

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
, ptxcScore = 0 -- This implementatin does not track score
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be a TODO?

}

continueWithStateM (serverIdle n) st {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,40 @@ makeDecisions
, Map peeraddr (TxDecision txid tx)
)
makeDecisions policy SharedDecisionContext {
sdcPeerGSV = peerGSV,
sdcPeerGSV = _peerGSV,
sdcSharedTxState = st
}
= fn
. pickTxsToDownload policy st
. orderByDeltaQ peerGSV
. orderByRejections
where
fn :: forall a.
(a, [(peeraddr, TxDecision txid tx)])
-> (a, Map peeraddr (TxDecision txid tx))
fn (a, as) = (a, Map.fromList as)


-- | Order peers by how useful the TXs they have provided are.
--
-- TXs delivered late will fail to apply because they where included in
-- a recently adopted block. Peers can race against each other by setting
-- `txInflightMultiplicity` to > 1.
--
-- TODO: Should not depend on plain `peeraddr` as a tie breaker.
orderByRejections :: Map peeraddr (PeerTxState txid tx)
-> [ (peeraddr, PeerTxState txid tx)]
orderByRejections =
sortOn (\(_peeraddr, ps) -> rejectedTxs ps)
. Map.toList

-- | Order peers by `DeltaQ`.
--
orderByDeltaQ :: forall peeraddr txid tx.
_orderByDeltaQ :: forall peeraddr txid tx.
Ord peeraddr
=> Map peeraddr PeerGSV
-> Map peeraddr (PeerTxState txid tx)
-> [(peeraddr, PeerTxState txid tx)]
orderByDeltaQ dq =
_orderByDeltaQ dq =
sortOn (\(peeraddr, _) ->
gsvRequestResponseDuration
(Map.findWithDefault defaultGSV peeraddr dq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ defaultTxDecisionPolicy =
maxUnacknowledgedTxIds = 10, -- must be the same as txSubmissionMaxUnacked
txsSizeInflightPerPeer = max_TX_SIZE * 6,
maxTxsSizeInflight = max_TX_SIZE * 20,
txInflightMultiplicity = 1
txInflightMultiplicity = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Data.Foldable (traverse_
, foldl'
#endif
)
import Data.Functor (void)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -75,8 +76,14 @@ data PeerTxAPI m txid tx = PeerTxAPI {
-- ^ requested txids
-> Map txid tx
-- ^ received txs
-> m ()
-> m (),
-- ^ handle received txs

countRejectedTxs :: Int
-> m Int,

consumeFetchedTxs :: Set txid
-> m (Set txid)
}


Expand Down Expand Up @@ -123,7 +130,9 @@ withPeer tracer
( TxChannels { txChannelMap = txChannelMap' }
, PeerTxAPI { readTxDecision = takeMVar chann',
handleReceivedTxIds,
handleReceivedTxs }
handleReceivedTxs,
countRejectedTxs,
consumeFetchedTxs }
)

atomically $ modifyTVar sharedStateVar registerPeer
Expand Down Expand Up @@ -151,7 +160,9 @@ withPeer tracer
requestedTxsInflightSize = 0,
requestedTxsInflight = Set.empty,
unacknowledgedTxIds = StrictSeq.empty,
unknownTxs = Set.empty }
unknownTxs = Set.empty,
rejectedTxs = 0,
fetchedTxs = Set.empty }
peerTxStates
}

Expand Down Expand Up @@ -210,8 +221,43 @@ withPeer tracer
-> Map txid tx
-- ^ received txs
-> m ()
handleReceivedTxs txids txs =
handleReceivedTxs txids txs = do
void $ atomically $ modifyTVar sharedStateVar addFethed
collectTxs tracer sharedStateVar peeraddr txids txs
where
addFethed :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
addFethed st@SharedTxState { peerTxStates } =
let peerTxStates' = Map.update (\ps -> Just $! ps { fetchedTxs = Set.union (fetchedTxs ps) txids }) peeraddr peerTxStates in
st {peerTxStates = peerTxStates' }

countRejectedTxs :: Int
-> m Int
countRejectedTxs n = atomically $ do
modifyTVar sharedStateVar cntRejects
st <- readTVar sharedStateVar
case Map.lookup peeraddr (peerTxStates st) of
Nothing -> error "missing peer updated"
Just ps -> return $ rejectedTxs ps
where
cntRejects :: SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
cntRejects st@SharedTxState { peerTxStates } =
let peerTxStates' = Map.update (\ps -> Just $! ps { rejectedTxs = min 42 (max (-42) (rejectedTxs ps + n)) }) peeraddr peerTxStates in
st {peerTxStates = peerTxStates'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be more efficient to update it for all peers at once, but it might be harder to implement it. Another way would be to store the counts in Map peerAddt (TVar ...), so that we don't block on taking the whole state to update the count just for one peer.


consumeFetchedTxs :: Set txid
-> m (Set txid)
consumeFetchedTxs otxids = atomically $ do
st <- readTVar sharedStateVar
case Map.lookup peeraddr (peerTxStates st) of
Nothing -> error "missing peer in consumeFetchedTxs"
Just ps -> do
let o = Set.intersection (fetchedTxs ps) otxids
r = Set.difference (fetchedTxs ps) otxids
st' = st { peerTxStates = Map.update (\ps' -> Just $! ps' { fetchedTxs = r }) peeraddr (peerTxStates st) }
writeTVar sharedStateVar st'
return o


decisionLogicThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import Data.Set qualified as Set
import Control.Concurrent.Class.MonadSTM.Strict
import Control.Exception (assert)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime.SI
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined

import Control.Monad (unless)
import Control.Monad (unless, when)
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Inbound.Registry (PeerTxAPI (..))
import Ouroboros.Network.TxSubmission.Inbound.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader

-- | Flag to enable/disable the usage of the new tx submission protocol
--
Expand All @@ -43,22 +45,29 @@ txSubmissionInboundV2
:: forall txid tx idx m.
( MonadSTM m
, MonadThrow m
, MonadMonotonicTime m
, Ord txid
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> PeerTxAPI m txid tx
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInboundV2
tracer
TxSubmissionMempoolReader{
mempoolGetSnapshot
}
TxSubmissionMempoolWriter {
txId,
mempoolAddTxs
}
PeerTxAPI {
readTxDecision,
handleReceivedTxIds,
handleReceivedTxs
handleReceivedTxs,
countRejectedTxs,
consumeFetchedTxs
}
=
TxSubmissionServerPipelined serverIdle
Expand All @@ -70,18 +79,53 @@ txSubmissionInboundV2
txd@TxDecision { txdTxsToRequest = txsToReq, txdTxsToMempool = txs }
<- readTxDecision
traceWith tracer (TraceTxInboundDecision txd)
txidsAccepted <- mempoolAddTxs txs
traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted

let !collected = length txs
let !accepted = length txidsAccepted
traceWith tracer $
TraceTxSubmissionCollected collected

traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = collected - accepted
}
mpSnapshot <- atomically mempoolGetSnapshot
let receivedL = [ (txId tx, tx) | tx <- txs ]
fetchedSet <- consumeFetchedTxs (Set.fromList (map fst receivedL))

-- Only attempt to add TXs if we actually has fetched some.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- Only attempt to add TXs if we actually has fetched some.
-- Only attempt to add TXs if we actually have fetched some.

when (not $ Set.null fetchedSet) $ do
let fetched = filter
(\(txid, _) -> Set.member txid fetchedSet)
receivedL
fetchedS = Set.fromList $ map fst fetched

-- Note that checking if the mempool contains a TX before
-- spending several ms attempting to add it to the pool has
-- been judged immoral.
Comment on lines +94 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😁

let fresh = filter
(\(txid, _) -> not $ mempoolHasTx mpSnapshot txid)
receivedL

!start <- getMonotonicTime
txidsAccepted <- mempoolAddTxs $ map snd fresh
!end <- getMonotonicTime
let duration = diffTime end start

let acceptedS = Set.fromList txidsAccepted
acceptedFetched = Set.intersection fetchedS acceptedS
!accepted = Set.size acceptedFetched
!rejected = Set.size fetchedS - accepted

traceWith tracer $
TraceTxInboundAddedToMempool txidsAccepted duration
traceWith tracer $
TraceTxSubmissionCollected collected

-- Accepted TXs are discounted from rejected.
--
-- The number of rejected TXs may be too high.
-- The reason for that is that any peer which has downloaded a
-- TX is permitted to add TXs for all TXids hit has offered.
-- This is done to preserve TX ordering.
!s <- countRejectedTxs (rejected - accepted) -- accepted TXs are discounted
traceWith tracer $ TraceTxSubmissionProcessed ProcessedTxCount {
ptxcAccepted = accepted
, ptxcRejected = rejected
, ptxcScore = s
}

-- TODO:
-- We can update the state so that other `tx-submission` servers will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module Ouroboros.Network.TxSubmission.Inbound.Types
) where

import Control.Exception (Exception (..))
import Control.Monad.Class.MonadTime.SI
import Data.Map.Strict (Map)
import Data.Sequence.Strict (StrictSeq)
import Data.Set (Set)
Expand Down Expand Up @@ -73,7 +74,11 @@ data PeerTxState txid tx = PeerTxState {
-- since that could potentially lead to corrupting the node, not being
-- able to download a `tx` which is needed & available from other nodes.
--
unknownTxs :: !(Set txid)
unknownTxs :: !(Set txid),

rejectedTxs :: !Int,

fetchedTxs :: !(Set txid)
}
deriving (Eq, Show, Generic)

Expand Down Expand Up @@ -258,6 +263,7 @@ data ProcessedTxCount = ProcessedTxCount {
ptxcAccepted :: Int
-- | Just rejected this many transactions.
, ptxcRejected :: Int
, ptxcScore :: Int
}
deriving (Eq, Show)

Expand Down Expand Up @@ -294,7 +300,7 @@ data TraceTxSubmissionInbound txid tx =
-- | Server received 'MsgDone'
| TraceTxInboundCanRequestMoreTxs Int
| TraceTxInboundCannotRequestMoreTxs Int
| TraceTxInboundAddedToMempool [txid]
| TraceTxInboundAddedToMempool [txid] DiffTime

--
-- messages emitted by the new implementation of the server in
Expand Down
Loading