Skip to content

Commit

Permalink
planner: add rpc to preload scheduling decision
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Nov 29, 2023
1 parent 18f6a46 commit e1b7266
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 10 deletions.
1 change: 1 addition & 0 deletions include/faabric/planner/PlannerApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ enum PlannerCalls
GetBatchResults = 10,
GetSchedulingDecision = 11,
CallBatch = 12,
PreloadSchedulingDecision = 13,
};
}
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient
faabric::batch_scheduler::SchedulingDecision getSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void preloadSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> preloadDec);

private:
std::mutex plannerCacheMx;
PlannerCache cache;
Expand Down
3 changes: 3 additions & 0 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvGetSchedulingDecision(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPreloadSchedulingDecision(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvCallBatch(
std::span<const uint8_t> buffer);

Expand Down
9 changes: 9 additions & 0 deletions include/faabric/util/ptp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/proto/faabric.pb.h>

namespace faabric::util {
faabric::PointToPointMappings ptpMappingsFromSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision);
}
10 changes: 10 additions & 0 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/network.h>
#include <faabric/util/ptp.h>

namespace faabric::planner {

Expand Down Expand Up @@ -380,6 +381,15 @@ PlannerClient::getSchedulingDecision(
return decision;
}

void PlannerClient::preloadSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> preloadDec)
{
faabric::EmptyResponse response;
auto mappings =
faabric::util::ptpMappingsFromSchedulingDecision(preloadDec);
syncSend(PlannerCalls::PreloadSchedulingDecision, &mappings, &response);
}

// -----------------------------------
// Static setter/getters
// -----------------------------------
Expand Down
33 changes: 23 additions & 10 deletions src/planner/PlannerServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/transport/macros.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>
#include <faabric/util/ptp.h>

#include <fmt/format.h>

Expand Down Expand Up @@ -60,6 +61,9 @@ std::unique_ptr<google::protobuf::Message> PlannerServer::doSyncRecv(
case PlannerCalls::GetSchedulingDecision: {
return recvGetSchedulingDecision(message.udata());
}
case PlannerCalls::PreloadSchedulingDecision: {
return recvPreloadSchedulingDecision(message.udata());
}
case PlannerCalls::CallBatch: {
return recvCallBatch(message.udata());
}
Expand Down Expand Up @@ -185,20 +189,29 @@ PlannerServer::recvGetSchedulingDecision(std::span<const uint8_t> buffer)
}

// Build PointToPointMappings from scheduling decision
faabric::PointToPointMappings mappings;
mappings.set_appid(decision->appId);
mappings.set_groupid(decision->groupId);
for (int i = 0; i < decision->hosts.size(); i++) {
auto* mapping = mappings.add_mappings();
mapping->set_host(decision->hosts.at(i));
mapping->set_messageid(decision->messageIds.at(i));
mapping->set_appidx(decision->appIdxs.at(i));
mapping->set_groupidx(decision->groupIdxs.at(i));
}
faabric::PointToPointMappings mappings =
faabric::util::ptpMappingsFromSchedulingDecision(decision);

return std::make_unique<faabric::PointToPointMappings>(mappings);
}

std::unique_ptr<google::protobuf::Message>
PlannerServer::recvPreloadSchedulingDecision(std::span<const uint8_t> buffer)
{
PARSE_MSG(PointToPointMappings, buffer.data(), buffer.size());

auto preloadDecision =
faabric::batch_scheduler::SchedulingDecision::fromPointToPointMappings(
parsedMsg);

planner.preloadSchedulingDecision(
preloadDecision.appId,
std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
preloadDecision));

return std::make_unique<faabric::EmptyResponse>();
}

std::unique_ptr<google::protobuf::Message> PlannerServer::recvCallBatch(
std::span<const uint8_t> buffer)
{
Expand Down
1 change: 1 addition & 0 deletions src/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ faabric_lib(util
logging.cpp
memory.cpp
network.cpp
ptp.cpp
queue.cpp
random.cpp
snapshot.cpp
Expand Down
20 changes: 20 additions & 0 deletions src/util/ptp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <faabric/util/ptp.h>

namespace faabric::util {
faabric::PointToPointMappings ptpMappingsFromSchedulingDecision(
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision)
{
faabric::PointToPointMappings mappings;
mappings.set_appid(decision->appId);
mappings.set_groupid(decision->groupId);
for (int i = 0; i < decision->hosts.size(); i++) {
auto* mapping = mappings.add_mappings();
mapping->set_host(decision->hosts.at(i));
mapping->set_messageid(decision->messageIds.at(i));
mapping->set_appidx(decision->appIdxs.at(i));
mapping->set_groupidx(decision->groupIdxs.at(i));
}

return mappings;
}
}
39 changes: 39 additions & 0 deletions tests/test/planner/test_planner_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,43 @@ TEST_CASE_METHOD(PlannerClientServerExecTestFixture,
checkMessageEquality(messageResults[msg.id()], msg);
}
}

TEST_CASE_METHOD(PlannerClientServerExecTestFixture,
"Test preloading a scheduling decision from the client",
"[planner]")
{
int nFuncs = 4;
faabric::HostResources res;
res.set_slots(nFuncs);
sch.setThisHostResources(res);
auto req = faabric::util::batchExecFactory("foo", "bar", nFuncs);

// Preload a scheduling decision
auto decision =
std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
req->appid(), req->groupid());
for (int i = 0; i < nFuncs; i++) {
decision->addMessage(
faabric::util::getSystemConfig().endpointHost, 0, 0, i);
}
plannerCli.preloadSchedulingDecision(decision);

// Now call the request with a preloaded decision
plannerCli.callFunctions(req);
std::map<int, Message> messageResults;
for (int i = 0; i < req->messages_size(); i++) {
auto result =
plannerCli.getMessageResult(req->appid(), req->messages(i).id(), 500);
REQUIRE(result.returnvalue() == 0);
messageResults[result.id()] = result;
}

// Now, all results for the batch should be registered
auto berStatus = plannerCli.getBatchResults(req);
REQUIRE(berStatus->appid() == req->appid());
for (const auto& msg : berStatus->messageresults()) {
REQUIRE(messageResults.contains(msg.id()));
checkMessageEquality(messageResults[msg.id()], msg);
}
}
}

0 comments on commit e1b7266

Please sign in to comment.