Skip to content

Commit

Permalink
Fix NNG error checks and MPI's group ID (#333)
Browse files Browse the repository at this point in the history
* mpi: add try/catch around queue timeout for more logging

* mpi: more debug

* nits: cleaner debugging

* nits: run clang-format

* mpi: remove unneeded include

* transport: correct error checking for nng_aio

* tests: fix failing migration test

* nng: yet another wrong ec
  • Loading branch information
csegarragonz authored Jul 20, 2023
1 parent fa0d07d commit d84e5ea
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
34 changes: 26 additions & 8 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
throw std::runtime_error("Error serialising message");
}
broker.sendMessage(
id,
thisRankMsg->groupid(),
sendRank,
recvRank,
reinterpret_cast<const uint8_t*>(serialisedBuffer.data()),
Expand All @@ -72,7 +72,8 @@ void MpiWorld::sendRemoteMpiMessage(std::string dstHost,
std::shared_ptr<MPIMessage> MpiWorld::recvRemoteMpiMessage(int sendRank,
int recvRank)
{
auto msg = broker.recvMessage(id, sendRank, recvRank, true);
auto msg =
broker.recvMessage(thisRankMsg->groupid(), sendRank, recvRank, true);
PARSE_MSG(MPIMessage, msg.data(), msg.size());
return std::make_shared<MPIMessage>(parsedMsg);
}
Expand Down Expand Up @@ -105,9 +106,17 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
user = call.user();
function = call.function();
thisRankMsg = &call;

size = newSize;

// Update the first message to make sure it looks like messages >= 1
call.set_ismpi(true);
call.set_mpirank(0);
call.set_mpiworldid(id);
call.set_mpiworldsize(size);
call.set_groupid(call.mpiworldid());
call.set_groupidx(call.mpirank());
call.set_appidx(call.mpirank());

auto& sch = faabric::scheduler::getScheduler();

// Dispatch all the chained calls. With the master being rank zero, we want
Expand All @@ -118,12 +127,12 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
faabric::Message& msg = req->mutable_messages()->at(i);
msg.set_appid(call.appid());
msg.set_ismpi(true);
msg.set_mpiworldid(id);
msg.set_mpiworldid(call.mpiworldid());
msg.set_mpirank(i + 1);
msg.set_mpiworldsize(size);
msg.set_mpiworldsize(call.mpiworldsize());

// Set group ids for remote messaging
msg.set_groupid(msg.mpiworldid());
msg.set_groupid(call.groupid());
msg.set_groupidx(msg.mpirank());
if (thisRankMsg != nullptr) {
// Set message fields to allow for function migration
Expand Down Expand Up @@ -209,6 +218,9 @@ void MpiWorld::destroy()
throw std::runtime_error("Destroying world with outstanding requests");
}

// Lastly, clear-out the rank message
thisRankMsg = nullptr;

// Clear structures used for mocking
{
faabric::util::UniqueLock lock(mockMutex);
Expand All @@ -222,6 +234,7 @@ void MpiWorld::initialiseFromMsg(faabric::Message& msg)
user = msg.user();
function = msg.function();
size = msg.mpiworldsize();
thisRankMsg = &msg;

// Record which ranks are local to this world, and query for all leaders
initLocalRemoteLeaders();
Expand Down Expand Up @@ -257,14 +270,19 @@ void MpiWorld::initLocalRemoteLeaders()
// keep a record of the opposite mapping, the host that each rank belongs
// to, as it is queried frequently and asking the ptp broker involves
// acquiring a lock.
auto rankIds = broker.getIdxsRegisteredForGroup(id);
if (thisRankMsg == nullptr) {
throw std::runtime_error("Rank message not set!");
}
int groupId = thisRankMsg->groupid();
auto rankIds = broker.getIdxsRegisteredForGroup(groupId);
if (rankIds.size() != size) {
SPDLOG_ERROR("rankIds != size ({} != {})", rankIds.size(), size);
throw std::runtime_error("MPI Group-World size mismatch!");
}
assert(rankIds.size() == size);
hostForRank.resize(size);
for (const auto& rankId : rankIds) {
std::string host = broker.getHostForReceiver(id, rankId);
std::string host = broker.getHostForReceiver(groupId, rankId);
ranksForHost[host].push_back(rankId);
hostForRank.at(rankId) = host;
}
Expand Down
2 changes: 1 addition & 1 deletion src/transport/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace faabric::transport {

Message::Message(size_t bufferSize)
{
if (int ec = nng_msg_alloc(&nngMsg, bufferSize); ec < 0) {
if (int ec = nng_msg_alloc(&nngMsg, bufferSize); ec != 0) {
SPDLOG_CRITICAL("Error allocating a message of size {}: {}",
bufferSize,
nng_strerror(ec));
Expand Down
9 changes: 8 additions & 1 deletion src/transport/MessageEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void MessageEndpoint::sendMessage(uint8_t header,
std::copy_n(data, dataSize, buffer + HEADER_MSG_SIZE);

nng_aio* aio = nullptr;
if (int ec = nng_aio_alloc(&aio, nullptr, nullptr); ec < 0) {
if (int ec = nng_aio_alloc(&aio, nullptr, nullptr); ec != 0) {
nng_msg_free(msg);
checkNngError(ec, "nng_aio_alloc", address);
}
Expand All @@ -289,6 +289,13 @@ void MessageEndpoint::sendMessage(uint8_t header,
int ec = nng_aio_result(aio);
nng_aio_free(aio);
if (ec != 0) {
SPDLOG_ERROR(
"Error {} ({}) when sending messge to {} (seq: {} - ctx: {})",
ec,
nng_strerror(ec),
address,
sequenceNum,
context.has_value());
nng_msg_free(msg); // Owned by the socket if succeeded
checkNngError(ec, "sendMessage", address);
}
Expand Down
3 changes: 2 additions & 1 deletion src/transport/PointToPointBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -754,8 +754,9 @@ std::vector<uint8_t> PointToPointBroker::recvMessage(int groupId,
if (recvMsg.getResponseCode() !=
faabric::transport::MessageResponseCode::SUCCESS) {
SPDLOG_WARN(
"Error {} when awaiting a message ({}:{} seq: {} label: {})",
"Error {} ({}) when awaiting a message ({}:{} seq: {} label: {})",
static_cast<int>(recvMsg.getResponseCode()),
nng_strerror(static_cast<int>(recvMsg.getResponseCode())),
sendIdx,
recvIdx,
expectedSeqNum,
Expand Down
7 changes: 6 additions & 1 deletion tests/test/scheduler/test_function_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,12 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture,
// Manually create the world, and trigger a second function invocation in
// the remote host
faabric::mpi::MpiWorld world;
world.create(*firstMsg, worldId, worldSize);
// Note that we deliberately pass a copy of the message. The `world.create`
// method modifies the passed message, which can race with the thread pool
// thread executing the message. Note that, normally, the thread pool
// thread _would_ be calling world.create itself, thus not racing
auto firstMsgCopy = req->messages(0);
world.create(firstMsgCopy, worldId, worldSize);

// Update host resources so that a migration opportunity appears
updateLocalResources(4, 2);
Expand Down

0 comments on commit d84e5ea

Please sign in to comment.