Skip to content

Commit

Permalink
bin-pack: implement SCALE_CHANGE scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 1, 2023
1 parent 883047e commit 8d397e6
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 7 deletions.
3 changes: 2 additions & 1 deletion include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ typedef std::map<std::string, Host> HostMap;
* 3) A `SCALE_CHANGE` scheduling decision happens when we are scheduling a BER
* _not_ for the first time, and the BER has a differet number of messages than
* it had before. This corresponds to a chaining request or a thread/process
* fork.
* fork. IMPORTANT: in a `SCALE_CHANGE` decision, we indicate the NEW number of
* messages we want to add to the running request, not the TOTAL
*/
enum DecisionType
{
Expand Down
3 changes: 1 addition & 2 deletions src/batch-scheduler/BatchScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ DecisionType BatchScheduler::getDecisionType(
return DecisionType::NEW;
}

auto oldReq = inFlightReqs.at(appId).first;
if (oldReq->messages_size() == req->messages_size()) {
if (req->type() == BatchExecuteRequest_BatchExecuteType_MIGRATION) {
return DecisionType::DIST_CHANGE;
}

Expand Down
63 changes: 60 additions & 3 deletions src/batch-scheduler/BinPackScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ std::vector<Host> BinPackScheduler::getSortedHosts(
sortedHosts.push_back(host);
}

auto isHostSmaller = [&](const Host& hostA, const Host& hostB) -> bool {
auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool {
// The BinPack scheduler sorts hosts by number of available slots
int nAvailableA = numSlotsAvailable(hostA);
int nAvailableB = numSlotsAvailable(hostB);
Expand All @@ -35,9 +35,62 @@ std::vector<Host> BinPackScheduler::getSortedHosts(
return getIp(hostA) > getIp(hostB);
};

// Helper lambda to get the frequency of messags at each host (useful to
// sort hosts maximising locality)
auto getHostFreqCount = [inFlightReqs,
req]() -> std::map<std::string, int> {
std::map<std::string, int> hostFreqCount;

auto oldDecision = inFlightReqs.at(req->appid()).second;
for (auto host : oldDecision->hosts) {
hostFreqCount[host] += 1;
}

return hostFreqCount;
};

switch (decisionType) {
case DecisionType::NEW: {
std::sort(sortedHosts.begin(), sortedHosts.end(), isHostSmaller);
// For a NEW decision type, the BinPack scheduler just sorts the
// hosts in decreasing order of capacity, and bin-packs messages
// to hosts in this order
std::sort(
sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger);
break;
}
case DecisionType::SCALE_CHANGE: {
// If we are changing the scale of a running app (i.e. via chaining
// or thread/process forking) we want to prioritise co-locating
// as much as possible. This means that we will sort first by the
// frequency of messages of the running app, and second with the
// same criteria than NEW
// IMPORTANT: a SCALE_CHANGE request with 4 messages means that we
// want to add 4 NEW messages to the running app (not that the new
// total count is 4)
auto hostFreqCount = getHostFreqCount();
std::sort(sortedHosts.begin(),
sortedHosts.end(),
[&](auto hostA, auto hostB) -> bool {
int numInHostA = hostFreqCount.contains(getIp(hostA))
? hostFreqCount.at(getIp(hostA))
: 0;
int numInHostB = hostFreqCount.contains(getIp(hostB))
? hostFreqCount.at(getIp(hostB))
: 0;

// If at least one of the hosts has messages for this
// request, return the host with the more messages for
// this request (note that it is possible that this
// host has no available slots at all, in this case we
// will just pack 0 messages here but we still want to
// sort it first nontheless)
if (numInHostA != numInHostB) {
return numInHostA > numInHostB;
}

// In case of a tie, use the same criteria than NEW
return isFirstHostLarger(hostA, hostB);
});
break;
}
default: {
Expand All @@ -49,6 +102,10 @@ std::vector<Host> BinPackScheduler::getSortedHosts(
return sortedHosts;
}

// The BinPack's scheduler decision algorithm is very simple. It first sorts
// hosts (i.e. bins) in a specific order (depending on the scheduling type),
// and then starts filling bins from begining to end, until it runs out of
// messages to schedule
std::shared_ptr<faabric::util::SchedulingDecision>
BinPackScheduler::makeSchedulingDecision(
const HostMap& hostMap,
Expand All @@ -63,7 +120,7 @@ BinPackScheduler::makeSchedulingDecision(
auto decisionType = getDecisionType(inFlightReqs, req);
auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType);

// Assign slots from the list
// Assign slots from the list (i.e. bin-pack)
auto it = sortedHosts.begin();
int numLeftToSchedule = req->messages_size();
int msgIdx = 0;
Expand Down
1 change: 1 addition & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ message EmptyRequest {
message BatchExecuteRequest {
// Each BatchExecuteRequest has a unique app id
int32 appId = 1;
// TODO: consider adding user/func to BER

enum BatchExecuteType {
FUNCTIONS = 0;
Expand Down
161 changes: 160 additions & 1 deletion tests/test/batch-scheduler/test_binpack_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class BinPackSchedulerTestFixture : public BatchSchedulerFixture
};

TEST_CASE_METHOD(BinPackSchedulerTestFixture,
"Test scheduling of new requests",
"Test scheduling of new requests with BinPack",
"[batch-scheduler]")
{
// To mock new requests (i.e. DecisionType::NEW), we always set the
Expand Down Expand Up @@ -124,4 +124,163 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture,
config.hostMap, config.inFlightReqs, ber);
compareSchedulingDecisions(actualDecision, config.expectedDecision);
}

TEST_CASE_METHOD(BinPackSchedulerTestFixture,
"Test scheduling of scale-change requests with BinPack",
"[batch-scheduler]")
{
// To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we
// need to have one in-flight request in the map with a different (always
// lower) number of messages
BatchSchedulerConfig config = {
.hostMap = {},
.inFlightReqs = {},
.expectedDecision = faabric::util::SchedulingDecision(appId, groupId),
};

SECTION("BinPack scheduler gives up if not enough slots are available")
{
config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 });
ber = faabric::util::batchExecFactory("bat", "man", 6);
config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" });
config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION;
}

// When scheduling a SCALE_CHANGE request, we always try to colocate as
// much as possible
SECTION("Scheduling fits in one host")
{
config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" });
config.expectedDecision =
buildExpectedDecision(ber, { "foo", "foo", "foo" });
}

// We prefer hosts with less capacity if they are already running requests
// for the same app
SECTION("Scheduling fits in one host and prefers known hosts")
{
config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" });
config.expectedDecision =
buildExpectedDecision(ber, { "bar", "bar", "bar" });
}

// Like with `NEW` requests, we can also spill to other hosts
SECTION("Scheduling spans more than one host")
{
config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 4);
config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" });
config.expectedDecision =
buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" });
}

// If two hosts are already executing the app, we pick the one that is
// running the largest number of messages
SECTION("Scheduler prefers hosts with more running messages")
{
config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 });
ber = faabric::util::batchExecFactory("bat", "man", 1);
config.inFlightReqs =
buildInFlightReqs(ber, 3, { "bar", "bar", "foo" });
config.expectedDecision = buildExpectedDecision(ber, { "bar" });
}

// Again, when picking a new host to spill to, we priorities hosts that
// are already running requests for this app
SECTION("Scheduling always picks known hosts first")
{
config.hostMap = buildHostMap(
{
"foo",
"bar",
"baz",
},
{ 4, 3, 2 },
{ 0, 1, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 5);
config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
config.expectedDecision =
buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" });
}

// Sometimes the preferred hosts just don't have slots. They will be sorted
// first but the scheduler will skip them when bin-packing
SECTION("Scheduler ignores preferred but full hosts")
{
config.hostMap = buildHostMap(
{
"foo",
"bar",
"baz",
},
{ 4, 2, 2 },
{ 0, 2, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs =
buildInFlightReqs(ber, 3, { "bar", "bar", "baz" });
config.expectedDecision =
buildExpectedDecision(ber, { "baz", "foo", "foo" });
}

// In case of a tie of the number of runing messages, we revert to `NEW`-
// like tie breaking
SECTION("In case of a tie of preferred hosts, fall-back to known "
"tie-breaks (free slots)")
{
config.hostMap = buildHostMap(
{
"foo",
"bar",
"baz",
},
{ 4, 3, 2 },
{ 0, 1, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
config.expectedDecision =
buildExpectedDecision(ber, { "bar", "bar", "baz" });
}

SECTION("In case of a tie of preferred hosts, fall-back to known "
"tie-breaks (size)")
{
config.hostMap = buildHostMap(
{
"foo",
"bar",
"baz",
},
{ 4, 3, 2 },
{ 0, 2, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
config.expectedDecision =
buildExpectedDecision(ber, { "bar", "baz", "foo" });
}

SECTION("In case of a tie of preferred hosts, fall-back to known "
"tie-breaks (alphabetical)")
{
config.hostMap = buildHostMap(
{
"foo",
"bar",
"baz",
},
{ 4, 2, 2 },
{ 0, 1, 1 });
ber = faabric::util::batchExecFactory("bat", "man", 3);
config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" });
config.expectedDecision =
buildExpectedDecision(ber, { "baz", "bar", "foo" });
}

actualDecision = *batchScheduler->makeSchedulingDecision(
config.hostMap, config.inFlightReqs, ber);
compareSchedulingDecisions(actualDecision, config.expectedDecision);
}
}
21 changes: 21 additions & 0 deletions tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,27 @@ class BatchSchedulerFixture : public ConfFixture
return hostMap;
}

static faabric::batch_scheduler::InFlightReqs buildInFlightReqs(
std::shared_ptr<BatchExecuteRequest> ber,
int numMsgsOldBer,
std::vector<std::string> hosts)
{
faabric::batch_scheduler::InFlightReqs inFlightReqs;
int appId = ber->appid();

auto oldBer = faabric::util::batchExecFactory(
ber->messages(0).user(), ber->messages(0).function(), numMsgsOldBer);
oldBer->set_appid(appId);

assert(oldBer->messages_size() == hosts.size());
inFlightReqs[appId] =
std::make_pair(oldBer,
std::make_shared<faabric::util::SchedulingDecision>(
buildExpectedDecision(oldBer, hosts)));

return inFlightReqs;
}

static faabric::util::SchedulingDecision buildExpectedDecision(
std::shared_ptr<BatchExecuteRequest> ber,
std::vector<std::string> hosts)
Expand Down

0 comments on commit 8d397e6

Please sign in to comment.