Skip to content

Commit

Permalink
migration(mpi): fix race conditions (#429)
Browse files Browse the repository at this point in the history
In this PR we fix a couple of bugs and race conditions that only appear
when doing many migrations over large node and core counts.
  • Loading branch information
csegarragonz authored Apr 20, 2024
1 parent 3cf510e commit 7304a61
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 49 deletions.
8 changes: 8 additions & 0 deletions include/faabric/batch-scheduler/SchedulingDecision.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ class SchedulingDecision
int32_t appIdx,
int32_t groupIdx);

// Add a message in a specific position
void addMessageInPosition(int32_t pos,
const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx,
int32_t mpiPort);

// Returns the MPI port that we have vacated
int32_t removeMessage(int32_t messageId);

Expand Down
4 changes: 3 additions & 1 deletion include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ class MpiWorld

/* Function Migration */

void prepareMigration(int thisRank, bool thisRankMustMigrate);
void prepareMigration(int newGroupId,
int thisRank,
bool thisRankMustMigrate);

private:
int id = -1;
Expand Down
64 changes: 30 additions & 34 deletions src/batch-scheduler/BinPackScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ static std::map<std::string, int> getHostFreqCount(
// This is, we want to keep as many host-message scheduling in the old decision
// as possible, and also have the overall locality of the new decision (i.e.
// the host-message histogram)
// NOTE: keep in mind that the newDecision has the right host histogram, but
// the messages may be completely out-of-order
static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(
std::shared_ptr<SchedulingDecision> newDecision,
std::shared_ptr<SchedulingDecision> oldDecision)
Expand All @@ -44,43 +46,37 @@ static std::shared_ptr<SchedulingDecision> minimiseNumOfMigrations(
};

assert(newDecision->hosts.size() == oldDecision->hosts.size());
for (int i = 0; i < newDecision->hosts.size(); i++) {
// If both decisions schedule this message to the same host great, as
// we can keep the old scheduling
if (newDecision->hosts.at(i) == oldDecision->hosts.at(i) &&
hostFreqCount.at(newDecision->hosts.at(i)) > 0) {
decision->addMessage(oldDecision->hosts.at(i),
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(oldDecision->hosts.at(i)) -= 1;
continue;
}

// If not, assign the old decision as long as we still can (i.e. as
// long as we still have slots in the histogram (note that it could be
// that the old host is not in the new histogram at all)
if (hostFreqCount.contains(oldDecision->hosts.at(i)) &&
hostFreqCount.at(oldDecision->hosts.at(i)) > 0) {
decision->addMessage(oldDecision->hosts.at(i),
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(oldDecision->hosts.at(i)) -= 1;
continue;
// First we try to allocate to each message the same host they used to have
for (int i = 0; i < oldDecision->hosts.size(); i++) {
auto oldHost = oldDecision->hosts.at(i);

if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) {
decision->addMessageInPosition(i,
oldHost,
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i),
oldDecision->mpiPorts.at(i));

hostFreqCount.at(oldHost) -= 1;
}
}

// If we can't assign the host from the old decision, then it means
// that that message MUST be migrated, so it doesn't really matter
// which of the hosts from the new migration we pick (as the new
// decision is optimal in terms of bin-packing), as long as there are
// still slots in the histogram
auto nextHost = nextHostWithSlots();
decision->addMessage(nextHost,
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(nextHost) -= 1;
// Second we allocate the rest
for (int i = 0; i < oldDecision->hosts.size(); i++) {
if (decision->nFunctions <= i || decision->hosts.at(i).empty()) {

auto nextHost = nextHostWithSlots();
decision->addMessageInPosition(i,
nextHost,
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i),
-1);

hostFreqCount.at(nextHost) -= 1;
}
}

// Assert that we have preserved the new decision's host-message histogram
Expand Down
27 changes: 27 additions & 0 deletions src/batch-scheduler/SchedulingDecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@ void SchedulingDecision::addMessage(const std::string& host,
mpiPorts.push_back(0);
}

void SchedulingDecision::addMessageInPosition(int32_t pos,
const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx,
int32_t mpiPort)
{
nFunctions++;

int desiredSize = std::max<int>(pos + 1, nFunctions);
bool mustResize = desiredSize > hosts.size();

if (mustResize) {
hosts.resize(desiredSize);
messageIds.resize(desiredSize);
appIdxs.resize(desiredSize);
groupIdxs.resize(desiredSize);
mpiPorts.resize(desiredSize);
}

hosts.at(pos) = host;
messageIds.at(pos) = messageId;
appIdxs.at(pos) = appIdx;
groupIdxs.at(pos) = groupIdx;
mpiPorts.at(pos) = mpiPort;
}

SchedulingDecision SchedulingDecision::fromPointToPointMappings(
faabric::PointToPointMappings& mappings)
{
Expand Down
36 changes: 29 additions & 7 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,6 @@ void MpiWorld::initLocalRemoteLeaders()
portForRank.at(rankId) = broker.getMpiPortForReceiver(groupId, rankId);
}

// Add the port for this rank
int thisRank = rankState.msg->groupidx();
portForRank.at(thisRank) = broker.getMpiPortForReceiver(groupId, thisRank);

// Persist the local leader in this host for further use
localLeader = (*ranksForHost[thisHost].begin());
}
Expand Down Expand Up @@ -1798,7 +1794,16 @@ void MpiWorld::initSendRecvSockets()
rankState.recvSocket =
std::make_unique<faabric::transport::tcp::RecvSocket>(thisPort);
rankState.recvConnPool = std::vector<int>(size, 0);
rankState.recvSocket->listen();
try {
rankState.recvSocket->listen();
} catch (std::exception& e) {
SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (this host: {})",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
thisHost);
throw e;
}

// Once we have bound and listened on the main socket, we can CONNECT to
// all remote ranks. Given that we have already bound the listening socket,
Expand Down Expand Up @@ -1835,7 +1840,19 @@ void MpiWorld::initSendRecvSockets()
for (int i = 0; i < numRemoteRanks; i++) {
SPDLOG_TRACE("MPI establishing remote connection ?:?:? -> {} (ACCEPT)",
thisRank);
int newConnFd = rankState.recvSocket->accept();

int newConnFd = -1;
try {
newConnFd = rankState.recvSocket->accept();
} catch (std::exception& e) {
SPDLOG_ERROR(
"{}:{}:{} Error accepting on recv socket! (this host: {})",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->groupidx(),
thisHost);
throw e;
}

// Work-out who CONNECT-ed to us
int otherRank = -1;
Expand Down Expand Up @@ -2027,8 +2044,13 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank)
#endif
}

