Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow Graph scalability improvements #1348

Merged
merged 30 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e8775e6
Add wait_vertex implementation
pavelkumbrasev Apr 15, 2024
300cf36
Improve task_group scalability
pavelkumbrasev Apr 15, 2024
dc47f47
Fix collaborative_call_once compilation
pavelkumbrasev Apr 15, 2024
9fc0dc1
Add comment for release in wait_vertex
pavelkumbrasev Apr 15, 2024
d73588f
Refactor node structure. Replace nodes in parallel_for and parallel_r…
pavelkumbrasev Feb 27, 2024
6b8760e
Fix constructor and copyright
pavelkumbrasev Feb 27, 2024
b783d09
Improve scalability of Flow Graph
kboyarinov Feb 27, 2024
b616e7b
Cleanup and increase FG interface version
kboyarinov Feb 28, 2024
9debc85
Fix issues caused by interface version changes
kboyarinov Feb 29, 2024
ef1ede2
Align changes
kboyarinov Apr 17, 2024
021c292
Exclude changes in parallel for
kboyarinov Apr 17, 2024
85d163d
Exclude changes in task.cpp
kboyarinov Apr 17, 2024
fbe6dc2
Increase interface version for multiple interfaces
kboyarinov Apr 17, 2024
07cb0a3
Temporarly disable map cleanup on dispatcher dtor
kboyarinov Apr 17, 2024
30d5caa
Fix deduction guides
kboyarinov May 1, 2024
e238a5e
Save progress
kboyarinov Jun 18, 2024
5c16961
Add to CI
kboyarinov Jul 2, 2024
e33dcf0
Fix flow_control namespace
kboyarinov Jul 2, 2024
6b6704f
Cleanup + copyright update
kboyarinov Jul 3, 2024
d112e09
Update scheduler_common
kboyarinov Jul 3, 2024
f95d34b
Fix async_node gateway
kboyarinov Jul 4, 2024
a53218f
Add workaround with new entry point
kboyarinov Jul 8, 2024
128a364
Add execution_slot entry point to arena.cpp
kboyarinov Jul 12, 2024
412c8de
Fix incorrect rebase
kboyarinov Jul 12, 2024
b3fec48
Cleanup + def files for Windows
kboyarinov Jul 12, 2024
466826b
Update src/tbb/arena.cpp
kboyarinov Jul 15, 2024
5957192
Apply comments
kboyarinov Jul 26, 2024
4ae4781
Fix def file on Win32
kboyarinov Aug 1, 2024
7d1ff3f
Revert "Fix def file on Win32"
kboyarinov Aug 1, 2024
5e41850
Fix Win32 def
kboyarinov Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/oneapi/tbb/collaborative_call_once.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class collaborative_once_flag : no_copy {
spin_wait_until_eq(m_state, expected);
} while (!m_state.compare_exchange_strong(expected, desired));
}

