Skip to content

Commit

Permalink
tests: util tests running
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 7, 2023
1 parent 6d034e8 commit 5f8583f
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 21 deletions.
3 changes: 3 additions & 0 deletions include/faabric/util/batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory(

bool isBatchExecRequestValid(std::shared_ptr<faabric::BatchExecuteRequest> ber);

void updateBatchExecAppId(std::shared_ptr<faabric::BatchExecuteRequest> ber,
int newAppId);

void updateBatchExecGroupId(std::shared_ptr<faabric::BatchExecuteRequest> ber,
int newGroupId);

Expand Down
12 changes: 10 additions & 2 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
// to spawn (size - 1) new functions starting with rank 1
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory(user, function, size - 1);
faabric::util::updateBatchExecAppId(req, call.appid());
for (int i = 0; i < req->messages_size(); i++) {
// Update MPI-related fields
faabric::Message& msg = req->mutable_messages()->at(i);
msg.set_appid(call.appid());
msg.set_ismpi(true);
msg.set_mpiworldid(call.mpiworldid());
msg.set_mpirank(i + 1);
Expand All @@ -154,6 +155,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
// Set group ids for remote messaging
// TODO: this will be set by the planner
// msg.set_groupid(call.groupid());
// TODO: do we need to set this one here?
msg.set_groupidx(msg.mpirank());
if (thisRankMsg != nullptr) {
// Set message fields to allow for function migration
Expand All @@ -165,6 +167,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
// To run migration experiments easily, we may want to propagate
// the UNDERFULL topology hint. In general however, we don't
// need to propagate this field
// TODO(hints): remove
if (thisRankMsg->topologyhint() == "UNDERFULL") {
msg.set_topologyhint(thisRankMsg->topologyhint());
}
Expand Down Expand Up @@ -299,7 +302,12 @@ void MpiWorld::initLocalRemoteLeaders()
int groupId = thisRankMsg->groupid();
auto rankIds = broker.getIdxsRegisteredForGroup(groupId);
if (rankIds.size() != size) {
SPDLOG_ERROR("rankIds != size ({} != {})", rankIds.size(), size);
SPDLOG_ERROR("{}:{}:{} rankIds != size ({} != {})",
thisRankMsg->appid(),
groupId,
thisRankMsg->groupidx(),
rankIds.size(),
size);
throw std::runtime_error("MPI Group-World size mismatch!");
}
assert(rankIds.size() == size);
Expand Down
25 changes: 16 additions & 9 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,6 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// hostMap type, to make sure we don't modify our state here
#endif

// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
int newGroupId = faabric::util::generateGid();
decision->groupId = newGroupId;
faabric::util::updateBatchExecGroupId(req, newGroupId);

// Handle failures to schedule work
if (*decision == NOT_ENOUGH_SLOTS_DECISION) {
SPDLOG_ERROR(
Expand All @@ -350,9 +344,11 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
return decision;
}

#ifndef NDEBUG
decision->print();
#endif
// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
int newGroupId = faabric::util::generateGid();
decision->groupId = newGroupId;
faabric::util::updateBatchExecGroupId(req, newGroupId);

// Given a scheduling decision, depending on the decision type, we want to:
// 1. Update the host-map to reflect the new host occupation
Expand All @@ -363,6 +359,11 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
auto& broker = faabric::transport::getPointToPointBroker();
switch (decisionType) {
case faabric::batch_scheduler::DecisionType::NEW: {
// 0. Log the decision in debug mode
#ifndef NDEBUG
decision->print();
#endif

// 1. For a scale change request, we only need to update the hosts
// with the new messages being scheduled
for (int i = 0; i < decision->hosts.size(); i++) {
Expand Down Expand Up @@ -399,12 +400,18 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
auto oldReq = state.inFlightReqs.at(appId).first;
auto oldDec = state.inFlightReqs.at(appId).second;
faabric::util::updateBatchExecGroupId(oldReq, newGroupId);
oldDec->groupId = newGroupId;

for (int i = 0; i < req->messages_size(); i++) {
*oldReq->add_messages() = req->messages(i);
oldDec->addMessage(decision->hosts.at(i), req->messages(i));
}

// 2.5. Log the updated decision in debug mode
#ifndef NDEBUG
oldDec->print();
#endif

// 3. We want to send the mappings for the _updated_ decision,
// including _all_ the messages (not just the ones that are being
// added)
Expand Down
1 change: 1 addition & 0 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ void Scheduler::setFunctionResult(faabric::Message& msg)

// Remove the app from in-flight map if still there, and this host is the
// main host for the message
// TODO: remove me
if (msg.mainhost() == thisHost) {
removePendingMigration(msg.appid());
}
Expand Down
16 changes: 14 additions & 2 deletions src/util/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,24 @@ bool isBatchExecRequestValid(std::shared_ptr<faabric::BatchExecuteRequest> ber)
return true;
}

void updateBatchExecAppId(std::shared_ptr<faabric::BatchExecuteRequest> ber,
int newAppId)
{
ber->set_appid(newAppId);
for (int i = 0; i < ber->messages_size(); i++) {
ber->mutable_messages(i)->set_appid(newAppId);
}

// Sanity-check in debug mode
assert(isBatchExecRequestValid(ber));
}

void updateBatchExecGroupId(std::shared_ptr<faabric::BatchExecuteRequest> ber,
int newGroupId)
{
ber->set_groupid(newGroupId);
for (auto msg : *ber->mutable_messages()) {
msg.set_groupid(newGroupId);
for (int i = 0; i < ber->messages_size(); i++) {
ber->mutable_messages(i)->set_groupid(newGroupId);
}

// Sanity-check in debug mode
Expand Down
1 change: 0 additions & 1 deletion tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,6 @@ TEST_CASE_METHOD(SlowExecutorTestFixture,
// Create waiters that will submit messages and await their results
for (int i = 0; i < nWaiters; i++) {
waiterThreads.emplace_back([nWaiterMessages] {
Scheduler& sch = scheduler::getScheduler();
auto& plannerCli = faabric::planner::getPlannerClient();

std::shared_ptr<faabric::BatchExecuteRequest> req =
Expand Down
16 changes: 11 additions & 5 deletions tests/test/util/test_exec_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ TEST_CASE_METHOD(ExecGraphTestFixture,
TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]")
{
faabric::mpi::MpiWorld world;
msg.set_appid(1337);
msg.set_ismpi(true);
msg.set_recordexecgraph(true);

Expand All @@ -138,10 +137,10 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]")
messages.at(rank).set_recordexecgraph(true);
}

world.create(msg, worldId, worldSize);
// First call the original message
plannerCli.callFunctions(req);

// Update the result for the main message
sch.setFunctionResult(msg);
world.create(msg, worldId, worldSize);

// Build expected graph
ExecGraphNode nodeB1 = { .msg = messages.at(1) };
Expand All @@ -154,8 +153,15 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]")

ExecGraph expected{ .rootNode = nodeA };

// The MPI base fixture uses the DummyExecutor, which immediately sets
// the function result. We want to overwrite said function result with the
// chained calls (logged as part of MpiWorld::create) thus we sleep enough
// to let the dummy executor set the result, to make sure we can overwrite
// it here
SLEEP_MS(500);
sch.setFunctionResult(msg);

// Wait for the MPI messages to finish
auto& plannerCli = faabric::planner::getPlannerClient();
plannerCli.getMessageResult(msg, 2000);
for (const auto& id : faabric::util::getChainedFunctions(msg)) {
plannerCli.getMessageResult(msg.appid(), id, 2000);
Expand Down
7 changes: 5 additions & 2 deletions tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ class MpiBaseTestFixture
, func("hellompi")
, worldId(123)
, worldSize(5)
, msg(faabric::util::messageFactory(user, func))
, req(faabric::util::batchExecFactory(user, func, 1))
, msg(*req->mutable_messages(0))
{
std::shared_ptr<faabric::scheduler::ExecutorFactory> fac =
std::make_shared<faabric::scheduler::DummyExecutorFactory>();
Expand All @@ -453,7 +454,9 @@ class MpiBaseTestFixture
int worldId;
int worldSize;

faabric::Message msg;
std::shared_ptr<BatchExecuteRequest> req;
// TODO: refactor to firstMsg
faabric::Message& msg;
};

class MpiTestFixture : public MpiBaseTestFixture
Expand Down

0 comments on commit 5f8583f

Please sign in to comment.