void MpiWorld::prepareMigration(int thisRank, bool thisRankMustMigrate)
void MpiWorld::prepareMigration(int newGroupId,
int thisRank,
bool thisRankMustMigrate)
{
// Update everybody's group id to make sure initialisation works
rankState.msg->set_groupid(newGroupId);

// Check that there are no pending asynchronous messages to send and receive
auto itr = rankState.unackedMessageBuffers.begin();
while (itr != rankState.unackedMessageBuffers.end()) {
Expand Down
21 changes: 15 additions & 6 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
assert(decision->messageIds.empty());
assert(decision->appIdxs.empty());
assert(decision->groupIdxs.empty());
assert(decision->mpiPorts.empty());
state.inFlightReqs.erase(appId);

// If we are removing the app from in-flight, we can also
Expand Down Expand Up @@ -755,16 +756,24 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// new one
assert(decision->hosts.size() == oldDec->hosts.size());

// First release the migrated-from hosts and slots
for (int i = 0; i < oldDec->hosts.size(); i++) {
auto oldHost = state.hostMap.at(oldDec->hosts.at(i));
releaseHostSlots(oldHost);
releaseHostMpiPort(oldHost, oldDec->mpiPorts.at(i));
if (decision->hosts.at(i) != oldDec->hosts.at(i)) {
auto oldHost = state.hostMap.at(oldDec->hosts.at(i));

releaseHostSlots(oldHost);
releaseHostMpiPort(oldHost, oldDec->mpiPorts.at(i));
}
}

// Second, occupy the migrated-to slots and ports
for (int i = 0; i < decision->hosts.size(); i++) {
auto newHost = state.hostMap.at(decision->hosts.at(i));
claimHostSlots(newHost);
decision->mpiPorts.at(i) = claimHostMpiPort(newHost);
if (decision->hosts.at(i) != oldDec->hosts.at(i)) {
auto newHost = state.hostMap.at(decision->hosts.at(i));

claimHostSlots(newHost);
decision->mpiPorts.at(i) = claimHostMpiPort(newHost);
}
}

// Print the new decision after accounting has been updated
Expand Down
3 changes: 2 additions & 1 deletion tests/dist/mpi/mpi_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ void mpiMigrationPoint(int entrypointFuncArg)

if (call->ismpi()) {
auto& mpiWorld = getMpiWorldRegistry().getWorld(call->mpiworldid());
mpiWorld.prepareMigration(call->mpirank(), funcMustMigrate);
mpiWorld.prepareMigration(
call->groupid(), call->mpirank(), funcMustMigrate);
}
}

Expand Down
24 changes: 24 additions & 0 deletions tests/test/batch-scheduler/test_binpack_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,30 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture,
{ "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" });
}

SECTION("BinPack will minimise the number of messages to migrate (ii)")
{
config.hostMap =
buildHostMap({ "foo", "bar", "baz" }, { 5, 3, 2 }, { 2, 3, 2 });
ber = faabric::util::batchExecFactory("bat", "man", 7);
ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
config.inFlightReqs = buildInFlightReqs(
ber, 7, { "bar", "bar", "bar", "baz", "baz", "foo", "foo" });
config.expectedDecision = buildExpectedDecision(
ber, { "bar", "bar", "foo", "foo", "foo", "foo", "foo" });
}

SECTION("BinPack will minimise the number of messages to migrate (iii)")
{
config.hostMap =
buildHostMap({ "foo", "bar", "baz" }, { 3, 3, 3 }, { 2, 3, 2 });
ber = faabric::util::batchExecFactory("bat", "man", 7);
ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
config.inFlightReqs = buildInFlightReqs(
ber, 7, { "foo", "foo", "bar", "bar", "bar", "baz", "baz" });
config.expectedDecision = buildExpectedDecision(
ber, { "foo", "foo", "bar", "bar", "bar", "baz", "foo" });
}

actualDecision = *batchScheduler->makeSchedulingDecision(
config.hostMap, config.inFlightReqs, ber);
compareSchedulingDecisions(actualDecision, config.expectedDecision);
Expand Down

0 comments on commit 7304a61

Please sign in to comment.