diff --git a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h index eedf946433..7fb370f014 100644 --- a/include/oneapi/tbb/detail/_flow_graph_cache_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_cache_impl.h @@ -98,9 +98,12 @@ class predecessor_cache : public node_cache< sender, M > { // Do not work with the passed pointer here as it may not be fully initialized yet } - bool get_item( output_type& v ) { +private: + bool get_item_impl( output_type& v + __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) ) + { - bool msg = false; + bool successful_get = false; do { predecessor_type *src; @@ -113,19 +116,36 @@ class predecessor_cache : public node_cache< sender, M > { } // Try to get from this sender - msg = src->try_get( v ); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (metainfo_ptr) { + successful_get = src->try_get( v, *metainfo_ptr ); + } else +#endif + { + successful_get = src->try_get( v ); + } - if (msg == false) { + if (successful_get == false) { // Relinquish ownership of the edge register_successor(*src, *my_owner); } else { // Retain ownership of the edge this->add(*src); } - } while ( msg == false ); - return msg; + } while ( successful_get == false ); + return successful_get; + } +public: + bool get_item( output_type& v ) { + return get_item_impl(v); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool get_item( output_type& v, message_metainfo& metainfo ) { + return get_item_impl(v, &metainfo); + } +#endif + // If we are removing arcs (rf_clear_edges), call clear() rather than reset(). void reset() { for(;;) { @@ -158,7 +178,7 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > { } bool try_reserve( output_type &v ) { - bool msg = false; + bool successful_reserve = false; do { predecessor_type* pred = nullptr; @@ -172,9 +192,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > { } // Try to get from this sender - msg = pred->try_reserve( v ); + successful_reserve = pred->try_reserve( v ); - if (msg == false) { + if (successful_reserve == false) { typename mutex_type::scoped_lock lock(this->my_mutex); // Relinquish ownership of the edge register_successor( *pred, *this->my_owner ); @@ -183,9 +203,9 @@ class reservable_predecessor_cache : public predecessor_cache< T, M > { // Retain ownership of the edge this->add( *pred); } - } while ( msg == false ); + } while ( successful_reserve == false ); - return msg; + return successful_reserve; } bool try_release() { @@ -342,7 +362,7 @@ class broadcast_cache : public successor_cache { typedef M mutex_type; typedef typename successor_cache::successors_type successors_type; - graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { + graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) { graph_task * last_task = nullptr; typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); @@ -425,11 +445,15 @@ class round_robin_cache : public successor_cache { return this->my_successors.size(); } - graph_task* try_put_task( const T &t ) override { +private: + + graph_task* try_put_task_impl( const T &t + __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) + { typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); typename successors_type::iterator i = this->my_successors.begin(); while ( i != this->my_successors.end() ) { - graph_task* new_task = (*i)->try_put_task(t); + graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); if ( new_task ) { return new_task; } else { @@ -444,10 +468,14 @@ class round_robin_cache : public successor_cache { return nullptr; } +public: + graph_task* try_put_task(const T& t) override { + return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - // TODO: add support for round robin cache - graph_task* try_put_task( const T& t, const message_metainfo& ) override { - return try_put_task(t); + graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override { + return try_put_task_impl(t, metainfo); } #endif }; diff --git a/include/oneapi/tbb/detail/_flow_graph_impl.h b/include/oneapi/tbb/detail/_flow_graph_impl.h index 0136d85a72..bab3e5d689 100644 --- a/include/oneapi/tbb/detail/_flow_graph_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_impl.h @@ -172,10 +172,22 @@ class trackable_messages_graph_task : public graph_task { } } + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + node_priority_t node_priority, + std::forward_list&& msg_waiters) + : graph_task(g, allocator, node_priority) + , my_msg_wait_context_vertices(std::move(msg_waiters)) + { + } + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, const std::forward_list& msg_waiters) : trackable_messages_graph_task(g, allocator, no_priority, msg_waiters) {} + trackable_messages_graph_task(graph& g, d1::small_object_allocator& allocator, + std::forward_list&& msg_waiters) + : trackable_messages_graph_task(g, allocator, no_priority, std::move(msg_waiters)) {} + const std::forward_list get_msg_wait_context_vertices() const { return my_msg_wait_context_vertices; } @@ -183,11 +195,21 @@ class trackable_messages_graph_task : public graph_task { protected: template void finalize(const d1::execution_data& ed) { + auto wait_context_vertices = std::move(my_msg_wait_context_vertices); auto msg_reference_vertices = std::move(my_msg_reference_vertices); graph_task::finalize(ed); - for (auto& msg_waiter : msg_reference_vertices) { - msg_waiter->release(1); + // If there is no thread reference vertices associated with the task + // then this task was created by transferring the ownership from other metainfo + // instance (e.g. while taking from the buffer) + if (msg_reference_vertices.empty()) { + for (auto& msg_waiter : wait_context_vertices) { + msg_waiter->release(1); + } + } else { + for (auto& msg_waiter : msg_reference_vertices) { + msg_waiter->release(1); + } } } private: diff --git a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h index 0e5b867bf8..4ec5507c0f 100644 --- a/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_item_buffer_impl.h @@ -84,6 +84,13 @@ class item_buffer { return element(i).item; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo& get_my_metainfo(size_t i) { + __TBB_ASSERT(my_item_valid(i), "attempt to get invalid item"); + return element(i).metainfo; + } +#endif + // may be called with an empty slot or a slot that has already been constructed into. void set_my_item(size_t i, const item_type &o __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) @@ -102,12 +109,36 @@ class item_buffer { #endif } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + void set_my_item(size_t i, const item_type& o, message_metainfo&& metainfo) { + if(element(i).state != no_item) { + destroy_item(i); + } + + new(&(element(i).item)) item_type(o); + new(&element(i).metainfo) message_metainfo(std::move(metainfo)); + // Skipping the reservation on metainfo.waiters since the ownership + // is moving from metainfo to the cache + element(i).state = has_item; + } +#endif + // destructively-fetch an object from the buffer +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + void fetch_item(size_t i, item_type& o, message_metainfo& metainfo) { + __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot"); + o = get_my_item(i); // could have std::move assign semantics + metainfo = std::move(get_my_metainfo(i)); + destroy_item(i); + } +#else void fetch_item(size_t i, item_type &o) { __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot"); o = get_my_item(i); // could have std::move assign semantics destroy_item(i); } +#endif + // move an existing item from one slot to another. The moved-to slot must be unoccupied, // the moved-from slot must exist and not be reserved. The after, from will be empty, @@ -115,9 +146,9 @@ class item_buffer { void move_item(size_t to, size_t from) { __TBB_ASSERT(!my_item_valid(to), "Trying to move to a non-empty slot"); __TBB_ASSERT(my_item_valid(from), "Trying to move from an empty slot"); - set_my_item(to, get_my_item(from)); // could have std::move semantics + // could have std::move semantics + set_my_item(to, get_my_item(from) __TBB_FLOW_GRAPH_METAINFO_ARG(get_my_metainfo(from))); destroy_item(from); - } // put an item in an empty slot. Return true if successful, else false @@ -129,12 +160,29 @@ class item_buffer { return true; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + bool place_item(size_t here, const item_type &me, Metainfo&& metainfo) { +#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES + if(my_item_valid(here)) return false; +#endif + set_my_item(here, me, std::forward(metainfo)); + return true; + } +#endif + // could be implemented with std::move semantics void swap_items(size_t i, size_t j) { __TBB_ASSERT(my_item_valid(i) && my_item_valid(j), "attempt to swap invalid item(s)"); item_type temp = get_my_item(i); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo temp_metainfo = get_my_metainfo(i); + set_my_item(i, get_my_item(j), get_my_metainfo(j)); + set_my_item(j, temp, temp_metainfo); +#else set_my_item(i, get_my_item(j)); set_my_item(j, temp); +#endif } void destroy_item(size_type i) { diff --git a/include/oneapi/tbb/detail/_flow_graph_join_impl.h b/include/oneapi/tbb/detail/_flow_graph_join_impl.h index cb854fdd5b..816829ed9a 100644 --- a/include/oneapi/tbb/detail/_flow_graph_join_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_join_impl.h @@ -1190,6 +1190,13 @@ return op_data.status == SUCCEEDED; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + // TODO: implement try_get with metainfo for join_node + bool try_get( output_type &v, message_metainfo& ) override { + return try_get(v); + } +#endif + protected: void reset_node(reset_flags f) override { input_ports_type::reset(f); diff --git a/include/oneapi/tbb/detail/_flow_graph_node_impl.h b/include/oneapi/tbb/detail/_flow_graph_node_impl.h index c7d7def633..53a59ccb62 100644 --- a/include/oneapi/tbb/detail/_flow_graph_node_impl.h +++ b/include/oneapi/tbb/detail/_flow_graph_node_impl.h @@ -213,9 +213,12 @@ class function_input_base : public receiver, no_assign { } else { input_type i; - if(my_predecessors.get_item(i)) { +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo metainfo; +#endif + if(my_predecessors.get_item(i __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) { ++my_concurrency; - new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + new_task = create_body_task(i __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo))); } } return new_task; @@ -351,8 +354,12 @@ class function_input_base : public receiver, no_assign { } //! allocates a task to apply a body - graph_task* create_body_task( const input_type &input - __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + template + graph_task* create_body_task( const input_type &input, Metainfo&& metainfo ) +#else + graph_task* create_body_task( const input_type &input ) +#endif { if (!is_graph_active(my_graph_ref)) { return nullptr; @@ -363,7 +370,7 @@ class function_input_base : public receiver, no_assign { #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT if (!metainfo.empty()) { using task_type = apply_body_task_bypass; - t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority, metainfo); + t = allocator.new_object(my_graph_ref, allocator, *this, input, my_priority, std::forward(metainfo)); } else #endif { diff --git a/include/oneapi/tbb/flow_graph.h b/include/oneapi/tbb/flow_graph.h index be3640353e..efd62aaaed 100644 --- a/include/oneapi/tbb/flow_graph.h +++ b/include/oneapi/tbb/flow_graph.h @@ -198,7 +198,8 @@ class message_metainfo { message_metainfo(const waiters_type& waiters) : my_waiters(waiters) {} message_metainfo(waiters_type&& waiters) : my_waiters(std::move(waiters)) {} - const waiters_type& waiters() const { return my_waiters; } + const waiters_type& waiters() const & { return my_waiters; } + waiters_type&& waiters() && { return std::move(my_waiters); } bool empty() const { return my_waiters.empty(); } private: @@ -220,9 +221,17 @@ class sender { //! Request an item from the sender virtual bool try_get( T & ) { return false; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual bool try_get( T &, message_metainfo& ) { return false; } +#endif + //! Reserves an item in the sender virtual bool try_reserve( T & ) { return false; } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + virtual bool try_reserve( T &, message_metainfo& ) { return false; } +#endif + //! Releases the reserved item virtual bool try_release( ) { return false; } @@ -688,6 +697,18 @@ class input_node : public graph_node, public sender< Output > { } } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +private: + bool try_reserve( output_type& v, message_metainfo& ) override { + return try_reserve(v); + } + + bool try_get( output_type& v, message_metainfo& ) override { + return try_get(v); + } +public: +#endif + //! Release a reserved item. /** true = item has been released and so remains in sender, dest must request or reserve future items */ bool try_release( ) override { @@ -1242,11 +1263,24 @@ class buffer_node T* elem; graph_task* ltask; successor_type *r; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo* metainfo{ nullptr }; +#endif buffer_operation(const T& e, op_type t) : type(char(t)) , elem(const_cast(&e)) , ltask(nullptr) , r(nullptr) {} + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + buffer_operation(const T& e, op_type t, const message_metainfo& info) + : type(char(t)), elem(const_cast(&e)), ltask(nullptr), r(nullptr) + , metainfo(const_cast(&info)) + {} + + buffer_operation(op_type t, message_metainfo& info) + : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr), metainfo(&info) {} +#endif buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {} }; @@ -1353,7 +1387,8 @@ class buffer_node } void try_put_and_add_task(graph_task*& last_task) { - graph_task *new_task = my_successors.try_put_task(this->back()); + graph_task* new_task = my_successors.try_put_task(this->back() + __TBB_FLOW_GRAPH_METAINFO_ARG(this->back_metainfo())); if (new_task) { // workaround for icc bug graph& g = this->my_graph; @@ -1395,14 +1430,27 @@ class buffer_node virtual bool internal_push(buffer_operation *op) { __TBB_ASSERT(op->elem, nullptr); - this->push_back(*(op->elem)); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + this->push_back(*(op->elem), (*op->metainfo)); + } else +#endif + { + this->push_back(*(op->elem)); + } op->status.store(SUCCEEDED, std::memory_order_release); return true; } virtual void internal_pop(buffer_operation *op) { __TBB_ASSERT(op->elem, nullptr); - if(this->pop_back(*(op->elem))) { +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool pop_result = op->metainfo ? this->pop_back(*(op->elem), *(op->metainfo)) + : this->pop_back(*(op->elem)); +#else + bool pop_result = this->pop_back(*(op->elem)); +#endif + if (pop_result) { op->status.store(SUCCEEDED, std::memory_order_release); } else { @@ -1492,6 +1540,16 @@ class buffer_node return (op_data.status==SUCCEEDED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_get( T &v, message_metainfo& metainfo ) override { + buffer_operation op_data(req_item, metainfo); + op_data.elem = &v; + my_aggregator.execute(&op_data); + (void)enqueue_forwarding_task(op_data); + return (op_data.status==SUCCEEDED); + } +#endif + //! Reserves an item. /** false = no item can be reserved
true = an item is reserved */ @@ -1503,6 +1561,15 @@ class buffer_node return (op_data.status==SUCCEEDED); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +private: + // TODO: add real implementation + bool try_reserve(output_type& v, message_metainfo&) override { + return try_reserve(v); + } +public: +#endif + //! Release a reserved item. /** true = item has been released and so remains in sender */ bool try_release() override { @@ -1521,14 +1588,9 @@ class buffer_node return true; } -protected: - - template< typename R, typename B > friend class run_and_put_task; - template friend class broadcast_cache; - template friend class round_robin_cache; - //! receive an item, return a task *if possible - graph_task *try_put_task(const T &t) override { - buffer_operation op_data(t, put_item); +private: + graph_task* try_put_task_impl(const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { + buffer_operation op_data(t, put_item __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); my_aggregator.execute(&op_data); graph_task *ft = grab_forwarding_task(op_data); // sequencer_nodes can return failure (if an item has been previously inserted) @@ -1546,10 +1608,19 @@ class buffer_node return ft; } +protected: + + template< typename R, typename B > friend class run_and_put_task; + template friend class broadcast_cache; + template friend class round_robin_cache; + //! receive an item, return a task *if possible + graph_task *try_put_task(const T &t) override { + return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})); + } + #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT - // TODO: add support for buffer_node - graph_task* try_put_task(const T& t, const message_metainfo&) override { - return try_put_task(t); + graph_task* try_put_task(const T& t, const message_metainfo& metainfo) override { + return try_put_task_impl(t, metainfo); } #endif @@ -1585,7 +1656,9 @@ class queue_node : public buffer_node { } void try_put_and_add_task(graph_task*& last_task) { - graph_task *new_task = this->my_successors.try_put_task(this->front()); + graph_task* new_task = this->my_successors.try_put_task(this->front() + __TBB_FLOW_GRAPH_METAINFO_ARG(this->front_metainfo())); + if (new_task) { // workaround for icc bug graph& graph_ref = this->graph_reference(); @@ -1604,7 +1677,14 @@ class queue_node : public buffer_node { op->status.store(FAILED, std::memory_order_release); } else { - this->pop_front(*(op->elem)); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + this->pop_front(*(op->elem), *(op->metainfo)); + } else +#endif + { + this->pop_front(*(op->elem)); + } op->status.store(SUCCEEDED, std::memory_order_release); } } @@ -1721,7 +1801,13 @@ class sequencer_node : public queue_node { } this->my_tail = new_tail; +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool place_item_result = op->metainfo ? this->place_item(tag, *(op->elem), *(op->metainfo)) + : this->place_item(tag, *(op->elem)); + const op_stat res = place_item_result ? SUCCEEDED : FAILED; +#else const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED; +#endif op->status.store(res, std::memory_order_release); return res ==SUCCEEDED; } @@ -1784,7 +1870,14 @@ class priority_queue_node : public buffer_node { } bool internal_push(prio_operation *op) override { - prio_push(*(op->elem)); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + prio_push(*(op->elem), *(op->metainfo)); + } else +#endif + { + prio_push(*(op->elem)); + } op->status.store(SUCCEEDED, std::memory_order_release); return true; } @@ -1797,6 +1890,11 @@ class priority_queue_node : public buffer_node { } *(op->elem) = prio(); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + if (op->metainfo) { + *(op->metainfo) = std::move(prio_metainfo()); + } +#endif op->status.store(SUCCEEDED, std::memory_order_release); prio_pop(); @@ -1841,7 +1939,8 @@ class priority_queue_node : public buffer_node { } void try_put_and_add_task(graph_task*& last_task) { - graph_task * new_task = this->my_successors.try_put_task(this->prio()); + graph_task* new_task = this->my_successors.try_put_task(this->prio() + __TBB_FLOW_GRAPH_METAINFO_ARG(this->prio_metainfo())); if (new_task) { // workaround for icc bug graph& graph_ref = this->graph_reference(); @@ -1863,14 +1962,20 @@ class priority_queue_node : public buffer_node { } // prio_push: checks that the item will fit, expand array if necessary, put at end - void prio_push(const T &src) { + void prio_push(const T &src __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) { if ( this->my_tail >= this->my_array_size ) this->grow_my_array( this->my_tail + 1 ); - (void) this->place_item(this->my_tail, src); + (void) this->place_item(this->my_tail, src __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); ++(this->my_tail); __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push"); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + void prio_push(const T& src) { + return prio_push(src, message_metainfo{}); + } +#endif + // prio_pop: deletes highest priority item from the array, and if it is item // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail // and mark. Assumes the array has already been tested for emptiness; no failure. @@ -1900,6 +2005,12 @@ class priority_queue_node : public buffer_node { return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo& prio_metainfo() { + return this->get_my_metainfo(prio_use_tail() ? this->my_tail-1 : 0); + } +#endif + // turn array into heap void heapify() { if(this->my_tail == 0) { @@ -1910,7 +2021,10 @@ class priority_queue_node : public buffer_node { for (; markmy_tail; ++mark) { // for each unheaped element size_type cur_pos = mark; input_type to_place; - this->fetch_item(mark,to_place); +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + message_metainfo metainfo; +#endif + this->fetch_item(mark, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo)); do { // push to_place up the heap size_type parent = (cur_pos-1)>>1; if (!compare(this->get_my_item(parent), to_place)) @@ -1918,7 +2032,7 @@ class priority_queue_node : public buffer_node { this->move_item(cur_pos, parent); cur_pos = parent; } while( cur_pos ); - (void) this->place_item(cur_pos, to_place); + this->place_item(cur_pos, to_place __TBB_FLOW_GRAPH_METAINFO_ARG(std::move(metainfo))); } } @@ -3162,6 +3276,19 @@ class overwrite_node : public graph_node, public receiver, public sender { return try_get(v); } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +private: + // TODO: add real implementation + bool try_reserve(T& v, message_metainfo&) override { + return try_reserve(v); + } + + bool try_get( input_type& v, message_metainfo& ) override { + return try_get(v); + } +public: +#endif + //! Releases the reserved item bool try_release() override { return true; } diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 83c5fc0253..2c2099f6df 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -416,6 +416,12 @@ struct harness_counting_sender : public tbb::flow::sender { } } +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + bool try_get( T & v, tbb::detail::d2::message_metainfo& ) override { + return try_get(v); + } +#endif + bool try_put_once() { successor_type *s = my_receiver; size_t i = my_count++; diff --git a/test/tbb/test_buffer_node.cpp b/test/tbb/test_buffer_node.cpp index c1c4582b8e..54e6e88a79 100644 --- a/test/tbb/test_buffer_node.cpp +++ b/test/tbb/test_buffer_node.cpp @@ -24,11 +24,11 @@ #include "common/graph_utils.h" #include "common/test_follows_and_precedes_api.h" +#include "test_buffering_try_put_and_wait.h" //! \file test_buffer_node.cpp //! \brief Test for [flow_graph.buffer_node] specification - #define N 1000 #define C 10 @@ -455,6 +455,119 @@ void test_deduction_guides() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_buffer_node_try_put_and_wait() { + using namespace test_try_put_and_wait; + + std::vector start_work_items; + std::vector new_work_items; + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + // Test push + // test_buffer_push tests the graph + // buffer1 -> function -> buffer2 -> writer + // function is a queueing serial function_node that submits new_work_items once wait_message arrives + // writer is an unlimited function_node that writes an item into the processed_items vector + // Test steps + // 1. push start_work_items into the buffer1 + // 2. buffer1.try_put_and_wait(wait_message); + // 3. g.wait_for_all() + // test_buffer_push returns the index from which the items processed during wait_for_all() starts + { + std::vector processed_items; + + std::size_t after_start = test_buffer_push>(start_work_items, wait_message, + new_work_items, processed_items); + + // Expected effect: + // During buffer1.try_put_and_wait() + // 1. start_work_items would be pushed to buffer1 + // 2. wait_message would be pushed to buffer1 + // 3. forward_task on buffer1 would transfer all of the items to the function_node in LIFO order + // 4. wait_message would occupy concurrency of function, other items would be pushed to the queue + // 5. function would process wait_message and add new_work_items to the buffer1 + // 6. forward_task for new_work_items would be spawned, wait_message would be buffered in the buffer2 + // 7. function task for next FIFO item in the queue would be spawned + // 8. forward_task for wait_message in buffer2 would be executed without spawning + // 9. writer task for wait_message would be executed without spawning and write wait_message to the buffer + // 10. try_put_and_wait exits since wait_message is completed + // During g.wait_for_all() + // 10. forward_task for new_work_items in buffer1 would be spawned and put items in function in LIFO order + // 11. function_node would process and push forward items from the queue in FIFO order + // Expected items processing - { wait_message, start_work_items LIFO, new_work_items LIFO } + + std::size_t check_index = 0; + CHECK_MESSAGE(after_start == 1, "try_put_and_wait should process only the wait_message"); + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "try_put_and_wait should process only the wait_message"); + + for (std::size_t index = start_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[index - 1], + "wait_for_all should process start_work_items LIFO"); + } + for (std::size_t index = new_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[index - 1], + "wait_for_all should process new_work_items LIFO"); + } + CHECK(check_index == processed_items.size()); + } // Test push + + // Test pull + // test_buffer_pull tests the graph + // buffer -> function + // function is a rejecting serial function_node that submits new_work_items once wait_message arrives + // and writes the processed item into the processed_items + // Test steps + // 1. push the occupier message to the function + // 2. push start_work_items into the buffer + // 3. buffer.try_put_and_wait(wait_message) + // 4. g.wait_for_all() + // test_buffer_pull returns the index from which the items processed during wait_for_all() starts + + { + std::vector processed_items; + int occupier = 42; + + std::size_t after_start = test_buffer_pull>(start_work_items, wait_message, occupier, + new_work_items, processed_items); + + // Expected effect + // 0. task for occupier processing would be spawned by the function + // During buffer.try_put_and_wait() + // 1. start_work_items would be pushed to the buffer + // 2. wait_message would be pushed to the buffer + // 3. forward_task would try to push items to the function, but would fail + // and set the edge to the pull state + // 4. occupier would be processed + // 5. items would be taken from the buffer by function in LIFO order + // 6. wait_message would be taken first and push new_work_items to the buffer + // Expected items processing { occupier, wait_message, new_work_items LIFO, start_work_items LIFO } + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == 2, "Only wait_message and occupier should be processed by try_put_and_wait"); + CHECK_MESSAGE(processed_items[check_index++] == occupier, "Unexpected items processing by try_put_and_wait"); + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing by try_put_and_wait"); + + for (std::size_t index = new_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[index - 1], + "wait_for_all should process new_work_items LIFO"); + } + for (std::size_t index = start_work_items.size(); index != 0; --index) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[index - 1], + "wait_for_all should process start_work_items LIFO"); + } + CHECK(check_index == processed_items.size()); + } + + // TODO: add try_reserve tests after implementing limiter_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + #include //! Test buffer_node with parallel and serial neighbours @@ -489,8 +602,15 @@ TEST_CASE("Follows and precedes API"){ #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT //! Test deduction guides -//! \brief requirement +//! \brief \ref requirement TEST_CASE("Deduction guides"){ test_deduction_guides(); } #endif + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test buffer_node try_put_and_wait") { + test_buffer_node_try_put_and_wait(); +} +#endif diff --git a/test/tbb/test_buffering_try_put_and_wait.h b/test/tbb/test_buffering_try_put_and_wait.h new file mode 100644 index 0000000000..d9e4578178 --- /dev/null +++ b/test/tbb/test_buffering_try_put_and_wait.h @@ -0,0 +1,139 @@ +/* + Copyright (c) 2024 Intel Corporation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TBB_test_tbb_buffering_try_put_and_wait_H +#define __TBB_test_tbb_buffering_try_put_and_wait_H + +#include +#include + +#include + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + +namespace test_try_put_and_wait { + +template +std::size_t test_buffer_push(const std::vector& start_work_items, + int wait_message, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + std::size_t after_try_put_and_wait_start_index = 0; + tbb::task_arena arena(1); + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer1(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer1.try_put(item); + } + } + return input; + }); + + BufferingNode buffer2(g, args...); + + function_node_type writer(g, tbb::flow::unlimited, + [&](int input) noexcept { + processed_items.emplace_back(input); + return 0; + }); + + tbb::flow::make_edge(buffer1, function); + tbb::flow::make_edge(function, buffer2); + tbb::flow::make_edge(buffer2, writer); + + for (auto item : start_work_items) { + buffer1.try_put(item); + } + + buffer1.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +template +std::size_t test_buffer_pull(const std::vector& start_work_items, + int wait_message, + int occupier, + const std::vector& new_work_items, + std::vector& processed_items, + Args... args) +{ + tbb::task_arena arena(1); + std::size_t after_try_put_and_wait_start_index = 0; + + arena.execute([&] { + tbb::flow::graph g; + + using function_node_type = tbb::flow::function_node; + + BufferingNode buffer(g, args...); + + function_node_type function(g, tbb::flow::serial, + [&](int input) noexcept { + if (input == wait_message) { + for (auto item : new_work_items) { + buffer.try_put(item); + } + } + + processed_items.emplace_back(input); + return 0; + }); + + // Occupy the concurrency of function_node + // This call spawns the task to process the occupier + function.try_put(occupier); + + // Make edge between buffer and function after occupying the concurrency + // To ensure that forward task of the buffer would be spawned after the occupier task + // And the function_node would reject the items from the buffer + // and process them later by calling try_get on the buffer + tbb::flow::make_edge(buffer, function); + + for (auto item : start_work_items) { + buffer.try_put(item); + } + + buffer.try_put_and_wait(wait_message); + + after_try_put_and_wait_start_index = processed_items.size(); + + g.wait_for_all(); + }); + + return after_try_put_and_wait_start_index; +} + +} // test_try_put_and_wait + +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +#endif // __TBB_test_tbb_buffering_try_put_and_wait_H diff --git a/test/tbb/test_function_node.cpp b/test/tbb/test_function_node.cpp index 6bb53940bc..999adac189 100644 --- a/test/tbb/test_function_node.cpp +++ b/test/tbb/test_function_node.cpp @@ -722,7 +722,6 @@ void test_try_put_and_wait() { test_try_put_and_wait_queueing(tbb::flow::unlimited); test_try_put_and_wait_rejecting(tbb::flow::serial); - // TODO: add integration test with buffering node before rejecting function_node } #endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT diff --git a/test/tbb/test_priority_queue_node.cpp b/test/tbb/test_priority_queue_node.cpp index d14aa4bbb3..4d9ffe368f 100644 --- a/test/tbb/test_priority_queue_node.cpp +++ b/test/tbb/test_priority_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ #include +#include "test_buffering_try_put_and_wait.h" //! \file test_priority_queue_node.cpp //! \brief Test for [flow_graph.priority_queue_node] specification @@ -378,6 +379,123 @@ void test_deduction_guides() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_pqueue_node_try_put_and_wait() { + using namespace test_try_put_and_wait; + + std::vector start_work_items; + std::vector new_work_items; + int wait_message = -10; + + for (int i = 0; i < 10; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + // Test push + // test_buffer_push tests the graph + // buffer1 -> function -> buffer2 -> writer + // function is a queueing serial function_node that submits new_work_items once wait_message arrives + // writer is an unlimited function_node that writes an item into the processed_items vector + // Test steps + // 1. push start_work_items into the buffer1 + // 2. buffer1.try_put_and_wait(wait_message); + // 3. g.wait_for_all() + // test_buffer_push returns the index from which the items processed during wait_for_all() starts + { + std::vector processed_items; + + std::size_t after_start = test_buffer_push>(start_work_items, wait_message, + new_work_items, processed_items); + + // Expected effect: + // During buffer1.try_put_and_wait() + // 1. start_work_items would be pushed to buffer1 + // 2. wait_message would be pushed to buffer1 + // 3. forward_task on buffer1 would transfer start_work_items into the function_node in LIFO order + // 4. wait_message would be transferred last because of lowest priority + // 5. the first item would occupy concurrency of function, other items would be pushed to the queue + // 6. function would process start_work_items and push them to the buffer2 + // 7. wait_message would be processed last and add new_work_items to buffer1 + // 8. forward_task on buffer2 would transfer start_work_items in FIFO order and the wait_message to the writer + // 9. try_put_and_wait exits since wait_message is completed + // During g.wait_for_all() + // 10. forward_task for new_work_items in buffer1 would be spawned and put items in function in LIFO order + // Expected items processing - { start_work_items LIFO, wait_message, new_work_items LIFO } + + std::size_t check_index = 0; + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "try_put_and_wait should process start_work_items and the wait_message"); + for (std::size_t i = start_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[i - 1], + "try_put_and_wait should process start_work_items in LIFO order"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, + "try_put_and_wait should process wait_message after start_work_items"); + + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], + "wait_for_all should process new_work_items in LIFO order"); + } + CHECK(check_index == processed_items.size()); + } // Test push + + // Test pull + // test_buffer_pull tests the graph + // buffer -> function + // function is a rejecting serial function_node that submits new_work_items once wait_message arrives + // and writes the processed item into the processed_items + // Test steps + // 1. push the occupier message to the function + // 2. push start_work_items into the buffer + // 3. buffer.try_put_and_wait(wait_message) + // 4. g.wait_for_all() + // test_buffer_pull returns the index from which the items processed during wait_for_all() starts + + { + std::vector processed_items; + int occupier = 42; + + std::size_t after_start = test_buffer_pull>(start_work_items, wait_message, occupier, + new_work_items, processed_items); + + // Expected effect + // 0. task for occupier processing would be spawned by the function + // During buffer.try_put_and_wait() + // 1. start_work_items would be pushed to the buffer + // 2. wait_message would be pushed to the buffer + // 3. forward_task would try to push items to the function, but would fail + // and set the edge to the pull state + // 4. occupier would be processed + // 5. items would be taken from the buffer by function in the priority (LIFO) order + // 6. wait_message would be taken last due to lowest priority + // 7. new_work_items would be pushed to the buffer while processing wait_message + // During wait_for_all() + // 8. new_work_items would be taken from the buffer in the priority (LIFO) order + // Expected items processing { occupier, start_work_items LIFO, wait_message, new_work_items LIFO } + + std::size_t check_index = 0; + CHECK_MESSAGE(after_start == start_work_items.size() + 2, + "try_put_and_wait should process start_work_items, occupier and the wait_message"); + CHECK_MESSAGE(processed_items[check_index++] == occupier, "try_put_and_wait should process the occupier"); + for (std::size_t i = start_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == start_work_items[i - 1], + "try_put_and_wait should process start_work_items in LIFO order"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, + "try_put_and_wait should process wait_message after start_work_items"); + + for (std::size_t i = new_work_items.size(); i != 0; --i) { + CHECK_MESSAGE(processed_items[check_index++] == new_work_items[i - 1], + "wait_for_all should process new_work_items in LIFO order"); + } + CHECK(check_index == processed_items.size()); + } + + // TODO: add try_reserve tests after implementing limiter_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Test serial, parallel behavior and reservation under parallelism //! \brief \ref requirement \ref error_guessing TEST_CASE("Serial, parallel and reservation tests"){ @@ -419,3 +537,9 @@ TEST_CASE("Test deduction guides"){ } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test priority_queue_node try_put_and_wait") { + test_pqueue_node_try_put_and_wait(); +} +#endif diff --git a/test/tbb/test_queue_node.cpp b/test/tbb/test_queue_node.cpp index e034ef6645..893738745d 100644 --- a/test/tbb/test_queue_node.cpp +++ b/test/tbb/test_queue_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ #include +#include "test_buffering_try_put_and_wait.h" //! \file test_queue_node.cpp //! \brief Test for [flow_graph.queue_node] specification @@ -494,6 +495,123 @@ void test_deduction_guides() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_queue_node_try_put_and_wait() { + using namespace test_try_put_and_wait; + + std::vector start_work_items; + std::vector new_work_items; + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + // Test push + // test_buffer_push tests the graph + // buffer1 -> function -> buffer2 -> writer + // function is a queueing serial function_node that submits new_work_items once wait_message arrives + // writer is an unlimited function_node that writes an item into the processed_items vector + // Test steps + // 1. push start_work_items into the buffer1 + // 2. buffer1.try_put_and_wait(wait_message); + // 3. g.wait_for_all() + // test_buffer_push returns the index from which the items processed during wait_for_all() starts + { + std::vector processed_items; + + std::size_t after_start = test_buffer_push>(start_work_items, wait_message, + new_work_items, processed_items); + + // Expected effect: + // During buffer1.try_put_and_wait() + // 1. start_work_items would be pushed to buffer1 + // 2. wait_message would be pushed to buffer1 + // 3. forward_task on buffer1 would transfer all of the items to the function_node in FIFO order + // 4. the first item would occupy concurrency of function, other items would be pushed to the queue + // 5. function would process start_work_items and push them to the buffer2 + // 6. wait_message would be processed last and add new_work_items to buffer1 + // 7. forward_task on buffer2 would transfer start_work_items in FIFO order and the wait_message to the writer + // 8. try_put_and_wait exits since wait_message is completed + // During g.wait_for_all() + // 10. forward_task for new_work_items in buffer1 would be spawned and put items in function in FIFO order + // 11. function_node would process and push forward items from the queue in FIFO order + // Expected items processing - { start_work_items FIFO, wait_message, new_work_items FIFO } + + std::size_t check_index = 0; + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "try_put_and_wait should process start_work_items and the wait_message"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process start_work_items FIFO"); + } + + CHECK_MESSAGE(processed_items[check_index++] == wait_message, + "try_put_and_wait should process wait_message after start_work_items"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "wait_for_all should process new_work_items FIFO"); + } + CHECK(check_index == processed_items.size()); + } // Test push + + // Test pull + // test_buffer_pull tests the graph + // buffer -> function + // function is a rejecting serial function_node that submits new_work_items once wait_message arrives + // and writes the processed item into the processed_items + // Test steps + // 1. push the occupier message to the function + // 2. push start_work_items into the buffer + // 3. buffer.try_put_and_wait(wait_message) + // 4. g.wait_for_all() + // test_buffer_pull returns the index from which the items processed during wait_for_all() starts + + { + std::vector processed_items; + int occupier = 42; + + std::size_t after_start = test_buffer_pull>(start_work_items, wait_message, occupier, + new_work_items, processed_items); + + // Expected effect + // 0. task for occupier processing would be spawned by the function + // During buffer.try_put_and_wait() + // 1. start_work_items would be pushed to the buffer + // 2. wait_message would be pushed to the buffer + // 3. forward_task would try to push items to the function, but would fail + // and set the edge to the pull state + // 4. occupier would be processed + // 5. items would be taken from the buffer by function in FIFO order + // 6. wait_message would be taken last and push new_work_items to the buffer + // During wait_for_all() + // 7. new_work_items would be taken from the buffer in FIFO order + // Expected items processing { occupier, start_work_items FIFO, wait_message, new_work_items FIFO } + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == start_work_items.size() + 2, + "start_work_items, occupier and wait_message should be processed by try_put_and_wait"); + CHECK_MESSAGE(processed_items[check_index++] == occupier, "Unexpected items processing by try_put_and_wait"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process start_work_items FIFO"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing by try_put_and_wait"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process new_work_items FIFO"); + } + CHECK(check_index == processed_items.size()); + } + + // TODO: add try_reserve tests after implementing limiter_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Test serial, parallel behavior and reservation under parallelism //! \brief \ref requirement \ref error_guessing TEST_CASE("Parallel, serial test"){ @@ -559,3 +677,10 @@ TEST_CASE("queue_node with reservation"){ CHECK_MESSAGE((out_arg == -1), "Getting from reserved node should not update its argument."); g.wait_for_all(); } + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test queue_node try_put_and_wait") { + test_queue_node_try_put_and_wait(); +} +#endif diff --git a/test/tbb/test_sequencer_node.cpp b/test/tbb/test_sequencer_node.cpp index 564721f682..1714087d66 100644 --- a/test/tbb/test_sequencer_node.cpp +++ b/test/tbb/test_sequencer_node.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2005-2021 Intel Corporation + Copyright (c) 2005-2024 Intel Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ #include #include +#include "test_buffering_try_put_and_wait.h" //! \file test_sequencer_node.cpp //! \brief Test for [flow_graph.sequencer_node] specification @@ -437,6 +438,127 @@ void test_deduction_guides() { } #endif +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +void test_seq_node_try_put_and_wait() { + using namespace test_try_put_and_wait; + + std::vector start_work_items; + std::vector new_work_items; + int wait_message = 10; + + for (int i = 0; i < wait_message; ++i) { + start_work_items.emplace_back(i); + new_work_items.emplace_back(i + 1 + wait_message); + } + + auto simple_sequencer = [](int item) { return item; }; + + // Test push + // test_buffer_push tests the graph + // buffer1 -> function -> buffer2 -> writer + // function is a queueing serial function_node that submits new_work_items once wait_message arrives + // writer is an unlimited function_node that writes an item into the processed_items vector + // Test steps + // 1. push start_work_items into the buffer1 + // 2. buffer1.try_put_and_wait(wait_message); + // 3. g.wait_for_all() + // test_buffer_push returns the index from which the items processed during wait_for_all() starts + { + std::vector processed_items; + + std::size_t after_start = test_buffer_push>(start_work_items, wait_message, + new_work_items, processed_items, + simple_sequencer); + + // Expected effect: + // During buffer1.try_put_and_wait() + // 1. start_work_items would be pushed to buffer1 + // 2. wait_message would be pushed to buffer1 + // 3. forward_task on buffer1 would transfer all of the items to the function_node in sequencer order (FIFO) + // 4. the first item would occupy concurrency of function, other items would be pushed to the queue + // 5. function would process start_work_items and push them to the buffer2 + // 6. wait_message would be processed last and add new_work_items to buffer1 + // 7. forward_task on buffer2 would transfer start_work_items in sequencer (FIFO) order and the wait_message to the writer + // 8. try_put_and_wait exits since wait_message is completed + // During g.wait_for_all() + // 10. forward_task for new_work_items in buffer1 would be spawned and put items in function in FIFO order + // 11. function_node would process and push forward items from the queue in FIFO order + // Expected items processing - { start_work_items FIFO, wait_message, new_work_items FIFO } + + std::size_t check_index = 0; + CHECK_MESSAGE(after_start == start_work_items.size() + 1, + "try_put_and_wait should process start_work_items and the wait_message"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process start_work_items FIFO"); + } + + CHECK_MESSAGE(processed_items[check_index++] == wait_message, + "try_put_and_wait should process wait_message after start_work_items"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "wait_for_all should process new_work_items FIFO"); + } + CHECK(check_index == processed_items.size()); + } // Test push + + // Test pull + // test_buffer_pull tests the graph + // buffer -> function + // function is a rejecting serial function_node that submits new_work_items once wait_message arrives + // and writes the processed item into the processed_items + // Test steps + // 1. push the occupier message to the function + // 2. push start_work_items into the buffer + // 3. buffer.try_put_and_wait(wait_message) + // 4. g.wait_for_all() + // test_buffer_pull returns the index from which the items processed during wait_for_all() starts + + { + std::vector processed_items; + int occupier = 42; + + std::size_t after_start = test_buffer_pull>(start_work_items, wait_message, occupier, + new_work_items, processed_items, + simple_sequencer); + + // Expected effect + // 0. task for occupier processing would be spawned by the function + // During buffer.try_put_and_wait() + // 1. start_work_items would be pushed to the buffer + // 2. wait_message would be pushed to the buffer + // 3. forward_task would try to push items to the function, but would fail + // and set the edge to the pull state + // 4. occupier would be processed + // 5. items would be taken from the buffer by function in FIFO order + // 6. wait_message would be taken last and push new_work_items to the buffer + // During wait_for_all() + // 7. new_work_items would be taken from the buffer in FIFO order + // Expected items processing { occupier, start_work_items FIFO, wait_message, new_work_items FIFO } + + std::size_t check_index = 0; + + CHECK_MESSAGE(after_start == start_work_items.size() + 2, + "start_work_items, occupier and wait_message should be processed by try_put_and_wait"); + CHECK_MESSAGE(processed_items[check_index++] == occupier, "Unexpected items processing by try_put_and_wait"); + for (auto item : start_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process start_work_items FIFO"); + } + CHECK_MESSAGE(processed_items[check_index++] == wait_message, "Unexpected items processing by try_put_and_wait"); + + for (auto item : new_work_items) { + CHECK_MESSAGE(processed_items[check_index++] == item, + "try_put_and_wait should process new_work_items FIFO"); + } + CHECK(check_index == processed_items.size()); + } + + // TODO: add try_reserve tests after implementing limiter_node +} +#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT + //! Test sequencer with various request orders and parallelism levels //! \brief \ref requirement \ref error_guessing TEST_CASE("Serial and parallel test"){ @@ -501,3 +623,10 @@ TEST_CASE("constraints for sequencer_node sequencer") { static_assert(!can_call_sequencer_node_ctor>); } #endif // __TBB_CPP20_CONCEPTS_PRESENT + +#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT +//! \brief \ref error_guessing +TEST_CASE("test sequencer_node try_put_and_wait") { + test_seq_node_try_put_and_wait(); +} +#endif