Skip to content

Commit

Permalink
planner(mpi): preload MPI SCALE_CHANGE decisions
Browse files Browse the repository at this point in the history
When scheduling a NEW app with the `ismpi` flag set, we know that this
request will be quickly followed-up by a SCALE_CHANGE request with
`worldSize` - 1 messages. As a consequence, and to guarantee optimal
bin-packing on-par with Slurm, we can schedule `worldSize` messages when
the first request arrives, dispatch just the first one, and preload the
`worldSize` - 1 remaining.
  • Loading branch information
csegarragonz committed Apr 16, 2024
1 parent d1c0f96 commit 2632f1c
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.16.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.16.0
FAABRIC_VERSION=0.17.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.17.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
env:
DEPLOYMENT_TYPE: gha-ci
steps:
Expand All @@ -34,7 +34,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
steps:
- name: "Check out code"
uses: actions/checkout@v4
Expand All @@ -45,7 +45,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
steps:
- name: "Check out code"
uses: actions/checkout@v4
Expand All @@ -65,7 +65,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
options: --privileged
services:
redis:
Expand Down Expand Up @@ -104,7 +104,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
options: --privileged
services:
redis:
Expand Down Expand Up @@ -156,7 +156,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.16.0
image: faasm.azurecr.io/faabric:0.17.0
services:
redis:
image: redis
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.16.0
0.17.0
61 changes: 58 additions & 3 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include <string>

// Special group ID magic to indicate MPI decisions that we have preemptively
// scheduled
#define MPI_PRELOADED_DECISION_GROUPID -99

namespace faabric::planner {

// ----------------------
Expand Down Expand Up @@ -511,6 +515,8 @@ Planner::getPreloadedSchedulingDecision(
msg.id(),
decision->appIdxs.at(idxInDecision),
decision->groupIdxs.at(idxInDecision));
filteredDecision->mpiPorts.at(filteredDecision->nFunctions - 1) =
decision->mpiPorts.at(idxInDecision);
}
assert(filteredDecision->hosts.size() == ber->messages_size());

Expand Down Expand Up @@ -621,12 +627,33 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}
}

// For a NEW decision of an MPI application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks, and
// consume them later as a preloaded scheduling decision
bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW;
bool isMpi = req->messages(0).ismpi();
std::shared_ptr<BatchExecuteRequest> mpiReq = nullptr;

// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
// pre-loaded decisions for dist-change requests
std::shared_ptr<batch_scheduler::SchedulingDecision> decision = nullptr;
if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) {
decision = getPreloadedSchedulingDecision(appId, req);
} else if (isNew && isMpi) {
mpiReq = faabric::util::batchExecFactory(
req->user(), req->function(), req->messages(0).mpiworldsize());

// Populate the temporary request
mpiReq->mutable_messages()->at(0) = req->messages(0);
faabric::util::updateBatchExecAppId(mpiReq, appId);
for (int i = 0; i < mpiReq->messages_size(); i++) {
mpiReq->mutable_messages()->at(i).set_groupidx(i);
}

decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, mpiReq);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
Expand All @@ -648,6 +675,9 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
return decision;
}

// Skip claiming slots and ports if we have preemptively allocated them
bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID;

// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
int newGroupId = faabric::util::generateGid();
Expand Down Expand Up @@ -675,6 +705,23 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
decision->print();
#endif

// For a NEW MPI decision that was not preloaded we have
// preemptively scheduled all MPI messages but now we just need to
// return the first one, and preload the rest
if (isMpi && mpiReq != nullptr) {
auto mpiDecision = std::make_shared<
faabric::batch_scheduler::SchedulingDecision>(req->appid(),
req->groupid());
*mpiDecision = *decision;
mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = mpiDecision;

// Remove all messages that we do not have to dispatch now
for (int i = 1; i < mpiDecision->messageIds.size(); i++) {
decision->removeMessage(mpiDecision->messageIds.at(i));
}
}

// 2. For a new decision, we just add it to the in-flight map
state.inFlightReqs[appId] = std::make_pair(req, decision);

Expand All @@ -688,7 +735,9 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// with the _new_ messages being scheduled
for (int i = 0; i < decision->hosts.size(); i++) {
auto thisHost = state.hostMap.at(decision->hosts.at(i));
claimHostSlots(thisHost);
if (!skipClaim) {
claimHostSlots(thisHost);
}
}

// 2. For a scale change request, we want to update the BER with the
Expand All @@ -702,8 +751,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
for (int i = 0; i < req->messages_size(); i++) {
*oldReq->add_messages() = req->messages(i);
oldDec->addMessage(decision->hosts.at(i), req->messages(i));
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
claimHostMpiPort(state.hostMap.at(decision->hosts.at(i)));
if (!skipClaim) {
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
claimHostMpiPort(state.hostMap.at(decision->hosts.at(i)));
} else {
assert(decision->mpiPorts.at(i) != 0);
oldDec->mpiPorts.at(oldDec->nFunctions - 1) =
decision->mpiPorts.at(i);
}
}

// 2.5.1. Log the updated decision in debug mode
Expand Down

0 comments on commit 2632f1c

Please sign in to comment.