diff --git a/include/faabric/util/batch.h b/include/faabric/util/batch.h index de8f906e2..56b8e964a 100644 --- a/include/faabric/util/batch.h +++ b/include/faabric/util/batch.h @@ -17,6 +17,9 @@ std::shared_ptr batchExecFactory( bool isBatchExecRequestValid(std::shared_ptr ber); +void updateBatchExecAppId(std::shared_ptr ber, + int newAppId); + void updateBatchExecGroupId(std::shared_ptr ber, int newGroupId); diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 2cffdc34b..9c7b50500 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -143,9 +143,10 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) // to spawn (size - 1) new functions starting with rank 1 std::shared_ptr req = faabric::util::batchExecFactory(user, function, size - 1); + faabric::util::updateBatchExecAppId(req, call.appid()); for (int i = 0; i < req->messages_size(); i++) { + // Update MPI-related fields faabric::Message& msg = req->mutable_messages()->at(i); - msg.set_appid(call.appid()); msg.set_ismpi(true); msg.set_mpiworldid(call.mpiworldid()); msg.set_mpirank(i + 1); @@ -154,6 +155,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) // Set group ids for remote messaging // TODO: this will be set by the planner // msg.set_groupid(call.groupid()); + // TODO: do we need to set this one here? msg.set_groupidx(msg.mpirank()); if (thisRankMsg != nullptr) { // Set message fields to allow for function migration @@ -165,6 +167,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) // To run migration experiments easily, we may want to propagate // the UNDERFULL topology hint. In general however, we don't // need to propagate this field + // TODO(hints): remove if (thisRankMsg->topologyhint() == "UNDERFULL") { msg.set_topologyhint(thisRankMsg->topologyhint()); } @@ -299,7 +302,12 @@ void MpiWorld::initLocalRemoteLeaders() int groupId = thisRankMsg->groupid(); auto rankIds = broker.getIdxsRegisteredForGroup(groupId); if (rankIds.size() != size) { - SPDLOG_ERROR("rankIds != size ({} != {})", rankIds.size(), size); + SPDLOG_ERROR("{}:{}:{} rankIds != size ({} != {})", + thisRankMsg->appid(), + groupId, + thisRankMsg->groupidx(), + rankIds.size(), + size); throw std::runtime_error("MPI Group-World size mismatch!"); } assert(rankIds.size() == size); diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 0df4f930c..f1b11ba9e 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -330,12 +330,6 @@ Planner::callBatch(std::shared_ptr req) // hostMap type, to make sure we don't modify our state here #endif - // A scheduling decision will create a new PTP mapping and, as a - // consequence, a new group ID - int newGroupId = faabric::util::generateGid(); - decision->groupId = newGroupId; - faabric::util::updateBatchExecGroupId(req, newGroupId); - // Handle failures to schedule work if (*decision == NOT_ENOUGH_SLOTS_DECISION) { SPDLOG_ERROR( @@ -350,9 +344,11 @@ Planner::callBatch(std::shared_ptr req) return decision; } -#ifndef NDEBUG - decision->print(); -#endif + // A scheduling decision will create a new PTP mapping and, as a + // consequence, a new group ID + int newGroupId = faabric::util::generateGid(); + decision->groupId = newGroupId; + faabric::util::updateBatchExecGroupId(req, newGroupId); // Given a scheduling decision, depending on the decision type, we want to: // 1. Update the host-map to reflect the new host occupation @@ -363,6 +359,11 @@ Planner::callBatch(std::shared_ptr req) auto& broker = faabric::transport::getPointToPointBroker(); switch (decisionType) { case faabric::batch_scheduler::DecisionType::NEW: { + // 0. Log the decision in debug mode +#ifndef NDEBUG + decision->print(); +#endif + // 1. For a scale change request, we only need to update the hosts // with the new messages being scheduled for (int i = 0; i < decision->hosts.size(); i++) { @@ -399,12 +400,18 @@ Planner::callBatch(std::shared_ptr req) auto oldReq = state.inFlightReqs.at(appId).first; auto oldDec = state.inFlightReqs.at(appId).second; faabric::util::updateBatchExecGroupId(oldReq, newGroupId); + oldDec->groupId = newGroupId; for (int i = 0; i < req->messages_size(); i++) { *oldReq->add_messages() = req->messages(i); oldDec->addMessage(decision->hosts.at(i), req->messages(i)); } + // 2.5. Log the updated decision in debug mode +#ifndef NDEBUG + oldDec->print(); +#endif + // 3. We want to send the mappings for the _updated_ decision, // including _all_ the messages (not just the ones that are being // added) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 03234d59f..d2edc9877 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -704,6 +704,7 @@ void Scheduler::setFunctionResult(faabric::Message& msg) // Remove the app from in-flight map if still there, and this host is the // main host for the message + // TODO: remove me if (msg.mainhost() == thisHost) { removePendingMigration(msg.appid()); } diff --git a/src/util/batch.cpp b/src/util/batch.cpp index 5262cbe0b..de5ad2d8b 100644 --- a/src/util/batch.cpp +++ b/src/util/batch.cpp @@ -61,12 +61,24 @@ bool isBatchExecRequestValid(std::shared_ptr ber) return true; } +void updateBatchExecAppId(std::shared_ptr ber, + int newAppId) +{ + ber->set_appid(newAppId); + for (int i = 0; i < ber->messages_size(); i++) { + ber->mutable_messages(i)->set_appid(newAppId); + } + + // Sanity-check in debug mode + assert(isBatchExecRequestValid(ber)); +} + void updateBatchExecGroupId(std::shared_ptr ber, int newGroupId) { ber->set_groupid(newGroupId); - for (auto msg : *ber->mutable_messages()) { - msg.set_groupid(newGroupId); + for (int i = 0; i < ber->messages_size(); i++) { + ber->mutable_messages(i)->set_groupid(newGroupId); } // Sanity-check in debug mode diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index c5fdced0e..66ab32941 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -657,7 +657,6 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // Create waiters that will submit messages and await their results for (int i = 0; i < nWaiters; i++) { waiterThreads.emplace_back([nWaiterMessages] { - Scheduler& sch = scheduler::getScheduler(); auto& plannerCli = faabric::planner::getPlannerClient(); std::shared_ptr req = diff --git a/tests/test/util/test_exec_graph.cpp b/tests/test/util/test_exec_graph.cpp index bd9a0bb2e..fce633988 100644 --- a/tests/test/util/test_exec_graph.cpp +++ b/tests/test/util/test_exec_graph.cpp @@ -116,7 +116,6 @@ TEST_CASE_METHOD(ExecGraphTestFixture, TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]") { faabric::mpi::MpiWorld world; - msg.set_appid(1337); msg.set_ismpi(true); msg.set_recordexecgraph(true); @@ -138,10 +137,10 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]") messages.at(rank).set_recordexecgraph(true); } - world.create(msg, worldId, worldSize); + // First call the original message + plannerCli.callFunctions(req); - // Update the result for the main message - sch.setFunctionResult(msg); + world.create(msg, worldId, worldSize); // Build expected graph ExecGraphNode nodeB1 = { .msg = messages.at(1) }; @@ -154,8 +153,15 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]") ExecGraph expected{ .rootNode = nodeA }; + // The MPI base fixture uses the DummyExecutor, which immediately sets + // the function result. We want to overwrite said function result with the + // chained calls (logged as part of MpiWorld::create) thus we sleep enough + // to let the dummy executor set the result, to make sure we can overwrite + // it here + SLEEP_MS(500); + sch.setFunctionResult(msg); + // Wait for the MPI messages to finish - auto& plannerCli = faabric::planner::getPlannerClient(); plannerCli.getMessageResult(msg, 2000); for (const auto& id : faabric::util::getChainedFunctions(msg)) { plannerCli.getMessageResult(msg.appid(), id, 2000); diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 109f33551..9ec4e218a 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -429,7 +429,8 @@ class MpiBaseTestFixture , func("hellompi") , worldId(123) , worldSize(5) - , msg(faabric::util::messageFactory(user, func)) + , req(faabric::util::batchExecFactory(user, func, 1)) + , msg(*req->mutable_messages(0)) { std::shared_ptr fac = std::make_shared(); @@ -453,7 +454,9 @@ class MpiBaseTestFixture int worldId; int worldSize; - faabric::Message msg; + std::shared_ptr req; + // TODO: refactor to firstMsg + faabric::Message& msg; }; class MpiTestFixture : public MpiBaseTestFixture