Skip to content

Commit

Permalink
mpi: fix setting local ranks
Browse files Browse the repository at this point in the history
In very constrained environments like GHA it may sometimes happen that
some ranks _completely_ finish (i.e. call destroy) before others have
even called Init.

With our current implementation, this meant that the world may have been
removed from the registry, even though there were still active local
ranks to run.

To fix this, we set the number of active local ranks once, at the
beginning, and decrement it on a per-thread basis. Note that this does
not invaliate the fix to the previous race condition because, in fact,
what we fixed is that we now _always_ init a world when we execute in
it.
  • Loading branch information
csegarragonz committed Apr 22, 2024
1 parent 5b65b3e commit 6666cff
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 23 deletions.
6 changes: 5 additions & 1 deletion include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ class MpiWorld
std::string thisHost;
faabric::util::TimePoint creationTime;

std::atomic<int> activeLocalRanks = 0;
// Latch used to clear the world from the registry when we are migrating
// out of it (i.e. evicting it). Note that this clean-up is only necessary
// for migration, as we want to clean things up in case we ever migrate
// again back into this host
std::atomic<int> evictionLatch = 0;

std::atomic_flag isDestroyed = false;

Expand Down
2 changes: 2 additions & 0 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class PointToPointBroker

std::set<int> getIdxsRegisteredForGroup(int groupId);

std::set<std::string> getHostsRegisteredForGroup(int groupId);

void updateHostForIdx(int groupId, int groupIdx, std::string newHost);

void sendMessage(int groupId,
Expand Down
14 changes: 1 addition & 13 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
if (msg.ismpi()) {
auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry();
if (mpiWorldRegistry.worldExists(msg.mpiworldid())) {
bool mustClear =
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

if (mustClear) {
SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
msg.mpiworldid(),
msg.executedhost());

mpiWorldRegistry.clearWorld(msg.mpiworldid());
}
mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy();

Check warning on line 429 in src/executor/Executor.cpp

View check run for this annotation

Codecov / codecov/patch

src/executor/Executor.cpp#L429

Added line #L429 was not covered by tests
}
}
}
Expand Down
44 changes: 35 additions & 9 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)

bool MpiWorld::destroy()
{
int groupId = -1;

if (rankState.msg != nullptr) {
SPDLOG_TRACE("{}:{}:{} destroying MPI world",
rankState.msg->appid(),
rankState.msg->groupid(),
rankState.msg->mpirank());

groupId = rankState.msg->groupid();
}

// ----- Per-rank cleanup -----
Expand All @@ -246,12 +250,19 @@ bool MpiWorld::destroy()
}
#endif

// ----- Global accounting -----

int numActiveLocalRanks =
activeLocalRanks.fetch_sub(1, std::memory_order_acquire);
// If we are evicting the host during a migration, use the eviction latch
// for proper resource clean-up in the event of a future migration back
// into this host
bool isEviction =
groupId != -1 &&
!broker.getHostsRegisteredForGroup(groupId).contains(thisHost);
if (isEviction) {
int numActiveLocalRanks =
evictionLatch.fetch_sub(1, std::memory_order_acquire);
return numActiveLocalRanks == 1;

Check warning on line 262 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L260-L262

Added lines #L260 - L262 were not covered by tests
}

return numActiveLocalRanks == 1;
return false;
}

// Initialise shared (per-host) MPI world state. This method is called once
Expand All @@ -276,7 +287,6 @@ void MpiWorld::initialiseFromMsg(faabric::Message& msg)
void MpiWorld::initialiseRankFromMsg(faabric::Message& msg)
{
rankState.msg = &msg;
activeLocalRanks++;

// Pin this thread to a free CPU
#ifdef FAABRIC_USE_SPINLOCK
Expand Down Expand Up @@ -341,8 +351,19 @@ void MpiWorld::initLocalRemoteLeaders()
portForRank.at(rankId) = broker.getMpiPortForReceiver(groupId, rankId);
}

// Persist the local leader in this host for further use
localLeader = (*ranksForHost[thisHost].begin());
// Finally, set up the infrastracture for proper clean-up of the world in
// case we are migrating away from it. Note that we are preparing the
// latch one migration before we migrate away. This is because we will also
// call this method right before evicting, so we want to have the latch
// already set
int numInThisHost =
ranksForHost.contains(thisHost) ? ranksForHost.at(thisHost).size() : 0;
bool mustEvictHost = numInThisHost == 0;

if (!mustEvictHost) {
evictionLatch.store(numInThisHost, std::memory_order_release);
localLeader = (*ranksForHost[thisHost].begin());
}
}

void MpiWorld::getCartesianRank(int rank,
Expand Down Expand Up @@ -1918,9 +1939,14 @@ void MpiWorld::initSendRecvSockets()
// corresponding receiver is local to this host, for any sender
void MpiWorld::initLocalQueues()
{
// Nothing to do if we are migrating away from this host
if (!ranksForHost.contains(thisHost)) {
return;

Check warning on line 1944 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L1944

Added line #L1944 was not covered by tests
}

localQueues.resize(size * size);
for (int sendRank = 0; sendRank < size; sendRank++) {
for (const int recvRank : ranksForHost[thisHost]) {
for (const int recvRank : ranksForHost.at(thisHost)) {
// We handle messages-to-self as memory copies
if (sendRank == recvRank) {
continue;
Expand Down
15 changes: 15 additions & 0 deletions src/transport/PointToPointBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,21 @@ std::set<int> PointToPointBroker::getIdxsRegisteredForGroup(int groupId)
return groupIdIdxsMap[groupId];
}

std::set<std::string> PointToPointBroker::getHostsRegisteredForGroup(
int groupId)
{
faabric::util::SharedLock lock(brokerMutex);
std::set<int> indexes = groupIdIdxsMap[groupId];

std::set<std::string> hosts;
for (const auto& idx : indexes) {
std::string key = getPointToPointKey(groupId, idx);
hosts.insert(mappings.at(key));
}

return hosts;
}

void PointToPointBroker::initSequenceCounters(int groupId)
{
if (currentGroupId != NO_CURRENT_GROUP_ID) {
Expand Down
4 changes: 4 additions & 0 deletions tests/test/transport/test_point_to_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ TEST_CASE_METHOD(PointToPointClientServerFixture,

REQUIRE(broker.getIdxsRegisteredForGroup(appIdA).empty());
REQUIRE(broker.getIdxsRegisteredForGroup(appIdB).empty());
REQUIRE(broker.getHostsRegisteredForGroup(appIdA).empty());
REQUIRE(broker.getHostsRegisteredForGroup(appIdB).empty());

faabric::PointToPointMappings mappingsA;
mappingsA.set_appid(appIdA);
Expand Down Expand Up @@ -73,6 +75,8 @@ TEST_CASE_METHOD(PointToPointClientServerFixture,

REQUIRE(broker.getIdxsRegisteredForGroup(groupIdA).size() == 2);
REQUIRE(broker.getIdxsRegisteredForGroup(groupIdB).size() == 1);
REQUIRE(broker.getHostsRegisteredForGroup(groupIdA).size() == 2);
REQUIRE(broker.getHostsRegisteredForGroup(groupIdB).size() == 1);

REQUIRE(broker.getHostForReceiver(groupIdA, groupIdxA1) == hostA);
REQUIRE(broker.getHostForReceiver(groupIdA, groupIdxA2) == hostB);
Expand Down

0 comments on commit 6666cff

Please sign in to comment.