Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let the thread which fetched a TX add it to the mempool #4984

Draft
wants to merge 5 commits into
base: bolt12/coot/tx-submission
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions network-mux/demo/mux-demo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ serverWorker bearer = do
putStrLn $ "Result: " ++ show result
stopMux mux

runMux nullTracer mux bearer
runMux nullTracer 1 mux bearer
where
ptcls :: MiniProtocolBundle ResponderMode
ptcls = MiniProtocolBundle
Expand Down Expand Up @@ -193,7 +193,7 @@ clientWorker bearer n msg = do
putStrLn $ "Result: " ++ show result
stopMux mux

runMux nullTracer mux bearer
runMux nullTracer 0 mux bearer
where
ptcls :: MiniProtocolBundle InitiatorMode
ptcls = MiniProtocolBundle
Expand Down
51 changes: 42 additions & 9 deletions network-mux/src/Control/Concurrent/JobPool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Control.Concurrent.JobPool
, Job (..)
, withJobPool
, forkJob
, forkJobOn
, readSize
, readGroupSize
, waitForJob
Expand All @@ -29,6 +30,9 @@ import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadThread (..))
import Control.Monad.Class.MonadThrow

import Control.Concurrent (getNumCapabilities)
import System.IO.Unsafe (unsafePerformIO)

-- | JobPool allows to submit asynchronous jobs, wait for their completion or
-- cancel. Jobs are grouped, each group can be cancelled separately.
--
Expand Down Expand Up @@ -69,16 +73,18 @@ withJobPool =
jobs <- readTVarIO jobsVar
mapM_ uninterruptibleCancel jobs

forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =

forkJob' :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> (m () -> m (Async m ()))
-> JobPool group m a
-> Job group m a
-> m ()
forkJob' doFork JobPool{jobsVar, completionQueue} (Job action handler group label) =
mask $ \restore -> do
jobAsync <- async $ do
jobAsync <- doFork $ do
tid <- myThreadId
io tid restore
`onException`
Expand All @@ -104,6 +110,33 @@ forkJob JobPool{jobsVar, completionQueue} (Job action handler group label) =
restore action
atomically $ writeTQueue completionQueue res



forkJob :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> JobPool group m a
-> Job group m a
-> m ()
forkJob = forkJob' async

forkJobOn :: forall group m a.
( MonadAsync m, MonadMask m
, Ord group
)
=> Int
-> JobPool group m a
-> Job group m a
-> m ()
forkJobOn c = forkJob' (asyncOn limitCapability)
where
limitCapability :: Int
limitCapability =
let sysCap = unsafePerformIO getNumCapabilities in
c `mod` (max 1 $ sysCap - 2)


readSize :: MonadSTM m => JobPool group m a -> STM m Int
readSize JobPool{jobsVar} = Map.size <$> readTVar jobsVar

Expand Down
13 changes: 8 additions & 5 deletions network-mux/src/Network/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ runMux :: forall m mode.
, MonadMask m
)
=> Tracer m MuxTrace
-> Int
-> Mux mode m
-> MuxBearer m
-> m ()
runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do
runMux tracer peerHash Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do
egressQueue <- atomically $ newTBQueue 100
labelTBQueueIO egressQueue "mux-eq"

Expand All @@ -223,7 +224,8 @@ runMux tracer Mux {muxMiniProtocols, muxControlCmdQueue, muxStatus} bearer = do
-- Wait for someone to shut us down by calling muxStop or an error.
-- Outstanding jobs are shut down Upon completion of withJobPool.
withTimeoutSerial $ \timeout ->
monitor tracer
monitor peerHash
tracer
timeout
jobpool
egressQueue
Expand Down Expand Up @@ -350,14 +352,15 @@ monitor :: forall mode m.
, Alternative (STM m)
, MonadThrow (STM m)
)
=> Tracer m MuxTrace
=> Int
-> Tracer m MuxTrace
-> TimeoutFn m
-> JobPool.JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> StrictTQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
monitor peerHash tracer timeout jobpool egressQueue cmdQueue muxStatus =
go (MonitorCtx Map.empty)
where
go :: MonitorCtx m mode -> m ()
Expand Down Expand Up @@ -426,7 +429,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
ptclAction) -> do
traceWith tracer (MuxTraceStartEagerly miniProtocolNum
(protocolDirEnum miniProtocolDir))
JobPool.forkJob jobpool $
JobPool.forkJobOn peerHash jobpool $
miniProtocolJob
tracer
egressQueue
Expand Down
2 changes: 1 addition & 1 deletion network-mux/src/Network/Mux/Compat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ muxStart tracer muxapp bearer = do
]

