diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 2016872e2..879972aa4 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -1,7 +1,7 @@ include(AddTTGExecutable) # TT unit test: core TTG ops -add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp" LINK_LIBRARIES "Catch2::Catch2") +add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp;streams.cc" LINK_LIBRARIES "Catch2::Catch2") # serialization test: probes serialization via all supported serialization methods (MADNESS, Boost::serialization, cereal) that are available add_executable(serialization "serialization.cc;unit_main.cpp") diff --git a/tests/unit/fibonacci.cc b/tests/unit/fibonacci.cc index 6434aaf55..3d335c3d9 100644 --- a/tests/unit/fibonacci.cc +++ b/tests/unit/fibonacci.cc @@ -30,6 +30,7 @@ TEST_CASE("Fibonacci", "[fib][core]") { if (ttg::default_execution_context().size() == 1) { ttg::Edge F2F; ttg::Edge F2P; + auto world = ttg::default_execution_context(); auto fib_op = ttg::make_tt( // computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0 @@ -49,8 +50,9 @@ TEST_CASE("Fibonacci", "[fib][core]") { const auto F_n_plus_2 = F_n_plus_1 + F_n; ttg::sendv<1>(F_n_plus_1, outs); ttg::send<0>(F_n_plus_2, F_n_plus_1, outs); - } else + } else { ttg::finalize<1>(outs); + } }, ttg::edges(F2F), ttg::edges(F2F, F2P)); auto print_op = ttg::make_tt( @@ -61,8 +63,9 @@ TEST_CASE("Fibonacci", "[fib][core]") { ttg::edges(F2P), ttg::edges()); print_op->set_input_reducer<0>([](int &a, const int &b) { a = a + b; }); make_graph_executable(fib_op); - if (ttg::default_execution_context().rank() == 0) fib_op->invoke(1, 0); - ttg::ttg_fence(ttg::default_execution_context()); + if (world.rank() == 0) fib_op->invoke(1, 0); + ttg::execute(world); + ttg::ttg_fence(world); } } @@ -70,7 +73,8 @@ TEST_CASE("Fibonacci", "[fib][core]") { SECTION("distributed-memory") { ttg::Edge> F2F; ttg::Edge F2P; - const auto nranks = ttg::default_execution_context().size(); + auto world = ttg::default_execution_context(); + const auto nranks = world.size(); auto fib_op = ttg::make_tt( // computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0 @@ -103,8 +107,9 @@ TEST_CASE("Fibonacci", "[fib][core]") { a = a + b; }); make_graph_executable(fib_op); - ttg::ttg_fence(ttg::default_execution_context()); - if (ttg::default_execution_context().rank() == 0) fib_op->invoke(0, std::make_pair(1, 0)); - ttg::ttg_fence(ttg::default_execution_context()); + ttg::ttg_fence(world); + if (world.rank() == 0) fib_op->invoke(0, std::make_pair(1, 0)); + ttg::execute(world); + ttg::ttg_fence(world); } } // TEST_CAST("Fibonacci") diff --git a/tests/unit/streams.cc b/tests/unit/streams.cc new file mode 100644 index 000000000..355276ced --- /dev/null +++ b/tests/unit/streams.cc @@ -0,0 +1,76 @@ +#include +#include + +#include "ttg.h" + +#include "ttg/serialization/std/pair.h" +#include "ttg/util/hash/std/pair.h" + + + +TEST_CASE("streams", "[streams][core]") { + // in distributed memory we must count how many messages the reducer will receive + SECTION("concurrent-stream-size") { + ttg::Edge I2O; + ttg::Edge O2S; + auto world = ttg::default_execution_context(); + const auto nranks = world.size(); + + constexpr std::size_t SLICE = 20; + std::size_t N = SLICE * 2 * world.size(); + constexpr const timespec ts = { .tv_sec = 0, .tv_nsec = 10000 }; + constexpr int VALUE = 1; + std::atomic reduce_ops = 0; + + auto op = ttg::make_tt( + [&](const int &n, int&& i, + std::tuple> &outs) { + int key = n/SLICE; + nanosleep(&ts, nullptr); + if (n < N-1) { + ttg::send<0>(key, std::forward(i), outs); + //ttg::print("sent to sink ", key); + } else { + // set the size of the last reducer + if (N%SLICE > 0) { + ttg::set_size<0>(key, N%SLICE, outs); + } + // forward the value + ttg::send<0>(key, std::forward(i), outs); + //ttg::print("finalized last sink ", key); + } + }, + ttg::edges(I2O), ttg::edges(O2S)); + + auto sink_op = ttg::make_tt( + [&](const int key, const int &value) { + std::cout << "sink " << key << std::endl; + if (!(value == SLICE || key == (N/SLICE))) { + std::cout << "SINK ERROR: key " << key << " value " << value << " SLICE " << SLICE << " N " << N << std::endl; + } + CHECK((value == SLICE || key == (N/SLICE))); + reduce_ops++; + }, + ttg::edges(O2S), ttg::edges()); + + op->set_keymap([=](const auto &key) { return nranks - 1; }); + op->set_trace_instance(true); + sink_op->set_input_reducer<0>([&](int &a, const int &b) { + a += 1; // we count invocations + CHECK(b == VALUE); + reduce_ops++; + }, SLICE); + + make_graph_executable(op); + ttg::ttg_fence(world); + if (world.rank() == 0) { + for (std::size_t i = 0; i < N; ++i) { + op->invoke(i, VALUE); + } + } + + ttg::execute(world); + ttg::ttg_fence(world); + CHECK(reduce_ops == (N/world.size())); + } +} // TEST_CASE("streams") \ No newline at end of file diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index b88874b0e..57a8ebf2c 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -526,10 +526,36 @@ namespace ttg_parsec { typedef void (release_task_fn)(parsec_ttg_task_base_t*); - typedef struct { + struct stream_info_t { std::size_t goal; std::size_t size; - } size_goal_t; + parsec_lifo_t reduce_copies; + std::atomic reduce_count; + }; + + protected: + template + void init_stream_info_impl(TT *tt, std::array& streams) { + if constexpr (TT::numins > i) { + if (std::get(tt->input_reducers)) { + streams[i].goal = tt->static_stream_goal[i]; + streams[i].size = 0; + PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t); + streams[i].reduce_count.store(0, std::memory_order_relaxed); + } + /* recursion */ + if constexpr((i + 1) < TT::numins) { + init_stream_info_impl(tt, streams); + } + } + } + + template + void init_stream_info(TT *tt, std::array& streams) { + init_stream_info_impl<0>(tt, streams); + } + + public: /* Poor-mans virtual function * We cannot use virtual inheritance or private visibility because we @@ -568,7 +594,6 @@ namespace ttg_parsec { : data_count(data_count) , defer_writer(defer_writer) , release_task_cb(release_fn) { - int32_t p = priority; PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); parsec_task.mempool_owner = mempool; parsec_task.task_class = task_class; @@ -589,7 +614,7 @@ namespace ttg_parsec { static constexpr size_t num_streams = TT::numins; TT* tt; key_type key; - size_goal_t stream[num_streams] = {}; + std::array streams; parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) : parsec_ttg_task_base_t(mempool, task_class, num_streams) { @@ -616,6 +641,8 @@ namespace ttg_parsec { parsec_task.data[i].data_in = nullptr; } + init_stream_info(tt, streams); + // We store the hash of the key and the address where it can be found in locals considered as a scratchpad uint64_t hv = ttg::hash>{}(key); *(uintptr_t*)&(parsec_task.locals[0]) = hv; @@ -635,7 +662,7 @@ namespace ttg_parsec { struct parsec_ttg_task_t : public parsec_ttg_task_base_t { static constexpr size_t num_streams = TT::numins; TT* tt; - size_goal_t stream[num_streams] = {}; + std::array streams; parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class) : parsec_ttg_task_base_t(mempool, task_class, num_streams) { @@ -656,6 +683,8 @@ namespace ttg_parsec { for (int i = 0; i < num_streams; ++i) { parsec_task.data[i].data_in = nullptr; } + + init_stream_info(tt, streams); } static void release_task(parsec_ttg_task_base_t* task_base) { @@ -667,6 +696,36 @@ namespace ttg_parsec { parsec_key_t pkey() { return 0; } }; + /** + * Reducer task representing one or more stream reductions. + * A reducer task may be deferred on its first input (the object into which + * all other inputs are folded). Once that input becomes available the task + * is submitted and reduces all available inputs. Additional reducer tasks may + * be submitted until all required inputs have been processed. + */ + struct reducer_task_t : public parsec_ttg_task_base_t { + parsec_ttg_task_base_t *parent_task; + bool is_first; + + reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool, + parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, + int32_t priority, bool is_first) + : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, + 0, &release_task, + true /* deferred until other readers have completed */) + , parent_task(task) + , is_first(is_first) + { } + + static void release_task(parsec_ttg_task_base_t* task_base) { + /* reducer tasks have one mutable input so the task can be submitted on the first release */ + parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task }; + parsec_execution_stream_t *es = ttg::default_execution_context().impl().execution_stream(); + __parsec_schedule_vp(es, vp_task_rings, 0); + } + }; + + inline ttg_data_copy_t *find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) { ttg_data_copy_t *res = nullptr; if (task == nullptr || ptr == nullptr) { @@ -774,10 +833,6 @@ namespace ttg_parsec { return PARSEC_HOOK_RETURN_DONE; } - static parsec_key_fn_t parsec_tasks_hash_fcts = {.key_equal = parsec_hash_table_generic_64bits_key_equal, - .key_print = parsec_hash_table_generic_64bits_key_print, - .key_hash = parsec_hash_table_generic_64bits_key_hash}; - template class rma_delayed_activate { std::vector _keylist; @@ -1149,6 +1204,7 @@ namespace ttg_parsec { private: using task_t = detail::parsec_ttg_task_t; + friend detail::parsec_ttg_task_base_t; friend task_t; /* the offset of the key placed after the task structure in the memory from mempool */ @@ -1199,7 +1255,8 @@ namespace ttg_parsec { // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime ttg::meta::detail::input_reducers_t input_reducers; //!< Reducers for the input terminals (empty = expect single value) - std::array static_stream_goal; + std::array inpute_reducers_taskclass = { nullptr }; + std::array static_stream_goal = { std::numeric_limits::max() }; int num_pullins = 0; bool m_defer_writer = TTG_PARSEC_DEFER_WRITER; @@ -1318,6 +1375,98 @@ namespace ttg_parsec { parsec_ttg_caller = NULL; } + template + static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) { + using rtask_t = detail::reducer_task_t; + using value_t = std::tuple_element_t; + constexpr const bool val_is_void = ttg::meta::is_void_v; + rtask_t *rtask = (rtask_t*)parsec_task; + task_t *parent_task = static_cast(rtask->parent_task); + ttT *baseobj = parent_task->tt; + derivedT *obj = static_cast(baseobj); + + auto& reducer = std::get(baseobj->input_reducers); + + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing"); + } + + /* the copy to reduce into */ + detail::ttg_data_copy_t *target_copy; + target_copy = static_cast(parent_task->parsec_task.data[i].data_in); + assert(val_is_void || nullptr != target_copy); + /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */ + std::size_t c = 0; + std::size_t size = 0; + assert(parent_task->streams[i].reduce_count > 0); + if (rtask->is_first) { + if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) { + /* we were the first and there is nothing to be done */ + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + } + + assert(parsec_ttg_caller == NULL); + parsec_ttg_caller = rtask->parent_task; + + do { + if constexpr(!val_is_void) { + /* the copies to reduce out of */ + detail::ttg_data_copy_t *source_copy; + parsec_list_item_t *item; + item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies); + if (nullptr == item) { + // maybe someone is changing the goal right now + break; + } + source_copy = ((detail::ttg_data_copy_self_t *)(item))->self; + reducer(*reinterpret_cast *>(target_copy->device_private), + *reinterpret_cast *>(source_copy->device_private)); + detail::release_data_copy(source_copy); + } else if constexpr(val_is_void) { + reducer(); // invoke control reducer + } + // there is only one task working on this stream, so no need to be atomic here + size = ++parent_task->streams[i].size; + //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl; + } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); + //} while ((c = (--task->streams[i].reduce_count)) > 0); + + /* finalize_argstream sets goal to 1, so size may be larger than goal */ + bool complete = (size >= parent_task->streams[i].goal); + + //std::cout << "static_reducer_op size " << size + // << " of " << parent_task->streams[i].goal << " complete " << complete + // << " c " << c << std::endl; + if (complete && c == 0) { + /* task is still in the hash table, have release_task remove it */ + parent_task->remove_from_hash = true; + parent_task->release_task(parent_task); + } + + parsec_ttg_caller = NULL; + + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + + protected: template uint64_t unpack(T &obj, void *_bytes, uint64_t pos) { @@ -1668,11 +1817,11 @@ namespace ttg_parsec { char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); int32_t priority = 0; if constexpr (!keyT_is_Void) { - //priority = priomap(key); + priority = priomap(key); /* placement-new the task */ newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority); } else { - //priority = priomap(); + priority = priomap(); /* placement-new the task */ newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority); } @@ -1683,14 +1832,37 @@ namespace ttg_parsec { newtask->function_template_class_ptr[static_cast(ttg::ExecutionSpace::CUDA)] = reinterpret_cast(&TT::static_op); - for (int i = 0; i < static_stream_goal.size(); ++i) { - newtask->stream[i].goal = static_stream_goal[i]; + ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task"); + return newtask; + } + + + template + detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) { + /* make sure we can reuse the existing memory pool and don't have to create a new one */ + static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t)); + constexpr const bool keyT_is_Void = ttg::meta::is_void_v; + auto &world_impl = world.impl(); + detail::reducer_task_t *newtask; + parsec_thread_mempool_t *mempool = get_task_mempool(); + char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); + // use the priority of the task we stream into + int32_t priority = 0; + if constexpr (!keyT_is_Void) { + priority = priomap(task->key); + ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task"); + } else { + priority = priomap(); + ttg::trace(world.rank(), ":", get_name(), ": creating reducer task"); } + /* placement-new the task */ + newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i], + world_impl.taskpool(), priority, is_first); - ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task"); return newtask; } + // Used to set the i'th argument template void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr, @@ -1716,7 +1888,7 @@ namespace ttg_parsec { task_t *task; auto &world_impl = world.impl(); auto &reducer = std::get(input_reducers); - bool release = true; + bool release = false; bool remove_from_hash = true; bool discover_task = true; bool get_pull_data = false; @@ -1739,7 +1911,10 @@ namespace ttg_parsec { parsec_hash_table_nolock_remove(&tasks_table, hk); remove_from_hash = false; } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + /* if we have a reducer, we need to hold on to the lock for just a little longer */ + if (!reducer) { + parsec_hash_table_unlock_bucket(&tasks_table, hk); + } } else { task = create_new_task(key); world_impl.increment_created(); @@ -1773,33 +1948,80 @@ namespace ttg_parsec { #endif } - if (reducer) { // is this a streaming input? reduce the received value - // N.B. Right now reductions are done eagerly, without spawning tasks - // this means we must lock - parsec_hash_table_lock_bucket(&tasks_table, hk); + auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){ + detail::ttg_data_copy_t *copy = copy_in; + if (nullptr != parsec_ttg_caller) { + copy = detail::find_copy_in_task(parsec_ttg_caller, &value); + } + if (nullptr != copy) { + /* retain the data copy */ + copy = detail::register_data_copy(copy, task, true); + } else { + /* create a new copy */ + copy = detail::create_new_datacopy(std::forward(value)); + } + return copy; + }; + + if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value + auto submit_reducer_task = [&](auto *parent_task){ + /* check if we need to create a task */ + std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); + if (0 == c) { + /* we are responsible for creating the reduction task */ + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(parent_task, false); + reduce_task->release_task(reduce_task); // release immediately + } + }; if constexpr (!ttg::meta::is_void_v) { // for data values // have a value already? if not, set, otherwise reduce - detail::ttg_data_copy_t *copy = nullptr; - if (nullptr == (copy = static_cast(task->parsec_task.data[i].data_in))) { + if (nullptr == static_cast(task->parsec_task.data[i].data_in)) { using decay_valueT = std::decay_t; - /* For now, we always create a copy because we cannot rely on the task_release - * mechanism (it would release the task, not the reduction value). */ - copy = detail::create_new_datacopy(std::forward(value)); + + /* first input value, create a task and bind it to the copy */ + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(task, true); + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward(value), false); + + /* put the copy into the task */ task->parsec_task.data[i].data_in = copy; + + /* protected by the bucket lock */ + task->streams[i].size = 1; + task->streams[i].reduce_count.store(1, std::memory_order_relaxed); + + if (copy->push_task != &reduce_task->parsec_task) { + reduce_task->release_task(reduce_task); + } + + /* now we can unlock the bucket */ + parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { - reducer(*reinterpret_cast *>(copy->device_private), value); + /* unlock the bucket, the lock is not needed anymore */ + parsec_hash_table_unlock_bucket(&tasks_table, hk); + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), true); + + /* enqueue the data copy to be reduced */ + parsec_lifo_push(&task->streams[i].reduce_copies, ©->super); + submit_reducer_task(task); } } else { - reducer(); // even if this was a control input, must execute the reducer for possible side effects - } - task->stream[i].size++; - release = (task->stream[i].size == task->stream[i].goal); - if (release) { - parsec_hash_table_nolock_remove(&tasks_table, hk); - remove_from_hash = false; + /* unlock the bucket, the lock is not needed anymore */ + parsec_hash_table_unlock_bucket(&tasks_table, hk); + /* submit reducer for void values to handle side effects */ + submit_reducer_task(task); } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + //if (release) { + // parsec_hash_table_nolock_remove(&tasks_table, hk); + // remove_from_hash = false; + //} + //parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { /* whether the task needs to be deferred or not */ if constexpr (!valueT_is_Void) { @@ -1808,22 +2030,16 @@ namespace ttg_parsec { throw std::logic_error("bad set arg"); } - detail::ttg_data_copy_t *copy = copy_in; - if (nullptr == copy_in && nullptr != parsec_ttg_caller) { - copy = detail::find_copy_in_task(parsec_ttg_caller, &value); - } + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), input_is_const); - if (nullptr != copy) { - /* register_data_copy might provide us with a different copy if !input_is_const */ - copy = detail::register_data_copy(copy, task, input_is_const); - } else { - copy = detail::create_new_datacopy(std::forward(value)); - } /* if we registered as a writer and were the first to register with this copy * we need to defer the release of this task to give other tasks a chance to * make a copy of the original data */ release = (copy->push_task != &task->parsec_task); task->parsec_task.data[i].data_in = copy; + } else { + release = true; } } task->remove_from_hash = remove_from_hash; @@ -2341,13 +2557,13 @@ namespace ttg_parsec { /// \param size positive integer that specifies the default stream size template void set_static_argstream_size(std::size_t size) { - assert(std::get(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal"); + assert(std::get(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal"); assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0"); this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i); // Check if stream is already bounded - if (static_stream_goal[i] > 0) { + if (static_stream_goal[i] < std::numeric_limits::max()) { ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i); throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream"); } @@ -2398,16 +2614,22 @@ namespace ttg_parsec { #endif } } + parsec_hash_table_unlock_bucket(&tasks_table, hk); // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].goal = size; - bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - if (release) release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = size; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= size)) { + release_task(task); + } } } @@ -2452,16 +2674,22 @@ namespace ttg_parsec { #endif } } + parsec_hash_table_unlock_bucket(&tasks_table, hk); // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].goal = size; - bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - if (release) release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = size; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= size)) { + release_task(task); + } } } @@ -2493,8 +2721,8 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + //parsec_hash_table_lock_bucket(&tasks_table, hk); + if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) { ttg::print_error(world.rank(), ":", get_name(), ":", key, " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("TT::finalize called on stream that never received an input data"); @@ -2504,10 +2732,16 @@ namespace ttg_parsec { // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = 1; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= 1)) { + release_task(task); + } } } @@ -2537,8 +2771,7 @@ namespace ttg_parsec { auto hk = static_cast(0); task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) { ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("TT::finalize called on stream that never received an input data"); @@ -2548,10 +2781,16 @@ namespace ttg_parsec { // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes - task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); - - release_task(task); + // 1) "lock" the stream by incrementing the reduce_count + // 2) set the goal + // 3) "unlock" the stream + // only one thread will see the reduce_count be zero and the goal match the size + task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire); + task->streams[i].goal = 1; + auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release); + if (1 == c && (task->streams[i].size >= 1)) { + release_task(task); + } } } @@ -2765,7 +3004,7 @@ namespace ttg_parsec { parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash}; static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *t) { - auto *task = (detail::parsec_ttg_task_base_t *)t; + task_t *task = (task_t *)t; for (int i = 0; i < task->data_count; i++) { detail::ttg_data_copy_t *copy = static_cast(task->parsec_task.data[i].data_in); if (nullptr == copy) continue; @@ -2786,8 +3025,7 @@ namespace ttg_parsec { , keymap(std::is_same>::value ? decltype(keymap)(ttg::detail::default_keymap(world)) : decltype(keymap)(std::forward(keymap_))) - , priomap(decltype(keymap)(std::forward(priomap_))) - , static_stream_goal() { + , priomap(decltype(keymap)(std::forward(priomap_))) { // Cannot call these in base constructor since terminals not yet constructed if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals"); if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals"); @@ -2942,6 +3180,12 @@ namespace ttg_parsec { free((void*)self.name); self.name = nullptr; } + for (std::size_t i = 0; i < numins; ++i) { + if (inpute_reducers_taskclass[i] != nullptr) { + std::free(inpute_reducers_taskclass[i]); + inpute_reducers_taskclass[i] = nullptr; + } + } release(); } @@ -2993,6 +3237,74 @@ namespace ttg_parsec { void set_input_reducer(Reducer &&reducer) { ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i); std::get(input_reducers) = reducer; + + parsec_task_class_t *tc = inpute_reducers_taskclass[i]; + if (nullptr == tc) { + tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc)); + inpute_reducers_taskclass[i] = tc; + + tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str()); + tc->task_class_id = get_instance_id(); + tc->nb_parameters = 0; + tc->nb_locals = 0; + tc->nb_flows = numflows; + + auto &world_impl = world.impl(); + + if( world_impl.profiling() ) { + // first two ints are used to store the hash of the key. + tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int); + // seconds two ints are used to store a pointer to the key of the task. + tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int); + + // If we have parameters and locals, we need to define the corresponding dereference arrays + tc->params[0] = &detail::parsec_taskclass_param0; + tc->params[1] = &detail::parsec_taskclass_param1; + + tc->locals[0] = &detail::parsec_taskclass_param0; + tc->locals[1] = &detail::parsec_taskclass_param1; + tc->locals[2] = &detail::parsec_taskclass_param2; + tc->locals[3] = &detail::parsec_taskclass_param3; + } + tc->make_key = make_key; + tc->key_functions = &tasks_hash_fcts; + tc->task_snprintf = parsec_ttg_task_snprintf; + +#if defined(PARSEC_PROF_TRACE) + tc->profile_info = &parsec_ttg_task_info; +#endif + + world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_castnb_task_classes)>(self.task_class_id+1)); + +#if 0 + // FIXME: currently only support reduction on the host + if constexpr (derived_has_cuda_op()) { + self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t)); + ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA; + ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda; + ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU; + ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook; + ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE; + ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL; + ((__parsec_chore_t *)self.incarnations)[2].hook = NULL; + } else +#endif // 0 + { + tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t)); + ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU; + ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL; + ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op; + ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE; + ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL; + ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL; + } + + /* the reduction task does not alter the termination detection because the target task will execute */ + tc->release_task = &parsec_release_task_to_mempool; + tc->complete_execution = NULL; + } } /// define the reducer function to be called when additional inputs are diff --git a/ttg/ttg/parsec/ttg_data_copy.h b/ttg/ttg/parsec/ttg_data_copy.h index 461984e3d..17ac18c0f 100644 --- a/ttg/ttg/parsec/ttg_data_copy.h +++ b/ttg/ttg/parsec/ttg_data_copy.h @@ -15,7 +15,22 @@ namespace ttg_parsec { * to facilitate the ref-counting of the data copy. * TODO: create abstractions for all fields in parsec_data_copy_t that we access. */ - struct ttg_data_copy_t : public parsec_data_copy_t { + + /* fwd decl */ + struct ttg_data_copy_t; + + /* special type: stores a pointer to the ttg_data_copy_t. This is necessary + * because ttg_data_copy_t has virtual functions so we cannot cast from parsec_data_copy_t + * to ttg_data_copy_t (offsetof is not supported for virtual classes). + * The self pointer is a back-pointer to the ttg_data_copy_t. */ + struct ttg_data_copy_self_t : public parsec_data_copy_t { + ttg_data_copy_t *self; + ttg_data_copy_self_t(ttg_data_copy_t* dc) + : self(dc) + { } + }; + + struct ttg_data_copy_t : public ttg_data_copy_self_t { #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND) int64_t size; int64_t uid; @@ -73,6 +88,7 @@ namespace ttg_parsec { } ttg_data_copy_t() + : ttg_data_copy_self_t(this) { /* TODO: do we need this construction? */ PARSEC_OBJ_CONSTRUCT(this, parsec_data_copy_t);