From 4a6fab037525c2d6a9c4e4f4f48e37bc35c9d8ba Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 16 Apr 2024 09:38:31 +0000 Subject: [PATCH] planner(mpi): preload MPI SCALE_CHANGE decisions When scheduling a NEW app with the `ismpi` flag set, we know that this request will be quickly followed-up by a SCALE_CHANGE request with `worldSize` - 1 messages. As a consequence, and to guarantee optimal bin-packing on-par with Slurm, we can schedule `worldSize` messages when the first request arrives, dispatch just the first one, and preload the `worldSize` - 1 remaining. --- .env | 4 +-- .github/workflows/tests.yml | 12 ++++---- VERSION | 2 +- src/planner/Planner.cpp | 61 +++++++++++++++++++++++++++++++++++-- 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/.env b/.env index 1de7746be..a133f1791 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.16.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.16.0 +FAABRIC_VERSION=0.17.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.17.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7e8ac4349..7a41087ad 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 env: DEPLOYMENT_TYPE: gha-ci steps: @@ -34,7 +34,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -45,7 +45,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -65,7 +65,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 options: --privileged services: redis: @@ -104,7 +104,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 options: --privileged services: redis: @@ -156,7 +156,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.16.0 + image: faasm.azurecr.io/faabric:0.17.0 services: redis: image: redis diff --git a/VERSION b/VERSION index 04a373efe..c5523bd09 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.16.0 +0.17.0 diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index f9ca7ff2f..48ad05478 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -17,6 +17,10 @@ #include +// Special group ID magic to indicate MPI decisions that we have preemptively +// scheduled +#define MPI_PRELOADED_DECISION_GROUPID -99 + namespace faabric::planner { // ---------------------- @@ -511,6 +515,8 @@ Planner::getPreloadedSchedulingDecision( msg.id(), decision->appIdxs.at(idxInDecision), decision->groupIdxs.at(idxInDecision)); + filteredDecision->mpiPorts.at(filteredDecision->nFunctions - 1) = + decision->mpiPorts.at(idxInDecision); } assert(filteredDecision->hosts.size() == ber->messages_size()); @@ -621,12 +627,33 @@ Planner::callBatch(std::shared_ptr req) } } + // For a NEW decision of an MPI application, we know that it will be + // followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter + // must be set. Thus, we can schedule slots for all the MPI ranks, and + // consume them later as a preloaded scheduling decision + bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW; + bool isMpi = req->messages(0).ismpi(); + std::shared_ptr mpiReq = nullptr; + // Check if there exists a pre-loaded scheduling decision for this app // (e.g. if we want to force a migration). Note that we don't want to check // pre-loaded decisions for dist-change requests std::shared_ptr decision = nullptr; if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) { decision = getPreloadedSchedulingDecision(appId, req); + } else if (isNew && isMpi) { + mpiReq = faabric::util::batchExecFactory( + req->user(), req->function(), req->messages(0).mpiworldsize()); + + // Populate the temporary request + mpiReq->mutable_messages()->at(0) = req->messages(0); + faabric::util::updateBatchExecAppId(mpiReq, appId); + for (int i = 0; i < mpiReq->messages_size(); i++) { + mpiReq->mutable_messages()->at(i).set_groupidx(i); + } + + decision = batchScheduler->makeSchedulingDecision( + hostMapCopy, state.inFlightReqs, mpiReq); } else { decision = batchScheduler->makeSchedulingDecision( hostMapCopy, state.inFlightReqs, req); @@ -648,6 +675,9 @@ Planner::callBatch(std::shared_ptr req) return decision; } + // Skip claiming slots and ports if we have preemptively allocated them + bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID; + // A scheduling decision will create a new PTP mapping and, as a // consequence, a new group ID int newGroupId = faabric::util::generateGid(); @@ -675,6 +705,23 @@ Planner::callBatch(std::shared_ptr req) decision->print(); #endif + // For a NEW MPI decision that was not preloaded we have + // preemptively scheduled all MPI messages but now we just need to + // return the first one, and preload the rest + if (isMpi && mpiReq != nullptr) { + auto mpiDecision = std::make_shared< + faabric::batch_scheduler::SchedulingDecision>(req->appid(), + req->groupid()); + *mpiDecision = *decision; + mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID; + state.preloadedSchedulingDecisions[appId] = mpiDecision; + + // Remove all messages that we do not have to dispatch now + for (int i = 1; i < mpiDecision->messageIds.size(); i++) { + decision->removeMessage(mpiDecision->messageIds.at(i)); + } + } + // 2. For a new decision, we just add it to the in-flight map state.inFlightReqs[appId] = std::make_pair(req, decision); @@ -688,7 +735,9 @@ Planner::callBatch(std::shared_ptr req) // with the _new_ messages being scheduled for (int i = 0; i < decision->hosts.size(); i++) { auto thisHost = state.hostMap.at(decision->hosts.at(i)); - claimHostSlots(thisHost); + if (!skipClaim) { + claimHostSlots(thisHost); + } } // 2. For a scale change request, we want to update the BER with the @@ -702,8 +751,14 @@ Planner::callBatch(std::shared_ptr req) for (int i = 0; i < req->messages_size(); i++) { *oldReq->add_messages() = req->messages(i); oldDec->addMessage(decision->hosts.at(i), req->messages(i)); - oldDec->mpiPorts.at(oldDec->nFunctions - 1) = - claimHostMpiPort(state.hostMap.at(decision->hosts.at(i))); + if (!skipClaim) { + oldDec->mpiPorts.at(oldDec->nFunctions - 1) = + claimHostMpiPort(state.hostMap.at(decision->hosts.at(i))); + } else { + assert(decision->mpiPorts.at(i) != 0); + oldDec->mpiPorts.at(oldDec->nFunctions - 1) = + decision->mpiPorts.at(i); + } } // 2.5.1. Log the updated decision in debug mode