-
Notifications
You must be signed in to change notification settings - Fork 0
/
allreduce.h
108 lines (88 loc) · 3.4 KB
/
allreduce.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#ifndef ALLREDUCE_H
#define ALLREDUCE_H
#include <libdash.h>
#ifdef SCOREP_USER_ENABLE
#include <scorep/SCOREP_User.h>
#define SCOREP_USER_FUNC() \
SCOREP_USER_REGION(__PRETTY_FUNCTION__, SCOREP_USER_REGION_TYPE_FUNCTION)
#else
#define SCOREP_USER_FUNC()
#endif
/* The class is used for the async lazy residual computation.
Make this a template class with template arguments for the element type
and for the number of elements per unit.
*/
class Allreduce {
/* distributed array for all local residual values of every unit.
It is still going to be allocated in a way, that all elements are
owned by unit 0.
It can also be used by a subteam of the team, that created it. */
dash::Array<double> centralized;
/* truely distributed version where the global residula is copied to
so that all units can access quickly. */
dash::Array<double> distributed;
public:
Allreduce( dash::Team& team ) :
centralized( team.size(), dash::BLOCKCYCLIC(team.size()), team),
distributed(team.size(), dash::BLOCKED, team) {
reset(team);
}
/* can be used with a subteam of the team used in the constructor */
void reset( dash::Team& team ) {
SCOREP_USER_FUNC()
/* really only unit 0 in the given team is doing something.
std::fill is the really the correct algorithm */
std::fill( centralized.lbegin(), centralized.lend(), std::numeric_limits<double>::max() );
std::fill( distributed.lbegin(), distributed.lend(), std::numeric_limits<double>::max() );
}
/* can be used with a subteam of the team used in the constructor,
does a barrier in the given team */
void collect_and_spread( dash::Team& team ) {
SCOREP_USER_FUNC()
centralized.flush();
team.barrier();
/* now all new values are there, get the maximum on unit 0 */
if ( 0 == team.myid() ) {
/*for(const auto& elem : centralized.local)
std::cout << elem << ",";
std::cout << std::endl;*/
distributed.local[0] =
*std::max_element(centralized.lbegin(), centralized.lbegin() + team.size());
for (auto i = 1; i < team.size(); ++i)
distributed.async[i].set(distributed.local[0]);
}
}
#if 0
/* broadcast the value from unit 0 to all units in the given team
which might be a subteam of the team from constuction time */
void asyncbroadcast( dash::Team& team ) {
SCOREP_USER_FUNC()
/* all but unit 0 fetch the value because unit 0 cannot know
everybody else's local element in 'distributed' */
if ( 0 == team.myid() ) {
for(auto i = 1; i < team.size(); ++i)
distributed.async[i].set(distributed.local[0]);
}
}
#endif
void wait( dash::Team& team ) {
SCOREP_USER_FUNC()
distributed.async.flush();
team.barrier();
}
/* send local residual to the correct place in unit 0's array,
need to be followed by collect() eventually */
void set( double* res, dash::Team& team ) {
SCOREP_USER_FUNC()
//std::cout << "TEST - " << team.myid() << " -> " << res << std::endl;
if (team.myid() != 0)
centralized.async[team.myid()].set(res);
else
centralized.local[0] = *res;
}
double get() const {
SCOREP_USER_FUNC()
return distributed.local[0];
}
};
#endif /* ALLREDUCE_H */