Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PaRSEC TTG: Offload stream reductions #256

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
include(AddTTGExecutable)

# TT unit test: core TTG ops
add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp" LINK_LIBRARIES "Catch2::Catch2")
add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp;streams.cc" LINK_LIBRARIES "Catch2::Catch2")

# serialization test: probes serialization via all supported serialization methods (MADNESS, Boost::serialization, cereal) that are available
add_executable(serialization "serialization.cc;unit_main.cpp")
Expand Down
19 changes: 12 additions & 7 deletions tests/unit/fibonacci.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ TEST_CASE("Fibonacci", "[fib][core]") {
if (ttg::default_execution_context().size() == 1) {
ttg::Edge<int, int> F2F;
ttg::Edge<void, int> F2P;
auto world = ttg::default_execution_context();

auto fib_op = ttg::make_tt(
// computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0
Expand All @@ -49,8 +50,9 @@ TEST_CASE("Fibonacci", "[fib][core]") {
const auto F_n_plus_2 = F_n_plus_1 + F_n;
ttg::sendv<1>(F_n_plus_1, outs);
ttg::send<0>(F_n_plus_2, F_n_plus_1, outs);
} else
} else {
ttg::finalize<1>(outs);
}
},
ttg::edges(F2F), ttg::edges(F2F, F2P));
auto print_op = ttg::make_tt(
Expand All @@ -61,16 +63,18 @@ TEST_CASE("Fibonacci", "[fib][core]") {
ttg::edges(F2P), ttg::edges());
print_op->set_input_reducer<0>([](int &a, const int &b) { a = a + b; });
make_graph_executable(fib_op);
if (ttg::default_execution_context().rank() == 0) fib_op->invoke(1, 0);
ttg::ttg_fence(ttg::default_execution_context());
if (world.rank() == 0) fib_op->invoke(1, 0);
ttg::execute(world);
ttg::ttg_fence(world);
}
}

// in distributed memory we must count how many messages the reducer will receive
SECTION("distributed-memory") {
ttg::Edge<int, std::pair<int, int>> F2F;
ttg::Edge<void, int> F2P;
const auto nranks = ttg::default_execution_context().size();
auto world = ttg::default_execution_context();
const auto nranks = world.size();

auto fib_op = ttg::make_tt(
// computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0
Expand Down Expand Up @@ -103,8 +107,9 @@ TEST_CASE("Fibonacci", "[fib][core]") {
a = a + b;
});
make_graph_executable(fib_op);
ttg::ttg_fence(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) fib_op->invoke(0, std::make_pair(1, 0));
ttg::ttg_fence(ttg::default_execution_context());
ttg::ttg_fence(world);
if (world.rank() == 0) fib_op->invoke(0, std::make_pair(1, 0));
ttg::execute(world);
ttg::ttg_fence(world);
}
} // TEST_CAST("Fibonacci")
76 changes: 76 additions & 0 deletions tests/unit/streams.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <catch2/catch.hpp>
#include <ctime>

#include "ttg.h"

#include "ttg/serialization/std/pair.h"
#include "ttg/util/hash/std/pair.h"



TEST_CASE("streams", "[streams][core]") {
// in distributed memory we must count how many messages the reducer will receive
SECTION("concurrent-stream-size") {
ttg::Edge<int, int> I2O;
ttg::Edge<int, int> O2S;
auto world = ttg::default_execution_context();
const auto nranks = world.size();

constexpr std::size_t SLICE = 20;
std::size_t N = SLICE * 2 * world.size();
constexpr const timespec ts = { .tv_sec = 0, .tv_nsec = 10000 };
constexpr int VALUE = 1;
std::atomic<std::size_t> reduce_ops = 0;

auto op = ttg::make_tt(
[&](const int &n, int&& i,
std::tuple<ttg::Out<int, int>> &outs) {
int key = n/SLICE;
nanosleep(&ts, nullptr);
if (n < N-1) {
ttg::send<0>(key, std::forward<int>(i), outs);
//ttg::print("sent to sink ", key);
} else {
// set the size of the last reducer
if (N%SLICE > 0) {
ttg::set_size<0>(key, N%SLICE, outs);
}
// forward the value
ttg::send<0>(key, std::forward<int>(i), outs);
//ttg::print("finalized last sink ", key);
}
},
ttg::edges(I2O), ttg::edges(O2S));

auto sink_op = ttg::make_tt(
[&](const int key, const int &value) {
std::cout << "sink " << key << std::endl;
if (!(value == SLICE || key == (N/SLICE))) {
std::cout << "SINK ERROR: key " << key << " value " << value << " SLICE " << SLICE << " N " << N << std::endl;
}
CHECK((value == SLICE || key == (N/SLICE)));
reduce_ops++;
},
ttg::edges(O2S), ttg::edges());

op->set_keymap([=](const auto &key) { return nranks - 1; });
op->set_trace_instance(true);
sink_op->set_input_reducer<0>([&](int &a, const int &b) {
a += 1; // we count invocations
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::ttg_fence(world);
if (world.rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}

ttg::execute(world);
ttg::ttg_fence(world);
CHECK(reduce_ops == (N/world.size()));
}
} // TEST_CASE("streams")
Loading