Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP : numa_partitioner #1458

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include <oneapi/tbb.h>
#include <iostream>
#include <vector>
#include <numeric>
#include <atomic>
#include <chrono>

int main() {
// Initialize a vector with some values
size_t data_size = 1 << 20; // Example: 1 million elements
std::vector<float> data(data_size);
std::iota(data.begin(), data.end(), 1.0f); // Fill the vector with values 1, 2, ..., data_size

// Atomic variable to hold the total sum
std::atomic<float> total_sum(0.0f);

// Lambda function to add local sums to the total sum atomically
auto atomic_add = [](std::atomic<float>& target, float value) {
float current = target.load();
while (!target.compare_exchange_weak(current, current + value));
};

// Create a static partitioner
tbb::static_partitioner partitioner;

// Start the timer
auto start_time = std::chrono::high_resolution_clock::now();

// Parallel sum using tbb::parallel_for with static_partitioner
tbb::parallel_for(tbb::blocked_range<size_t>(0, data.size()),
[&data, &total_sum, &atomic_add](const tbb::blocked_range<size_t>& range) {
float local_sum = 0;
for (size_t i = range.begin(); i != range.end(); ++i) {
local_sum += data[i];
}
atomic_add(total_sum, local_sum);
}, tbb::numa_partitioner<tbb::static_partitioner>());

auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();

// Print the total sum
std::cout << "Total sum: " << total_sum.load() << std::endl;
std::cout << "Execution time: " << duration << " milliseconds" << std::endl;
return 0;
}
55 changes: 54 additions & 1 deletion include/oneapi/tbb/parallel_for.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <cstddef>
#include <new>


namespace tbb {
namespace detail {
#if __TBB_CPP20_CONCEPTS_PRESENT
Expand Down Expand Up @@ -261,7 +262,59 @@ void parallel_for( const Range& range, const Body& body, affinity_partitioner& p
start_for<Range,Body,affinity_partitioner>::run(range,body,partitioner);
}

//! Parallel iteration over range with default partitioner and user-supplied context.
template<typename T, typename = void>
struct is_2d_range : std::false_type {};

template<typename T>
struct is_2d_range<T, std::void_t<decltype(std::declval<T>().rows()), decltype(std::declval<T>().cols())>> : std::true_type {};

template<typename Range, typename Body, typename T>
__TBB_requires(tbb_range<Range> && parallel_for_body<Body, Range>)
void parallel_for(const Range& range, const Body& body, const T& n_partitioner) {

//size_t data_size = range.size();
size_t data_size;
if constexpr (is_2d_range<Range>::value) {
data_size = (range.rows().end() - range.rows().begin()) * (range.cols().end() - range.cols().begin());
} else {
data_size = range.end() - range.begin();
}
size_t partition_size = data_size / n_partitioner.num_numa_nodes;
std::vector<oneapi::tbb::task_group> task_groups(n_partitioner.num_numa_nodes);

n_partitioner.data_partitions.resize(n_partitioner.num_numa_nodes);

// Initialize the data in partitions
auto initialize_data = [&](std::vector<float>& partition, const tbb::blocked_range<size_t>& range) {
for (size_t j = range.begin(); j != range.end(); ++j) {
partition[j] = static_cast<float>(j);
}
};

for (size_t i = 0; i < n_partitioner.num_numa_nodes; ++i) {
n_partitioner.arenas[i].execute([&task_groups, &n_partitioner, &range, &initialize_data, &body, i]() {
task_groups[i].run([&, i] {
parallel_for(tbb::blocked_range<size_t>(0, n_partitioner.data_partitions[i].size()),
[&](const tbb::blocked_range<size_t>& r) {
initialize_data(n_partitioner.data_partitions[i], r);
}, n_partitioner.inner_partitioner);
parallel_for(range,
[&](const tbb::blocked_range<size_t>& r) {
body(r);
}, n_partitioner.inner_partitioner);
});
});
}

for (size_t i = 0; i < n_partitioner.num_numa_nodes; ++i) {
n_partitioner.arenas[i].execute([&task_groups, i]() {
task_groups[i].wait();
});
}
}


//! Parallel iteration over range with default partitioner and user-supplied context.
/** @ingroup algorithms **/
template<typename Range, typename Body>
__TBB_requires(tbb_range<Range> && parallel_for_body<Body, Range>)
Expand Down
34 changes: 33 additions & 1 deletion include/oneapi/tbb/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class static_partitioner;
class affinity_partitioner;
class affinity_partition_type;
class affinity_partitioner_base;

template <typename T> class numa_partitioner;

inline std::size_t get_initial_auto_partitioner_divisor() {
const std::size_t factor = 4;
return factor * static_cast<std::size_t>(max_concurrency());
Expand Down Expand Up @@ -654,6 +655,35 @@ class affinity_partitioner : affinity_partitioner_base {
typedef affinity_partition_type::split_type split_type;
};

template <typename T>
class numa_partitioner{
public:
numa_partitioner() {
printf("Creating Numa partitioner\n");
numa_nodes = oneapi::tbb::info::numa_nodes();
num_numa_nodes = numa_nodes.size();

// Create a task arena for each valid NUMA node
for (int node : numa_nodes) {
arenas.emplace_back(tbb::task_arena::constraints().set_numa_id(node));
}

}

mutable std::vector<oneapi::tbb::numa_node_id> numa_nodes;
mutable std::vector<oneapi::tbb::task_arena> arenas;
mutable std::vector<std::vector<float>> data_partitions;
T inner_partitioner;
//std::vector<std::vector<float>>& get_data_partitions() {
//return data_partitions;
//}
size_t num_numa_nodes;
private:
//template<typename Range, typename Body, typename Partitioner> friend struct start_for;
//typedef hierarchical_partition_type task_partition_type;
typedef detail::proportional_split split_type;
};

} // namespace d1
} // namespace detail

Expand All @@ -663,6 +693,8 @@ using detail::d1::auto_partitioner;
using detail::d1::simple_partitioner;
using detail::d1::static_partitioner;
using detail::d1::affinity_partitioner;
using detail::d1::numa_partitioner;

// Split types
using detail::split;
using detail::proportional_split;
Expand Down
44 changes: 44 additions & 0 deletions p_for.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include <iostream>
#include <vector>
#include <cmath>

#include <tbb/parallel_for.h>

int main(int argc, char **argv)
{
auto values = std::vector<double>(10000);

tbb::parallel_for( tbb::blocked_range<size_t>(0,values.size()),
[&](tbb::blocked_range<size_t> r)
{
for (size_t i=r.begin(); i<r.end(); ++i)
{
values[i] = std::sin(i * 0.001);
}
},tbb::numa_partitioner<tbb::static_partitioner>());
double total = 0;

for (double value : values)
{
total += value;
}

std::cout << total << std::endl;

total = 0;
tbb::parallel_for( tbb::blocked_range<size_t>(0,values.size()),
[&](tbb::blocked_range<size_t> r)
{
for (size_t i=r.begin(); i<r.end(); ++i)
{
values[i] = std::sin(i * 0.001);
}
}, tbb::static_partitioner());
for (double value : values)
{
total += value;
}

std::cout << total << std::endl;
return 0;
}
Loading