From 19a86e23005a878f36d95a1f89b8462a1cbce2b5 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 11 Jun 2024 08:11:41 -0700 Subject: [PATCH 01/22] Add implementation for function_node --- include/oneapi/tbb/detail/_config.h | 4 + .../oneapi/tbb/detail/_flow_graph_body_impl.h | 58 +++- .../tbb/detail/_flow_graph_cache_impl.h | 37 ++- include/oneapi/tbb/detail/_flow_graph_impl.h | 38 +++ .../tbb/detail/_flow_graph_indexer_impl.h | 5 + .../tbb/detail/_flow_graph_item_buffer_impl.h | 144 ++++++++-- .../oneapi/tbb/detail/_flow_graph_join_impl.h | 15 ++ .../oneapi/tbb/detail/_flow_graph_node_impl.h | 99 +++++-- include/oneapi/tbb/flow_graph.h | 74 +++++ test/common/graph_utils.h | 8 + test/tbb/test_function_node.cpp | 253 ++++++++++++++++++ 11 files changed, 671 insertions(+), 64 deletions(-) diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index d6705e154c..f7c821a3c0 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -521,6 +521,10 @@ #define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES) #endif +#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES) +#endif + #if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS #define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1 #endif diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index ece27bd1a4..e55ff1807c 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -270,31 +270,64 @@ class forward_task_bypass : public graph_task { } }; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +template +struct graph_task_base : std::conditional +{}; +#endif + //! A task that calls a node's apply_body_bypass function, passing in an input of type Input // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr -template< typename NodeType, typename Input > -class apply_body_task_bypass : public graph_task { +template< typename NodeType, typename Input, typename... Metainfo > +class apply_body_task_bypass +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + : public graph_task_base::type +#else + : public graph_task +#endif +{ NodeType &my_node; Input my_input; -public: - apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i - , node_priority_t node_priority = no_priority - ) : graph_task(g, allocator, node_priority), - my_node(n), my_input(i) {} +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + using base_type = typename graph_task_base::type; +#else + using base_type = graph_task; +#endif + + graph_task* call_apply_body_bypass_impl(std::true_type) { + return my_node.apply_body_bypass(my_input); + } + + graph_task* call_apply_body_bypass_impl(std::false_type) { + return my_node.apply_body_bypass(my_input, this->get_msg_wait_context_vertexes()); + } + + graph_task* call_apply_body_bypass() { + return call_apply_body_bypass_impl(std::is_same{}); + } + +public: + apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i, + node_priority_t node_priority = no_priority, + const Metainfo&... metainfo ) + : base_type(g, allocator, metainfo.waiters()..., node_priority), + my_node(n), my_input(i) {} d1::task* execute(d1::execution_data& ed) override { - graph_task* next_task = my_node.apply_body_bypass( my_input ); + graph_task* next_task = call_apply_body_bypass(); if (SUCCESSFULLY_ENQUEUED == next_task) next_task = nullptr; else if (next_task) next_task = prioritize_task(my_node.graph_reference(), *next_task); - finalize(ed); + base_type::template finalize(ed); return next_task; } d1::task* cancel(d1::execution_data& ed) override { - finalize(ed); + base_type::template finalize(ed); return nullptr; } }; @@ -343,6 +376,11 @@ class threshold_regulatormy_graph; } diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 83b667b9dc..f22e3d7442 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -268,6 +268,9 @@ class successor_cache : no_copy { } virtual graph_task* try_put_task( const T& t ) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo) = 0; +#endif }; // successor_cache //! An abstract cache of successors, specialized to continue_msg @@ -327,6 +330,9 @@ class successor_cache< continue_msg, M > : no_copy { } virtual graph_task* try_put_task( const continue_msg& t ) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0; +#endif }; // successor_cache< continue_msg > //! A cache of successors that are broadcast to @@ -336,19 +342,13 @@ class broadcast_cache : public successor_cache { typedef M mutex_type; typedef typename successor_cache::successors_type successors_type; -public: - - broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { - // Do not work with the passed pointer here as it may not be fully initialized yet - } - - // as above, but call try_put_task instead, and return the last task we received (if any) - graph_task* try_put_task( const T &t ) override { + template + graph_task* try_put_task_impl( const T& t, const Metainfo&... metainfo) { graph_task * last_task = nullptr; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task *new_task = (*i)->try_put_task(t); + graph_task *new_task = (*i)->try_put_task(t, metainfo...); // workaround for icc bug graph& graph_ref = (*i)->graph_reference(); last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary @@ -365,6 +365,20 @@ class broadcast_cache : public successor_cache { } return last_task; } +public: + + broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { + // Do not work with the passed pointer here as it may not be fully initialized yet + } + + // as above, but call try_put_task instead, and return the last task we received (if any) + graph_task* try_put_task( const T &t ) override { + return try_put_task_impl(t); + } + + graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override { + return try_put_task_impl(t, metainfo); + } // call try_put_task and return list of received tasks bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { @@ -429,6 +443,11 @@ class round_robin_cache : public successor_cache { } return nullptr; } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for round robin cache + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif }; #endif // __TBB__flow_graph_cache_impl_H diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 27ac5316ed..0a766352cb 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -146,6 +146,41 @@ class graph_task : public d1::task { friend graph_task* prioritize_task(graph& g, graph_task& gt); }; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class graph_task_with_message_waiters : public graph_task { +public: + graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator, + const std::forward_list& msg_waiters, + node_priority_t node_priority = no_priority) + : graph_task(g, allocator, node_priority) + , my_msg_wait_context_vertexes(msg_waiters) + { + for (auto& msg_waiter : msg_waiters) { + my_msg_reference_vertexes.emplace_back(r1::get_thread_reference_vertex(msg_waiter)); + my_msg_reference_vertexes.back()->reserve(1); + } + } + + const std::forward_list get_msg_wait_context_vertexes() const { + return my_msg_wait_context_vertexes; + } + +protected: + template + void finalize(const d1::execution_data& ed) { + auto msg_reference_vertexes = std::move(my_msg_reference_vertexes); + graph_task::finalize(ed); + + for (auto& msg_waiter : msg_reference_vertexes) { + msg_waiter->release(1); + } + } +private: + std::forward_list my_msg_wait_context_vertexes; + std::list my_msg_reference_vertexes; +}; // class graph_task_with_message_waiters +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + struct graph_task_comparator { bool operator()(const graph_task* left, const graph_task* right) { return left->priority < right->priority; @@ -357,6 +392,9 @@ class graph : no_copy, public graph_proxy { friend class task_arena_base; friend class graph_task; + + template + friend class receiver; }; // class graph template diff --git a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h index 258a0f2423..dc43bb6589 100644 --- a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h @@ -79,6 +79,11 @@ return my_try_put_task(v, my_indexer_ptr); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for indexer_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return *my_graph; } diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 423033b1d5..1edaf4d82e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -37,8 +37,14 @@ class item_buffer { typedef T item_type; enum buffer_item_state { no_item=0, has_item=1, reserved_item=2 }; protected: + struct aligned_space_item { + item_type item; + buffer_item_state state; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo metainfo; +#endif + }; typedef size_t size_type; - typedef std::pair aligned_space_item; typedef aligned_space buffer_item_type; typedef typename allocator_traits::template rebind_alloc allocator_type; buffer_item_type *my_array; @@ -49,39 +55,61 @@ class item_buffer { bool buffer_empty() const { return my_head == my_tail; } - aligned_space_item &item(size_type i) { - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->second))%alignment_of::value), nullptr); - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->first))%alignment_of::value), nullptr); + aligned_space_item &element(size_type i) { + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of::value), nullptr); + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of::value), nullptr); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of::value), nullptr); +#endif return *my_array[i & (my_array_size - 1) ].begin(); } - const aligned_space_item &item(size_type i) const { - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->second))%alignment_of::value), nullptr); - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->first))%alignment_of::value), nullptr); + const aligned_space_item &element(size_type i) const { + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of::value), nullptr); + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of::value), nullptr); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of::value), nullptr); +#endif return *my_array[i & (my_array_size-1)].begin(); } - bool my_item_valid(size_type i) const { return (i < my_tail) && (i >= my_head) && (item(i).second != no_item); } + bool my_item_valid(size_type i) const { return (i < my_tail) && (i >= my_head) && (element(i).state != no_item); } #if TBB_USE_ASSERT - bool my_item_reserved(size_type i) const { return item(i).second == reserved_item; } + bool my_item_reserved(size_type i) const { return element(i).state == reserved_item; } #endif // object management in buffer const item_type &get_my_item(size_t i) const { __TBB_ASSERT(my_item_valid(i),"attempt to get invalid item"); - item_type* itm = const_cast(reinterpret_cast(&item(i).first)); - return *itm; + return element(i).item; } // may be called with an empty slot or a slot that has already been constructed into. - void set_my_item(size_t i, const item_type &o) { - if(item(i).second != no_item) { + void set_my_item(size_t i, const item_type &o +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + , const message_metainfo& metainfo +#endif + ) { + if(element(i).state != no_item) { destroy_item(i); } - new(&(item(i).first)) item_type(o); - item(i).second = has_item; + new(&(element(i).item)) item_type(o); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + new(&element(i).metainfo) message_metainfo(metainfo); + + for (auto& waiter : metainfo.waiters()) { + waiter->reserve(); + } +#endif + element(i).state = has_item; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + void set_my_item(size_t i, const item_type&o) { + set_my_item(i, o, message_metainfo{}); + } +#endif + // destructively-fetch an object from the buffer void fetch_item(size_t i, item_type &o) { __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot"); @@ -119,8 +147,18 @@ class item_buffer { void destroy_item(size_type i) { __TBB_ASSERT(my_item_valid(i), "destruction of invalid item"); - item(i).first.~item_type(); - item(i).second = no_item; + + auto& e = element(i); + e.item.~item_type(); + e.state = no_item; + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + for (auto& msg_waiter : e.metainfo.waiters()) { + msg_waiter->release(1); + } + + e.metainfo.~message_metainfo(); +#endif } // returns the front element @@ -130,6 +168,14 @@ class item_buffer { return get_my_item(my_head); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& front_metainfo() const + { + __TBB_ASSERT(my_item_valid(my_head), "attempt to fetch head non-item"); + return element(my_head).metainfo; + } +#endif + // returns the back element const item_type& back() const { @@ -137,9 +183,16 @@ class item_buffer { return get_my_item(my_tail - 1); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& back_metainfo() const { + __TBB_ASSERT(my_item_valid(my_tail - 1), "attempt to fetch head non-item"); + return element(my_tail - 1).metainfo; + } +#endif + // following methods are for reservation of the front of a buffer. - void reserve_item(size_type i) { __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); item(i).second = reserved_item; } - void release_item(size_type i) { __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); item(i).second = has_item; } + void reserve_item(size_type i) { __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); element(i).state = reserved_item; } + void release_item(size_type i) { __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); element(i).state = has_item; } void destroy_front() { destroy_item(my_head); ++my_head; } void destroy_back() { destroy_item(my_tail-1); --my_tail; } @@ -163,14 +216,18 @@ class item_buffer { buffer_item_type* new_array = allocator_type().allocate(new_size); // initialize validity to "no" - for( size_type i=0; isecond = no_item; } + for( size_type i=0; istate = no_item; } for( size_type i=my_head; ifirst); + char *new_space = (char *)&(new_array[i&(new_size-1)].begin()->item); (void)new(new_space) item_type(get_my_item(i)); - new_array[i&(new_size-1)].begin()->second = item(i).second; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + char* meta_space = (char *)&(new_array[i&(new_size-1)].begin()->metainfo); + ::new(meta_space) message_metainfo(std::move(element(i).metainfo)); +#endif + new_array[i&(new_size-1)].begin()->state = element(i).state; } } @@ -189,6 +246,17 @@ class item_buffer { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool push_back(item_type& v, const message_metainfo& metainfo) { + if (buffer_full()) { + grow_my_array(size() + 1); + } + set_my_item(my_tail, v, metainfo); + ++my_tail; + return true; + } +#endif + bool pop_back(item_type &v) { if (!my_item_valid(my_tail-1)) { return false; @@ -198,6 +266,22 @@ class item_buffer { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool pop_back(item_type& v, message_metainfo* metainfo_ptr) { + __TBB_ASSERT(metainfo_ptr != nullptr, nullptr); + + if (!my_item_valid(my_tail - 1)) { + return false; + } + auto& e = element(my_tail - 1); + v = e.item; + *metainfo_ptr = std::move(e.metainfo); + + destroy_back(); + return true; + } +#endif + bool pop_front(item_type &v) { if(!my_item_valid(my_head)) { return false; @@ -207,6 +291,22 @@ class item_buffer { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool pop_front(item_type& v, message_metainfo* metainfo_ptr) { + __TBB_ASSERT(metainfo_ptr != nullptr, nullptr); + + if (!my_item_valid(my_head)) { + return false; + } + auto& e = element(my_head); + v = e.item; + *metainfo_ptr = std::move(e.metainfo); + + destroy_front(); + return true; + } +#endif + // This is used both for reset and for grow_my_array. In the case of grow_my_array // we want to retain the values of the head and tail. void clean_up_buffer(bool reset_pointers) { diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index d215b017f0..c23a655a9e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -294,6 +294,11 @@ return nullptr; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for rejecting join_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } @@ -455,6 +460,11 @@ return op_data.bypass_t; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for queueing join_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } @@ -605,6 +615,11 @@ return rtask; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for key_matching join_node + graph_task* try_put_task(const input_type&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index dc89d20e2c..598eeaaebb 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -34,6 +34,12 @@ class function_input_queue : public item_buffer { return this->item_buffer::front(); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& front_metainfo() const { + return this->item_buffer::front_metainfo(); + } +#endif + void pop() { this->destroy_front(); } @@ -41,6 +47,12 @@ class function_input_queue : public item_buffer { bool push( T& t ) { return this->push_back( t ); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool push( T& t, const message_metainfo& metainfo ) { + return this->push_back(t, metainfo); + } +#endif }; //! Input and scheduling for a function node that takes a type Input as input @@ -87,11 +99,14 @@ class function_input_base : public receiver, no_assign { } graph_task* try_put_task( const input_type& t) override { - if ( my_is_no_throw ) - return try_put_task_impl(t, has_policy()); - else - return try_put_task_impl(t, std::false_type()); + return try_put_task_base(t); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* try_put_task( const input_type& t, const message_metainfo& metainfo ) override { + return try_put_task_base(t, metainfo); } +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Adds src to the list of cached predecessors. bool register_predecessor( predecessor_type &src ) override { @@ -148,6 +163,9 @@ class function_input_base : public receiver, no_assign { private: friend class apply_body_task_bypass< class_type, input_type >; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + friend class apply_body_task_bypass< class_type, input_type, message_metainfo >; +#endif friend class forward_task_bypass< class_type >; class operation_type : public d1::aggregated_operation< operation_type > { @@ -158,8 +176,16 @@ class function_input_base : public receiver, no_assign { predecessor_type *r; }; graph_task* bypass_t; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo* metainfo; +#endif operation_type(const input_type& e, op_type t) : - type(char(t)), elem(const_cast(&e)), bypass_t(nullptr) {} + type(char(t)), elem(const_cast(&e)), bypass_t(nullptr), metainfo(nullptr) {} +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + operation_type(const input_type& e, op_type t, const message_metainfo& info) : + type(char(t)), elem(const_cast(&e)), bypass_t(nullptr), + metainfo(const_cast(&info)) {} +#endif operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {} }; @@ -173,7 +199,18 @@ class function_input_base : public receiver, no_assign { if(my_queue) { if(!my_queue->empty()) { ++my_concurrency; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: consider removing metainfo from the queue using move semantics to avoid + // ref counter increase + message_metainfo queued_metainfo = my_queue->front_metainfo(); + if (queued_metainfo.empty()) { + new_task = create_body_task(my_queue->front()); + } else { + new_task = create_body_task(my_queue->front(), queued_metainfo); + } +#else new_task = create_body_task(my_queue->front()); +#endif my_queue->pop(); } @@ -233,10 +270,12 @@ class function_input_base : public receiver, no_assign { __TBB_ASSERT(my_max_concurrency != 0, nullptr); if (my_concurrency < my_max_concurrency) { ++my_concurrency; - graph_task * new_task = create_body_task(*(op->elem)); + graph_task* new_task = op->metainfo != nullptr ? create_body_task(*(op->elem), *(op->metainfo)) + : create_body_task(*(op->elem)); op->bypass_t = new_task; op->status.store(SUCCEEDED, std::memory_order_release); - } else if ( my_queue && my_queue->push(*(op->elem)) ) { + } else if ( my_queue && (op->metainfo != nullptr ? my_queue->push(*(op->elem), *(op->metainfo)) + : my_queue->push(*(op->elem))) ) { op->bypass_t = SUCCESSFULLY_ENQUEUED; op->status.store(SUCCEEDED, std::memory_order_release); } else { @@ -258,8 +297,9 @@ class function_input_base : public receiver, no_assign { } } - graph_task* internal_try_put_bypass( const input_type& t ) { - operation_type op_data(t, tryput_bypass); + template + graph_task* internal_try_put_bypass( const input_type& t, const Metainfo&... metainfo) { + operation_type op_data(t, tryput_bypass, metainfo...); my_aggregator.execute(&op_data); if( op_data.status == SUCCEEDED ) { return op_data.bypass_t; @@ -267,42 +307,54 @@ class function_input_base : public receiver, no_assign { return nullptr; } - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { + template + graph_task* try_put_task_base(const input_type& t, const Metainfo&... metainfo) { + if ( my_is_no_throw ) + return try_put_task_impl(t, has_policy(), metainfo...); + else + return try_put_task_impl(t, std::false_type(), metainfo...); + } + + template + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type, const Metainfo&... metainfo) { if( my_max_concurrency == 0 ) { - return apply_body_bypass(t); + return apply_body_bypass(t, metainfo...); } else { operation_type check_op(t, occupy_concurrency); my_aggregator.execute(&check_op); if( check_op.status == SUCCEEDED ) { - return apply_body_bypass(t); + return apply_body_bypass(t, metainfo...); } - return internal_try_put_bypass(t); + return internal_try_put_bypass(t, metainfo...); } } - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { + template + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type, const Metainfo&... metainfo) { if( my_max_concurrency == 0 ) { - return create_body_task(t); + return create_body_task(t, metainfo...); } else { - return internal_try_put_bypass(t); + return internal_try_put_bypass(t, metainfo...); } } //! Applies the body to the provided input // then decides if more work is available - graph_task* apply_body_bypass( const input_type &i ) { - return static_cast(this)->apply_body_impl_bypass(i); + template + graph_task* apply_body_bypass( const input_type &i, const Metainfo&... metainfo ) { + return static_cast(this)->apply_body_impl_bypass(i, metainfo...); } //! allocates a task to apply a body - graph_task* create_body_task( const input_type &input ) { + template + graph_task* create_body_task( const input_type &input, const Metainfo&... metainfo) { if (!is_graph_active(my_graph_ref)) { return nullptr; } // TODO revamp: extract helper for common graph task allocation part d1::small_object_allocator allocator{}; - typedef apply_body_task_bypass task_type; - graph_task* t = allocator.new_object( my_graph_ref, allocator, *this, input, my_priority ); + typedef apply_body_task_bypass task_type; + graph_task* t = allocator.new_object( my_graph_ref, allocator, *this, input, my_priority, metainfo...); return t; } @@ -396,7 +448,8 @@ class function_input : public function_input_base + graph_task* apply_body_impl_bypass( const input_type &i, const Metainfo&... metainfo) { output_type v = apply_body_impl(i); graph_task* postponed_task = nullptr; if( base_type::my_max_concurrency != 0 ) { @@ -408,7 +461,7 @@ class function_input : public function_input_base #include +#include #include #if __TBB_CPP20_CONCEPTS_PRESENT #include @@ -187,6 +188,24 @@ static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* return left; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class message_metainfo { +public: + using waiters_type = std::forward_list; + + message_metainfo() = default; + + message_metainfo(const waiters_type& waiters) : my_waiters(waiters) {} + message_metainfo(waiters_type&& waiters) : my_waiters(std::move(waiters)) {} + + const waiters_type& waiters() const { return my_waiters; } + + bool empty() const { return my_waiters.empty(); } +private: + waiters_type my_waiters; +}; // class message_metainfo +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Pure virtual template class that defines a sender of messages of type T template< typename T > class sender { @@ -250,6 +269,23 @@ class receiver { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Put an item to the receiver and wait for completion + bool try_put_and_wait( const T& t ) { + d1::small_object_allocator alloc{}; + d1::wait_context_vertex* msg_wait_context = alloc.new_object(); + + graph_task* res = try_put_task(t, message_metainfo{message_metainfo::waiters_type{msg_wait_context}}); + if (!res) return false; + if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); + + __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph"); + + wait(msg_wait_context->get_context(), *graph_reference().my_context); + return true; + } +#endif + //! put item to successor; return task to run the successor if possible. protected: //! The input type of this receiver @@ -262,6 +298,9 @@ class receiver { template< typename X, typename Y > friend class broadcast_cache; template< typename X, typename Y > friend class round_robin_cache; virtual graph_task *try_put_task(const T& t) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task *try_put_task(const T& t, const message_metainfo&) = 0; +#endif virtual graph& graph_reference() const = 0; template friend class successor_cache; @@ -350,6 +389,11 @@ class continue_receiver : public receiver< continue_msg > { return res? res : SUCCESSFULLY_ENQUEUED; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add metainfo support for continue_receiver + graph_task* try_put_task( const input_type&, const message_metainfo& ) override { return nullptr; } +#endif + spin_mutex my_mutex; int my_predecessor_count; int my_current_count; @@ -961,6 +1005,11 @@ class split_node : public graph_node, public receiver { // Also, we do not have successors here. So we just tell the task returned here is successful. return emit_element::emit_this(this->my_graph, t, output_ports()); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for split_node + graph_task* try_put_task(const TupleType&, const message_metainfo&) { return nullptr; } +#endif + void reset_node(reset_flags f) override { if (f & rf_clear_edges) clear_element::clear_this(my_output_ports); @@ -1129,6 +1178,11 @@ class broadcast_node : public graph_node, public receiver, public sender { return new_task; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for broadcast_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_graph; } @@ -1477,6 +1531,11 @@ class buffer_node return ft; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for buffer_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_graph; } @@ -2111,6 +2170,11 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > return rtask; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for limiter_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif + graph& graph_reference() const override { return my_graph; } void reset_node( reset_flags f ) override { @@ -3105,6 +3169,11 @@ class overwrite_node : public graph_node, public receiver, public sender { return try_put_task_impl(v); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for overwrite_node + graph_task* try_put_task(const input_type&, const message_metainfo&) { return nullptr; } +#endif + graph_task * try_put_task_impl(const input_type &v) { my_buffer = v; my_buffer_is_valid = true; @@ -3194,6 +3263,11 @@ class write_once_node : public overwrite_node { spin_mutex::scoped_lock l( this->my_mutex ); return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for write_once_node + graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } +#endif }; // write_once_node inline void set_name(const graph& g, const char *name) { diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 5eb6d1d27f..7ab2e0734d 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -282,6 +282,10 @@ struct harness_counting_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { + return try_put_task(t); + } + void validate() { size_t n = my_count; CHECK( n == num_copies*max_value ); @@ -332,6 +336,10 @@ struct harness_mapped_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { + return try_put_task(t); + } + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index aa7e41ca59..701c7af909 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -18,6 +18,7 @@ #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated #endif +#include #include "common/config.h" #include "tbb/flow_graph.h" @@ -469,6 +470,251 @@ void test_follows_and_precedes_api() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { + tbb::task_arena arena(1); + + arena.execute([&]{ + tbb::flow::graph g; + + std::vector start_work_items; + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + if (i != 0) { + new_work_items.emplace_back(i + 10); + } + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + for (int i = 0; i < wait_message; ++i) { + function.try_put(i); + } + + function.try_put_and_wait(wait_message); + + std::size_t check_index = 0; + + // For lightweight function_node, start_work_items are expected to be processed first + // while putting items into the first node. + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + + if (concurrency_limit == tbb::flow::serial) { + // If the lightweight function_node is serial, it should process the wait_message but add items from new_work_items + // into the queue since the concurrency limit is occupied. + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + } else { + // If the node is unlimited, it should process new_work_items immediately while processing the wait_message + // Hence they should be processed before exiting the try_put_and_wait + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + // wait_message would be processed only after new_work_items + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + } + + g.wait_for_all(); + + if (concurrency_limit == tbb::flow::serial) { + // For the serial node, processing of new_work_items would be postponed to wait_for_all since they + // would be queued and spawned after working with wait_message + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } + }); +} + +void test_try_put_and_wait_queueing(std::size_t concurrency_limit) { + tbb::task_arena arena(1); + arena.execute([&]{ + tbb::flow::graph g; + + std::vector start_work_items; + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + if (i != 0) { + new_work_items.emplace_back(i + 10); + } + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + for (int i = 0; i < wait_message; ++i) { + function.try_put(i); + } + + function.try_put_and_wait(wait_message); + + std::size_t check_index = 0; + + if (concurrency_limit == tbb::flow::serial) { + // Serial queueing function_node should add all start_work_items except the first one into the queue + // and then process them in FIFO order. + // wait_message would also be added to the queue, but would be processed later + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } + + // For the unlimited function_node, all of the tasks for start_work_items and wait_message would be spawned + // and hence processed by the thread in LIFO order. + // The first processed item is expected to be wait_message since it was spawned last + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + + g.wait_for_all(); + + if (concurrency_limit == tbb::flow::serial) { + // For serial queueing function_node, the new_work_items are expected to be processed while calling to wait_for_all + // They would be queued and processed later in FIFO order + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } else { + // Unlimited function_node would always spawn tasks immediately without adding them into the queue + // They would be processed in LIFO order. Hence it is expected that new_work_items would be processed first in reverse order + // After them, start_work_items would be processed also in reverse order + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], "Unexpected items processing"); + } + for (std::size_t i = start_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[i - 1], "Unexpected items processing"); + } + } + }); +} + +void test_try_put_and_wait_rejecting(size_t concurrency_limit) { + tbb::task_arena arena(1); + + arena.execute([&]{ + tbb::flow::graph g; + + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 0; + + for (int i = 1; i < wait_message; ++i) { + new_work_items.emplace_back(i); + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + // If the first action is try_put_and_wait, it will occupy concurrency of the function_node + // All submits of new_work_items inside of the body should be rejected + bool result = function.try_put_and_wait(wait_message); + CHECK_MESSAGE(result, "task should not rejected since the node concurrency is not acquired"); + + CHECK_MESSAGE(processed_items.size() == 1, nullptr); + CHECK_MESSAGE(processed_items[0] == wait_message, "Unexpected items processing"); + + g.wait_for_all(); + + CHECK_MESSAGE(processed_items.size() == 1, nullptr); + + processed_items.clear(); + + // If the first action is try_put, try_put_and_wait is expected to return false since the concurrency of the + // node would be acquired + function.try_put(0); + result = function.try_put_and_wait(wait_message); + CHECK_MESSAGE(!result, "task should be rejected since the node concurrency is acquired"); + CHECK(processed_items.empty()); + + g.wait_for_all(); + + CHECK(processed_items.size() == 1); + CHECK_MESSAGE(processed_items[0] == 0, "Unexpected items processing"); + }); +} + +void test_try_put_and_wait() { + test_try_put_and_wait_lightweight(tbb::flow::serial); + test_try_put_and_wait_lightweight(tbb::flow::unlimited); + + test_try_put_and_wait_queueing(tbb::flow::serial); + test_try_put_and_wait_queueing(tbb::flow::unlimited); + + test_try_put_and_wait_rejecting(tbb::flow::serial); + // TODO: add integration test with buffering node before rejecting function_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Test various node bodies with concurrency //! \brief \ref error_guessing @@ -544,3 +790,10 @@ TEST_CASE("constraints for function_node body") { static_assert(!can_call_function_node_ctor>); } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test function_node try_put_and_wait") { + test_try_put_and_wait(); +} +#endif From dbe50f78db053b938a55e330770f62404e5d37c3 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 11 Jun 2024 08:16:18 -0700 Subject: [PATCH 02/22] Fix non-CPF build --- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 2 ++ include/oneapi/tbb/detail/_flow_graph_node_impl.h | 14 +++++++++++++- test/common/graph_utils.h | 4 ++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index f22e3d7442..a12a45889b 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -376,9 +376,11 @@ class broadcast_cache : public successor_cache { return try_put_task_impl(t); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override { return try_put_task_impl(t, metainfo); } +#endif // call try_put_task and return list of received tasks bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 598eeaaebb..5125429878 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -180,7 +180,11 @@ class function_input_base : public receiver, no_assign { message_metainfo* metainfo; #endif operation_type(const input_type& e, op_type t) : - type(char(t)), elem(const_cast(&e)), bypass_t(nullptr), metainfo(nullptr) {} + type(char(t)), elem(const_cast(&e)), bypass_t(nullptr) +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + , metainfo(nullptr) +#endif + {} #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT operation_type(const input_type& e, op_type t, const message_metainfo& info) : type(char(t)), elem(const_cast(&e)), bypass_t(nullptr), @@ -270,12 +274,20 @@ class function_input_base : public receiver, no_assign { __TBB_ASSERT(my_max_concurrency != 0, nullptr); if (my_concurrency < my_max_concurrency) { ++my_concurrency; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT graph_task* new_task = op->metainfo != nullptr ? create_body_task(*(op->elem), *(op->metainfo)) : create_body_task(*(op->elem)); +#else + graph_task* new_task = create_body_task(*(op->elem)); +#endif op->bypass_t = new_task; op->status.store(SUCCEEDED, std::memory_order_release); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT } else if ( my_queue && (op->metainfo != nullptr ? my_queue->push(*(op->elem), *(op->metainfo)) : my_queue->push(*(op->elem))) ) { +#else + } else if ( my_queue && my_queue->push(*(op->elem)) ) { +#endif op->bypass_t = SUCCESSFULLY_ENQUEUED; op->status.store(SUCCEEDED, std::memory_order_release); } else { diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 7ab2e0734d..73644d9969 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -282,9 +282,11 @@ struct harness_counting_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { return try_put_task(t); } +#endif void validate() { size_t n = my_count; @@ -336,9 +338,11 @@ struct harness_mapped_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { return try_put_task(t); } +#endif tbb::flow::graph& graph_reference() const override { return my_graph; From 660646e666003929cbd347b14be4ec3a6555fb72 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 11 Jun 2024 08:35:01 -0700 Subject: [PATCH 03/22] Add try_put_and_wait_production branch to the CI trigger list --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d758aee050..ce8bfb7285 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ on: branches: [master] pull_request: - branches: [master] + branches: [master, dev/kboyarinov/try_put_and_wait_production] types: - opened - synchronize From 531da5b051a8b9311daecc849546bd899bc4d0c8 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Thu, 13 Jun 2024 05:11:14 -0700 Subject: [PATCH 04/22] Fix test issues --- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 3 ++- test/tbb/test_broadcast_node.cpp | 6 ++++++ test/tbb/test_input_node.cpp | 6 ++++++ test/tbb/test_limiter_node.cpp | 12 ++++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 5125429878..3261c14205 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -587,7 +587,8 @@ class multifunction_input : public function_input_base + graph_task* apply_body_impl_bypass( const input_type &i, const Metainfo&... ) { fgt_begin_body( my_body ); (*my_body)(i, my_output_ports); fgt_end_body( my_body ); diff --git a/test/tbb/test_broadcast_node.cpp b/test/tbb/test_broadcast_node.cpp index 8c02957af6..14a3e564a2 100644 --- a/test/tbb/test_broadcast_node.cpp +++ b/test/tbb/test_broadcast_node.cpp @@ -73,6 +73,12 @@ class counting_array_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + tbb::task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_input_node.cpp b/test/tbb/test_input_node.cpp index 629c0be29c..c18b2e9a94 100644 --- a/test/tbb/test_input_node.cpp +++ b/test/tbb/test_input_node.cpp @@ -61,6 +61,12 @@ class test_push_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* try_put_task( const T& v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index 53fa63dc58..d11a6625de 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -53,6 +53,12 @@ struct serial_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } @@ -71,6 +77,12 @@ struct parallel_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } From f97c7947170f9d3e10ba0f5b3d93b03ad08e0228 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Thu, 13 Jun 2024 07:05:33 -0700 Subject: [PATCH 05/22] Fix CI issues --- include/oneapi/tbb/detail/_config.h | 2 +- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_indexer_impl.h | 4 ++-- include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_join_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- include/oneapi/tbb/flow_graph.h | 4 ++-- test/common/graph_utils.h | 2 +- test/tbb/test_broadcast_node.cpp | 2 +- test/tbb/test_function_node.cpp | 2 +- test/tbb/test_input_node.cpp | 2 +- test/tbb/test_limiter_node.cpp | 2 +- 14 files changed, 16 insertions(+), 16 deletions(-) diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index f7c821a3c0..e01f1b35b7 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index e55ff1807c..2d4dfb2ad3 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index a12a45889b..d0bf953b34 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 0a766352cb..4dd68ed7e7 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h index dc43bb6589..dbd9e881db 100644 --- a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -81,7 +81,7 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for indexer_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 1edaf4d82e..4dd457bdc8 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index c23a655a9e..2712309d69 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 3261c14205..b19c1f4e4b 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 6f83268037..b02274dbae 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -110,7 +110,7 @@ concept join_node_function_object = std::copy_constructible && template concept input_node_body = std::copy_constructible && - requires( Body& body, tbb::detail::d2::flow_control& fc ) { + requires( Body& body, tbb::detail::d1::flow_control& fc ) { { body(fc) } -> adaptive_same_as; }; diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 73644d9969..aaefd645ea 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_broadcast_node.cpp b/test/tbb/test_broadcast_node.cpp index 14a3e564a2..b55455fc91 100644 --- a/test/tbb/test_broadcast_node.cpp +++ b/test/tbb/test_broadcast_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 701c7af909..9a9b462a1b 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_input_node.cpp b/test/tbb/test_input_node.cpp index c18b2e9a94..9442693980 100644 --- a/test/tbb/test_input_node.cpp +++ b/test/tbb/test_input_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index d11a6625de..66611ce19e 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2023 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 7afdd9dc5dffa83484d7eb7d95d7bc7f017a2467 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Thu, 13 Jun 2024 07:30:34 -0700 Subject: [PATCH 06/22] + more use of override keyword --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_join_impl.h | 6 +++--- include/oneapi/tbb/flow_graph.h | 12 ++++++------ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 2d4dfb2ad3..d1bbf4feb5 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -378,7 +378,7 @@ class threshold_regulator { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for round robin cache - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif }; diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index 2712309d69..f66d7531b2 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -296,7 +296,7 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for rejecting join_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { @@ -462,7 +462,7 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for queueing join_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { @@ -617,7 +617,7 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for key_matching join_node - graph_task* try_put_task(const input_type&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const input_type&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index b02274dbae..78c4ae8a53 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -1007,7 +1007,7 @@ class split_node : public graph_node, public receiver { } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for split_node - graph_task* try_put_task(const TupleType&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const TupleType&, const message_metainfo&) override { return nullptr; } #endif void reset_node(reset_flags f) override { @@ -1180,7 +1180,7 @@ class broadcast_node : public graph_node, public receiver, public sender { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for broadcast_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { @@ -1533,7 +1533,7 @@ class buffer_node #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for buffer_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { @@ -2172,7 +2172,7 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for limiter_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif graph& graph_reference() const override { return my_graph; } @@ -3171,7 +3171,7 @@ class overwrite_node : public graph_node, public receiver, public sender { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for overwrite_node - graph_task* try_put_task(const input_type&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const input_type&, const message_metainfo&) override { return nullptr; } #endif graph_task * try_put_task_impl(const input_type &v) { @@ -3266,7 +3266,7 @@ class write_once_node : public overwrite_node { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for write_once_node - graph_task* try_put_task(const T&, const message_metainfo&) { return nullptr; } + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } #endif }; // write_once_node From da1591644016f138889881f391b46e1dfedb498b Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Fri, 21 Jun 2024 03:46:51 -0700 Subject: [PATCH 07/22] vertexes->vertices, remove unnecessary iostream include --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 4 +++- include/oneapi/tbb/detail/_flow_graph_impl.h | 18 +++++++++--------- test/tbb/test_function_node.cpp | 1 - 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index d1bbf4feb5..1e77712f2e 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -301,9 +301,11 @@ class apply_body_task_bypass return my_node.apply_body_bypass(my_input); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT graph_task* call_apply_body_bypass_impl(std::false_type) { - return my_node.apply_body_bypass(my_input, this->get_msg_wait_context_vertexes()); + return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()}); } +#endif graph_task* call_apply_body_bypass() { return call_apply_body_bypass_impl(std::is_same{}); diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 4dd68ed7e7..fd6161f719 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -153,31 +153,31 @@ class graph_task_with_message_waiters : public graph_task { const std::forward_list& msg_waiters, node_priority_t node_priority = no_priority) : graph_task(g, allocator, node_priority) - , my_msg_wait_context_vertexes(msg_waiters) + , my_msg_wait_context_vertices(msg_waiters) { for (auto& msg_waiter : msg_waiters) { - my_msg_reference_vertexes.emplace_back(r1::get_thread_reference_vertex(msg_waiter)); - my_msg_reference_vertexes.back()->reserve(1); + my_msg_reference_vertices.emplace_back(r1::get_thread_reference_vertex(msg_waiter)); + my_msg_reference_vertices.back()->reserve(1); } } - const std::forward_list get_msg_wait_context_vertexes() const { - return my_msg_wait_context_vertexes; + const std::forward_list get_msg_wait_context_vertices() const { + return my_msg_wait_context_vertices; } protected: template void finalize(const d1::execution_data& ed) { - auto msg_reference_vertexes = std::move(my_msg_reference_vertexes); + auto msg_reference_vertices = std::move(my_msg_reference_vertices); graph_task::finalize(ed); - for (auto& msg_waiter : msg_reference_vertexes) { + for (auto& msg_waiter : msg_reference_vertices) { msg_waiter->release(1); } } private: - std::forward_list my_msg_wait_context_vertexes; - std::list my_msg_reference_vertexes; + std::forward_list my_msg_wait_context_vertices; + std::list my_msg_reference_vertices; }; // class graph_task_with_message_waiters #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 9a9b462a1b..4b9c0fb818 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -18,7 +18,6 @@ #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated #endif -#include #include "common/config.h" #include "tbb/flow_graph.h" From 92bf552e643af4a7409716f1eaaae9dba42550f0 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 25 Jun 2024 06:44:39 -0700 Subject: [PATCH 08/22] Remove unnecessary variadics --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 46 ++++---- .../tbb/detail/_flow_graph_cache_impl.h | 13 +- include/oneapi/tbb/detail/_flow_graph_impl.h | 12 +- .../tbb/detail/_flow_graph_indexer_impl.h | 4 +- .../tbb/detail/_flow_graph_item_buffer_impl.h | 87 ++++++-------- .../oneapi/tbb/detail/_flow_graph_join_impl.h | 8 +- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 111 +++++++++--------- include/oneapi/tbb/flow_graph.h | 33 ++++-- 8 files changed, 171 insertions(+), 143 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 1e77712f2e..4888cdd194 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -276,29 +276,23 @@ struct graph_task_base : std::conditional {}; + +template +using graph_task_base_t = typename graph_task_base::type; #endif //! A task that calls a node's apply_body_bypass function, passing in an input of type Input // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr -template< typename NodeType, typename Input, typename... Metainfo > +template< typename NodeType, typename Input, typename BaseTaskType = graph_task> class apply_body_task_bypass -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - : public graph_task_base::type -#else - : public graph_task -#endif + : public BaseTaskType { NodeType &my_node; Input my_input; -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - using base_type = typename graph_task_base::type; -#else - using base_type = graph_task; -#endif - graph_task* call_apply_body_bypass_impl(std::true_type) { - return my_node.apply_body_bypass(my_input); + return my_node.apply_body_bypass(my_input + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT @@ -308,15 +302,23 @@ class apply_body_task_bypass #endif graph_task* call_apply_body_bypass() { - return call_apply_body_bypass_impl(std::is_same{}); + return call_apply_body_bypass_impl(std::is_same{}); } public: +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template ::value>::type> apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i, - node_priority_t node_priority = no_priority, - const Metainfo&... metainfo ) - : base_type(g, allocator, metainfo.waiters()..., node_priority), - my_node(n), my_input(i) {} + node_priority_t node_priority, Metainfo&& metainfo ) + : BaseTaskType(g, allocator, std::forward(metainfo).waiters(), node_priority) + , my_node(n), my_input(i) {} +#endif + + apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i, + node_priority_t node_priority = no_priority ) + : BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {} d1::task* execute(d1::execution_data& ed) override { graph_task* next_task = call_apply_body_bypass(); @@ -324,12 +326,12 @@ class apply_body_task_bypass next_task = nullptr; else if (next_task) next_task = prioritize_task(my_node.graph_reference(), *next_task); - base_type::template finalize(ed); + BaseTaskType::template finalize(ed); return next_task; } d1::task* cancel(d1::execution_data& ed) override { - base_type::template finalize(ed); + BaseTaskType::template finalize(ed); return nullptr; } }; @@ -380,7 +382,9 @@ class threshold_regulator { typedef M mutex_type; typedef typename successor_cache::successors_type successors_type; - template - graph_task* try_put_task_impl( const T& t, const Metainfo&... metainfo) { + graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { graph_task * last_task = nullptr; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task *new_task = (*i)->try_put_task(t, metainfo...); + graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); // workaround for icc bug graph& graph_ref = (*i)->graph_reference(); last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary @@ -373,7 +372,11 @@ class broadcast_cache : public successor_cache { // as above, but call try_put_task instead, and return the last task we received (if any) graph_task* try_put_task( const T &t ) override { +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + return try_put_task_impl(t, message_metainfo{}); +#else return try_put_task_impl(t); +#endif } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT @@ -448,7 +451,9 @@ class round_robin_cache : public successor_cache { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for round robin cache - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } #endif }; diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index fd6161f719..75583a332f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -155,9 +155,13 @@ class graph_task_with_message_waiters : public graph_task { : graph_task(g, allocator, node_priority) , my_msg_wait_context_vertices(msg_waiters) { - for (auto& msg_waiter : msg_waiters) { - my_msg_reference_vertices.emplace_back(r1::get_thread_reference_vertex(msg_waiter)); - my_msg_reference_vertices.back()->reserve(1); + auto last_iterator = my_msg_reference_vertices.cbefore_begin(); + + for (auto& msg_waiter : my_msg_wait_context_vertices) { + d1::wait_tree_vertex_interface* ref_vertex = r1::get_thread_reference_vertex(msg_waiter); + last_iterator = my_msg_reference_vertices.emplace_after(last_iterator, + ref_vertex); + ref_vertex->reserve(1); } } @@ -177,7 +181,7 @@ class graph_task_with_message_waiters : public graph_task { } private: std::forward_list my_msg_wait_context_vertices; - std::list my_msg_reference_vertices; + std::forward_list my_msg_reference_vertices; }; // class graph_task_with_message_waiters #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h index dbd9e881db..eb703bb8a5 100644 --- a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h @@ -81,7 +81,9 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for indexer_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } #endif graph& graph_reference() const override { diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 4dd457bdc8..bfe980b508 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -86,10 +86,8 @@ class item_buffer { // may be called with an empty slot or a slot that has already been constructed into. void set_my_item(size_t i, const item_type &o -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - , const message_metainfo& metainfo -#endif - ) { + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if(element(i).state != no_item) { destroy_item(i); } @@ -104,12 +102,6 @@ class item_buffer { element(i).state = has_item; } -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - void set_my_item(size_t i, const item_type&o) { - set_my_item(i, o, message_metainfo{}); - } -#endif - // destructively-fetch an object from the buffer void fetch_item(size_t i, item_type &o) { __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot"); @@ -237,73 +229,66 @@ class item_buffer { my_array_size = new_size; } - bool push_back(item_type &v) { - if(buffer_full()) { - grow_my_array(size() + 1); - } - set_my_item(my_tail, v); - ++my_tail; - return true; - } - -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - bool push_back(item_type& v, const message_metainfo& metainfo) { + bool push_back(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if (buffer_full()) { grow_my_array(size() + 1); } - set_my_item(my_tail, v, metainfo); + set_my_item(my_tail, v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); ++my_tail; return true; } -#endif - - bool pop_back(item_type &v) { - if (!my_item_valid(my_tail-1)) { - return false; - } - v = this->back(); - destroy_back(); - return true; - } - -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - bool pop_back(item_type& v, message_metainfo* metainfo_ptr) { - __TBB_ASSERT(metainfo_ptr != nullptr, nullptr); + bool pop_back(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo)) + { if (!my_item_valid(my_tail - 1)) { return false; } auto& e = element(my_tail - 1); v = e.item; - *metainfo_ptr = std::move(e.metainfo); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + metainfo = std::move(e.metainfo); +#endif destroy_back(); return true; } -#endif - bool pop_front(item_type &v) { - if(!my_item_valid(my_head)) { + bool pop_front(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo)) + { + if (!my_item_valid(my_head)) { return false; } - v = this->front(); + auto& e = element(my_head); + v = e.item; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + metainfo = std::move(e.metainfo); +#endif + destroy_front(); return true; } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - bool pop_front(item_type& v, message_metainfo* metainfo_ptr) { - __TBB_ASSERT(metainfo_ptr != nullptr, nullptr); + void set_my_item(size_t i, const item_type&o) { + set_my_item(i, o, message_metainfo{}); + } - if (!my_item_valid(my_head)) { - return false; - } - auto& e = element(my_head); - v = e.item; - *metainfo_ptr = std::move(e.metainfo); + bool push_back(item_type& v) { + return push_back(v, message_metainfo{}); + } - destroy_front(); - return true; + bool pop_back(item_type& v) { + message_metainfo metainfo; + return pop_back(v, metainfo); + } + + bool pop_front(item_type& v) { + message_metainfo metainfo; + return pop_front(v, metainfo); } #endif diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index f66d7531b2..cb854fdd5b 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -462,7 +462,9 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for queueing join_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } #endif graph& graph_reference() const override { @@ -617,7 +619,9 @@ #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for key_matching join_node - graph_task* try_put_task(const input_type&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const input_type& v, const message_metainfo&) override { + return try_put_task(v); + } #endif graph& graph_reference() const override { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index b19c1f4e4b..62cc329e48 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -99,7 +99,7 @@ class function_input_base : public receiver, no_assign { } graph_task* try_put_task( const input_type& t) override { - return try_put_task_base(t); + return try_put_task_base(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT @@ -164,7 +164,7 @@ class function_input_base : public receiver, no_assign { friend class apply_body_task_bypass< class_type, input_type >; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - friend class apply_body_task_bypass< class_type, input_type, message_metainfo >; + friend class apply_body_task_bypass< class_type, input_type, graph_task_with_message_waiters>; #endif friend class forward_task_bypass< class_type >; @@ -203,18 +203,10 @@ class function_input_base : public receiver, no_assign { if(my_queue) { if(!my_queue->empty()) { ++my_concurrency; -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: consider removing metainfo from the queue using move semantics to avoid // ref counter increase - message_metainfo queued_metainfo = my_queue->front_metainfo(); - if (queued_metainfo.empty()) { - new_task = create_body_task(my_queue->front()); - } else { - new_task = create_body_task(my_queue->front(), queued_metainfo); - } -#else - new_task = create_body_task(my_queue->front()); -#endif + new_task = create_body_task(my_queue->front() + __TBB_FLOW_GRAPH_METAINFO_ARG(my_queue->front_metainfo())); my_queue->pop(); } @@ -223,7 +215,7 @@ class function_input_base : public receiver, no_assign { input_type i; if(my_predecessors.get_item(i)) { ++my_concurrency; - new_task = create_body_task(i); + new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } } return new_task; @@ -274,20 +266,13 @@ class function_input_base : public receiver, no_assign { __TBB_ASSERT(my_max_concurrency != 0, nullptr); if (my_concurrency < my_max_concurrency) { ++my_concurrency; -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - graph_task* new_task = op->metainfo != nullptr ? create_body_task(*(op->elem), *(op->metainfo)) - : create_body_task(*(op->elem)); -#else - graph_task* new_task = create_body_task(*(op->elem)); -#endif + graph_task* new_task = create_body_task(*(op->elem) + __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))); op->bypass_t = new_task; op->status.store(SUCCEEDED, std::memory_order_release); -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - } else if ( my_queue && (op->metainfo != nullptr ? my_queue->push(*(op->elem), *(op->metainfo)) - : my_queue->push(*(op->elem))) ) { -#else - } else if ( my_queue && my_queue->push(*(op->elem)) ) { -#endif + } else if ( my_queue && my_queue->push(*(op->elem) + __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))) ) + { op->bypass_t = SUCCESSFULLY_ENQUEUED; op->status.store(SUCCEEDED, std::memory_order_release); } else { @@ -309,9 +294,10 @@ class function_input_base : public receiver, no_assign { } } - template - graph_task* internal_try_put_bypass( const input_type& t, const Metainfo&... metainfo) { - operation_type op_data(t, tryput_bypass, metainfo...); + graph_task* internal_try_put_bypass( const input_type& t + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { + operation_type op_data(t, tryput_bypass __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); my_aggregator.execute(&op_data); if( op_data.status == SUCCEEDED ) { return op_data.bypass_t; @@ -319,54 +305,71 @@ class function_input_base : public receiver, no_assign { return nullptr; } - template - graph_task* try_put_task_base(const input_type& t, const Metainfo&... metainfo) { + graph_task* try_put_task_base(const input_type& t + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if ( my_is_no_throw ) - return try_put_task_impl(t, has_policy(), metainfo...); + return try_put_task_impl(t, has_policy() + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); else - return try_put_task_impl(t, std::false_type(), metainfo...); + return try_put_task_impl(t, std::false_type() + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } - template - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type, const Metainfo&... metainfo) { + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if( my_max_concurrency == 0 ) { - return apply_body_bypass(t, metainfo...); + return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } else { operation_type check_op(t, occupy_concurrency); my_aggregator.execute(&check_op); if( check_op.status == SUCCEEDED ) { - return apply_body_bypass(t, metainfo...); + return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } - return internal_try_put_bypass(t, metainfo...); + return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } } - template - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type, const Metainfo&... metainfo) { + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if( my_max_concurrency == 0 ) { - return create_body_task(t, metainfo...); + return create_body_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } else { - return internal_try_put_bypass(t, metainfo...); + return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } } //! Applies the body to the provided input // then decides if more work is available - template - graph_task* apply_body_bypass( const input_type &i, const Metainfo&... metainfo ) { - return static_cast(this)->apply_body_impl_bypass(i, metainfo...); + graph_task* apply_body_bypass( const input_type &i + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + + { + return static_cast(this)->apply_body_impl_bypass(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } //! allocates a task to apply a body - template - graph_task* create_body_task( const input_type &input, const Metainfo&... metainfo) { + graph_task* create_body_task( const input_type &input + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if (!is_graph_active(my_graph_ref)) { return nullptr; } // TODO revamp: extract helper for common graph task allocation part d1::small_object_allocator allocator{}; - typedef apply_body_task_bypass task_type; - graph_task* t = allocator.new_object( my_graph_ref, allocator, *this, input, my_priority, metainfo...); + graph_task* t = nullptr; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (!metainfo.empty()) { + using task_type = apply_body_task_bypass; + t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority, metainfo); + } else +#endif + { + using task_type = apply_body_task_bypass; + t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority); + } return t; } @@ -460,8 +463,9 @@ class function_input : public function_input_base - graph_task* apply_body_impl_bypass( const input_type &i, const Metainfo&... metainfo) { + graph_task* apply_body_impl_bypass( const input_type &i + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { output_type v = apply_body_impl(i); graph_task* postponed_task = nullptr; if( base_type::my_max_concurrency != 0 ) { @@ -473,7 +477,7 @@ class function_input : public function_input_base - graph_task* apply_body_impl_bypass( const input_type &i, const Metainfo&... ) { + graph_task* apply_body_impl_bypass( const input_type &i + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) + { fgt_begin_body( my_body ); (*my_body)(i, my_output_ports); fgt_end_body( my_body ); diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 78c4ae8a53..596dba5e82 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -204,6 +204,11 @@ class message_metainfo { private: waiters_type my_waiters; }; // class message_metainfo + +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo + +#else +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Pure virtual template class that defines a sender of messages of type T @@ -391,7 +396,9 @@ class continue_receiver : public receiver< continue_msg > { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add metainfo support for continue_receiver - graph_task* try_put_task( const input_type&, const message_metainfo& ) override { return nullptr; } + graph_task* try_put_task( const input_type& input, const message_metainfo& ) override { + return try_put_task(input); + } #endif spin_mutex my_mutex; @@ -1007,7 +1014,9 @@ class split_node : public graph_node, public receiver { } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for split_node - graph_task* try_put_task(const TupleType&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const TupleType& t, const message_metainfo&) override { + return try_put_task(t); + } #endif void reset_node(reset_flags f) override { @@ -1180,7 +1189,9 @@ class broadcast_node : public graph_node, public receiver, public sender { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for broadcast_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } #endif graph& graph_reference() const override { @@ -1533,7 +1544,9 @@ class buffer_node #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for buffer_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } #endif graph& graph_reference() const override { @@ -2172,7 +2185,9 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for limiter_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } #endif graph& graph_reference() const override { return my_graph; } @@ -3171,7 +3186,9 @@ class overwrite_node : public graph_node, public receiver, public sender { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for overwrite_node - graph_task* try_put_task(const input_type&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const input_type& v, const message_metainfo&) override { + return try_put_task(v); + } #endif graph_task * try_put_task_impl(const input_type &v) { @@ -3266,7 +3283,9 @@ class write_once_node : public overwrite_node { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for write_once_node - graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } #endif }; // write_once_node From 2bb5948d217004663eba0053113ad1ba89432bfa Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 25 Jun 2024 06:47:41 -0700 Subject: [PATCH 09/22] Simplify cache impl --- include/oneapi/tbb/detail/_flow_graph_cache_impl.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index cb115d1ab0..17117da9bd 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -372,11 +372,7 @@ class broadcast_cache : public successor_cache { // as above, but call try_put_task instead, and return the last task we received (if any) graph_task* try_put_task( const T &t ) override { -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - return try_put_task_impl(t, message_metainfo{}); -#else - return try_put_task_impl(t); -#endif + return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT From 73eb5133db1aa884d3d500497cf461d680af1b87 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 25 Jun 2024 07:14:11 -0700 Subject: [PATCH 10/22] Fix unused variable --- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 62cc329e48..5c8fa27bb6 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -592,7 +592,7 @@ class multifunction_input : public function_input_base Date: Tue, 25 Jun 2024 07:20:01 -0700 Subject: [PATCH 11/22] Add stub for continue_input --- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 5c8fa27bb6..9064a5c50f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -725,7 +725,7 @@ class continue_input : public continue_receiver { friend class apply_body_task_bypass< class_type, continue_msg >; //! Applies the body to the provided input - graph_task* apply_body_bypass( input_type ) { + graph_task* apply_body_bypass( input_type__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) ) { // There is an extra copied needed to capture the // body execution without the try_put fgt_begin_body( my_body ); From 6d82b64b1f0ff4c7eb6b74dc1600a6c02d16794d Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 25 Jun 2024 07:21:37 -0700 Subject: [PATCH 12/22] Fix whitespace --- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 9064a5c50f..9a02928ba1 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -725,7 +725,7 @@ class continue_input : public continue_receiver { friend class apply_body_task_bypass< class_type, continue_msg >; //! Applies the body to the provided input - graph_task* apply_body_bypass( input_type__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) ) { + graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) ) { // There is an extra copied needed to capture the // body execution without the try_put fgt_begin_body( my_body ); From bc01522a96e057c6af8f9370e61a313afe706a6a Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Tue, 25 Jun 2024 07:22:31 -0700 Subject: [PATCH 13/22] Fix issues --- include/oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 9a02928ba1..b0c1c26381 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -746,7 +746,7 @@ class continue_input : public continue_receiver { #if _MSC_VER && !__INTEL_COMPILER #pragma warning (pop) #endif - return apply_body_bypass( continue_msg() ); + return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}) ); } else { d1::small_object_allocator allocator{}; From f6f767335308293859488fd65e5e98e5da6880df Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 8 Jul 2024 03:33:15 -0700 Subject: [PATCH 14/22] Address review comments --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 15 +-------- include/oneapi/tbb/detail/_flow_graph_impl.h | 5 +++ .../tbb/detail/_flow_graph_item_buffer_impl.h | 6 ++-- include/oneapi/tbb/flow_graph.h | 32 +++++++++++-------- 4 files changed, 27 insertions(+), 31 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 4888cdd194..06043a76a7 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -270,17 +270,6 @@ class forward_task_bypass : public graph_task { } }; -#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -template -struct graph_task_base : std::conditional -{}; - -template -using graph_task_base_t = typename graph_task_base::type; -#endif - //! A task that calls a node's apply_body_bypass function, passing in an input of type Input // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr template< typename NodeType, typename Input, typename BaseTaskType = graph_task> @@ -307,9 +296,7 @@ class apply_body_task_bypass public: #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - template ::value>::type> + template apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i, node_priority_t node_priority, Metainfo&& metainfo ) : BaseTaskType(g, allocator, std::forward(metainfo).waiters(), node_priority) diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 75583a332f..9baf3c21c7 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -180,6 +180,11 @@ class graph_task_with_message_waiters : public graph_task { } } private: + // Each task that holds information about single message wait_contexts should hold two lists + // The first one is wait_contexts associated with the message itself. They are needed + // to be able to broadcast the list of wait_contexts to the node successors while executing the task. + // The second list is a list of reference vertices for each wait_context_vertex in the first list + // to support the distributed reference counting schema std::forward_list my_msg_wait_context_vertices; std::forward_list my_msg_reference_vertices; }; // class graph_task_with_message_waiters diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index bfe980b508..114a1372a2 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -92,14 +92,14 @@ class item_buffer { destroy_item(i); } new(&(element(i).item)) item_type(o); + element(i).state = has_item; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT new(&element(i).metainfo) message_metainfo(metainfo); for (auto& waiter : metainfo.waiters()) { - waiter->reserve(); + waiter->reserve(1); } #endif - element(i).state = has_item; } // destructively-fetch an object from the buffer @@ -215,11 +215,11 @@ class item_buffer { // placement-new copy-construct; could be std::move char *new_space = (char *)&(new_array[i&(new_size-1)].begin()->item); (void)new(new_space) item_type(get_my_item(i)); + new_array[i&(new_size-1)].begin()->state = element(i).state; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT char* meta_space = (char *)&(new_array[i&(new_size-1)].begin()->metainfo); ::new(meta_space) message_metainfo(std::move(element(i).metainfo)); #endif - new_array[i&(new_size-1)].begin()->state = element(i).state; } } diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 596dba5e82..6cb7b87edc 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -262,32 +262,36 @@ bool remove_successor(sender& s, receiver& r) { //! Pure virtual template class that defines a receiver of messages of type T template< typename T > class receiver { +private: + template + bool internal_try_put(const T& t, TryPutTaskArgs&&... args) { + graph_task* res = try_put_task(t, std::forward(args)...); + if (!res) return false; + if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); + return true; + } + public: //! Destructor virtual ~receiver() {} //! Put an item to the receiver bool try_put( const T& t ) { - graph_task *res = try_put_task(t); - if (!res) return false; - if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); - return true; + return internal_try_put(t); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Put an item to the receiver and wait for completion bool try_put_and_wait( const T& t ) { - d1::small_object_allocator alloc{}; - d1::wait_context_vertex* msg_wait_context = alloc.new_object(); - - graph_task* res = try_put_task(t, message_metainfo{message_metainfo::waiters_type{msg_wait_context}}); - if (!res) return false; - if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); + // Since try_put_and_wait is a blocking call, it is safe to create wait_context on stack + d1::wait_context_vertex msg_wait_context{}; - __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph"); - - wait(msg_wait_context->get_context(), *graph_reference().my_context); - return true; + bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_context}}); + if (res) { + __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph"); + wait(msg_wait_context.get_context(), *graph_reference().my_context); + } + return res; } #endif From e08d81fab0069bc490aceefdfade4d09525ef1a8 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 8 Jul 2024 03:47:39 -0700 Subject: [PATCH 15/22] Rearrange base task constructors --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 2 +- include/oneapi/tbb/detail/_flow_graph_impl.h | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 06043a76a7..5afa34d81d 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -299,7 +299,7 @@ class apply_body_task_bypass template apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i, node_priority_t node_priority, Metainfo&& metainfo ) - : BaseTaskType(g, allocator, std::forward(metainfo).waiters(), node_priority) + : BaseTaskType(g, allocator, node_priority, std::forward(metainfo).waiters()) , my_node(n), my_input(i) {} #endif diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 9baf3c21c7..23e91bc059 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -150,8 +150,8 @@ class graph_task : public d1::task { class graph_task_with_message_waiters : public graph_task { public: graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator, - const std::forward_list& msg_waiters, - node_priority_t node_priority = no_priority) + node_priority_t node_priority, + const std::forward_list& msg_waiters) : graph_task(g, allocator, node_priority) , my_msg_wait_context_vertices(msg_waiters) { @@ -165,6 +165,10 @@ class graph_task_with_message_waiters : public graph_task { } } + graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator, + const std::forward_list& msg_waiters) + : graph_task_with_message_waiters(g, allocator, no_priority, msg_waiters) {} + const std::forward_list get_msg_wait_context_vertices() const { return my_msg_wait_context_vertices; } From 402909197b1b82fd518ae9ae3303d440bb604b93 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Wed, 17 Jul 2024 05:16:45 -0700 Subject: [PATCH 16/22] Introduce separate macro for try_put_and_wait feature --- include/oneapi/tbb/detail/_config.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index e01f1b35b7..043aaf8d94 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -522,7 +522,8 @@ #endif #ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES) +#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \ + || TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT) #endif #if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS From 2033045b0f22953bc566f64c70eb220e91b4f9c0 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 22 Jul 2024 03:54:55 -0700 Subject: [PATCH 17/22] Address review comments --- .../oneapi/tbb/detail/_flow_graph_body_impl.h | 10 ++++++--- .../tbb/detail/_flow_graph_cache_impl.h | 5 ++--- .../tbb/detail/_flow_graph_item_buffer_impl.h | 11 ++++++++-- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 2 +- include/oneapi/tbb/flow_graph.h | 6 ++--- test/common/graph_utils.h | 4 ++-- test/tbb/test_function_node.cpp | 22 +++++++++++-------- 7 files changed, 37 insertions(+), 23 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 5afa34d81d..6b964bcb44 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -279,19 +279,23 @@ class apply_body_task_bypass NodeType &my_node; Input my_input; - graph_task* call_apply_body_bypass_impl(std::true_type) { + using check_metainfo = std::is_same; + using not_contains_metainfo = std::true_type; + using contains_metainfo = std::false_type; + + graph_task* call_apply_body_bypass_impl(not_contains_metainfo) { return my_node.apply_body_bypass(my_input __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - graph_task* call_apply_body_bypass_impl(std::false_type) { + graph_task* call_apply_body_bypass_impl(contains_metainfo) { return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()}); } #endif graph_task* call_apply_body_bypass() { - return call_apply_body_bypass_impl(std::is_same{}); + return call_apply_body_bypass_impl(check_metainfo{}); } public: diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 17117da9bd..eedf946433 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -269,7 +269,7 @@ class successor_cache : no_copy { virtual graph_task* try_put_task( const T& t ) = 0; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo) = 0; + virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0; #endif }; // successor_cache @@ -370,7 +370,6 @@ class broadcast_cache : public successor_cache { // Do not work with the passed pointer here as it may not be fully initialized yet } - // as above, but call try_put_task instead, and return the last task we received (if any) graph_task* try_put_task( const T &t ) override { return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } @@ -447,7 +446,7 @@ class round_robin_cache : public successor_cache { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT // TODO: add support for round robin cache - graph_task* try_put_task(const T& t, const message_metainfo&) override { + graph_task* try_put_task( const T& t, const message_metainfo& ) override { return try_put_task(t); } #endif diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 114a1372a2..0e5b867bf8 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -183,8 +183,15 @@ class item_buffer { #endif // following methods are for reservation of the front of a buffer. - void reserve_item(size_type i) { __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); element(i).state = reserved_item; } - void release_item(size_type i) { __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); element(i).state = has_item; } + void reserve_item(size_type i) { + __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); + element(i).state = reserved_item; + } + + void release_item(size_type i) { + __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); + element(i).state = has_item; + } void destroy_front() { destroy_item(my_head); ++my_head; } void destroy_back() { destroy_item(my_tail-1); --my_tail; } diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index b0c1c26381..7e763aaa32 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -164,7 +164,7 @@ class function_input_base : public receiver, no_assign { friend class apply_body_task_bypass< class_type, input_type >; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - friend class apply_body_task_bypass< class_type, input_type, graph_task_with_message_waiters>; + friend class apply_body_task_bypass< class_type, input_type, graph_task_with_message_waiters >; #endif friend class forward_task_bypass< class_type >; diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 6cb7b87edc..be3640353e 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -284,12 +284,12 @@ class receiver { //! Put an item to the receiver and wait for completion bool try_put_and_wait( const T& t ) { // Since try_put_and_wait is a blocking call, it is safe to create wait_context on stack - d1::wait_context_vertex msg_wait_context{}; + d1::wait_context_vertex msg_wait_vertex{}; - bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_context}}); + bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_vertex}}); if (res) { __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph"); - wait(msg_wait_context.get_context(), *graph_reference().my_context); + wait(msg_wait_vertex.get_context(), *graph_reference().my_context); } return res; } diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index aaefd645ea..83c5fc0253 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -283,7 +283,7 @@ struct harness_counting_receiver : public tbb::flow::receiver { } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo& ) override { return try_put_task(t); } #endif @@ -339,7 +339,7 @@ struct harness_mapped_receiver : public tbb::flow::receiver { } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo&) override { + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo& ) override { return try_put_task(t); } #endif diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 4b9c0fb818..9c4d033135 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -484,9 +484,7 @@ void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { for (int i = 0; i < wait_message; ++i) { start_work_items.emplace_back(i); - if (i != 0) { - new_work_items.emplace_back(i + 10); - } + new_work_items.emplace_back(i + 1 + wait_message); } using function_node_type = tbb::flow::function_node; @@ -529,10 +527,13 @@ void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { if (concurrency_limit == tbb::flow::serial) { // If the lightweight function_node is serial, it should process the wait_message but add items from new_work_items // into the queue since the concurrency limit is occupied. + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + 1, "Unexpected number of elements processed"); CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); } else { // If the node is unlimited, it should process new_work_items immediately while processing the wait_message // Hence they should be processed before exiting the try_put_and_wait + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + new_work_items.size() + 1, + "Unexpected number of elements processed"); for (auto item : new_work_items) { CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); } @@ -549,6 +550,7 @@ void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); } } + CHECK(check_index == processed_items.size()); }); } @@ -565,9 +567,7 @@ void test_try_put_and_wait_queueing(std::size_t concurrency_limit) { for (int i = 0; i < wait_message; ++i) { start_work_items.emplace_back(i); - if (i != 0) { - new_work_items.emplace_back(i + 10); - } + new_work_items.emplace_back(i + 1 + wait_message); } using function_node_type = tbb::flow::function_node; @@ -605,9 +605,12 @@ void test_try_put_and_wait_queueing(std::size_t concurrency_limit) { // Serial queueing function_node should add all start_work_items except the first one into the queue // and then process them in FIFO order. // wait_message would also be added to the queue, but would be processed later + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + 1, "Unexpected number of elements processed"); for (auto item : start_work_items) { CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); } + } else { + CHECK_MESSAGE(processed_items.size() == 1, "Unexpected number of elements processed"); } // For the unlimited function_node, all of the tasks for start_work_items and wait_message would be spawned @@ -634,6 +637,7 @@ void test_try_put_and_wait_queueing(std::size_t concurrency_limit) { CHECK_MESSAGE(processed_items[check_index++] == start_work_items[i - 1], "Unexpected items processing"); } } + CHECK(check_index == processed_items.size()); }); } @@ -678,7 +682,7 @@ void test_try_put_and_wait_rejecting(size_t concurrency_limit) { // If the first action is try_put_and_wait, it will occupy concurrency of the function_node // All submits of new_work_items inside of the body should be rejected bool result = function.try_put_and_wait(wait_message); - CHECK_MESSAGE(result, "task should not rejected since the node concurrency is not acquired"); + CHECK_MESSAGE(result, "task should not rejected since the node concurrency is not saturated"); CHECK_MESSAGE(processed_items.size() == 1, nullptr); CHECK_MESSAGE(processed_items[0] == wait_message, "Unexpected items processing"); @@ -690,10 +694,10 @@ void test_try_put_and_wait_rejecting(size_t concurrency_limit) { processed_items.clear(); // If the first action is try_put, try_put_and_wait is expected to return false since the concurrency of the - // node would be acquired + // node would be saturated function.try_put(0); result = function.try_put_and_wait(wait_message); - CHECK_MESSAGE(!result, "task should be rejected since the node concurrency is acquired"); + CHECK_MESSAGE(!result, "task should be rejected since the node concurrency is saturated"); CHECK(processed_items.empty()); g.wait_for_all(); From 967aa5387d28fd8d9b4eabfe171e9ed50274b879 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 22 Jul 2024 04:21:59 -0700 Subject: [PATCH 18/22] Add missed comment --- test/tbb/test_function_node.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 9c4d033135..726d695161 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -470,6 +470,13 @@ void test_follows_and_precedes_api() { #endif #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +// Basic idea of the following tests is to check that try_put_and_wait(message) call for function_node +// with one of the policies (lightweight, queueing and rejecting) with different concurrency limits +// processes all of the previous jobs required to process message, the message itself, but does +// not process the elements submitted later or not required to process the message +// These tests submits start_work_items using the regular try_put and then submits wait_message +// with try_put_and_wait. During the completion of the graph, new_work_items would be submitted +// once the wait_message arrives. void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { tbb::task_arena arena(1); From c77e9c5dbdf0c66f2733e6a27f5041727a701692 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 22 Jul 2024 17:25:20 +0300 Subject: [PATCH 19/22] Update test/tbb/test_function_node.cpp Co-authored-by: Aleksei Fedotov --- test/tbb/test_function_node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 726d695161..6bb53940bc 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -474,7 +474,7 @@ void test_follows_and_precedes_api() { // with one of the policies (lightweight, queueing and rejecting) with different concurrency limits // processes all of the previous jobs required to process message, the message itself, but does // not process the elements submitted later or not required to process the message -// These tests submits start_work_items using the regular try_put and then submits wait_message +// These tests submit start_work_items using the regular try_put and then submit wait_message // with try_put_and_wait. During the completion of the graph, new_work_items would be submitted // once the wait_message arrives. void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { From 5aa2e348ecb9f89a53377de90e22bc16a9094f97 Mon Sep 17 00:00:00 2001 From: Konstantin Boyarinov Date: Mon, 22 Jul 2024 07:26:09 -0700 Subject: [PATCH 20/22] Rename template helpers --- include/oneapi/tbb/detail/_flow_graph_body_impl.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 6b964bcb44..dc321148ef 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -280,16 +280,16 @@ class apply_body_task_bypass Input my_input; using check_metainfo = std::is_same; - using not_contains_metainfo = std::true_type; - using contains_metainfo = std::false_type; + using without_metainfo = std::true_type; + using with_metainfo = std::false_type; - graph_task* call_apply_body_bypass_impl(not_contains_metainfo) { + graph_task* call_apply_body_bypass_impl(without_metainfo) { return my_node.apply_body_bypass(my_input __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - graph_task* call_apply_body_bypass_impl(contains_metainfo) { + graph_task* call_apply_body_bypass_impl(with_metainfo) { return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()}); } #endif From 0b2b098344711be66156411abf5a596eedc30bff Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Fri, 26 Jul 2024 11:10:49 -0500 Subject: [PATCH 21/22] Rename metainfo-friendly graph_task class --- include/oneapi/tbb/detail/_flow_graph_impl.h | 16 ++++++++-------- .../oneapi/tbb/detail/_flow_graph_node_impl.h | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 23e91bc059..629d5931b4 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -147,11 +147,11 @@ class graph_task : public d1::task { }; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT -class graph_task_with_message_waiters : public graph_task { +class trackable_messages_graph_task : public graph_task { public: - graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator, - node_priority_t node_priority, - const std::forward_list& msg_waiters) + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + node_priority_t node_priority, + const std::forward_list& msg_waiters) : graph_task(g, allocator, node_priority) , my_msg_wait_context_vertices(msg_waiters) { @@ -165,9 +165,9 @@ class graph_task_with_message_waiters : public graph_task { } } - graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator, - const std::forward_list& msg_waiters) - : graph_task_with_message_waiters(g, allocator, no_priority, msg_waiters) {} + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + const std::forward_list& msg_waiters) + : trackable_messages_graph_task(g, allocator, no_priority, msg_waiters) {} const std::forward_list get_msg_wait_context_vertices() const { return my_msg_wait_context_vertices; @@ -191,7 +191,7 @@ class graph_task_with_message_waiters : public graph_task { // to support the distributed reference counting schema std::forward_list my_msg_wait_context_vertices; std::forward_list my_msg_reference_vertices; -}; // class graph_task_with_message_waiters +}; // class trackable_messages_graph_task #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT struct graph_task_comparator { diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 7e763aaa32..c7d7def633 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -164,7 +164,7 @@ class function_input_base : public receiver, no_assign { friend class apply_body_task_bypass< class_type, input_type >; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - friend class apply_body_task_bypass< class_type, input_type, graph_task_with_message_waiters >; + friend class apply_body_task_bypass< class_type, input_type, trackable_messages_graph_task >; #endif friend class forward_task_bypass< class_type >; @@ -362,7 +362,7 @@ class function_input_base : public receiver, no_assign { graph_task* t = nullptr; #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT if (!metainfo.empty()) { - using task_type = apply_body_task_bypass; + using task_type = apply_body_task_bypass; t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority, metainfo); } else #endif From ac27bb16e332797396917c629d854842b32d8b29 Mon Sep 17 00:00:00 2001 From: kboyarinov Date: Thu, 1 Aug 2024 05:22:36 -0500 Subject: [PATCH 22/22] Add this_thread_in_graph_arena usage --- include/oneapi/tbb/detail/_flow_graph_impl.h | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index fbf9251f05..0136d85a72 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -146,6 +146,8 @@ class graph_task : public d1::task { friend graph_task* prioritize_task(graph& g, graph_task& gt); }; +inline bool is_this_thread_in_graph_arena(graph& g); + #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT class trackable_messages_graph_task : public graph_task { public: @@ -158,7 +160,12 @@ class trackable_messages_graph_task : public graph_task { auto last_iterator = my_msg_reference_vertices.cbefore_begin(); for (auto& msg_waiter : my_msg_wait_context_vertices) { - d1::wait_tree_vertex_interface* ref_vertex = r1::get_thread_reference_vertex(msg_waiter); + // If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex + // may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used + // and the task should be associated with the msg wait context itself + d1::wait_tree_vertex_interface* ref_vertex = is_this_thread_in_graph_arena(g) ? + r1::get_thread_reference_vertex(msg_waiter) : + msg_waiter; last_iterator = my_msg_reference_vertices.emplace_after(last_iterator, ref_vertex); ref_vertex->reserve(1);