-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbin_communicator.m
126 lines (109 loc) · 3.35 KB
/
bin_communicator.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
function [time,read_size] = bin_communicator(block_size,n_blocks,job_num)
% block_size == chunk_size -- the size of a single i/o block
% n_blocks -- number of blocks (chunks) defining the total file size as
% block_size*n_blocks*n_workers
% job_num -- debugging parameter used in serial execution to mimick the
% mpi labindex. Not used in parallel execution
%diary on
do_logging = false;
nl = numlabs;
if ~exist('job_num','var')
id = labindex-1;
else
id = job_num-1;
end
t0 = tic;
%
if id == 0
f_name = 'targ_file.bin';
fh = fopen(f_name,'wb');
if fh<1
error('PARALLEL_WRITER:io_error','Can not open file %s to write',f_name);
end
clobW = onCleanup(@()fclose('all'));
n_readers = nl-1;
if n_readers == 0 % serial execution for debugging. All input files should be read by serial job.
n_readers = 5; % fake workers
fhr = cell(1,n_readers);
for i=1:n_readers
f_name = sprintf('block_%d.bin',i);
fhr{i} = fopen(f_name,'rb');
if fhr{i}<1
error('PARALLEL_WRITER:io_error','Can not open file %s to read',f_name);
end
end
clobR = onCleanup(@()par_clear(fhr));
else
if do_logging
l_name = sprintf('worker_%d.log',labindex);
fhr = fopen(l_name,'w');
fprintf(fhr,'receiver started\n');
else
fhr = [];
end
end
for i=1:n_blocks
block = get_block(i,block_size,n_readers,fhr,do_logging);
fwrite(fh,block,'single');
end
read_size = ftell(fh)/(4*9);
else
f_name = sprintf('block_%d.bin',id);
fh = fopen(f_name,'rb');
if fh<1
error('PARALLEL_WRITER:io_error','Can not open file %s to read',f_name);
end
if do_logging
l_name = sprintf('worker_%d.log',labindex);
fhl = fopen(l_name,'w');
end
clob = onCleanup(@()fclose('all'));
for i=1:n_blocks
contents = fread(fh,[9,block_size],'*float32');
if do_logging
fprintf(fhl,'sending block N%d\n',i);
end
labSend(contents,1,i);
end
read_size = block_size*n_blocks;
clear('clob');
end
time = toc(t0);
function block = get_block(n_block,chunk_size,n_parts,par,do_logging)
%block_size = size(block);
if numel(par)==1 || isempty(par)
get_data = @(i)(labReceive(i+1,n_block));
check_exist = @(i)(labProbe(i+1,n_block));
if do_logging
fprintf(par,'accignied mpi receivers\n');
end
else
check_exist = @(i)(true);
get_data = @(i)get_bin_data(par{i},(n_block-1)*chunk_size*4*9,chunk_size);
do_logging = false;
end
n_received = 0;
chunks_cache = cell(1,n_parts);
while n_received~=n_parts
if do_logging
fprintf(par,' expecting block %d from all workers\n',n_block);
end
for i=1:n_parts
if check_exist(i)
chunks_cache{i} = get_data(i);
n_received = n_received+1;
end
end
if do_logging
fprintf(par,' received %d out of %d parts\n',n_received,n_parts);
end
end
block = [chunks_cache{:}];
%block = reshape(block ,9,chunk_size*n_parts);
function data= get_bin_data(fh,pos,chunk_size)
fseek(fh,pos,'bof');
data = fread(fh,[9,chunk_size],'*float32');
function par_clear(fh_list)
for i=1:numel(fh_list)
fclose(fh_list{i});
end