Skip to content

Commit

Permalink
Expose transaction size to tx submission (#4926)
Browse files Browse the repository at this point in the history
* Added callback providing tx size to txSubmissionInbound

* changelog update
  • Loading branch information
crocodile-dentist committed Sep 23, 2024
1 parent d900a38 commit 374a78a
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 11 deletions.
3 changes: 3 additions & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
* Added `daMinBigLedgerPeersForTrustedState` to `ArgumentsExtra` when starting diffusion.
It is used by `outboundConnectionsState` when signaling trust state when syncing in
Genesis mode. Default value is provided by the Configuration module.
* `txSubmissionInbound` takes an additional callback which exposes
CBOR-encoded transaction size as it is when transmitted over the
network, except for some top level wrapping (cf. PR#4926 description)

### Non-Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ txSubmissionSimulation tracer maxUnacked outboundTxs
maxUnacked
(getMempoolReader inboundMempool)
(getMempoolWriter inboundMempool)
getTxSize
NodeToNodeV_7

prop_txSubmission :: Positive Word16
Expand Down
14 changes: 11 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.Limits
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.SizeInBytes
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Inbound.Types (ProcessedTxCount (..),
TraceTxSubmissionInbound (..), TxSubmissionMempoolWriter (..),
Expand Down Expand Up @@ -135,9 +136,10 @@ txSubmissionInbound
-> NumTxIdsToAck -- ^ Maximum number of unacknowledged txids allowed
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> (tx -> SizeInBytes) -- ^ get size of CBOR encoded transaction
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version =
txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter txSize _version =
TxSubmissionServerPipelined $ do
#ifdef TXSUBMISSION_DELAY
-- make the client linger before asking for tx's and expending
Expand Down Expand Up @@ -262,8 +264,14 @@ txSubmissionInbound tracer (NumTxIdsToAck maxUnacked) mpReader mpWriter _version
-- for. We should never get a tx we did not ask for. We take a strict
-- approach to this and check it.
--
let txsMap :: Map txid tx
txsMap = Map.fromList [ (txId tx, tx) | tx <- txs ]
let availableTxidsMap = availableTxids st
txsMap :: Map txid tx
txsMap = Map.fromList [ (txId', assert sizesMatch tx)
| tx <- txs
, let txId' = txId tx
calcSize = Just $ txSize tx
advertisedSize = availableTxidsMap Map.!? txId'
sizesMatch = calcSize == advertisedSize]

txidsReceived = Map.keysSet txsMap
txidsRequested = Set.fromList txids
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ withPeer
-> TxSubmissionMempoolReader txid tx idx m
-> peeraddr
-- ^ new peer
-> (tx -> SizeInBytes)
-> (PeerTxAPI m txid tx -> m a)
-- ^ callback which gives access to `PeerTxStateAPI`
-> m a
withPeer tracer
channelsVar
sharedStateVar
TxSubmissionMempoolReader { mempoolGetSnapshot }
peeraddr io =
peeraddr txSize io =
bracket
(do -- create a communication channel
!peerTxAPI <-
Expand Down Expand Up @@ -211,7 +212,7 @@ withPeer tracer
-- ^ received txs
-> m ()
handleReceivedTxs txids txs =
collectTxs tracer sharedStateVar peeraddr txids txs
collectTxs tracer sharedStateVar peeraddr txids txs txSize


decisionLogicThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ txSubmissionInboundV2

unless (Map.keysSet received `Set.isSubsetOf` requested) $
throwIO ProtocolErrorTxNotRequested
-- TODO: all sizes of txs which were announced earlier with
-- `MsgReplyTxIds` must be verified.

handleReceivedTxs requested received
k
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,12 @@ collectTxsImpl
=> peeraddr
-> Set txid -- ^ set of requested txids
-> Map txid tx -- ^ received txs
-> (tx -> SizeInBytes)
-> SharedTxState peeraddr txid tx
-> SharedTxState peeraddr txid tx
-- ^ number of `txid`s to be acknowledged, `tx`s to be added to
-- the mempool and updated state.
collectTxsImpl peeraddr requestedTxIds receivedTxs
collectTxsImpl peeraddr requestedTxIds receivedTxs txSize
st@SharedTxState { peerTxStates } =

-- using `alterF` so the update of `PeerTxState` is done in one lookup
Expand All @@ -327,8 +328,14 @@ collectTxsImpl peeraddr requestedTxIds receivedTxs
-> ( SharedTxState peeraddr txid tx
, PeerTxState txid tx
)
fn ps = (st'', ps'')
fn ps = assert allSizesMatch (st'', ps'')
where
allSizesMatch = Map.foldlWithKey'
step True
(availableTxIds ps `Map.restrictKeys` Map.keysSet receivedTxs)
step acc txid advSize =
acc && advSize == txSize (receivedTxs Map.! txid)

notReceived = requestedTxIds Set.\\ Map.keysSet receivedTxs

-- add received `tx`s to buffered map
Expand Down Expand Up @@ -452,11 +459,12 @@ collectTxs
-> peeraddr
-> Set txid -- ^ set of requested txids
-> Map txid tx -- ^ received txs
-> (tx -> SizeInBytes)
-> m ()
-- ^ number of txids to be acknowledged and txs to be added to the
-- mempool
collectTxs tracer sharedVar peeraddr txidsRequested txsMap = do
collectTxs tracer sharedVar peeraddr txidsRequested txsMap txSize = do
st <- atomically $
stateTVar sharedVar
((\a -> (a,a)) . collectTxsImpl peeraddr txidsRequested txsMap)
((\a -> (a,a)) . collectTxsImpl peeraddr txidsRequested txsMap txSize)
traceWith tracer (TraceSharedTxState "collectTxs" st)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Ouroboros.Network.ControlMessage (ControlMessage, ControlMessageSTM,
timeoutWithControlMessage)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.SizeInBytes
import Ouroboros.Network.Protocol.TxSubmission2.Type
import Ouroboros.Network.TxSubmission.Mempool.Reader (MempoolSnapshot (..),
TxSubmissionMempoolReader (..))
Expand Down

0 comments on commit 374a78a

Please sign in to comment.