Skip to content

Commit

Permalink
Flow Graph scalability improvements (#1348)
Browse files Browse the repository at this point in the history
Signed-off-by: pavelkumbrasev <[email protected]>
Co-authored-by: pavelkumbrasev <[email protected]>
  • Loading branch information
kboyarinov and pavelkumbrasev committed Sep 24, 2024
1 parent 8973c00 commit 4e51d0d
Show file tree
Hide file tree
Showing 33 changed files with 304 additions and 274 deletions.
2 changes: 1 addition & 1 deletion include/oneapi/tbb/collaborative_call_once.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class collaborative_once_flag : no_copy {
spin_wait_until_eq(m_state, expected);
} while (!m_state.compare_exchange_strong(expected, desired));
}

template <typename Fn>
void do_collaborative_call_once(Fn&& f) {
std::uintptr_t expected = m_state.load(std::memory_order_acquire);
Expand Down
28 changes: 14 additions & 14 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1 (in flow_graph.h)
// included in namespace tbb::detail::d2 (in flow_graph.h)

typedef std::uint64_t tag_value;

Expand Down Expand Up @@ -53,7 +53,7 @@ namespace graph_policy_namespace {
// K == type of field used for key-matching. Each tag-matching port will be provided
// functor that, given an object accepted by the port, will return the
/// field of type K being used for matching.
template<typename K, typename KHash=tbb_hash_compare<typename std::decay<K>::type > >
template<typename K, typename KHash=d1::tbb_hash_compare<typename std::decay<K>::type > >
__TBB_requires(tbb::detail::hash_compare<KHash, K>)
struct key_matching {
typedef K key_type;
Expand All @@ -77,7 +77,7 @@ template< typename Output >
class input_body : no_assign {
public:
virtual ~input_body() {}
virtual Output operator()(flow_control& fc) = 0;
virtual Output operator()(d1::flow_control& fc) = 0;
virtual input_body* clone() = 0;
};

Expand All @@ -86,7 +86,7 @@ template< typename Output, typename Body>
class input_body_leaf : public input_body<Output> {
public:
input_body_leaf( const Body &_body ) : body(_body) { }
Output operator()(flow_control& fc) override { return body(fc); }
Output operator()(d1::flow_control& fc) override { return body(fc); }
input_body_leaf* clone() override {
return new input_body_leaf< Output, Body >(body);
}
Expand Down Expand Up @@ -249,12 +249,12 @@ template< typename NodeType >
class forward_task_bypass : public graph_task {
NodeType &my_node;
public:
forward_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n
forward_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.forward_task();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -264,7 +264,7 @@ class forward_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<forward_task_bypass>(ed);
return nullptr;
}
Expand All @@ -278,12 +278,12 @@ class apply_body_task_bypass : public graph_task {
Input my_input;
public:

apply_body_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n, const Input &i
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -293,7 +293,7 @@ class apply_body_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
return nullptr;
}
Expand All @@ -304,10 +304,10 @@ template< typename NodeType >
class input_node_task_bypass : public graph_task {
NodeType &my_node;
public:
input_node_task_bypass( graph& g, small_object_allocator& allocator, NodeType &n )
input_node_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n )
: graph_task(g, allocator), my_node(n) {}

task* execute(execution_data& ed) override {
d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( );
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
Expand All @@ -317,7 +317,7 @@ class input_node_task_bypass : public graph_task {
return next_task;
}

task* cancel(execution_data& ed) override {
d1::task* cancel(d1::execution_data& ed) override {
finalize<input_node_task_bypass>(ed);
return nullptr;
}
Expand Down
4 changes: 2 additions & 2 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1 (in flow_graph.h)
// included in namespace tbb::detail::d2 (in flow_graph.h)

//! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
template< typename T, typename M=spin_mutex >
Expand Down
84 changes: 53 additions & 31 deletions include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,7 @@
namespace tbb {
namespace detail {

namespace d1 {
namespace d2 {

class graph_task;
static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
Expand Down Expand Up @@ -123,27 +123,24 @@ void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
class graph;

//! Base class for tasks generated by graph nodes.
class graph_task : public task {
class graph_task : public d1::task {
public:
graph_task(graph& g, small_object_allocator& allocator
, node_priority_t node_priority = no_priority
)
: my_graph(g)
, priority(node_priority)
, my_allocator(allocator)
{}
graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority = no_priority);

graph& my_graph; // graph instance the task belongs to
// TODO revamp: rename to my_priority
node_priority_t priority;
template <typename DerivedType>
void destruct_and_deallocate(const execution_data& ed);
void destruct_and_deallocate(const d1::execution_data& ed);
protected:
template <typename DerivedType>
void finalize(const execution_data& ed);
void finalize(const d1::execution_data& ed);
private:
// To organize task_list
graph_task* my_next{ nullptr };
small_object_allocator my_allocator;
d1::small_object_allocator my_allocator;
d1::wait_tree_vertex_interface* my_reference_vertex;
// TODO revamp: elaborate internal interfaces to avoid friends declarations
friend class graph_task_list;
friend graph_task* prioritize_task(graph& g, graph_task& gt);
Expand All @@ -157,18 +154,18 @@ struct graph_task_comparator {

typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;

class priority_task_selector : public task {
class priority_task_selector : public d1::task {
public:
priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
priority_task_selector(graph_task_priority_queue_t& priority_queue, d1::small_object_allocator& allocator)
: my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
task* execute(execution_data& ed) override {
task* execute(d1::execution_data& ed) override {
next_task();
__TBB_ASSERT(my_task, nullptr);
task* t_next = my_task->execute(ed);
my_allocator.delete_object(this, ed);
return t_next;
}
task* cancel(execution_data& ed) override {
task* cancel(d1::execution_data& ed) override {
if (!my_task) {
next_task();
}
Expand All @@ -190,7 +187,7 @@ class priority_task_selector : public task {
}

graph_task_priority_queue_t& my_priority_queue;
small_object_allocator my_allocator;
d1::small_object_allocator my_allocator;
graph_task* my_task;
};

Expand Down Expand Up @@ -281,7 +278,7 @@ class graph : no_copy, public graph_proxy {
caught_exception = false;
try_call([this] {
my_task_arena->execute([this] {
wait(my_wait_context, *my_context);
wait(my_wait_context_vertex.get_context(), *my_context);
});
cancelled = my_context->is_group_execution_cancelled();
}).on_exception([this] {
Expand Down Expand Up @@ -332,7 +329,7 @@ class graph : no_copy, public graph_proxy {
bool exception_thrown() { return caught_exception; }

private:
wait_context my_wait_context;
d1::wait_context_vertex my_wait_context_vertex;
task_group_context *my_context;
bool own_context;
bool cancelled;
Expand All @@ -349,30 +346,50 @@ class graph : no_copy, public graph_proxy {

graph_task_priority_queue_t my_priority_queue;

d1::wait_context_vertex& get_wait_context_vertex() { return my_wait_context_vertex; }

friend void activate_graph(graph& g);
friend void deactivate_graph(graph& g);
friend bool is_graph_active(graph& g);
friend bool is_this_thread_in_graph_arena(graph& g);
friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);

friend class task_arena_base;

friend class graph_task;
}; // class graph

template<typename DerivedType>
inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
inline void graph_task::destruct_and_deallocate(const d1::execution_data& ed) {
auto allocator = my_allocator;
// TODO: investigate if direct call of derived destructor gives any benefits.
this->~graph_task();
allocator.deallocate(static_cast<DerivedType*>(this), ed);
}

template<typename DerivedType>
inline void graph_task::finalize(const execution_data& ed) {
graph& g = my_graph;
inline void graph_task::finalize(const d1::execution_data& ed) {
d1::wait_tree_vertex_interface* reference_vertex = my_reference_vertex;
destruct_and_deallocate<DerivedType>(ed);
g.release_wait();
reference_vertex->release();
}

inline graph_task::graph_task(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority)
: my_graph(g)
, priority(node_priority)
, my_allocator(allocator)
{
// If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex
// may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used
// and the task should be associated with the graph wait context itself
// TODO: consider how reference counting can be improved for such a use case. Most common example is the async_node
d1::wait_context_vertex* graph_wait_context_vertex = &my_graph.get_wait_context_vertex();
my_reference_vertex = is_this_thread_in_graph_arena(g) ? r1::get_thread_reference_vertex(graph_wait_context_vertex)
: graph_wait_context_vertex;
__TBB_ASSERT(my_reference_vertex, nullptr);
my_reference_vertex->reserve();
}

//********************************************************************************
Expand Down Expand Up @@ -424,15 +441,20 @@ inline bool is_graph_active(graph& g) {
return g.my_is_active;
}

inline bool is_this_thread_in_graph_arena(graph& g) {
__TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
return r1::execution_slot(*g.my_task_arena) != d1::slot_id(-1);
}

inline graph_task* prioritize_task(graph& g, graph_task& gt) {
if( no_priority == gt.priority )
return &gt;

//! Non-preemptive priority pattern. The original task is submitted as a work item to the
//! priority queue, and a new critical task is created to take and execute a work item with
//! the highest known priority. The reference counting responsibility is transferred (via
//! allocate_continuation) to the new task.
task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
//! the highest known priority. The reference counting responsibility is transferred to
//! the new task.
d1::task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
__TBB_ASSERT( critical_task, "bad_alloc?" );
g.my_priority_queue.push(&gt);
using tbb::detail::d1::submit;
Expand All @@ -443,7 +465,7 @@ inline graph_task* prioritize_task(graph& g, graph_task& gt) {
//! Spawns a task inside graph arena
inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
if (is_graph_active(g)) {
task* gt = prioritize_task(g, arena_task);
d1::task* gt = prioritize_task(g, arena_task);
if( !gt )
return;

Expand All @@ -464,12 +486,12 @@ inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
__TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );

// TODO revamp: decide on the approach that does not postpone critical task
if( task* gt = prioritize_task(g, arena_task) )
if( d1::task* gt = prioritize_task(g, arena_task) )
submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
}
}

} // namespace d1
} // namespace d2
} // namespace detail
} // namespace tbb

Expand Down
12 changes: 6 additions & 6 deletions include/oneapi/tbb/detail/_flow_graph_indexer_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
#error Do not #include this internal file directly; use public TBB headers instead.
#endif

// included in namespace tbb::detail::d1
// included in namespace tbb::detail::d2

#include "_flow_graph_types_impl.h"

Expand Down Expand Up @@ -118,7 +118,7 @@
};
typedef indexer_node_base<InputTuple,output_type,StructTypes> class_type;

class indexer_node_base_operation : public aggregated_operation<indexer_node_base_operation> {
class indexer_node_base_operation : public d1::aggregated_operation<indexer_node_base_operation> {
public:
char type;
union {
Expand All @@ -132,9 +132,9 @@
my_succ(const_cast<successor_type *>(&s)) {}
};

typedef aggregating_functor<class_type, indexer_node_base_operation> handler_type;
friend class aggregating_functor<class_type, indexer_node_base_operation>;
aggregator<handler_type, indexer_node_base_operation> my_aggregator;
typedef d1::aggregating_functor<class_type, indexer_node_base_operation> handler_type;
friend class d1::aggregating_functor<class_type, indexer_node_base_operation>;
d1::aggregator<handler_type, indexer_node_base_operation> my_aggregator;

void handle_operations(indexer_node_base_operation* op_list) {
indexer_node_base_operation *current;
Expand Down
Loading

0 comments on commit 4e51d0d

Please sign in to comment.