-- Wait for the first MuxApplication to finish, then stop the mux.
withAsync (runMux tracer mux bearer) $ \aid -> do
withAsync (runMux tracer 0 mux bearer) $ \aid -> do
waitOnAny resOps
stopMux mux
wait aid
Expand Down
36 changes: 18 additions & 18 deletions network-mux/test/Test/Mux.hs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ prop_mux_snd_recv (DummyRun messages) = ioProperty $ do

serverMux <- newMux $ MiniProtocolBundle [serverApp]

withAsync (runMux clientTracer clientMux clientBearer) $ \clientAsync ->
withAsync (runMux serverTracer serverMux serverBearer) $ \serverAsync -> do
withAsync (runMux clientTracer 0 clientMux clientBearer) $ \clientAsync ->
withAsync (runMux serverTracer 0 serverMux serverBearer) $ \serverAsync -> do

r <- step clientMux clientApp serverMux serverApp messages
stopMux serverMux
Expand Down Expand Up @@ -436,10 +436,10 @@ prop_mux_snd_recv_bi (DummyRun messages) = ioProperty $ do


clientMux <- newMux $ MiniProtocolBundle clientApps
clientAsync <- async $ runMux clientTracer clientMux clientBearer
clientAsync <- async $ runMux clientTracer 0 clientMux clientBearer

serverMux <- newMux $ MiniProtocolBundle serverApps
serverAsync <- async $ runMux serverTracer serverMux serverBearer
serverAsync <- async $ runMux serverTracer 1 serverMux serverBearer

