-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathws_mpi.py
executable file
·82 lines (74 loc) · 1.94 KB
/
ws_mpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from sys import argv
import numpy as np
from mpi4py import MPI
from collections import deque
from ws_utils import *
import os
# Flags for MPI.
EXECUTE = 0
TERMINATE = 1
# Read a DCM image from disk,
# process it and write the result
# as a PNG file in the same dir.
def process_image(comm):
status = MPI.Status()
while True:
msg = comm.recv(None, 0, MPI.ANY_TAG, status)
if status.Get_tag() == TERMINATE: break
folder,file_name = msg
f = folder + file_name
# Read in image.
O = read_dcm(f)
# Preprocess image.
I = preprocess(O)
# Perform watershed.
L = watershed(I)
# Get watershed lines.
E = getEdges(L,O)
# Show progress dots.
show_progress()
# Save image to disk
out = strip_extension(f) + ".png"
img.imsave(out, E, cmap='gray')
comm.send(None, 0)
# Distribute images across the
# available MPI processes.
def distribute_images(comm,folder):
size = comm.Get_size()
status = MPI.Status()
queue = os.listdir(folder)
# Send out initial data.
for i in range(1,size):
file_name = queue.pop(0)
msg = [folder,file_name]
comm.send(msg,i)
# Process all remaining.
while queue:
file_name = queue.pop(0)
msg = [folder,file_name]
comm.recv(None, MPI.ANY_SOURCE, MPI.ANY_TAG, status)
comm.send(msg, status.Get_source(), EXECUTE)
# Terminate all workers.
for i in range (1,size):
comm.send(None,i,TERMINATE)
# If running as main script:
if __name__ == '__main__':
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# Show usage.
if len(argv) != 2:
if rank == 0:
print "Usage: mpirun -N [processes] ws_mpi.py [folder]."
print "Folder must contain DCM images only."
exit()
# Run as master.
if rank == 0:
start_time = MPI.Wtime()
distribute_images(comm,argv[1])
end_time = MPI.Wtime()
print "Total time: %f" % \
(end_time - start_time)
# Run as slave.
else:
from ws_gpu import watershed
process_image(comm)