Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[try_put_and_wait] Part 1: Add implementation of try_put_and_wait feature for function_node #1398

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ on:
branches: [master]

pull_request:
branches: [master]
branches: [master, dev/kboyarinov/try_put_and_wait_production]
types:
- opened
- synchronize
Expand Down
6 changes: 5 additions & 1 deletion include/oneapi/tbb/detail/_config.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -521,6 +521,10 @@
#define __TBB_PREVIEW_FLOW_GRAPH_NODE_SET (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
#endif

#ifndef __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
#define __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT (TBB_PREVIEW_FLOW_GRAPH_FEATURES)
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
#endif

#if TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS
#define __TBB_PREVIEW_CONCURRENT_HASH_MAP_EXTENSIONS 1
#endif
Expand Down
64 changes: 54 additions & 10 deletions include/oneapi/tbb/detail/_flow_graph_body_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -270,31 +270,68 @@ class forward_task_bypass : public graph_task {
}
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename... Metainfo>
struct graph_task_base : std::conditional<sizeof...(Metainfo) != 0,
graph_task_with_message_waiters,
graph_task>
{};

template <typename... Metainfo>
using graph_task_base_t = typename graph_task_base<Metainfo...>::type;
#endif

//! A task that calls a node's apply_body_bypass function, passing in an input of type Input
// return the task* unless it is SUCCESSFULLY_ENQUEUED, in which case return nullptr
template< typename NodeType, typename Input >
class apply_body_task_bypass : public graph_task {
template< typename NodeType, typename Input, typename BaseTaskType = graph_task>
class apply_body_task_bypass
: public BaseTaskType
{
NodeType &my_node;
Input my_input;

graph_task* call_apply_body_bypass_impl(std::true_type) {
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
return my_node.apply_body_bypass(my_input
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* call_apply_body_bypass_impl(std::false_type) {
return my_node.apply_body_bypass(my_input, message_metainfo{this->get_msg_wait_context_vertices()});
}
#endif

graph_task* call_apply_body_bypass() {
return call_apply_body_bypass_impl(std::is_same<BaseTaskType, graph_task>{});
}

public:
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
template <typename Metainfo,
typename BaseT = BaseTaskType,
typename = typename std::enable_if<std::is_same<BaseT, graph_task_with_message_waiters>::value>::type>
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i,
node_priority_t node_priority, Metainfo&& metainfo )
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
: BaseTaskType(g, allocator, std::forward<Metainfo>(metainfo).waiters(), node_priority)
, my_node(n), my_input(i) {}
#endif

apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType &n, const Input &i
, node_priority_t node_priority = no_priority
) : graph_task(g, allocator, node_priority),
my_node(n), my_input(i) {}
apply_body_task_bypass( graph& g, d1::small_object_allocator& allocator, NodeType& n, const Input& i,
node_priority_t node_priority = no_priority )
: BaseTaskType(g, allocator, node_priority), my_node(n), my_input(i) {}

d1::task* execute(d1::execution_data& ed) override {
graph_task* next_task = my_node.apply_body_bypass( my_input );
graph_task* next_task = call_apply_body_bypass();
if (SUCCESSFULLY_ENQUEUED == next_task)
next_task = nullptr;
else if (next_task)
next_task = prioritize_task(my_node.graph_reference(), *next_task);
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return next_task;
}

d1::task* cancel(d1::execution_data& ed) override {
finalize<apply_body_task_bypass>(ed);
BaseTaskType::template finalize<apply_body_task_bypass>(ed);
return nullptr;
}
};
Expand Down Expand Up @@ -343,6 +380,13 @@ class threshold_regulator<T, DecrementType,
return result;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for limiter_node
graph_task* try_put_task(const DecrementType& value, const message_metainfo&) override {
return try_put_task(value);
}
#endif

graph& graph_reference() const override {
return my_node->my_graph;
}
Expand Down
42 changes: 32 additions & 10 deletions include/oneapi/tbb/detail/_flow_graph_cache_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -268,6 +268,9 @@ class successor_cache : no_copy {
}

virtual graph_task* try_put_task( const T& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo) = 0;
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
#endif
}; // successor_cache<T>