r <- step clientMux clientApps serverMux serverApps messages
stopMux clientMux
Expand Down Expand Up @@ -692,7 +692,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
respMux <- newMux $ MiniProtocolBundle $ map (\(pn,_) ->
MiniProtocolInfo (MiniProtocolNum pn) ResponderDirectionOnly defaultMiniProtocolLimits)
respApps'
respAsync <- async $ runMux serverTracer respMux respBearer
respAsync <- async $ runMux serverTracer 1 respMux respBearer
getRespRes <- sequence [ runMiniProtocol
respMux
(MiniProtocolNum pn)
Expand All @@ -705,7 +705,7 @@ runMuxApplication initApps initBearer respApps respBearer = do
initMux <- newMux $ MiniProtocolBundle $ map (\(pn,_) ->
MiniProtocolInfo (MiniProtocolNum pn) InitiatorDirectionOnly defaultMiniProtocolLimits)
initApps'
initAsync <- async $ runMux clientTracer initMux initBearer
initAsync <- async $ runMux clientTracer 0 initMux initBearer
getInitRes <- sequence [ runMiniProtocol
initMux
(MiniProtocolNum pn)
Expand Down Expand Up @@ -925,14 +925,14 @@ prop_mux_starvation (Uneven response0 response1) =
}

serverMux <- newMux $ MiniProtocolBundle [serverApp2, serverApp3]
serverMux_aid <- async $ runMux serverTracer serverMux serverBearer
serverMux_aid <- async $ runMux serverTracer 0 serverMux serverBearer
serverRes2 <- runMiniProtocol serverMux (miniProtocolNum serverApp2) (miniProtocolDir serverApp2)
StartOnDemand server_short
serverRes3 <- runMiniProtocol serverMux (miniProtocolNum serverApp3) (miniProtocolDir serverApp3)
StartOnDemand server_long

clientMux <- newMux $ MiniProtocolBundle [clientApp2, clientApp3]
clientMux_aid <- async $ runMux (clientTracer <> headerTracer) clientMux clientBearer
clientMux_aid <- async $ runMux (clientTracer <> headerTracer) 1 clientMux clientBearer
clientRes2 <- runMiniProtocol clientMux (miniProtocolNum clientApp2) (miniProtocolDir clientApp2)
StartEagerly client_short
clientRes3 <- runMiniProtocol clientMux (miniProtocolNum clientApp3) (miniProtocolDir clientApp3)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ prop_demux_sdu a = do
serverRes <- runMiniProtocol serverMux (miniProtocolNum serverApp) (miniProtocolDir serverApp)
StartEagerly server_mp

said <- async $ runMux serverTracer serverMux serverBearer
said <- async $ runMux serverTracer 1 serverMux serverBearer
return (server_r, said, serverRes, serverMux)

-- Server that expects to receive a specific ByteString.
Expand Down Expand Up @@ -1400,7 +1400,7 @@ prop_mux_restart_m (DummyRestartingInitiatorApps apps) = do
let MiniProtocolBundle minis = MiniProtocolBundle $ map (appToInfo InitiatorDirectionOnly . fst) apps

mux <- newMux $ MiniProtocolBundle minis
mux_aid <- async $ runMux nullTracer mux bearer
mux_aid <- async $ runMux nullTracer 0 mux bearer
getRes <- sequence [ runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1447,7 +1447,7 @@ prop_mux_restart_m (DummyRestartingResponderApps rapps) = do
MiniProtocolBundle minis = MiniProtocolBundle $ map (appToInfo ResponderDirectionOnly) apps

mux <- newMux $ MiniProtocolBundle minis
mux_aid <- async $ runMux nullTracer mux bearer
mux_aid <- async $ runMux nullTracer 1 mux bearer
getRes <- sequence [ runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1496,7 +1496,7 @@ prop_mux_restart_m (DummyRestartingInitiatorResponderApps rapps) = do
respMinis = map (appToInfo ResponderDirection) apps

mux <- newMux $ MiniProtocolBundle $ initMinis ++ respMinis
mux_aid <- async $ runMux nullTracer mux bearer
mux_aid <- async $ runMux nullTracer 1 mux bearer
getInitRes <- sequence [ runMiniProtocol
mux
(daNum $ fst app)
Expand Down Expand Up @@ -1571,7 +1571,7 @@ prop_mux_start_m bearer _ checkRes (DummyInitiatorApps apps) runTime = do
minRunTime = minimum $ runTime : (map daRunTime $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- newMux $ MiniProtocolBundle minis
mux_aid <- async $ runMux nullTracer mux bearer
mux_aid <- async $ runMux nullTracer 0 mux bearer
killer <- async $ (threadDelay runTime) >> stopMux mux
getRes <- sequence [ runMiniProtocol
mux
Expand All @@ -1592,7 +1592,7 @@ prop_mux_start_m bearer trigger checkRes (DummyResponderApps apps) runTime = do
minRunTime = minimum $ runTime : (map (\a -> daRunTime a + daStartAfter a) $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- newMux $ MiniProtocolBundle minis
mux_aid <- async $ runMux verboseTracer mux bearer
mux_aid <- async $ runMux verboseTracer 0 mux bearer
getRes <- sequence [ runMiniProtocol
mux
(daNum app)
Expand All @@ -1618,7 +1618,7 @@ prop_mux_start_m bearer _trigger _checkRes (DummyResponderAppsKillMux apps) runT
let MiniProtocolBundle minis = MiniProtocolBundle $ map (appToInfo ResponderDirectionOnly) apps

mux <- newMux $ MiniProtocolBundle minis
mux_aid <- async $ runMux verboseTracer mux bearer
mux_aid <- async $ runMux verboseTracer 1 mux bearer
getRes <- sequence [ runMiniProtocol
mux
(daNum app)
Expand All @@ -1641,7 +1641,7 @@ prop_mux_start_m bearer trigger checkRes (DummyInitiatorResponderApps apps) runT
minRunTime = minimum $ runTime : (map (\a -> daRunTime a) $ filter (\app -> daAction app == DummyAppFail) apps)

mux <- newMux $ MiniProtocolBundle $ initMinis ++ respMinis
mux_aid <- async $ runMux verboseTracer mux bearer
mux_aid <- async $ runMux verboseTracer 0 mux bearer
getInitRes <- sequence [ runMiniProtocol
mux
(daNum app)
Expand Down Expand Up @@ -1804,7 +1804,7 @@ close_experiment
])
stopMux $ \mux ->
withNetworkCtx clientCtx $ \clientBearer ->
withAsync (runMux ((Client,) `contramap` muxTracer) mux clientBearer) $ \_muxAsync ->
withAsync (runMux ((Client,) `contramap` muxTracer) 0 mux clientBearer) $ \_muxAsync ->
runMiniProtocol
mux miniProtocolNum
InitiatorDirectionOnly StartEagerly
Expand All @@ -1823,7 +1823,7 @@ close_experiment
])
stopMux $ \mux ->
withNetworkCtx serverCtx $ \serverBearer ->
withAsync (runMux ((Server,) `contramap` muxTracer) mux serverBearer) $ \_muxAsync -> do
withAsync (runMux ((Server,) `contramap` muxTracer) 0 mux serverBearer) $ \_muxAsync -> do
runMiniProtocol
mux miniProtocolNum
ResponderDirectionOnly StartOnDemand
Expand Down
6 changes: 4 additions & 2 deletions ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import Control.Tracer (Tracer (..), contramap, nullTracer, traceWith)
import Data.ByteString.Lazy (ByteString)
import Data.Either (partitionEithers)
import Data.Functor (($>))
import Data.Hashable
import Data.List.NonEmpty (NonEmpty (..))
import Data.Typeable (Typeable)

Expand Down Expand Up @@ -77,6 +78,7 @@ import Ouroboros.Network.Server2 qualified as Server
import Ouroboros.Network.Snocket (Snocket, socketSnocket)
import Ouroboros.Network.Snocket qualified as Snocket
import Ouroboros.Network.Util.ShowProxy
import Ouroboros.Network.Socket


instance ShowProxy (ReqResp req resp) where
Expand Down Expand Up @@ -173,7 +175,7 @@ withBidirectionalConnectionManager
:: forall peerAddr socket m a.
( ConnectionManagerMonad m

, Ord peerAddr, Show peerAddr, Typeable peerAddr
, Ord peerAddr, Show peerAddr, Typeable peerAddr, Hashable peerAddr

-- debugging
, MonadFix m
Expand Down Expand Up @@ -438,6 +440,7 @@ bidirectionalExperiment
, Show peerAddr
, Typeable peerAddr
, Eq peerAddr
, Hashable peerAddr
)
=> Snocket IO socket peerAddr
-> Mux.MakeBearer IO socket
Expand Down Expand Up @@ -618,7 +621,6 @@ optionParser =
<> showDefault
)


run :: (Addr, Port)
-> (Addr, Port)
-> DiffTime -- ^ protocol idle timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ library testlib
, bytestring
, cborg
, containers
, hashable
, random
, serialise

Expand Down Expand Up @@ -185,6 +186,7 @@ test-suite sim-tests
, cborg
, containers
, dns
, hashable
, iproute
, network
, pretty-simple
Expand Down Expand Up @@ -328,6 +330,7 @@ executable demo-connection-manager
main-is: connection-manager.hs
build-depends: base >=4.14 && <4.21,
bytestring,
hashable,
network,
optparse-applicative,
random,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import Data.ByteString.Lazy (ByteString)
import Data.Dynamic (fromDynamic)
import Data.Foldable (foldMap')
import Data.Functor (void, ($>), (<&>))
import Data.Hashable
import Data.List as List (delete, foldl', intercalate, nub, (\\))
import Data.List.Trace qualified as Trace
import Data.Map.Strict (Map)
Expand Down Expand Up @@ -625,7 +626,7 @@ multinodeExperiment
, MonadTraceSTM m
, MonadSay m
, acc ~ [req], resp ~ [req]
, Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr
, Ord peerAddr, Show peerAddr, Typeable peerAddr, Eq peerAddr, Hashable peerAddr
, Serialise req, Show req
, Serialise resp, Show resp, Eq resp
, Typeable req, Typeable resp
Expand Down
Loading
Loading