Skip to content

Commit

Permalink
threads: handle zero-sized requests and preload a la mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed May 8, 2024
1 parent 0c44a40 commit e0bfed0
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 33 deletions.
7 changes: 7 additions & 0 deletions src/batch-scheduler/BinPackScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ std::shared_ptr<SchedulingDecision> BinPackScheduler::makeSchedulingDecision(
auto decisionType = getDecisionType(inFlightReqs, req);
auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType);

// For an OpenMP request with the single host hint, we only consider
// scheduling in one VM
bool isOmp = req->messages_size() > 0 && req->messages(0).isomp();
if (req->singlehosthint() && isOmp) {
sortedHosts.erase(sortedHosts.begin() + 1, sortedHosts.end());
}

// Assign slots from the list (i.e. bin-pack)
auto it = sortedHosts.begin();
int numLeftToSchedule = req->messages_size();
Expand Down
2 changes: 1 addition & 1 deletion src/batch-scheduler/SchedulingDecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bool SchedulingDecision::isSingleHost() const

std::string thisHost = conf.endpointHost;
std::set<std::string> hostSet(hosts.begin(), hosts.end());
return hostSet.size() == 1;
return hostSet.size() <= 1;
}

void SchedulingDecision::addMessage(const std::string& host,
Expand Down
73 changes: 46 additions & 27 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

// Special group ID magic to indicate MPI decisions that we have preemptively
// scheduled
#define MPI_PRELOADED_DECISION_GROUPID -99
#define FIXED_SIZE_PRELOADED_DECISION_GROUPID -99

namespace faabric::planner {

Expand Down Expand Up @@ -793,7 +793,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)

int numAvail = availableSlots(state.hostMap.at(mainHost));
int numRequested = req->messages_size();
int lastMsgIdx = req->messages(numRequested - 1).groupidx();
int lastMsgIdx = numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx();
for (int itr = 0; itr < (numAvail - numRequested); itr++) {
// Differentiate between the position in the message array (itr)
// and the new group index. Usually, in a fork, they would be
Expand All @@ -802,9 +802,25 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
SPDLOG_DEBUG("Adding elastically scaled up msg idx {} (app: {})", msgIdx, appId);

// To add a new message, copy from the last, and update the indexes
*req->add_messages() = req->messages(numRequested - 1);
req->mutable_messages(numRequested + itr)->set_appidx(msgIdx);
req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx);
if (numRequested == 0) {
// This is a special case where we scale up from zero
// parallelism (i.e. 1 OpenMP thread) that requires special
// care
auto* newMsg = req->add_messages();
*newMsg = state.inFlightReqs.at(appId).first->messages(0);
newMsg->set_mainhost(mainHost);
newMsg->set_appidx(msgIdx);
newMsg->set_groupidx(msgIdx);

// For requests that elastically scale from 1 (i.e. zero-
// parallelism) we make use of the group id field to pass the
// actual function pointer as a hack
newMsg->set_funcptr(req->groupid());
} else {
*req->add_messages() = req->messages(numRequested - 1);
req->mutable_messages(numRequested + itr)->set_appidx(msgIdx);
req->mutable_messages(numRequested + itr)->set_groupidx(msgIdx);
}

// Also update the message id to make sure we can wait-for and
// clean-up the resources we use
Expand All @@ -831,13 +847,14 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}
}

// For a NEW decision of an MPI application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the mpi_world_size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks, and
// consume them later as a preloaded scheduling decision
// For a NEW decision of an MPI/OpenMP application, we know that it will be
// followed-up by a SCALE_CHANGE one, and that the size parameter
// must be set. Thus, we can schedule slots for all the MPI ranks/OMP
// threads, and consume them later as a preloaded scheduling decision
bool isNew = decisionType == faabric::batch_scheduler::DecisionType::NEW;
bool isMpi = req->messages(0).ismpi();
std::shared_ptr<BatchExecuteRequest> mpiReq = nullptr;
bool isMpi = req->messages_size() > 0 && req->messages(0).ismpi();
bool isOmp = req->messages_size() > 0 && req->messages(0).isomp();
std::shared_ptr<BatchExecuteRequest> knownSizeReq = nullptr;

// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
Expand All @@ -849,25 +866,27 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// In general, after a scale change decision (that has been preloaded)
// it is safe to remove it
if (isScaleChange) {
SPDLOG_DEBUG("Removing pre-loaded scheduling decision for app {}", appId);
state.preloadedSchedulingDecisions.erase(appId);
}
} else if (isNew && isMpi) {
mpiReq = std::make_shared<BatchExecuteRequest>();
*mpiReq = *req;
} else if (isNew && (isMpi || isOmp)) {
knownSizeReq = std::make_shared<BatchExecuteRequest>();
*knownSizeReq = *req;

// Deep-copy as many messages we can from the original BER, and mock
// the rest
for (int i = req->messages_size(); i < req->messages(0).mpiworldsize();
i++) {
auto* newMpiMsg = mpiReq->add_messages();
size_t reqSize = isMpi ? req->messages(0).mpiworldsize() : req->messages(0).ompnumthreads();
assert(reqSize > 0);
for (int i = req->messages_size(); i < reqSize; i++) {
auto* newMpiMsg = knownSizeReq->add_messages();

newMpiMsg->set_appid(req->appid());
newMpiMsg->set_groupidx(i);
}
assert(mpiReq->messages_size() == req->messages(0).mpiworldsize());
assert(knownSizeReq->messages_size() == reqSize);

decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, mpiReq);
hostMapCopy, state.inFlightReqs, knownSizeReq);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
Expand Down Expand Up @@ -953,7 +972,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}

// Skip claiming slots and ports if we have preemptively allocated them
bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID;
bool skipClaim = decision->groupId == FIXED_SIZE_PRELOADED_DECISION_GROUPID;

// A scheduling decision will create a new PTP mapping and, as a
// consequence, a new group ID
Expand Down Expand Up @@ -990,17 +1009,17 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// For a NEW MPI decision that was not preloaded we have
// preemptively scheduled all MPI messages but now we just need to
// return the first one, and preload the rest
if (isMpi && mpiReq != nullptr) {
auto mpiDecision = std::make_shared<
if ((isMpi || isOmp) && knownSizeReq != nullptr) {
auto knownSizeDecision = std::make_shared<
faabric::batch_scheduler::SchedulingDecision>(req->appid(),
req->groupid());
*mpiDecision = *decision;
mpiDecision->groupId = MPI_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = mpiDecision;
*knownSizeDecision = *decision;
knownSizeDecision->groupId = FIXED_SIZE_PRELOADED_DECISION_GROUPID;
state.preloadedSchedulingDecisions[appId] = knownSizeDecision;

// Remove all messages that we do not have to dispatch now
for (int i = 1; i < mpiDecision->messageIds.size(); i++) {
decision->removeMessage(mpiDecision->messageIds.at(i));
for (int i = 1; i < knownSizeDecision->messageIds.size(); i++) {
decision->removeMessage(knownSizeDecision->messageIds.at(i));
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions(
// to other hosts. Given that we don't support nested threading, if we
// have a THREADS request here it means that we are being called from the
// main thread (which holds the main snapshot)
const std::string funcStr =
faabric::util::funcToString(req->messages(0), false);
std::string snapshotKey;
auto& reg = faabric::snapshot::getSnapshotRegistry();

std::string snapshotKey;
const auto firstMsg = req->messages(0);
if (isThreads) {
// Note that with threads we may have 0-sized BERs
if (isThreads && req->messages_size() > 0) {
const std::string funcStr =
faabric::util::funcToString(req->messages(0), false);

const auto firstMsg = req->messages(0);

if (!firstMsg.snapshotkey().empty()) {
SPDLOG_ERROR("{} should not provide snapshot key for {} threads",
funcStr,
Expand Down
4 changes: 4 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ message Message {
repeated int32 chainedMsgIds = 36;
map<string, int32> intExecGraphDetails = 37;
map<string, string> execGraphDetails = 38;

// OpenMP
bool isOmp = 39;
int32 ompNumThreads = 40;
}

// ---------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ long Scheduler::getFunctionExecutorCount(const faabric::Message& msg)

void Scheduler::executeBatch(std::shared_ptr<faabric::BatchExecuteRequest> req)
{
if (req->messages_size() == 0) {
return;
}

faabric::util::FullLock lock(mx);

bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
Expand Down

0 comments on commit e0bfed0

Please sign in to comment.