Skip to content

Commit

Permalink
dist-tests: mpi function migration tests running
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 29, 2023
1 parent 15cc23e commit ccdbe48
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 33 deletions.
10 changes: 7 additions & 3 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
assert(req->appid() == decision->appId);
assert(req->groupid() == decision->groupId);

// Lastly, asynchronously dispatch the decision to the corresponding hosts
// (we may not need the lock here anymore, but we are eager to make the
// Lastly, asynchronously dispatch the execute requests to the
// corresponding hosts if new functions need to be spawned (not if
// migrating)
// We may not need the lock here anymore, but we are eager to make the
// whole function atomic)
dispatchSchedulingDecision(req, decision);
if (decisionType != faabric::batch_scheduler::DecisionType::DIST_CHANGE) {
dispatchSchedulingDecision(req, decision);
}

return decision;
}
Expand Down
3 changes: 1 addition & 2 deletions src/scheduler/FunctionCallServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ void FunctionCallServer::recvExecuteFunctions(std::span<const uint8_t> buffer)
PARSE_MSG(faabric::BatchExecuteRequest, buffer.data(), buffer.size())

// This host has now been told to execute these functions no matter what
// TODO(planner-schedule): this if is only here because, temporarily, the
// planner doesn't take any scheduling decisions
parsedMsg.mutable_messages()->at(0).set_timestamp(
faabric::util::getGlobalClock().epochMillis());
parsedMsg.mutable_messages()->at(0).set_mainhost(
faabric::util::getSystemConfig().endpointHost);

// TODO: consider moving the logic from the scheduler here?
scheduler.executeBatch(
std::make_shared<faabric::BatchExecuteRequest>(parsedMsg));
Expand Down
49 changes: 27 additions & 22 deletions tests/dist/dist_test_fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,43 +62,45 @@ class MpiDistTestsFixture : public DistTestsFixture
int worldSize = 4;
bool origIsMsgOrderingOn;

// The server has four slots, therefore by setting the number of local slots
// and the world size we are able to infer the expected scheduling decision
void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0)
{
faabric::HostResources remoteRes;
remoteRes.set_slots(newRemoteSlots);
remoteRes.set_usedslots(newRemoteUsedSlots);
sch.addHostToGlobalSet(workerIP, std::make_shared<HostResources>(remoteRes));
}

void setLocalSlots(int numLocalSlots, int worldSizeIn = 0)
{
if (worldSizeIn > 0) {
worldSize = worldSizeIn;
}
int numRemoteSlots = worldSize - numLocalSlots;

faabric::HostResources localRes;
faabric::HostResources remoteRes;

if (numLocalSlots == numRemoteSlots) {
localRes.set_slots(2 * numLocalSlots);
localRes.set_usedslots(numLocalSlots);
remoteRes.set_slots(numRemoteSlots);
// localRes.set_slots(2 * numLocalSlots);
// localRes.set_usedslots(numLocalSlots);
// remoteRes.set_slots(numRemoteSlots);
updateLocalSlots(2 * numLocalSlots, numLocalSlots);
updateRemoteSlots(numRemoteSlots);
} else if (numLocalSlots > numRemoteSlots) {
localRes.set_slots(numLocalSlots);
remoteRes.set_slots(numRemoteSlots);
updateLocalSlots(numLocalSlots);
updateRemoteSlots(numRemoteSlots);
} else {
SPDLOG_ERROR(
"Unfeasible MPI world slots config (local: {} - remote: {})",
numLocalSlots,
numRemoteSlots);
throw std::runtime_error("Unfeasible slots configuration");
}

sch.setThisHostResources(localRes);
sch.addHostToGlobalSet(workerIP, std::make_shared<HostResources>(remoteRes));
}

void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

std::shared_ptr<faabric::BatchExecuteRequest> setRequest(
Expand Down Expand Up @@ -179,7 +181,10 @@ class MpiDistTestsFixture : public DistTestsFixture
{
faabric::Message& msg = req->mutable_messages()->at(0);
faabric::Message result = plannerCli.getMessageResult(msg, timeoutMs);
REQUIRE(result.returnvalue() == 0);

if (result.returnvalue() != MIGRATED_FUNCTION_RETURN_VALUE) {
REQUIRE(result.returnvalue() == 0);
}
SLEEP_MS(1000);
auto execGraph = faabric::util::getFunctionExecGraph(msg);
checkSchedulingFromExecGraph(
Expand Down
14 changes: 11 additions & 3 deletions tests/dist/mpi/mpi_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,18 @@ int MPI_Init(int* argc, char*** argv)
faabric::Message* call = getExecutingCall();

if (call->mpirank() <= 0) {
SPDLOG_TRACE("MPI - MPI_Init (create)");
// If we are rank 0 and the world already exists, it means we are being
// migrated
if (getMpiWorldRegistry().worldExists(call->mpiworldid())) {
SPDLOG_TRACE("MPI - MPI_Init (join)");

int worldId = executingContext.createWorld(*call);
call->set_mpiworldid(worldId);
executingContext.joinWorld(*call);
} else {
SPDLOG_TRACE("MPI - MPI_Init (create)");

int worldId = executingContext.createWorld(*call);
call->set_mpiworldid(worldId);
}
} else {
SPDLOG_TRACE("MPI - MPI_Init (join)");
executingContext.joinWorld(*call);
Expand Down
28 changes: 25 additions & 3 deletions tests/dist/mpi/test_mpi_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,37 @@ TEST_CASE_METHOD(MpiDistTestsFixture, "Test MPI function migration", "[mpi]")
// Sleep for a while to let the planner schedule the MPI calls
SLEEP_MS(500);

// Update the local slots so that a migration opportunity appears
updateLocalSlots(worldSize);
// Update the slots so that a migration opportunity appears. We update
// either the local or remote worlds to force the migration of one
// half of the ranks or the other one
bool migratingMainRank;

SECTION("Migrate main rank")
{
// Make more space remotely, so we migrate the first half of ranks
// (including the main rank)
migratingMainRank = true;
updateRemoteSlots(worldSize);
}

SECTION("Don't migrate main rank")
{
// Make more space locally, so we migrate the second half of ranks
migratingMainRank = false;
updateLocalSlots(worldSize);
}

// The current function migration approach breaks the execution graph, as
// some messages are left dangling (deliberately) without return value
std::vector<std::string> hostsBeforeMigration = {
getMasterIP(), getMasterIP(), getWorkerIP(), getWorkerIP()
};
std::vector<std::string> hostsAfterMigration(worldSize, getMasterIP());
std::vector<std::string> hostsAfterMigration;
if (migratingMainRank) {
hostsAfterMigration = { getWorkerIP(), getWorkerIP(), getWorkerIP(), getWorkerIP() };
} else {
hostsAfterMigration = { getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP() };
}
checkAllocationAndResultMigration(
req, hostsBeforeMigration, hostsAfterMigration, 15000);
}
Expand Down

0 comments on commit ccdbe48

Please sign in to comment.