From 7304a61a94d7786ffebfa200df6d799154e8f857 Mon Sep 17 00:00:00 2001 From: Carlos Date: Sat, 20 Apr 2024 17:50:35 +0100 Subject: [PATCH] migration(mpi): fix race conditions (#429) In this PR we fix a couple of bugs and race conditions that only appear when doing many migrations over large node and core counts. --- .../batch-scheduler/SchedulingDecision.h | 8 +++ include/faabric/mpi/MpiWorld.h | 4 +- src/batch-scheduler/BinPackScheduler.cpp | 64 +++++++++---------- src/batch-scheduler/SchedulingDecision.cpp | 27 ++++++++ src/mpi/MpiWorld.cpp | 36 +++++++++-- src/planner/Planner.cpp | 21 ++++-- tests/dist/mpi/mpi_native.cpp | 3 +- .../test_binpack_scheduler.cpp | 24 +++++++ 8 files changed, 138 insertions(+), 49 deletions(-) diff --git a/include/faabric/batch-scheduler/SchedulingDecision.h b/include/faabric/batch-scheduler/SchedulingDecision.h index 18c5ff3dc..96170a5ca 100644 --- a/include/faabric/batch-scheduler/SchedulingDecision.h +++ b/include/faabric/batch-scheduler/SchedulingDecision.h @@ -101,6 +101,14 @@ class SchedulingDecision int32_t appIdx, int32_t groupIdx); + // Add a message in a specific position + void addMessageInPosition(int32_t pos, + const std::string& host, + int32_t messageId, + int32_t appIdx, + int32_t groupIdx, + int32_t mpiPort); + // Returns the MPI port that we have vacated int32_t removeMessage(int32_t messageId); diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index 0f56deb79..504add7fa 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -200,7 +200,9 @@ class MpiWorld /* Function Migration */ - void prepareMigration(int thisRank, bool thisRankMustMigrate); + void prepareMigration(int newGroupId, + int thisRank, + bool thisRankMustMigrate); private: int id = -1; diff --git a/src/batch-scheduler/BinPackScheduler.cpp b/src/batch-scheduler/BinPackScheduler.cpp index 56e31352d..452435ab7 100644 --- a/src/batch-scheduler/BinPackScheduler.cpp +++ b/src/batch-scheduler/BinPackScheduler.cpp @@ -21,6 +21,8 @@ static std::map getHostFreqCount( // This is, we want to keep as many host-message scheduling in the old decision // as possible, and also have the overall locality of the new decision (i.e. // the host-message histogram) +// NOTE: keep in mind that the newDecision has the right host histogram, but +// the messages may be completely out-of-order static std::shared_ptr minimiseNumOfMigrations( std::shared_ptr newDecision, std::shared_ptr oldDecision) @@ -44,43 +46,37 @@ static std::shared_ptr minimiseNumOfMigrations( }; assert(newDecision->hosts.size() == oldDecision->hosts.size()); - for (int i = 0; i < newDecision->hosts.size(); i++) { - // If both decisions schedule this message to the same host great, as - // we can keep the old scheduling - if (newDecision->hosts.at(i) == oldDecision->hosts.at(i) && - hostFreqCount.at(newDecision->hosts.at(i)) > 0) { - decision->addMessage(oldDecision->hosts.at(i), - oldDecision->messageIds.at(i), - oldDecision->appIdxs.at(i), - oldDecision->groupIdxs.at(i)); - hostFreqCount.at(oldDecision->hosts.at(i)) -= 1; - continue; - } - // If not, assign the old decision as long as we still can (i.e. as - // long as we still have slots in the histogram (note that it could be - // that the old host is not in the new histogram at all) - if (hostFreqCount.contains(oldDecision->hosts.at(i)) && - hostFreqCount.at(oldDecision->hosts.at(i)) > 0) { - decision->addMessage(oldDecision->hosts.at(i), - oldDecision->messageIds.at(i), - oldDecision->appIdxs.at(i), - oldDecision->groupIdxs.at(i)); - hostFreqCount.at(oldDecision->hosts.at(i)) -= 1; - continue; + // First we try to allocate to each message the same host they used to have + for (int i = 0; i < oldDecision->hosts.size(); i++) { + auto oldHost = oldDecision->hosts.at(i); + + if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) { + decision->addMessageInPosition(i, + oldHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + oldDecision->mpiPorts.at(i)); + + hostFreqCount.at(oldHost) -= 1; } + } - // If we can't assign the host from the old decision, then it means - // that that message MUST be migrated, so it doesn't really matter - // which of the hosts from the new migration we pick (as the new - // decision is optimal in terms of bin-packing), as long as there are - // still slots in the histogram - auto nextHost = nextHostWithSlots(); - decision->addMessage(nextHost, - oldDecision->messageIds.at(i), - oldDecision->appIdxs.at(i), - oldDecision->groupIdxs.at(i)); - hostFreqCount.at(nextHost) -= 1; + // Second we allocate the rest + for (int i = 0; i < oldDecision->hosts.size(); i++) { + if (decision->nFunctions <= i || decision->hosts.at(i).empty()) { + + auto nextHost = nextHostWithSlots(); + decision->addMessageInPosition(i, + nextHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + -1); + + hostFreqCount.at(nextHost) -= 1; + } } // Assert that we have preserved the new decision's host-message histogram diff --git a/src/batch-scheduler/SchedulingDecision.cpp b/src/batch-scheduler/SchedulingDecision.cpp index 5810a8fc1..31bd97351 100644 --- a/src/batch-scheduler/SchedulingDecision.cpp +++ b/src/batch-scheduler/SchedulingDecision.cpp @@ -38,6 +38,33 @@ void SchedulingDecision::addMessage(const std::string& host, mpiPorts.push_back(0); } +void SchedulingDecision::addMessageInPosition(int32_t pos, + const std::string& host, + int32_t messageId, + int32_t appIdx, + int32_t groupIdx, + int32_t mpiPort) +{ + nFunctions++; + + int desiredSize = std::max(pos + 1, nFunctions); + bool mustResize = desiredSize > hosts.size(); + + if (mustResize) { + hosts.resize(desiredSize); + messageIds.resize(desiredSize); + appIdxs.resize(desiredSize); + groupIdxs.resize(desiredSize); + mpiPorts.resize(desiredSize); + } + + hosts.at(pos) = host; + messageIds.at(pos) = messageId; + appIdxs.at(pos) = appIdx; + groupIdxs.at(pos) = groupIdx; + mpiPorts.at(pos) = mpiPort; +} + SchedulingDecision SchedulingDecision::fromPointToPointMappings( faabric::PointToPointMappings& mappings) { diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index fe3ee2fc8..a62175533 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -331,10 +331,6 @@ void MpiWorld::initLocalRemoteLeaders() portForRank.at(rankId) = broker.getMpiPortForReceiver(groupId, rankId); } - // Add the port for this rank - int thisRank = rankState.msg->groupidx(); - portForRank.at(thisRank) = broker.getMpiPortForReceiver(groupId, thisRank); - // Persist the local leader in this host for further use localLeader = (*ranksForHost[thisHost].begin()); } @@ -1798,7 +1794,16 @@ void MpiWorld::initSendRecvSockets() rankState.recvSocket = std::make_unique(thisPort); rankState.recvConnPool = std::vector(size, 0); - rankState.recvSocket->listen(); + try { + rankState.recvSocket->listen(); + } catch (std::exception& e) { + SPDLOG_ERROR("{}:{}:{} Error binding recv socket! (this host: {})", + rankState.msg->appid(), + rankState.msg->groupid(), + rankState.msg->groupidx(), + thisHost); + throw e; + } // Once we have bound and listened on the main socket, we can CONNECT to // all remote ranks. Given that we have already bound the listening socket, @@ -1835,7 +1840,19 @@ void MpiWorld::initSendRecvSockets() for (int i = 0; i < numRemoteRanks; i++) { SPDLOG_TRACE("MPI establishing remote connection ?:?:? -> {} (ACCEPT)", thisRank); - int newConnFd = rankState.recvSocket->accept(); + + int newConnFd = -1; + try { + newConnFd = rankState.recvSocket->accept(); + } catch (std::exception& e) { + SPDLOG_ERROR( + "{}:{}:{} Error accepting on recv socket! (this host: {})", + rankState.msg->appid(), + rankState.msg->groupid(), + rankState.msg->groupidx(), + thisHost); + throw e; + } // Work-out who CONNECT-ed to us int otherRank = -1; @@ -2027,8 +2044,13 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank) #endif } -void MpiWorld::prepareMigration(int thisRank, bool thisRankMustMigrate) +void MpiWorld::prepareMigration(int newGroupId, + int thisRank, + bool thisRankMustMigrate) { + // Update everybody's group id to make sure initialisation works + rankState.msg->set_groupid(newGroupId); + // Check that there are no pending asynchronous messages to send and receive auto itr = rankState.unackedMessageBuffers.begin(); while (itr != rankState.unackedMessageBuffers.end()) { diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 8a12d7014..7e80df62f 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -387,6 +387,7 @@ void Planner::setMessageResult(std::shared_ptr msg) assert(decision->messageIds.empty()); assert(decision->appIdxs.empty()); assert(decision->groupIdxs.empty()); + assert(decision->mpiPorts.empty()); state.inFlightReqs.erase(appId); // If we are removing the app from in-flight, we can also @@ -755,16 +756,24 @@ Planner::callBatch(std::shared_ptr req) // new one assert(decision->hosts.size() == oldDec->hosts.size()); + // First release the migrated-from hosts and slots for (int i = 0; i < oldDec->hosts.size(); i++) { - auto oldHost = state.hostMap.at(oldDec->hosts.at(i)); - releaseHostSlots(oldHost); - releaseHostMpiPort(oldHost, oldDec->mpiPorts.at(i)); + if (decision->hosts.at(i) != oldDec->hosts.at(i)) { + auto oldHost = state.hostMap.at(oldDec->hosts.at(i)); + + releaseHostSlots(oldHost); + releaseHostMpiPort(oldHost, oldDec->mpiPorts.at(i)); + } } + // Second, occupy the migrated-to slots and ports for (int i = 0; i < decision->hosts.size(); i++) { - auto newHost = state.hostMap.at(decision->hosts.at(i)); - claimHostSlots(newHost); - decision->mpiPorts.at(i) = claimHostMpiPort(newHost); + if (decision->hosts.at(i) != oldDec->hosts.at(i)) { + auto newHost = state.hostMap.at(decision->hosts.at(i)); + + claimHostSlots(newHost); + decision->mpiPorts.at(i) = claimHostMpiPort(newHost); + } } // Print the new decision after accounting has been updated diff --git a/tests/dist/mpi/mpi_native.cpp b/tests/dist/mpi/mpi_native.cpp index 19f771ca3..cb5b57e44 100644 --- a/tests/dist/mpi/mpi_native.cpp +++ b/tests/dist/mpi/mpi_native.cpp @@ -790,7 +790,8 @@ void mpiMigrationPoint(int entrypointFuncArg) if (call->ismpi()) { auto& mpiWorld = getMpiWorldRegistry().getWorld(call->mpiworldid()); - mpiWorld.prepareMigration(call->mpirank(), funcMustMigrate); + mpiWorld.prepareMigration( + call->groupid(), call->mpirank(), funcMustMigrate); } } diff --git a/tests/test/batch-scheduler/test_binpack_scheduler.cpp b/tests/test/batch-scheduler/test_binpack_scheduler.cpp index f6be86d34..cddd70a6e 100644 --- a/tests/test/batch-scheduler/test_binpack_scheduler.cpp +++ b/tests/test/batch-scheduler/test_binpack_scheduler.cpp @@ -523,6 +523,30 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture, { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" }); } + SECTION("BinPack will minimise the number of messages to migrate (ii)") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 3, 2 }, { 2, 3, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, 7, { "bar", "bar", "bar", "baz", "baz", "foo", "foo" }); + config.expectedDecision = buildExpectedDecision( + ber, { "bar", "bar", "foo", "foo", "foo", "foo", "foo" }); + } + + SECTION("BinPack will minimise the number of messages to migrate (iii)") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 3, 3, 3 }, { 2, 3, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, 7, { "foo", "foo", "bar", "bar", "bar", "baz", "baz" }); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "bar", "bar", "bar", "baz", "foo" }); + } + actualDecision = *batchScheduler->makeSchedulingDecision( config.hostMap, config.inFlightReqs, ber); compareSchedulingDecisions(actualDecision, config.expectedDecision);