template <typename Fn>
void do_collaborative_call_once(Fn&& f) {
std::uintptr_t expected = m_state.load(std::memory_order_acquire);
Expand Down
28 changes: 14 additions & 14 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1 (in flow_graph.h)
// included in namespace tbb::detail::d2 (in flow_graph.h)

typedef std::uint64_t tag_value;

Expand Down Expand Up @@ -53,7 +53,7 @@ namespace graph_policy_namespace {
// K == type of field used for key-matching. Each tag-matching port will be provided
// functor that, given an object accepted by the port, will return the
/// field of type K being used for matching.
template<typename K, typename KHash=tbb_hash_compare<typename std::decay<K>::type > >
template<typename K, typename KHash=d1::tbb_hash_compare<typename std::decay<K>::type > >
__TBB_requires(tbb::detail::hash_compare<KHash, K>)
struct key_matching {
typedef K key_type;
Expand All @@ -77,7 +77,7 @@ template< typename Output >
class input_body : no_assign {
public:
virtual ~input_body() {}
virtual Output operator()(flow_control& fc) = 0;
virtual Output operator()(d1::flow_control& fc) = 0;
virtual input_body* clone() = 0;
};

Expand All @@ -86,7 +86,7 @@ template< typename Output, typename Body>
class input_body_leaf : public input_body<Output> {
public:
input_body_leaf( const Body &_body ) : body(_body) { }
Output operator()(flow_control& fc) override { return body(fc); }
Output operator()(d1::flow_control& fc) override { return body(fc); }
input_body_leaf* clone() override {
return new input_body_leaf< Output, Body >(body);
}
Expand Down Expand Up @@ -249,12 +249,12 @@ template< typename NodeType >
class forward_task_bypass : public graph_task {
NodeType &my_node;
public:
forward_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n
forward_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.forward_task();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -264,7 +264,7 @@ class forward_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<forward_task_bypass>(ed);
return nullptr;
}
Expand All @@ -278,12 +278,12 @@ class apply_body_task_bypass : public graph_task {
Input my_input;
public:

apply_body_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n, const Input &i
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -293,7 +293,7 @@ class apply_body_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
return nullptr;
}
Expand All @@ -304,10 +304,10 @@ template< typename NodeType >
class input_node_task_bypass : public graph_task {
NodeType &my_node;
public:
input_node_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n )
input_node_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n )
: graph_task(g, allocator), my_node(n) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( );
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -317,7 +317,7 @@ class input_node_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<input_node_task_bypass>(ed);
return nullptr;
}
Expand Down
4 changes: 2 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1 (in flow_graph.h)
// included in namespace tbb::detail::d2 (in flow_graph.h)

//! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
template< typename T, typename M=spin_mutex >
Expand Down
80 changes: 51 additions & 29 deletions include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,7 @@
namespace tbb {
namespace detail {

namespace d1 {
namespace d2 {

class graph_task;
static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
Expand Down Expand Up @@ -123,27 +123,24 @@ void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
class graph;

//! Base class for tasks generated by graph nodes.
class graph_task : public task {
class graph_task : public d1::task {
public:
graph_task(graph& g, small_object_allocator& allocator
, node_priority_t node_priority = no_priority
)
: my_graph(g)
, priority(node_priority)
, my_allocator(allocator)
{}
graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority = no_priority);

graph& my_graph; // graph instance the task belongs to
// TODO revamp: rename to my_priority
node_priority_t priority;
template <typename DerivedType>
void destruct_and_deallocate(const execution_data& ed);
void destruct_and_deallocate(const d1::execution_data& ed);
protected:
template <typename DerivedType>
void finalize(const execution_data& ed);
void finalize(const d1::execution_data& ed);
private:
// To organize task_list
graph_task* my_next{ nullptr };
small_object_allocator my_allocator;
d1::small_object_allocator my_allocator;
d1::wait_tree_vertex_interface* my_reference_vertex;
// TODO revamp: elaborate internal interfaces to avoid friends declarations
friend class graph_task_list;
friend graph_task* prioritize_task(graph& g, graph_task& gt);
Expand All @@ -157,18 +154,18 @@ struct graph_task_comparator {

typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;

class priority_task_selector : public task {
class priority_task_selector : public d1::task {
public:
priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
priority_task_selector(graph_task_priority_queue_t& priority_queue, d1::small_object_allocator& allocator)
: my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
task* execute(execution_data& ed) override {
task* execute(d1::execution_data& ed) override {
next_task();
__TBB_ASSERT(my_task, nullptr);
task* t_next = my_task->execute(ed);
my_allocator.delete_object(this, ed);
return t_next;
}
task* cancel(execution_data& ed) override {
task* cancel(d1::execution_data& ed) override {
if (!my_task) {
next_task();
}
Expand All @@ -190,7 +187,7 @@ class priority_task_selector : public task {
}

graph_task_priority_queue_t& my_priority_queue;
small_object_allocator my_allocator;
d1::small_object_allocator my_allocator;
graph_task* my_task;
};

Expand Down Expand Up @@ -281,7 +278,7 @@ class graph : no_copy, public graph_proxy {
caught_exception = false;
try_call([this] {
my_task_arena->execute([this] {
wait(my_wait_context, *my_context);
wait(my_wait_context_vertex.get_context(), *my_context);
});
cancelled = my_context->is_group_execution_cancelled();
}).on_exception([this] {
Expand Down Expand Up @@ -332,7 +329,7 @@ class graph : no_copy, public graph_proxy {
bool exception_thrown() { return caught_exception; }

private:
wait_context my_wait_context;
d1::wait_context_vertex my_wait_context_vertex;
task_group_context *my_context;
bool own_context;
bool cancelled;
Expand All @@ -349,30 +346,50 @@ class graph : no_copy, public graph_proxy {

graph_task_priority_queue_t my_priority_queue;

d1::wait_context_vertex& get_wait_context_vertex() { return my_wait_context_vertex; }

friend void activate_graph(graph& g);
friend void deactivate_graph(graph& g);
friend bool is_graph_active(graph& g);
friend bool is_thread_in_graph_arena(graph& g);
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);

friend class task_arena_base;

friend class graph_task;
}; // class graph

template<typename DerivedType>
inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
inline void graph_task::destruct_and_deallocate(const d1::execution_data& ed) {
auto allocator = my_allocator;
// TODO: investigate if direct call of derived destructor gives any benefits.
this->~graph_task();
allocator.deallocate(static_cast<DerivedType*>(this), ed);
}

template<typename DerivedType>
inline void graph_task::finalize(const execution_data& ed) {
graph& g = my_graph;
inline void graph_task::finalize(const d1::execution_data& ed) {
d1::wait_tree_vertex_interface* reference_vertex = my_reference_vertex;
destruct_and_deallocate<DerivedType>(ed);
g.release_wait();
reference_vertex->release();
}

inline graph_task::graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority)
: my_graph(g)
, priority(node_priority)
, my_allocator(allocator)
{
// If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex
// may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used
// and the task should be associated with the graph wait context itself
// TODO: consider how reference counting can be improved for such a use case. Most common example is the async_node
d1::wait_context_vertex* graph_wait_context_vertex = &my_graph.get_wait_context_vertex();
my_reference_vertex = is_thread_in_graph_arena(g) ? r1::get_thread_reference_vertex(graph_wait_context_vertex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this can happen even without async_node. Consider the following case:

std::thread t1([]() {
            tbb::task_arena& gta = get_graph_task_arena();
            gta.execute([]() {
                             auto& some_graph_node = get_graph_node();
                             some_graph_node.try_put(/*node message*/1); // spawn of task A
            });
});

t1.join();


// task A starts its execution
graph.wait_for_all();

Don't we support such cases anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it is guaranteed that t1 will either join the gta and execute the task if there is an available slot or send a task and it would be executed by another worker in gta. In each case, the thread executing the lambda and calling try_put is guaranteed to be in gta and hence has its reference node in the TLS (the code will choose the true branch).
If t1 would submit work in the graph without explicitly calling task_arena::execute, the false branch would be chosen and the task reference counting would work as before this PR by adding a reference to graph wait_context.

: graph_wait_context_vertex;
__TBB_ASSERT(my_reference_vertex, nullptr);
my_reference_vertex->reserve();
}

//********************************************************************************
Expand Down Expand Up @@ -424,6 +441,11 @@ inline bool is_graph_active(graph& g) {
return g.my_is_active;
}

inline bool is_thread_in_graph_arena(graph& g) {
__TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
return r1::execution_slot(*g.my_task_arena) != d1::slot_id(-1);
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
}

inline graph_task* prioritize_task(graph& g, graph_task& gt) {
if( no_priority == gt.priority )
return &gt;
Expand All @@ -432,7 +454,7 @@ inline graph_task* prioritize_task(graph& g, graph_task& gt) {
//! priority queue, and a new critical task is created to take and execute a work item with
//! the highest known priority. The reference counting responsibility is transferred (via
//! allocate_continuation) to the new task.
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
d1::task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
__TBB_ASSERT( critical_task, "bad_alloc?" );
g.my_priority_queue.push(&gt);
using tbb::detail::d1::submit;
Expand All @@ -443,7 +465,7 @@ inline graph_task* prioritize_task(graph& g, graph_task& gt) {
//! Spawns a task inside graph arena
inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
if (is_graph_active(g)) {
task* gt = prioritize_task(g, arena_task);
d1::task* gt = prioritize_task(g, arena_task);
if( !gt )
return;

Expand All @@ -464,12 +486,12 @@ inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
__TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );

// TODO revamp: decide on the approach that does not postpone critical task
if( task* gt = prioritize_task(g, arena_task) )
if( d1::task* gt = prioritize_task(g, arena_task) )
submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
}
}

} // namespace d1
} // namespace d2
} // namespace detail
} // namespace tbb

Expand Down
12 changes: 6 additions & 6 deletions include/oneapi/tbb/detail/_flow_graph_indexer_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1
// included in namespace tbb::detail::d2

#include "_flow_graph_types_impl.h"

Expand Down Expand Up @@ -118,7 +118,7 @@
};
typedef indexer_node_base<InputTuple,output_type,StructTypes> class_type;

class indexer_node_base_operation : public aggregated_operation<indexer_node_base_operation> {
class indexer_node_base_operation : public d1::aggregated_operation<indexer_node_base_operation> {
public:
char type;
union {
Expand All @@ -132,9 +132,9 @@
my_succ(const_cast<successor_type *>(&s)) {}
};

typedef aggregating_functor<class_type, indexer_node_base_operation> handler_type;
friend class aggregating_functor<class_type, indexer_node_base_operation>;
aggregator<handler_type, indexer_node_base_operation> my_aggregator;
typedef d1::aggregating_functor<class_type, indexer_node_base_operation> handler_type;
friend class d1::aggregating_functor<class_type, indexer_node_base_operation>;
d1::aggregator<handler_type, indexer_node_base_operation> my_aggregator;

void handle_operations(indexer_node_base_operation* op_list) {
indexer_node_base_operation *current;
Expand Down
Loading
Loading