Skip to content

Commit

Permalink
planner: support pre-loading a scheduling decision (#355)
Browse files Browse the repository at this point in the history
* planner: add http entrypoint to pre-load scheduling decision

* planner: finish-up implementation for decision preloading

* planner: remove preloaded decision when removing app from in-flight

* tests: add tests

* planner: add rpc to preload scheduling decision
  • Loading branch information
csegarragonz authored Nov 29, 2023
1 parent 9db49af commit 8b8c635
Show file tree
Hide file tree
Showing 18 changed files with 348 additions and 14 deletions.
9 changes: 9 additions & 0 deletions include/faabric/planner/Planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ class Planner
std::shared_ptr<faabric::Message> getMessageResult(
std::shared_ptr<faabric::Message> msg);

// Setter/Getter to bypass the planner's scheduling for a specific app
void preloadSchedulingDecision(
int appId,
std::shared_ptr<batch_scheduler::SchedulingDecision> decision);

std::shared_ptr<batch_scheduler::SchedulingDecision>
getPreloadedSchedulingDecision(int32_t appId,
std::shared_ptr<BatchExecuteRequest> ber);

// Get all the results recorded for one batch
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(
int32_t appId);
Expand Down
1 change: 1 addition & 0 deletions include/faabric/planner/PlannerApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ enum PlannerCalls
GetBatchResults = 10,
GetSchedulingDecision = 11,
CallBatch = 12,
PreloadSchedulingDecision = 13,
};
}
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient
faabric::batch_scheduler::SchedulingDecision getSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void preloadSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> preloadDec);

private:
std::mutex plannerCacheMx;
PlannerCache cache;
Expand Down
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvGetSchedulingDecision(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPreloadSchedulingDecision(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvCallBatch(
std::span<const uint8_t> buffer);

Expand Down
6 changes: 6 additions & 0 deletions include/faabric/planner/PlannerState.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <faabric/batch-scheduler/BatchScheduler.h>
#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/planner/planner.pb.h>
#include <faabric/proto/faabric.pb.h>

Expand All @@ -27,5 +28,10 @@ struct PlannerState

// Map keeping track of the requests that are in-flight
faabric::batch_scheduler::InFlightReqs inFlightReqs;

// Map keeping track of pre-loaded scheduling decisions that bypass the
// planner's scheduling
std::map<int, std::shared_ptr<batch_scheduler::SchedulingDecision>>
preloadedSchedulingDecisions;
};
}
5 changes: 5 additions & 0 deletions include/faabric/util/batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(

std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(
std::shared_ptr<faabric::BatchExecuteRequest> ber);

// Get the number of messages in a BER Status that have actually finished (i.e.
// those that have not been migrated)
int getNumFinishedMessagesInBatch(
std::shared_ptr<faabric::BatchExecuteRequestStatus> berStatus);
}
9 changes: 9 additions & 0 deletions include/faabric/util/ptp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/proto/faabric.pb.h>

namespace faabric::util {
faabric::PointToPointMappings ptpMappingsFromSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision);
}
83 changes: 80 additions & 3 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,15 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
assert(decision->appIdxs.empty());
assert(decision->groupIdxs.empty());
state.inFlightReqs.erase(appId);

// If we are removing the app from in-flight, we can also
// remmove any pre-loaded scheduling decisions
if (state.preloadedSchedulingDecisions.contains(appId)) {
SPDLOG_DEBUG(
"Removing preloaded scheduling decision for app {}",
appId);
state.preloadedSchedulingDecisions.erase(appId);
}
}
}
}
Expand Down Expand Up @@ -394,6 +403,63 @@ std::shared_ptr<faabric::Message> Planner::getMessageResult(
return nullptr;
}

void Planner::preloadSchedulingDecision(
int32_t appId,
std::shared_ptr<batch_scheduler::SchedulingDecision> decision)
{
faabric::util::FullLock lock(plannerMx);

if (state.preloadedSchedulingDecisions.contains(appId)) {
SPDLOG_ERROR(
"ERROR: preloaded scheduling decisions already contain app {}",
appId);
return;
}

SPDLOG_INFO("Pre-loading scheduling decision for app {}", appId);
state.preloadedSchedulingDecisions[appId] = decision;
}

