Skip to content

Commit

Permalink
transport(tcp): fix race condition in migrations w/ tcp (#416)
Browse files Browse the repository at this point in the history
transport(ptp): fix race condition in migrations w/ tcp
  • Loading branch information
csegarragonz authored Apr 12, 2024
1 parent 8f9f62a commit 9f17c56
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 36 deletions.
11 changes: 8 additions & 3 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,6 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
// in the same group
bool isMigration =
task.req->type() == faabric::BatchExecuteRequest::MIGRATION;
if (isMigration) {
faabric::transport::getPointToPointBroker().postMigrationHook(msg);
}

SPDLOG_TRACE("Thread {}:{} executing task {} ({}, thread={}, group={})",
id,
Expand All @@ -380,6 +377,14 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
// Execute the task
int32_t returnValue;
try {
// Right before executing the task, do any kind of synchronisation
// if the task has been migrated. Also benefit from the try/catch
// statement in case the migration fails
if (isMigration) {
faabric::transport::getPointToPointBroker().postMigrationHook(
msg);
}

returnValue =
executeTask(threadPoolIdx, task.messageIndex, task.req);
} catch (const faabric::util::FunctionMigratedException& ex) {
Expand Down
38 changes: 5 additions & 33 deletions src/transport/PointToPointBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,42 +894,14 @@ void PointToPointBroker::resetThreadLocalCache()

void PointToPointBroker::postMigrationHook(faabric::Message& msg)
{
int postMigrationOkCode = 1337;
int recvCode = 0;

// TODO: implement this as a broadcast in the PTP broker
int mainIdx = 0;
if (msg.groupidx() == mainIdx) {
auto groupIdxs = getIdxsRegisteredForGroup(msg.groupid());
groupIdxs.erase(mainIdx);
for (const auto& recvIdx : groupIdxs) {
sendMessage(msg.groupid(),
mainIdx,
recvIdx,
BYTES_CONST(&postMigrationOkCode),
sizeof(int));
}
recvCode = postMigrationOkCode;
} else {
std::vector<uint8_t> bytes =
recvMessage(msg.groupid(), 0, msg.groupidx());
recvCode = faabric::util::bytesToInt(bytes);
}

if (recvCode != postMigrationOkCode) {
SPDLOG_ERROR("Error in post-migration hook. {}:{}:{} received code {}",
msg.appid(),
msg.groupid(),
msg.groupidx(),
recvCode);
throw std::runtime_error("Error in post-migration hook");
}
// Make sure all threads are here before we move forward
PointToPointGroup::getGroup(msg.groupid())->barrier(msg.groupidx());

// Do per-thread MPI initialisation (mostly send/recv TCP sockets)
if (msg.ismpi()) {
auto& mpiWorld =
faabric::mpi::getMpiWorldRegistry().getWorld(msg.mpiworldid());
mpiWorld.initialiseRankFromMsg(msg);
// Get-or-initialise to initialise the world in case we are migrating
// to a completely new world
faabric::mpi::getMpiWorldRegistry().getOrInitialiseWorld(msg);
}

SPDLOG_DEBUG("{}:{}:{} exiting post-migration hook",
Expand Down

0 comments on commit 9f17c56

Please sign in to comment.