-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPiDistribCountinuous.hs
executable file
·120 lines (97 loc) · 4.46 KB
/
PiDistribCountinuous.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env ./execthirdline.sh
-- compile and run within a docker image
-- set -e && executable=`basename -s .hs ${1}` && docker run -it -v $(pwd):/work agocorona/transient:05-02-2017 bash -c "ghc /work/${1} && /work/${executable} ${2} ${3}"
-- Distributed streaming using Transient
-- See the article: https://www.fpcomplete.com/user/agocorona/streaming-transient-effects-vi
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module MainCountinuous where
import Transient.Base
import Transient.Move
import Transient.Indeterminism
import Transient.Logged
import Transient.Stream.Resource
import Control.Applicative
import Data.Char
import Control.Monad
import Control.Monad.IO.Class
import System.Random
import Data.IORef
import System.IO
import GHC.Conc
import System.Environment
-- continuos streaming version
-- Perform the same calculation but it does not stop, and the results are accumulated in in a mutable reference within the calling node,
-- so the precision in the value of pi is printed with more and more precision. every 1000 calculations.
-- Here instead of `collect` that finish the calculation when the number of samples has been reached, i use `group` which simply
-- group the number of results in a list and then sum of the list is returned to the calling node.
-- Since `group` do not finish the calculation, new sums are streamed from the nodes again and again.
main= do
let numNodes= 5
numCalcsNode= 100
ports= [2000.. 2000+ numNodes -1]
createLocalNode p= createNode "localhost" p
nodes= map createLocalNode ports
rresults <- newIORef (0,0)
keep $ freeThreads $ threads 10 $ runCloud $ do
--setBufSize 1024
local $ addNodes nodes
foldl (<|>) empty (map listen nodes) <|> return()
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th
-- really distributed version
-- generate an executable with this main and invoke it as such:
--
-- program myport remotehost remoteport
--
-- where remotehost remoteport are from a previously initialized node
-- The first node initialize it with:
--
-- program myport localhost myport
mainDistributed= do
args <- getArgs
let localPort = read (args !! 0)
seedHost = read (args !! 1)
seedPort = read (args !! 2)
mynode = createNode "localhost" localPort
seedNode = createNode seedHost seedPort
numCalcsNode= 100
rresults <- liftIO $ newIORef (0,0)
runCloudIO $ do
connect mynode seedNode
local $ option "start" "start the calculation once all the nodes have been started" :: Cloud String
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th