std::shared_ptr<batch_scheduler::SchedulingDecision>
Planner::getPreloadedSchedulingDecision(
int32_t appId,
std::shared_ptr<BatchExecuteRequest> ber)
{
SPDLOG_DEBUG("Requesting pre-loaded scheduling decision for app {}", appId);
// WARNING: this method is currently only called from the main Planner
// entrypoint (callBatch) which has a FullLock, thus we don't need to
// acquire a (SharedLock) here. In general, we would need a read-lock
// to read the dict from the planner's state
auto decision = state.preloadedSchedulingDecisions.at(appId);
assert(decision != nullptr);

// Only include in the returned scheduling decision the group indexes that
// are in this BER. This can happen when consuming a preloaded decision
// in two steps (e.g. for MPI)
std::shared_ptr<batch_scheduler::SchedulingDecision> filteredDecision =
std::make_shared<batch_scheduler::SchedulingDecision>(decision->appId,
decision->groupId);
for (const auto& msg : ber->messages()) {
int groupIdx = msg.groupidx();
int idxInDecision = std::distance(decision->groupIdxs.begin(),
std::find(decision->groupIdxs.begin(),
decision->groupIdxs.end(),
groupIdx));
assert(idxInDecision < decision->groupIdxs.size());

// Add the schedulign for this group idx to the filtered decision.
// Make sure we also maintain the message IDs that come from the BER
// (as we can not possibly predict them in the preloaded decision)
filteredDecision->addMessage(decision->hosts.at(idxInDecision),
msg.id(),
decision->appIdxs.at(idxInDecision),
decision->groupIdxs.at(idxInDecision));
}
assert(filteredDecision->hosts.size() == ber->messages_size());

return filteredDecision;
}

std::shared_ptr<faabric::BatchExecuteRequestStatus> Planner::getBatchResults(
int32_t appId)
{
Expand Down Expand Up @@ -475,11 +541,13 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// Make a copy of the host-map state to make sure the scheduling process
// does not modify it
auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap);
bool isDistChange =
decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE;

// For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove
// on the old decision (we don't care the one we send), so we make sure
// we are scheduling the same messages from the old request
if (decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE) {
if (isDistChange) {
SPDLOG_INFO("App {} asked for migration opportunities", appId);
auto oldReq = state.inFlightReqs.at(appId).first;
req->clear_messages();
Expand All @@ -488,8 +556,17 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
}
}

auto decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
// pre-loaded decisions for dist-change requests
std::shared_ptr<batch_scheduler::SchedulingDecision> decision = nullptr;
if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) {
decision = getPreloadedSchedulingDecision(appId, req);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
}
assert(decision != nullptr);

