Skip to content

Commit

Permalink
Add implementation for key_matching
Browse files Browse the repository at this point in the history
  • Loading branch information
kboyarinov committed Jun 28, 2024
1 parent 32e7319 commit 05f7c5e
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 96 deletions.
109 changes: 89 additions & 20 deletions include/oneapi/tbb/detail/_flow_graph_join_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,13 +656,23 @@
const K& operator()(const table_item_type& v) { return v.my_key; }
};

template <typename K, typename T, typename TtoK, typename KHash>
struct key_matching_port_base {
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
using type = metainfo_hash_buffer<K, T, TtoK, KHash>;
#else
using type = hash_buffer<K, T, TtoK, KHash>;
#endif
};

// the ports can have only one template parameter. We wrap the types needed in
// a traits type
template< class TraitsType >
class key_matching_port :
public receiver<typename TraitsType::T>,
public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
typename TraitsType::KHash > {
public key_matching_port_base< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
typename TraitsType::KHash >::type
{
public:
typedef TraitsType traits;
typedef key_matching_port<traits> class_type;
Expand All @@ -672,7 +682,7 @@
typedef typename receiver<input_type>::predecessor_type predecessor_type;
typedef typename TraitsType::TtoK type_to_key_func_type;
typedef typename TraitsType::KHash hash_compare_type;
typedef hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type;
typedef typename key_matching_port_base<key_type, input_type, type_to_key_func_type, hash_compare_type>::type buffer_type;

private:
// ----------- Aggregator ------------
Expand All @@ -685,12 +695,21 @@
char type;
input_type my_val;
input_type *my_arg;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo* metainfo = nullptr;
#endif
// constructor for value parameter
key_matching_port_operation(const input_type& e, op_type t) :
type(char(t)), my_val(e), my_arg(nullptr) {}
key_matching_port_operation(const input_type& e, op_type t
__TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& info))
: type(char(t)), my_val(e), my_arg(nullptr)
__TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(const_cast<message_metainfo*>(&info))) {}

// constructor for pointer parameter
key_matching_port_operation(const input_type* p, op_type t) :
type(char(t)), my_arg(const_cast<input_type*>(p)) {}
key_matching_port_operation(const input_type* p, op_type t
__TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& info))
: type(char(t)), my_arg(const_cast<input_type*>(p))
__TBB_FLOW_GRAPH_METAINFO_ARG(metainfo(&info)) {}

// constructor with no parameter
key_matching_port_operation(op_type t) : type(char(t)), my_arg(nullptr) {}
};
Expand All @@ -706,18 +725,40 @@
op_list = op_list->next;
switch(current->type) {
case try__put: {
bool was_inserted = this->insert_with_key(current->my_val);
bool was_inserted = false;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (current->metainfo) {
was_inserted = this->insert_with_key(current->my_val, *(current->metainfo));
} else
#endif
{
was_inserted = this->insert_with_key(current->my_val);
}
// return failure if a duplicate insertion occurs
current->status.store( was_inserted ? SUCCEEDED : FAILED, std::memory_order_release);
}
break;
case get__item:
case get__item: {
// use current_key from FE for item
__TBB_ASSERT(current->my_arg, nullptr);
if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
bool find_result = false;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
if (current->metainfo) {
find_result = this->find_with_key(my_join->current_key, *(current->my_arg),
*(current->metainfo));

} else
#endif
{
find_result = this->find_with_key(my_join->current_key, *(current->my_arg));
}
#if TBB_USE_DEBUG
if (!find_result) {
__TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
}
#endif
current->status.store( SUCCEEDED, std::memory_order_release);
}
break;
case res_port:
// use current_key from FE for item
Expand All @@ -732,22 +773,26 @@
template< typename R, typename B > friend class run_and_put_task;
template<typename X, typename Y> friend class broadcast_cache;
template<typename X, typename Y> friend class round_robin_cache;
graph_task* try_put_task(const input_type& v) override {
key_matching_port_operation op_data(v, try__put);
private:
graph_task* try_put_task_impl(const input_type& v __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo)) {
key_matching_port_operation op_data(v, try__put __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
graph_task* rtask = nullptr;
my_aggregator.execute(&op_data);
if(op_data.status == SUCCEEDED) {
rtask = my_join->increment_key_count((*(this->get_key_func()))(v)); // may spawn
rtask = my_join->increment_key_count((*(this->get_key_func()))(v)); // may spawn
// rtask has to reflect the return status of the try_put
if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
}
return rtask;
}
protected:
graph_task* try_put_task(const input_type& v) override {
return try_put_task_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
// TODO: add support for key_matching join_node
graph_task* try_put_task(const input_type& v, const message_metainfo&) override {
return try_put_task(v);
graph_task* try_put_task(const input_type& v, const message_metainfo& metainfo) override {
return try_put_task_impl(v, metainfo);
}
#endif

Expand Down Expand Up @@ -786,6 +831,15 @@
return op_data.status == SUCCEEDED;
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool get_item( input_type& v, message_metainfo& metainfo ) {
// aggregator uses current_key from FE for Key
key_matching_port_operation op_data(&v, get__item, metainfo);
my_aggregator.execute(&op_data);
return op_data.status == SUCCEEDED;
}
#endif

// reset_port is called when item is accepted by successor, but
// is initiated by join_node.
void reset_port() {
Expand Down Expand Up @@ -1018,10 +1072,17 @@
unref_key_type my_val;
output_type* my_output;
graph_task* bypass_t;
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo* metainfo = nullptr;
#endif
// constructor for value parameter
key_matching_FE_operation(const unref_key_type& e , op_type t) : type(char(t)), my_val(e),
my_output(nullptr), bypass_t(nullptr) {}
key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(nullptr) {}
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
key_matching_FE_operation(output_type *p, op_type t, message_metainfo& info)
: type(char(t)), my_output(p), bypass_t(nullptr), metainfo(&info) {}
#endif
// constructor with no parameter
key_matching_FE_operation(op_type t) : type(char(t)), my_output(nullptr), bypass_t(nullptr) {}
};
Expand All @@ -1039,8 +1100,11 @@
bool do_fwd = this->buffer_empty() && is_graph_active(this->graph_ref);
this->current_key = t;
this->delete_with_key(this->current_key); // remove the key
if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
this->push_back(l_out);
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
message_metainfo metainfo;
#endif
if(join_helper<N>::get_items(my_inputs, l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo))) { // <== call back
this->push_back(l_out __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
d1::small_object_allocator allocator{};
typedef forward_task_bypass<base_node_type> task_type;
Expand Down Expand Up @@ -1094,6 +1158,9 @@
}
else {
*(current->my_output) = this->front();
#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
*(current->metainfo) = this->front_metainfo();
#endif
current->status.store( SUCCEEDED, std::memory_order_release);
}
break;
Expand Down Expand Up @@ -1168,8 +1235,10 @@
}

#if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
bool try_to_make_tuple(output_type &out, message_metainfo&) {
return try_to_make_tuple(out);
bool try_to_make_tuple(output_type &out, message_metainfo& metainfo) {
key_matching_FE_operation op_data(&out, try_make, metainfo);
my_aggregator.execute(&op_data);
return op_data.status == SUCCEEDED;
}
#endif

Expand Down
Loading

0 comments on commit 05f7c5e

Please sign in to comment.