diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a65de62241..d5ab1c0a2d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ on: branches: [master] pull_request: - branches: [master] + branches: [master, dev/kboyarinov/try_put_and_wait_production] types: - opened - synchronize diff --git a/include/oneapi/tbb/detail/_config.h b/include/oneapi/tbb/detail/_config.h index 0e5fbfe92f..e676b1558b 100644 --- a/include/oneapi/tbb/detail/_config.h +++ b/include/oneapi/tbb/detail/_config.h @@ -521,6 +521,11 @@ #define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES) #endif +#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \ + || TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT) +#endif + #if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS #define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1 #endif diff --git a/include/oneapi/tbb/detail/_flow_graph_body_impl.h b/include/oneapi/tbb/detail/_flow_graph_body_impl.h index 7a6a1cf43c..dc321148ef 100644 --- a/include/oneapi/tbb/detail/_flow_graph_body_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_body_impl.h @@ -272,29 +272,57 @@ class forward_task_bypass : public graph_task { //! A task that calls a node's apply_body_bypass function, passing in an input of type Input // return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr -template< typename NodeType, typename Input > -class apply_body_task_bypass : public graph_task { +template< typename NodeType, typename Input, typename BaseTaskType = graph_task> +class apply_body_task_bypass + : public BaseTaskType +{ NodeType &my_node; Input my_input; + + using check_metainfo = std::is_same; + using without_metainfo = std::true_type; + using with_metainfo = std::false_type; + + graph_task* call_apply_body_bypass_impl(without_metainfo) { + return my_node.apply_body_bypass(my_input + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* call_apply_body_bypass_impl(with_metainfo) { + return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()}); + } +#endif + + graph_task* call_apply_body_bypass() { + return call_apply_body_bypass_impl(check_metainfo{}); + } + public: +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i, + node_priority_t node_priority, Metainfo&& metainfo ) + : BaseTaskType(g, allocator, node_priority, std::forward(metainfo).waiters()) + , my_node(n), my_input(i) {} +#endif - apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i - , node_priority_t node_priority = no_priority - ) : graph_task(g, allocator, node_priority), - my_node(n), my_input(i) {} + apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i, + node_priority_t node_priority = no_priority ) + : BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {} d1::task* execute(d1::execution_data& ed) override { - graph_task* next_task = my_node.apply_body_bypass( my_input ); + graph_task* next_task = call_apply_body_bypass(); if (SUCCESSFULLY_ENQUEUED == next_task) next_task = nullptr; else if (next_task) next_task = prioritize_task(my_node.graph_reference(), *next_task); - finalize(ed); + BaseTaskType::template finalize(ed); return next_task; } d1::task* cancel(d1::execution_data& ed) override { - finalize(ed); + BaseTaskType::template finalize(ed); return nullptr; } }; @@ -343,6 +371,13 @@ class threshold_regulatormy_graph; } diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index 69625408f6..eedf946433 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -268,6 +268,9 @@ class successor_cache : no_copy { } virtual graph_task* try_put_task( const T& t ) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0; +#endif }; // successor_cache //! An abstract cache of successors, specialized to continue_msg @@ -327,6 +330,9 @@ class successor_cache< continue_msg, M > : no_copy { } virtual graph_task* try_put_task( const continue_msg& t ) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0; +#endif }; // successor_cache< continue_msg > //! A cache of successors that are broadcast to @@ -336,19 +342,12 @@ class broadcast_cache : public successor_cache { typedef M mutex_type; typedef typename successor_cache::successors_type successors_type; -public: - - broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { - // Do not work with the passed pointer here as it may not be fully initialized yet - } - - // as above, but call try_put_task instead, and return the last task we received (if any) - graph_task* try_put_task( const T &t ) override { + graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { graph_task * last_task = nullptr; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task *new_task = (*i)->try_put_task(t); + graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); // workaround for icc bug graph& graph_ref = (*i)->graph_reference(); last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary @@ -365,6 +364,21 @@ class broadcast_cache : public successor_cache { } return last_task; } +public: + + broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { + // Do not work with the passed pointer here as it may not be fully initialized yet + } + + graph_task* try_put_task( const T &t ) override { + return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override { + return try_put_task_impl(t, metainfo); + } +#endif // call try_put_task and return list of received tasks bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { @@ -429,6 +443,13 @@ class round_robin_cache : public successor_cache { } return nullptr; } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for round robin cache + graph_task* try_put_task( const T& t, const message_metainfo& ) override { + return try_put_task(t); + } +#endif }; #endif // __TBB__flow_graph_cache_impl_H diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 5d79a5bf08..0136d85a72 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -146,6 +146,61 @@ class graph_task : public d1::task { friend graph_task* prioritize_task(graph& g, graph_task& gt); }; +inline bool is_this_thread_in_graph_arena(graph& g); + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class trackable_messages_graph_task : public graph_task { +public: + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + node_priority_t node_priority, + const std::forward_list& msg_waiters) + : graph_task(g, allocator, node_priority) + , my_msg_wait_context_vertices(msg_waiters) + { + auto last_iterator = my_msg_reference_vertices.cbefore_begin(); + + for (auto& msg_waiter : my_msg_wait_context_vertices) { + // If the task is created by the thread outside the graph arena, the lifetime of the thread reference vertex + // may be shorter that the lifetime of the task, so thread reference vertex approach cannot be used + // and the task should be associated with the msg wait context itself + d1::wait_tree_vertex_interface* ref_vertex = is_this_thread_in_graph_arena(g) ? + r1::get_thread_reference_vertex(msg_waiter) : + msg_waiter; + last_iterator = my_msg_reference_vertices.emplace_after(last_iterator, + ref_vertex); + ref_vertex->reserve(1); + } + } + + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + const std::forward_list& msg_waiters) + : trackable_messages_graph_task(g, allocator, no_priority, msg_waiters) {} + + const std::forward_list get_msg_wait_context_vertices() const { + return my_msg_wait_context_vertices; + } + +protected: + template + void finalize(const d1::execution_data& ed) { + auto msg_reference_vertices = std::move(my_msg_reference_vertices); + graph_task::finalize(ed); + + for (auto& msg_waiter : msg_reference_vertices) { + msg_waiter->release(1); + } + } +private: + // Each task that holds information about single message wait_contexts should hold two lists + // The first one is wait_contexts associated with the message itself. They are needed + // to be able to broadcast the list of wait_contexts to the node successors while executing the task. + // The second list is a list of reference vertices for each wait_context_vertex in the first list + // to support the distributed reference counting schema + std::forward_list my_msg_wait_context_vertices; + std::forward_list my_msg_reference_vertices; +}; // class trackable_messages_graph_task +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + struct graph_task_comparator { bool operator()(const graph_task* left, const graph_task* right) { return left->priority < right->priority; @@ -358,6 +413,9 @@ class graph : no_copy, public graph_proxy { friend class task_arena_base; friend class graph_task; + + template + friend class receiver; }; // class graph template diff --git a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h index df083bd443..eb703bb8a5 100644 --- a/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_indexer_impl.h @@ -79,6 +79,13 @@ return my_try_put_task(v, my_indexer_ptr); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for indexer_node + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } +#endif + graph& graph_reference() const override { return *my_graph; } diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 423033b1d5..0e5b867bf8 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2022 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -37,8 +37,14 @@ class item_buffer { typedef T item_type; enum buffer_item_state { no_item=0, has_item=1, reserved_item=2 }; protected: + struct aligned_space_item { + item_type item; + buffer_item_state state; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo metainfo; +#endif + }; typedef size_t size_type; - typedef std::pair aligned_space_item; typedef aligned_space buffer_item_type; typedef typename allocator_traits::template rebind_alloc allocator_type; buffer_item_type *my_array; @@ -49,37 +55,51 @@ class item_buffer { bool buffer_empty() const { return my_head == my_tail; } - aligned_space_item &item(size_type i) { - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->second))%alignment_of::value), nullptr); - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->first))%alignment_of::value), nullptr); + aligned_space_item &element(size_type i) { + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of::value), nullptr); + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of::value), nullptr); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of::value), nullptr); +#endif return *my_array[i & (my_array_size - 1) ].begin(); } - const aligned_space_item &item(size_type i) const { - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->second))%alignment_of::value), nullptr); - __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->first))%alignment_of::value), nullptr); + const aligned_space_item &element(size_type i) const { + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of::value), nullptr); + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of::value), nullptr); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of::value), nullptr); +#endif return *my_array[i & (my_array_size-1)].begin(); } - bool my_item_valid(size_type i) const { return (i < my_tail) && (i >= my_head) && (item(i).second != no_item); } + bool my_item_valid(size_type i) const { return (i < my_tail) && (i >= my_head) && (element(i).state != no_item); } #if TBB_USE_ASSERT - bool my_item_reserved(size_type i) const { return item(i).second == reserved_item; } + bool my_item_reserved(size_type i) const { return element(i).state == reserved_item; } #endif // object management in buffer const item_type &get_my_item(size_t i) const { __TBB_ASSERT(my_item_valid(i),"attempt to get invalid item"); - item_type* itm = const_cast(reinterpret_cast(&item(i).first)); - return *itm; + return element(i).item; } // may be called with an empty slot or a slot that has already been constructed into. - void set_my_item(size_t i, const item_type &o) { - if(item(i).second != no_item) { + void set_my_item(size_t i, const item_type &o + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { + if(element(i).state != no_item) { destroy_item(i); } - new(&(item(i).first)) item_type(o); - item(i).second = has_item; + new(&(element(i).item)) item_type(o); + element(i).state = has_item; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + new(&element(i).metainfo) message_metainfo(metainfo); + + for (auto& waiter : metainfo.waiters()) { + waiter->reserve(1); + } +#endif } // destructively-fetch an object from the buffer @@ -119,8 +139,18 @@ class item_buffer { void destroy_item(size_type i) { __TBB_ASSERT(my_item_valid(i), "destruction of invalid item"); - item(i).first.~item_type(); - item(i).second = no_item; + + auto& e = element(i); + e.item.~item_type(); + e.state = no_item; + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + for (auto& msg_waiter : e.metainfo.waiters()) { + msg_waiter->release(1); + } + + e.metainfo.~message_metainfo(); +#endif } // returns the front element @@ -130,6 +160,14 @@ class item_buffer { return get_my_item(my_head); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& front_metainfo() const + { + __TBB_ASSERT(my_item_valid(my_head), "attempt to fetch head non-item"); + return element(my_head).metainfo; + } +#endif + // returns the back element const item_type& back() const { @@ -137,9 +175,23 @@ class item_buffer { return get_my_item(my_tail - 1); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& back_metainfo() const { + __TBB_ASSERT(my_item_valid(my_tail - 1), "attempt to fetch head non-item"); + return element(my_tail - 1).metainfo; + } +#endif + // following methods are for reservation of the front of a buffer. - void reserve_item(size_type i) { __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); item(i).second = reserved_item; } - void release_item(size_type i) { __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); item(i).second = has_item; } + void reserve_item(size_type i) { + __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved"); + element(i).state = reserved_item; + } + + void release_item(size_type i) { + __TBB_ASSERT(my_item_reserved(i), "item is not reserved"); + element(i).state = has_item; + } void destroy_front() { destroy_item(my_head); ++my_head; } void destroy_back() { destroy_item(my_tail-1); --my_tail; } @@ -163,14 +215,18 @@ class item_buffer { buffer_item_type* new_array = allocator_type().allocate(new_size); // initialize validity to "no" - for( size_type i=0; isecond = no_item; } + for( size_type i=0; istate = no_item; } for( size_type i=my_head; ifirst); + char *new_space = (char *)&(new_array[i&(new_size-1)].begin()->item); (void)new(new_space) item_type(get_my_item(i)); - new_array[i&(new_size-1)].begin()->second = item(i).second; + new_array[i&(new_size-1)].begin()->state = element(i).state; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + char* meta_space = (char *)&(new_array[i&(new_size-1)].begin()->metainfo); + ::new(meta_space) message_metainfo(std::move(element(i).metainfo)); +#endif } } @@ -180,33 +236,69 @@ class item_buffer { my_array_size = new_size; } - bool push_back(item_type &v) { - if(buffer_full()) { + bool push_back(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { + if (buffer_full()) { grow_my_array(size() + 1); } - set_my_item(my_tail, v); + set_my_item(my_tail, v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); ++my_tail; return true; } - bool pop_back(item_type &v) { - if (!my_item_valid(my_tail-1)) { + bool pop_back(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo)) + { + if (!my_item_valid(my_tail - 1)) { return false; } - v = this->back(); + auto& e = element(my_tail - 1); + v = e.item; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + metainfo = std::move(e.metainfo); +#endif + destroy_back(); return true; } - bool pop_front(item_type &v) { - if(!my_item_valid(my_head)) { + bool pop_front(item_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo)) + { + if (!my_item_valid(my_head)) { return false; } - v = this->front(); + auto& e = element(my_head); + v = e.item; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + metainfo = std::move(e.metainfo); +#endif + destroy_front(); return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + void set_my_item(size_t i, const item_type&o) { + set_my_item(i, o, message_metainfo{}); + } + + bool push_back(item_type& v) { + return push_back(v, message_metainfo{}); + } + + bool pop_back(item_type& v) { + message_metainfo metainfo; + return pop_back(v, metainfo); + } + + bool pop_front(item_type& v) { + message_metainfo metainfo; + return pop_front(v, metainfo); + } +#endif + // This is used both for reset and for grow_my_array. In the case of grow_my_array // we want to retain the values of the head and tail. void clean_up_buffer(bool reset_pointers) { diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index fd401f31a1..cb854fdd5b 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -294,6 +294,11 @@ return nullptr; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for rejecting join_node + graph_task* try_put_task(const T&, const message_metainfo&) override { return nullptr; } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } @@ -455,6 +460,13 @@ return op_data.bypass_t; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for queueing join_node + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } @@ -605,6 +617,13 @@ return rtask; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for key_matching join_node + graph_task* try_put_task(const input_type& v, const message_metainfo&) override { + return try_put_task(v); + } +#endif + graph& graph_reference() const override { return my_join->graph_ref; } diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index 3bb9b7d788..c7d7def633 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -34,6 +34,12 @@ class function_input_queue : public item_buffer { return this->item_buffer::front(); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + const message_metainfo& front_metainfo() const { + return this->item_buffer::front_metainfo(); + } +#endif + void pop() { this->destroy_front(); } @@ -41,6 +47,12 @@ class function_input_queue : public item_buffer { bool push( T& t ) { return this->push_back( t ); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool push( T& t, const message_metainfo& metainfo ) { + return this->push_back(t, metainfo); + } +#endif }; //! Input and scheduling for a function node that takes a type Input as input @@ -87,12 +99,15 @@ class function_input_base : public receiver, no_assign { } graph_task* try_put_task( const input_type& t) override { - if ( my_is_no_throw ) - return try_put_task_impl(t, has_policy()); - else - return try_put_task_impl(t, std::false_type()); + return try_put_task_base(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* try_put_task( const input_type& t, const message_metainfo& metainfo ) override { + return try_put_task_base(t, metainfo); + } +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Adds src to the list of cached predecessors. bool register_predecessor( predecessor_type &src ) override { operation_type op_data(reg_pred); @@ -148,6 +163,9 @@ class function_input_base : public receiver, no_assign { private: friend class apply_body_task_bypass< class_type, input_type >; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + friend class apply_body_task_bypass< class_type, input_type, trackable_messages_graph_task >; +#endif friend class forward_task_bypass< class_type >; class operation_type : public d1::aggregated_operation< operation_type > { @@ -158,8 +176,20 @@ class function_input_base : public receiver, no_assign { predecessor_type *r; }; graph_task* bypass_t; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo* metainfo; +#endif operation_type(const input_type& e, op_type t) : - type(char(t)), elem(const_cast(&e)), bypass_t(nullptr) {} + type(char(t)), elem(const_cast(&e)), bypass_t(nullptr) +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + , metainfo(nullptr) +#endif + {} +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + operation_type(const input_type& e, op_type t, const message_metainfo& info) : + type(char(t)), elem(const_cast(&e)), bypass_t(nullptr), + metainfo(const_cast(&info)) {} +#endif operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {} }; @@ -173,7 +203,10 @@ class function_input_base : public receiver, no_assign { if(my_queue) { if(!my_queue->empty()) { ++my_concurrency; - new_task = create_body_task(my_queue->front()); + // TODO: consider removing metainfo from the queue using move semantics to avoid + // ref counter increase + new_task = create_body_task(my_queue->front() + __TBB_FLOW_GRAPH_METAINFO_ARG(my_queue->front_metainfo())); my_queue->pop(); } @@ -182,7 +215,7 @@ class function_input_base : public receiver, no_assign { input_type i; if(my_predecessors.get_item(i)) { ++my_concurrency; - new_task = create_body_task(i); + new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); } } return new_task; @@ -233,10 +266,13 @@ class function_input_base : public receiver, no_assign { __TBB_ASSERT(my_max_concurrency != 0, nullptr); if (my_concurrency < my_max_concurrency) { ++my_concurrency; - graph_task * new_task = create_body_task(*(op->elem)); + graph_task* new_task = create_body_task(*(op->elem) + __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))); op->bypass_t = new_task; op->status.store(SUCCEEDED, std::memory_order_release); - } else if ( my_queue && my_queue->push(*(op->elem)) ) { + } else if ( my_queue && my_queue->push(*(op->elem) + __TBB_FLOW_GRAPH_METAINFO_ARG(*(op->metainfo))) ) + { op->bypass_t = SUCCESSFULLY_ENQUEUED; op->status.store(SUCCEEDED, std::memory_order_release); } else { @@ -258,8 +294,10 @@ class function_input_base : public receiver, no_assign { } } - graph_task* internal_try_put_bypass( const input_type& t ) { - operation_type op_data(t, tryput_bypass); + graph_task* internal_try_put_bypass( const input_type& t + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { + operation_type op_data(t, tryput_bypass __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); my_aggregator.execute(&op_data); if( op_data.status == SUCCEEDED ) { return op_data.bypass_t; @@ -267,42 +305,71 @@ class function_input_base : public receiver, no_assign { return nullptr; } - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { + graph_task* try_put_task_base(const input_type& t + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { + if ( my_is_no_throw ) + return try_put_task_impl(t, has_policy() + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); + else + return try_put_task_impl(t, std::false_type() + __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); + } + + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if( my_max_concurrency == 0 ) { - return apply_body_bypass(t); + return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } else { operation_type check_op(t, occupy_concurrency); my_aggregator.execute(&check_op); if( check_op.status == SUCCEEDED ) { - return apply_body_bypass(t); + return apply_body_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } - return internal_try_put_bypass(t); + return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } } - graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { + graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if( my_max_concurrency == 0 ) { - return create_body_task(t); + return create_body_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } else { - return internal_try_put_bypass(t); + return internal_try_put_bypass(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } } //! Applies the body to the provided input // then decides if more work is available - graph_task* apply_body_bypass( const input_type &i ) { - return static_cast(this)->apply_body_impl_bypass(i); + graph_task* apply_body_bypass( const input_type &i + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + + { + return static_cast(this)->apply_body_impl_bypass(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); } //! allocates a task to apply a body - graph_task* create_body_task( const input_type &input ) { + graph_task* create_body_task( const input_type &input + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) + { if (!is_graph_active(my_graph_ref)) { return nullptr; } // TODO revamp: extract helper for common graph task allocation part d1::small_object_allocator allocator{}; - typedef apply_body_task_bypass task_type; - graph_task* t = allocator.new_object( my_graph_ref, allocator, *this, input, my_priority ); + graph_task* t = nullptr; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (!metainfo.empty()) { + using task_type = apply_body_task_bypass; + t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority, metainfo); + } else +#endif + { + using task_type = apply_body_task_bypass; + t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority); + } return t; } @@ -396,7 +463,9 @@ class function_input : public function_input_base; //! Applies the body to the provided input - graph_task* apply_body_bypass( input_type ) { + graph_task* apply_body_bypass( input_type __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo&) ) { // There is an extra copied needed to capture the // body execution without the try_put fgt_begin_body( my_body ); @@ -675,7 +746,7 @@ class continue_input : public continue_receiver { #if _MSC_VER && !__INTEL_COMPILER #pragma warning (pop) #endif - return apply_body_bypass( continue_msg() ); + return apply_body_bypass( continue_msg() __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}) ); } else { d1::small_object_allocator allocator{}; diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index 2857a41254..be3640353e 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -52,6 +52,7 @@ #include #include +#include #include #if __TBB_CPP20_CONCEPTS_PRESENT #include @@ -187,6 +188,29 @@ static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* return left; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +class message_metainfo { +public: + using waiters_type = std::forward_list; + + message_metainfo() = default; + + message_metainfo(const waiters_type& waiters) : my_waiters(waiters) {} + message_metainfo(waiters_type&& waiters) : my_waiters(std::move(waiters)) {} + + const waiters_type& waiters() const { return my_waiters; } + + bool empty() const { return my_waiters.empty(); } +private: + waiters_type my_waiters; +}; // class message_metainfo + +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) , metainfo + +#else +#define __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo) +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Pure virtual template class that defines a sender of messages of type T template< typename T > class sender { @@ -238,18 +262,38 @@ bool remove_successor(sender& s, receiver& r) { //! Pure virtual template class that defines a receiver of messages of type T template< typename T > class receiver { +private: + template + bool internal_try_put(const T& t, TryPutTaskArgs&&... args) { + graph_task* res = try_put_task(t, std::forward(args)...); + if (!res) return false; + if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); + return true; + } + public: //! Destructor virtual ~receiver() {} //! Put an item to the receiver bool try_put( const T& t ) { - graph_task* res = try_put_task(t); + return internal_try_put(t); + } - if (!res) return false; - if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); - return true; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Put an item to the receiver and wait for completion + bool try_put_and_wait( const T& t ) { + // Since try_put_and_wait is a blocking call, it is safe to create wait_context on stack + d1::wait_context_vertex msg_wait_vertex{}; + + bool res = internal_try_put(t, message_metainfo{message_metainfo::waiters_type{&msg_wait_vertex}}); + if (res) { + __TBB_ASSERT(graph_reference().my_context != nullptr, "No wait_context associated with the Flow Graph"); + wait(msg_wait_vertex.get_context(), *graph_reference().my_context); + } + return res; } +#endif //! put item to successor; return task to run the successor if possible. protected: @@ -263,6 +307,9 @@ class receiver { template< typename X, typename Y > friend class broadcast_cache; template< typename X, typename Y > friend class round_robin_cache; virtual graph_task *try_put_task(const T& t) = 0; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual graph_task *try_put_task(const T& t, const message_metainfo&) = 0; +#endif virtual graph& graph_reference() const = 0; template friend class successor_cache; @@ -351,6 +398,13 @@ class continue_receiver : public receiver< continue_msg > { return res? res : SUCCESSFULLY_ENQUEUED; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add metainfo support for continue_receiver + graph_task* try_put_task( const input_type& input, const message_metainfo& ) override { + return try_put_task(input); + } +#endif + spin_mutex my_mutex; int my_predecessor_count; int my_current_count; @@ -962,6 +1016,13 @@ class split_node : public graph_node, public receiver { // Also, we do not have successors here. So we just tell the task returned here is successful. return emit_element::emit_this(this->my_graph, t, output_ports()); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for split_node + graph_task* try_put_task(const TupleType& t, const message_metainfo&) override { + return try_put_task(t); + } +#endif + void reset_node(reset_flags f) override { if (f & rf_clear_edges) clear_element::clear_this(my_output_ports); @@ -1130,6 +1191,13 @@ class broadcast_node : public graph_node, public receiver, public sender { return new_task; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for broadcast_node + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } +#endif + graph& graph_reference() const override { return my_graph; } @@ -1478,6 +1546,13 @@ class buffer_node return ft; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for buffer_node + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } +#endif + graph& graph_reference() const override { return my_graph; } @@ -2112,6 +2187,13 @@ class limiter_node : public graph_node, public receiver< T >, public sender< T > return rtask; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for limiter_node + graph_task* try_put_task(const T& t, const message_metainfo&) override { + return try_put_task(t); + } +#endif + graph& graph_reference() const override { return my_graph; } void reset_node( reset_flags f ) override { @@ -3106,6 +3188,13 @@ class overwrite_node : public graph_node, public receiver, public sender { return try_put_task_impl(v); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for overwrite_node + graph_task* try_put_task(const input_type& v, const message_metainfo&) override { + return try_put_task(v); + } +#endif + graph_task * try_put_task_impl(const input_type &v) { my_buffer = v; my_buffer_is_valid = true; @@ -3195,6 +3284,13 @@ class write_once_node : public overwrite_node { spin_mutex::scoped_lock l( this->my_mutex ); return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: add support for write_once_node + graph_task* try_put_task(const T& v, const message_metainfo&) override { + return try_put_task(v); + } +#endif }; // write_once_node inline void set_name(const graph& g, const char *name) { diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 2ab6db854f..83c5fc0253 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -282,6 +282,12 @@ struct harness_counting_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(t); + } +#endif + void validate() { size_t n = my_count; CHECK( n == num_copies*max_value ); @@ -332,6 +338,12 @@ struct harness_mapped_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + tbb::detail::d2::graph_task *try_put_task( const T &t, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(t); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_broadcast_node.cpp b/test/tbb/test_broadcast_node.cpp index fe0eea0f13..b55455fc91 100644 --- a/test/tbb/test_broadcast_node.cpp +++ b/test/tbb/test_broadcast_node.cpp @@ -73,6 +73,12 @@ class counting_array_receiver : public tbb::flow::receiver { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + tbb::task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index bf1e664988..6bb53940bc 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -469,6 +469,262 @@ void test_follows_and_precedes_api() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +// Basic idea of the following tests is to check that try_put_and_wait(message) call for function_node +// with one of the policies (lightweight, queueing and rejecting) with different concurrency limits +// processes all of the previous jobs required to process message, the message itself, but does +// not process the elements submitted later or not required to process the message +// These tests submit start_work_items using the regular try_put and then submit wait_message +// with try_put_and_wait. During the completion of the graph, new_work_items would be submitted +// once the wait_message arrives. +void test_try_put_and_wait_lightweight(std::size_t concurrency_limit) { + tbb::task_arena arena(1); + + arena.execute([&]{ + tbb::flow::graph g; + + std::vector start_work_items; + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + for (int i = 0; i < wait_message; ++i) { + function.try_put(i); + } + + function.try_put_and_wait(wait_message); + + std::size_t check_index = 0; + + // For lightweight function_node, start_work_items are expected to be processed first + // while putting items into the first node. + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + + if (concurrency_limit == tbb::flow::serial) { + // If the lightweight function_node is serial, it should process the wait_message but add items from new_work_items + // into the queue since the concurrency limit is occupied. + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + 1, "Unexpected number of elements processed"); + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + } else { + // If the node is unlimited, it should process new_work_items immediately while processing the wait_message + // Hence they should be processed before exiting the try_put_and_wait + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + new_work_items.size() + 1, + "Unexpected number of elements processed"); + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + // wait_message would be processed only after new_work_items + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + } + + g.wait_for_all(); + + if (concurrency_limit == tbb::flow::serial) { + // For the serial node, processing of new_work_items would be postponed to wait_for_all since they + // would be queued and spawned after working with wait_message + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } + CHECK(check_index == processed_items.size()); + }); +} + +void test_try_put_and_wait_queueing(std::size_t concurrency_limit) { + tbb::task_arena arena(1); + arena.execute([&]{ + tbb::flow::graph g; + + std::vector start_work_items; + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + for (int i = 0; i < wait_message; ++i) { + function.try_put(i); + } + + function.try_put_and_wait(wait_message); + + std::size_t check_index = 0; + + if (concurrency_limit == tbb::flow::serial) { + // Serial queueing function_node should add all start_work_items except the first one into the queue + // and then process them in FIFO order. + // wait_message would also be added to the queue, but would be processed later + CHECK_MESSAGE(processed_items.size() == start_work_items.size() + 1, "Unexpected number of elements processed"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } else { + CHECK_MESSAGE(processed_items.size() == 1, "Unexpected number of elements processed"); + } + + // For the unlimited function_node, all of the tasks for start_work_items and wait_message would be spawned + // and hence processed by the thread in LIFO order. + // The first processed item is expected to be wait_message since it was spawned last + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing"); + + g.wait_for_all(); + + if (concurrency_limit == tbb::flow::serial) { + // For serial queueing function_node, the new_work_items are expected to be processed while calling to wait_for_all + // They would be queued and processed later in FIFO order + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, "Unexpected items processing"); + } + } else { + // Unlimited function_node would always spawn tasks immediately without adding them into the queue + // They would be processed in LIFO order. Hence it is expected that new_work_items would be processed first in reverse order + // After them, start_work_items would be processed also in reverse order + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], "Unexpected items processing"); + } + for (std::size_t i = start_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[i - 1], "Unexpected items processing"); + } + } + CHECK(check_index == processed_items.size()); + }); +} + +void test_try_put_and_wait_rejecting(size_t concurrency_limit) { + tbb::task_arena arena(1); + + arena.execute([&]{ + tbb::flow::graph g; + + std::vector processed_items; + std::vector new_work_items; + + int wait_message = 0; + + for (int i = 1; i < wait_message; ++i) { + new_work_items.emplace_back(i); + } + + using function_node_type = tbb::flow::function_node; + function_node_type* start_node = nullptr; + + function_node_type function(g, concurrency_limit, + [&](int input) noexcept { + if (input == wait_message) { + for (int item : new_work_items) { + start_node->try_put(item); + } + } + return input; + }); + + start_node = &function; + + function_node_type writer(g, concurrency_limit, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(function, writer); + + // If the first action is try_put_and_wait, it will occupy concurrency of the function_node + // All submits of new_work_items inside of the body should be rejected + bool result = function.try_put_and_wait(wait_message); + CHECK_MESSAGE(result, "task should not rejected since the node concurrency is not saturated"); + + CHECK_MESSAGE(processed_items.size() == 1, nullptr); + CHECK_MESSAGE(processed_items[0] == wait_message, "Unexpected items processing"); + + g.wait_for_all(); + + CHECK_MESSAGE(processed_items.size() == 1, nullptr); + + processed_items.clear(); + + // If the first action is try_put, try_put_and_wait is expected to return false since the concurrency of the + // node would be saturated + function.try_put(0); + result = function.try_put_and_wait(wait_message); + CHECK_MESSAGE(!result, "task should be rejected since the node concurrency is saturated"); + CHECK(processed_items.empty()); + + g.wait_for_all(); + + CHECK(processed_items.size() == 1); + CHECK_MESSAGE(processed_items[0] == 0, "Unexpected items processing"); + }); +} + +void test_try_put_and_wait() { + test_try_put_and_wait_lightweight(tbb::flow::serial); + test_try_put_and_wait_lightweight(tbb::flow::unlimited); + + test_try_put_and_wait_queueing(tbb::flow::serial); + test_try_put_and_wait_queueing(tbb::flow::unlimited); + + test_try_put_and_wait_rejecting(tbb::flow::serial); + // TODO: add integration test with buffering node before rejecting function_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT //! Test various node bodies with concurrency //! \brief \ref error_guessing @@ -544,3 +800,10 @@ TEST_CASE("constraints for function_node body") { static_assert(!can_call_function_node_ctor>); } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test function_node try_put_and_wait") { + test_try_put_and_wait(); +} +#endif diff --git a/test/tbb/test_input_node.cpp b/test/tbb/test_input_node.cpp index 73082ae075..9442693980 100644 --- a/test/tbb/test_input_node.cpp +++ b/test/tbb/test_input_node.cpp @@ -61,6 +61,12 @@ class test_push_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task* try_put_task( const T& v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } diff --git a/test/tbb/test_limiter_node.cpp b/test/tbb/test_limiter_node.cpp index 43cc6750eb..66611ce19e 100644 --- a/test/tbb/test_limiter_node.cpp +++ b/test/tbb/test_limiter_node.cpp @@ -53,6 +53,12 @@ struct serial_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; } @@ -71,6 +77,12 @@ struct parallel_receiver : public tbb::flow::receiver, utils::NoAssign { return const_cast(SUCCESSFULLY_ENQUEUED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + graph_task * try_put_task( const T &v, const tbb::detail::d2::message_metainfo& ) override { + return try_put_task(v); + } +#endif + tbb::flow::graph& graph_reference() const override { return my_graph; }