// Handle failures to schedule work
if (*decision == NOT_ENOUGH_SLOTS_DECISION) {
Expand Down
10 changes: 10 additions & 0 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/network.h>
#include <faabric/util/ptp.h>

namespace faabric::planner {

Expand Down Expand Up @@ -380,6 +381,15 @@ PlannerClient::getSchedulingDecision(
return decision;
}

void PlannerClient::preloadSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> preloadDec)
{
faabric::EmptyResponse response;
auto mappings =
faabric::util::ptpMappingsFromSchedulingDecision(preloadDec);
syncSend(PlannerCalls::PreloadSchedulingDecision, &mappings, &response);
}

// -----------------------------------
// Static setter/getters
// -----------------------------------
Expand Down
44 changes: 43 additions & 1 deletion src/planner/PlannerEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void PlannerEndpointHandler::onRequest(
// Prepare the response
response.result(beast::http::status::ok);
// Work-out if it has finished using user-provided flags
if (actualBerStatus->messageresults_size() ==
if (faabric::util::getNumFinishedMessagesInBatch(actualBerStatus) ==
berStatus.expectednummessages()) {
actualBerStatus->set_finished(true);
} else {
Expand All @@ -268,6 +268,48 @@ void PlannerEndpointHandler::onRequest(

return ctx.sendFunction(std::move(response));
}
case faabric::planner::HttpMessage_Type_PRELOAD_SCHEDULING_DECISION: {
// foo bar
// in: BatchExecuteRequest
// out: none
SPDLOG_DEBUG(
"Planner received PRELOAD_SCHEDULING_DECISION request");
faabric::BatchExecuteRequest ber;
try {
faabric::util::jsonToMessage(msg.payloadjson(), &ber);
} catch (faabric::util::JsonSerialisationException e) {
response.result(beast::http::status::bad_request);
response.body() = std::string("Bad JSON in request body");
return ctx.sendFunction(std::move(response));
}

// For this method, we build the SchedulingDecision from a specially
// crafter BER. In particular, we only need to read the BER's
// app ID, and the `executedHost` parameter of each message in the
// BER.
auto decision =
std::make_shared<batch_scheduler::SchedulingDecision>(
ber.appid(), ber.groupid());
for (int i = 0; i < ber.messages_size(); i++) {
// Setting the right group idx here is key as it is the only
// message parameter that we can emulate in advance (i.e. we
// can not guess message ids in advance)
decision->addMessage(ber.messages(i).executedhost(),
ber.messages(i).id(),
ber.messages(i).appidx(),
ber.messages(i).groupidx());
}

// Pre-load the scheduling decision in the planner
faabric::planner::getPlanner().preloadSchedulingDecision(
decision->appId, decision);

// Prepare the response
response.result(beast::http::status::ok);
response.body() = std::string("Decision pre-loaded to planner");

return ctx.sendFunction(std::move(response));
}
default: {
SPDLOG_ERROR("Unrecognised message type {}", msg.type());
response.result(beast::http::status::bad_request);
Expand Down
33 changes: 23 additions & 10 deletions src/planner/PlannerServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/transport/macros.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>
#include <faabric/util/ptp.h>

#include <fmt/format.h>

Expand Down Expand Up @@ -60,6 +61,9 @@ std::unique_ptr<google::protobuf::Message> PlannerServer::doSyncRecv(
case PlannerCalls::GetSchedulingDecision: {
return recvGetSchedulingDecision(message.udata());
}
case PlannerCalls::PreloadSchedulingDecision: {
return recvPreloadSchedulingDecision(message.udata());
}
case PlannerCalls::CallBatch: {
return recvCallBatch(message.udata());
}
Expand Down Expand Up @@ -185,20 +189,29 @@ PlannerServer::recvGetSchedulingDecision(std::span<const uint8_t> buffer)
}

// Build PointToPointMappings from scheduling decision
faabric::PointToPointMappings mappings;
mappings.set_appid(decision->appId);
mappings.set_groupid(decision->groupId);
for (int i = 0; i < decision->hosts.size(); i++) {
auto* mapping = mappings.add_mappings();
mapping->set_host(decision->hosts.at(i));
mapping->set_messageid(decision->messageIds.at(i));
mapping->set_appidx(decision->appIdxs.at(i));
mapping->set_groupidx(decision->groupIdxs.at(i));
}
faabric::PointToPointMappings mappings =
faabric::util::ptpMappingsFromSchedulingDecision(decision);

return std::make_unique<faabric::PointToPointMappings>(mappings);
}

std::unique_ptr<google::protobuf::Message>
PlannerServer::recvPreloadSchedulingDecision(std::span<const uint8_t> buffer)
{
PARSE_MSG(PointToPointMappings, buffer.data(), buffer.size());

auto preloadDecision =
faabric::batch_scheduler::SchedulingDecision::fromPointToPointMappings(
parsedMsg);

planner.preloadSchedulingDecision(
preloadDecision.appId,
std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
preloadDecision));

return std::make_unique<faabric::EmptyResponse>();
}

std::unique_ptr<google::protobuf::Message> PlannerServer::recvCallBatch(
std::span<const uint8_t> buffer)
{
Expand Down
3 changes: 3 additions & 0 deletions src/planner/planner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ message HttpMessage {
GET_IN_FLIGHT_APPS = 8;
EXECUTE_BATCH = 9;
EXECUTE_BATCH_STATUS = 10;
PRELOAD_SCHEDULING_DECISION = 11;
}

Type type = 1 [json_name = "http_type"];
Expand All @@ -52,6 +53,8 @@ message HttpMessage {
// - GET_EXEC_GRAPH: where the body is a faabric::Message
// - EXECUTE_BATCH: where the body is a faabric::BatchExecuteRequest
// - EXECUTE_BATCH_STATUS: where the body is a BER too
// - PRELOAD_SCHEDULING_DECISION: where the body is a
// faabric::BatchExecuteRequest
string payloadJson = 2 [json_name = "payload"];
}

Expand Down
1 change: 1 addition & 0 deletions src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ faabric_lib(util
logging.cpp
memory.cpp
network.cpp
ptp.cpp
queue.cpp
random.cpp
snapshot.cpp
Expand Down
14 changes: 14 additions & 0 deletions src/util/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,18 @@ std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(
{
return batchExecStatusFactory(ber->appid());
}

int getNumFinishedMessagesInBatch(
std::shared_ptr<faabric::BatchExecuteRequestStatus> berStatus)
{
int numFinishedMsgs = 0;

for (const auto& msg : berStatus->messageresults()) {
if (msg.returnvalue() != MIGRATED_FUNCTION_RETURN_VALUE) {
numFinishedMsgs += 1;
}
}

return numFinishedMsgs;
}
}
Loading

0 comments on commit 8b8c635

Please sign in to comment.