//! An abstract cache of successors, specialized to continue_msg
Expand Down Expand Up @@ -327,6 +330,9 @@ class successor_cache< continue_msg, M > : no_copy {
}

virtual graph_task* try_put_task( const continue_msg& t ) = 0;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
#endif
}; // successor_cache< continue_msg >

//! A cache of successors that are broadcast to
Expand All @@ -336,19 +342,12 @@ class broadcast_cache : public successor_cache<T, M> {
typedef M mutex_type;
typedef typename successor_cache<T,M>::successors_type successors_type;

public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
graph_task* try_put_task( const T &t ) override {
graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
pavelkumbrasev marked this conversation as resolved.
Show resolved Hide resolved
graph_task * last_task = nullptr;
typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
typename successors_type::iterator i = this->my_successors.begin();
while ( i != this->my_successors.end() ) {
graph_task *new_task = (*i)->try_put_task(t);
graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
// workaround for icc bug
graph& graph_ref = (*i)->graph_reference();
last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
Expand All @@ -365,6 +364,22 @@ class broadcast_cache : public successor_cache<T, M> {
}
return last_task;
}
public:

broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
// Do not work with the passed pointer here as it may not be fully initialized yet
}

// as above, but call try_put_task instead, and return the last task we received (if any)
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
graph_task* try_put_task( const T &t ) override {
return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
return try_put_task_impl(t, metainfo);
}
#endif

// call try_put_task and return list of received tasks
bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
Expand Down Expand Up @@ -429,6 +444,13 @@ class round_robin_cache : public successor_cache<T, M> {
}
return nullptr;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for round robin cache
graph_task* try_put_task(const T& t, const message_metainfo&) override {
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
return try_put_task(t);
}
#endif
};

#endif // __TBB__flow_graph_cache_impl_H
44 changes: 43 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,6 +146,45 @@ class graph_task : public d1::task {
friend graph_task* prioritize_task(graph& g, graph_task& gt);
};

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
class graph_task_with_message_waiters : public graph_task {
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
public:
graph_task_with_message_waiters(graph& g, d1::small_object_allocator& allocator,
const std::forward_list<d1::wait_context_vertex*>& msg_waiters,
node_priority_t node_priority = no_priority)
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
: graph_task(g, allocator, node_priority)
, my_msg_wait_context_vertices(msg_waiters)
{
auto last_iterator = my_msg_reference_vertices.cbefore_begin();

for (auto& msg_waiter : my_msg_wait_context_vertices) {
d1::wait_tree_vertex_interface* ref_vertex = r1::get_thread_reference_vertex(msg_waiter);
last_iterator = my_msg_reference_vertices.emplace_after(last_iterator,
ref_vertex);
ref_vertex->reserve(1);
}
}

const std::forward_list<d1::wait_context_vertex*> get_msg_wait_context_vertices() const {
return my_msg_wait_context_vertices;
}

protected:
template <typename DerivedType>
void finalize(const d1::execution_data& ed) {
auto msg_reference_vertices = std::move(my_msg_reference_vertices);
graph_task::finalize<DerivedType>(ed);

for (auto& msg_waiter : msg_reference_vertices) {
msg_waiter->release(1);
}
}
private:
std::forward_list<d1::wait_context_vertex*> my_msg_wait_context_vertices;
std::forward_list<d1::wait_tree_vertex_interface*> my_msg_reference_vertices;
kboyarinov marked this conversation as resolved.
Show resolved Hide resolved
}; // class graph_task_with_message_waiters
#endif // __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT

struct graph_task_comparator {
bool operator()(const graph_task* left, const graph_task* right) {
return left->priority < right->priority;
Expand Down Expand Up @@ -357,6 +396,9 @@ class graph : no_copy, public graph_proxy {

friend class task_arena_base;
friend class graph_task;

template <typename T>
friend class receiver;
}; // class graph

template<typename DerivedType>
Expand Down
9 changes: 8 additions & 1 deletion include/oneapi/tbb/detail/_flow_graph_indexer_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,13 @@
return my_try_put_task(v, my_indexer_ptr);
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for indexer_node
graph_task* try_put_task(const T& v, const message_metainfo&) override {
return try_put_task(v);
}
#endif

graph& graph_reference() const override {
return *my_graph;
}
Expand Down
Loading
Loading