This repository has been archived by the owner on Apr 5, 2023. It is now read-only.
forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Parallel.h
141 lines (124 loc) · 4 KB
/
Parallel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#pragma once
#include <ATen/ATen.h>
#include <atomic>
#include <cstddef>
#include <exception>
#ifdef _OPENMP
#include <omp.h>
#endif
namespace at {
namespace internal {
// This parameter is heuristically chosen to determine the minimum number of
// work that warrants paralellism. For example, when summing an array, it is
// deemed inefficient to parallelise over arrays shorter than 32768. Further,
// no parallel algorithm (such as parallel_reduce) should split work into
// smaller than GRAIN_SIZE chunks.
constexpr int64_t GRAIN_SIZE = 32768;
} // namespace internal
inline int64_t divup(int64_t x, int64_t y) {
return (x + y - 1) / y;
}
inline int get_max_threads() {
#ifdef _OPENMP
return omp_get_max_threads();
#else
return 1;
#endif
}
inline int get_thread_num() {
#ifdef _OPENMP
return omp_get_thread_num();
#else
return 0;
#endif
}
inline bool in_parallel_region() {
#ifdef _OPENMP
return omp_in_parallel();
#else
return false;
#endif
}
template <class F>
inline void parallel_for(
const int64_t begin,
const int64_t end,
const int64_t grain_size,
const F& f) {
#ifdef _OPENMP
std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;
#pragma omp parallel if (!omp_in_parallel() && ((end - begin) >= grain_size))
{
int64_t num_threads = omp_get_num_threads();
int64_t tid = omp_get_thread_num();
int64_t chunk_size = divup((end - begin), num_threads);
int64_t begin_tid = begin + tid * chunk_size;
if (begin_tid < end) {
try {
f(begin_tid, std::min(end, chunk_size + begin_tid));
} catch (...) {
if (!err_flag.test_and_set()) {
eptr = std::current_exception();
}
}
}
}
if (eptr) {
std::rethrow_exception(eptr);
}
#else
if (begin < end) {
f(begin, end);
}
#endif
}
/*
parallel_reduce
begin: index at which to start applying reduction
end: index at which to stop applying reduction
grain_size: number of elements per chunk. impacts number of elements in
intermediate results tensor and degree of parallelization.
ident: identity for binary combination function sf. sf(ident, x) needs to return
x.
f: function for reduction over a chunk. f needs to be of signature scalar_t
f(int64_t partial_begin, int64_t partial_end, scalar_t identifiy)
sf: function to combine two partial results. sf needs to be of signature
scalar_t sf(scalar_t x, scalar_t y)
For example, you might have a tensor of 10000 entires and want to sum together
all the elements. Parallel_reduce with a grain_size of 2500 will then allocate
an intermediate result tensor with 4 elements. Then it will execute the function
"f" you provide and pass the beginning and end index of these chunks, so
0-2499, 2500-4999, etc. and the combination identity. It will then write out
the result from each of these chunks into the intermediate result tensor. After
that it'll reduce the partial results from each chunk into a single number using
the combination function sf and the identity ident. For a total summation this
would be "+" and 0 respectively. This is similar to tbb's approach [1], where
you need to provide a function to accumulate a subrange, a function to combine
two partial results and an identity.
[1] https://software.intel.com/en-us/node/506154
*/
template <class scalar_t, class F, class SF>
inline scalar_t parallel_reduce(
const int64_t begin,
const int64_t end,
const int64_t grain_size,
const scalar_t ident,
const F f,
const SF sf) {
if (get_num_threads() == 1) {
return f(begin, end, ident);
} else {
const int64_t num_results = divup((end - begin), grain_size);
std::vector<scalar_t> results(num_results);
scalar_t* results_data = results.data();
#pragma omp parallel for if ((end - begin) >= grain_size)
for (int64_t id = 0; id < num_results; id++) {
int64_t i = begin + id * grain_size;
results_data[id] = f(i, i + std::min(end - i, grain_size), ident);
}
return std::accumulate(
results_data, results_data + results.size(), ident, sf);
}
}
} // namespace at