Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 2: Add implementation of try_put_and_wait feature for buffering nodes #1412

Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
19a86e2
Add implementation for function_node
kboyarinov Jun 11, 2024
dbe50f7
Fix non-CPF build
kboyarinov Jun 11, 2024
660646e
Add try_put_and_wait_production branch to the CI trigger list
kboyarinov Jun 11, 2024
531da5b
Fix test issues
kboyarinov Jun 13, 2024
f97c794
Fix CI issues
kboyarinov Jun 13, 2024
7afdd9d
+ more use of override keyword
kboyarinov Jun 13, 2024
da15916
vertexes->vertices, remove unnecessary iostream include
kboyarinov Jun 21, 2024
92bf552
Remove unnecessary variadics
kboyarinov Jun 25, 2024
2bb5948
Simplify cache impl
kboyarinov Jun 25, 2024
73eb513
Fix unused variable
kboyarinov Jun 25, 2024
f92503a
Add stub for continue_input
kboyarinov Jun 25, 2024
6d82b64
Fix whitespace
kboyarinov Jun 25, 2024
bc01522
Fix issues
kboyarinov Jun 25, 2024
6d6c145
Save progress
kboyarinov Jun 18, 2024
4e05f31
Add implementation for buffering nodes
kboyarinov Jun 20, 2024
842381c
Add to CI
kboyarinov Jun 20, 2024
2c011ed
Fix copyrights
kboyarinov Jun 21, 2024
67226f3
Add missed file
kboyarinov Jun 21, 2024
708d655
Add test definition for try_get with meta
kboyarinov Jun 21, 2024
ed5d05b
Fix metainfo namespace in test
kboyarinov Jun 21, 2024
811b9b9
join_node::try_get stub
kboyarinov Jun 21, 2024
703104a
Allign with base branch
kboyarinov Jun 25, 2024
f6f7673
Address review comments
kboyarinov Jul 8, 2024
e08d81f
Rearrange base task constructors
kboyarinov Jul 8, 2024
4029091
Introduce separate macro for try_put_and_wait feature
kboyarinov Jul 17, 2024
5d66123
Simplify code
kboyarinov Jul 17, 2024
2033045
Address review comments
kboyarinov Jul 22, 2024
967aa53
Add missed comment
kboyarinov Jul 22, 2024
c77e9c5
Update test/tbb/test_function_node.cpp
kboyarinov Jul 22, 2024
5aa2e34
Rename template helpers
kboyarinov Jul 22, 2024
0fd4b26
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Jul 23, 2024
fc1239f
Fix incorrect merging
kboyarinov Jul 23, 2024
0321596
Fix previous review comments
kboyarinov Jul 23, 2024
3691466
Apply PR review comments
kboyarinov Jul 25, 2024
eeffec0
Rename boolean flags
kboyarinov Jul 26, 2024
0b2b098
Rename metainfo-friendly graph_task class
kboyarinov Jul 26, 2024
03c9391
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
ac27bb1
Add this_thread_in_graph_arena usage
kboyarinov Aug 1, 2024
a23dc02
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
a82c649
Fix merging
kboyarinov Aug 1, 2024
6787ae5
Remove unnecessary TODO
kboyarinov Aug 1, 2024
078f3ad
Merge remote-tracking branch 'origin/dev/kboyarinov/try_put_and_wait_…
kboyarinov Aug 1, 2024
fab7913
Correct fetch_item
kboyarinov Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master]
branches: [master, dev/kboyarinov/try_put_and_wait_production, dev/kboyarinov/try_put_and_wait_function_node]
types:
- opened
- synchronize
Expand Down
7 changes: 6 additions & 1 deletion include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -521,6 +521,11 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES \
|| TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT)
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
55 changes: 45 additions & 10 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -272,29 +272,57 @@ class forward_task_bypass : public graph_task {

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

using check_metainfo = std::is_same<BaseTaskType, graph_task>;
using without_metainfo = std::true_type;
using with_metainfo = std::false_type;

graph_task* call_apply_body_bypass_impl(without_metainfo) {
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(with_metainfo) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(check_metainfo{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
: BaseTaskType(g, allocator, node_priority, std::forward<Metainfo>(metainfo).waiters())
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +371,13 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand Down
78 changes: 64 additions & 14 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,7 +98,10 @@ class predecessor_cache : public node_cache< sender<T>, M > {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

bool get_item( output_type& v ) {
private:
bool get_item_impl( output_type& v
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
{

bool msg = false;

Expand All @@ -113,7 +116,15 @@ class predecessor_cache : public node_cache< sender<T>, M > {
}

// Try to get from this sender
msg = src->try_get( v );
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (metainfo_ptr)
{
msg = src->try_get( v, *metainfo_ptr );
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
} else
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
#endif
{
msg = src->try_get( v );
}

if (msg == false) {
// Relinquish ownership of the edge
Expand All @@ -125,6 +136,16 @@ class predecessor_cache : public node_cache< sender<T>, M > {
} while ( msg == false );
return msg;
}
public:
bool get_item( output_type& v ) {
return get_item_impl(v);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( output_type& v, message_metainfo& metainfo ) {
return get_item_impl(v, &metainfo);
}
#endif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to make it a single method as get_item_impl? (may be for consistency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understood correctly. get_item should be considered a public API for a cache. get_item_impl - as a private method. We need to keep the existing public API get_item(v) and add a new one get_item(v, metainfo) and implement both of them on top of get_item_impl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose for try_put_task, there are calls like try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{})). But for get_item, you'd need to create a temporary that is just dropped. While for try_put_task you still want to send the empty metainfo. Is this the reason there are two get_item_impl functions instead of something like the single try_put_task_impl function?


// If we are removing arcs (rf_clear_edges), call clear() rather than reset().
void reset() {
Expand Down Expand Up @@ -268,6 +289,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +351,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +363,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,6 +385,21 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
Expand Down Expand Up @@ -411,11 +446,15 @@ class round_robin_cache : public successor_cache<T, M> {
return this->my_successors.size();
}

graph_task* try_put_task( const T &t ) override {
private:

graph_task* try_put_task_impl( const T &t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
{
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task* new_task = (*i)->try_put_task(t);
graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if ( new_task ) {
return new_task;
} else {
Expand All @@ -429,6 +468,17 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

public:
graph_task* try_put_task(const T& t) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
75 changes: 74 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,6 +146,76 @@ class graph_task : public d1::task {
friend graph_task* prioritize_task(graph& g, graph_task& gt);
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
class graph_task_with_message_waiters : public graph_task {
public:
graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority,
const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
: graph_task(g, allocator, node_priority)
, my_msg_wait_context_vertices(msg_waiters)
{
auto last_iterator = my_msg_reference_vertices.cbefore_begin();

for (auto& msg_waiter : my_msg_wait_context_vertices) {
d1::wait_tree_vertex_interface* ref_vertex = r1::get_thread_reference_vertex(msg_waiter);
last_iterator = my_msg_reference_vertices.emplace_after(last_iterator,
ref_vertex);
ref_vertex->reserve(1);
}
}

graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator,
node_priority_t node_priority,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: graph_task(g, allocator, node_priority)
, my_msg_wait_context_vertices(std::move(msg_waiters))
{
}

graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator,
const std::forward_list<d1::wait_context_vertex*>& msg_waiters)
: graph_task_with_message_waiters(g, allocator, no_priority, msg_waiters) {}

graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator,
std::forward_list<d1::wait_context_vertex*>&& msg_waiters)
: graph_task_with_message_waiters(g, allocator, no_priority, std::move(msg_waiters)) {}

const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
return my_msg_wait_context_vertices;
}

protected:
template <typename DerivedType>
void finalize(const d1::execution_data& ed) {
auto wait_context_vertices = std::move(my_msg_wait_context_vertices);
auto msg_reference_vertices = std::move(my_msg_reference_vertices);
graph_task::finalize<DerivedType>(ed);

// If there is no thread reference vertices associated with the task
// then this task was created by transferring the ownership from other metainfo
// instance (e.g. while taking from the buffer)
if (msg_reference_vertices.empty()) {
for (auto& msg_waiter : wait_context_vertices) {
msg_waiter->release(1);
}
} else {
for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
}
}
}
private:
// Each task that holds information about single message wait_contexts should hold two lists
// The first one is wait_contexts associated with the message itself. They are needed
// to be able to broadcast the list of wait_contexts to the node successors while executing the task.
// The second list is a list of reference vertices for each wait_context_vertex in the first list
// to support the distributed reference counting schema
std::forward_list<d1::wait_context_vertex*> my_msg_wait_context_vertices;
std::forward_list<d1::wait_tree_vertex_interface*> my_msg_reference_vertices;
}; // class graph_task_with_message_waiters
#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT

struct graph_task_comparator {
bool operator()(const graph_task* left, const graph_task* right) {
return left->priority < right->priority;
Expand Down Expand Up @@ -357,6 +427,9 @@ class graph : no_copy, public graph_proxy {

friend class task_arena_base;
friend class graph_task;

template <typename T>
friend class receiver;
}; // class graph

template<typename DerivedType>
Expand Down
9 changes: 8 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_indexer_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,13 @@
return my_try_put_task(v, my_indexer_ptr);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for indexer_node
graph_task* try_put_task(const T& v, const message_metainfo&) override {
return try_put_task(v);
}
#endif

graph& graph_reference() const override {
return *my_graph;
}
Expand Down
Loading
Loading