diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h index 6ec2617d9..b42f1adb5 100644 --- a/include/faabric/batch-scheduler/BatchScheduler.h +++ b/include/faabric/batch-scheduler/BatchScheduler.h @@ -46,7 +46,8 @@ typedef std::map HostMap; * 3) A `SCALE_CHANGE` scheduling decision happens when we are scheduling a BER * _not_ for the first time, and the BER has a differet number of messages than * it had before. This corresponds to a chaining request or a thread/process - * fork. + * fork. IMPORTANT: in a `SCALE_CHANGE` decision, we indicate the NEW number of + * messages we want to add to the running request, not the TOTAL */ enum DecisionType { diff --git a/src/batch-scheduler/BatchScheduler.cpp b/src/batch-scheduler/BatchScheduler.cpp index af2e7fd21..79c7d043e 100644 --- a/src/batch-scheduler/BatchScheduler.cpp +++ b/src/batch-scheduler/BatchScheduler.cpp @@ -42,8 +42,7 @@ DecisionType BatchScheduler::getDecisionType( return DecisionType::NEW; } - auto oldReq = inFlightReqs.at(appId).first; - if (oldReq->messages_size() == req->messages_size()) { + if (req->type() == BatchExecuteRequest_BatchExecuteType_MIGRATION) { return DecisionType::DIST_CHANGE; } diff --git a/src/batch-scheduler/BinPackScheduler.cpp b/src/batch-scheduler/BinPackScheduler.cpp index f20313091..da4bdad16 100644 --- a/src/batch-scheduler/BinPackScheduler.cpp +++ b/src/batch-scheduler/BinPackScheduler.cpp @@ -16,7 +16,7 @@ std::vector BinPackScheduler::getSortedHosts( sortedHosts.push_back(host); } - auto isHostSmaller = [&](const Host& hostA, const Host& hostB) -> bool { + auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool { // The BinPack scheduler sorts hosts by number of available slots int nAvailableA = numSlotsAvailable(hostA); int nAvailableB = numSlotsAvailable(hostB); @@ -35,9 +35,62 @@ std::vector BinPackScheduler::getSortedHosts( return getIp(hostA) > getIp(hostB); }; + // Helper lambda to get the frequency of messags at each host (useful to + // sort hosts maximising locality) + auto getHostFreqCount = [inFlightReqs, + req]() -> std::map { + std::map hostFreqCount; + + auto oldDecision = inFlightReqs.at(req->appid()).second; + for (auto host : oldDecision->hosts) { + hostFreqCount[host] += 1; + } + + return hostFreqCount; + }; + switch (decisionType) { case DecisionType::NEW: { - std::sort(sortedHosts.begin(), sortedHosts.end(), isHostSmaller); + // For a NEW decision type, the BinPack scheduler just sorts the + // hosts in decreasing order of capacity, and bin-packs messages + // to hosts in this order + std::sort( + sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger); + break; + } + case DecisionType::SCALE_CHANGE: { + // If we are changing the scale of a running app (i.e. via chaining + // or thread/process forking) we want to prioritise co-locating + // as much as possible. This means that we will sort first by the + // frequency of messages of the running app, and second with the + // same criteria than NEW + // IMPORTANT: a SCALE_CHANGE request with 4 messages means that we + // want to add 4 NEW messages to the running app (not that the new + // total count is 4) + auto hostFreqCount = getHostFreqCount(); + std::sort(sortedHosts.begin(), + sortedHosts.end(), + [&](auto hostA, auto hostB) -> bool { + int numInHostA = hostFreqCount.contains(getIp(hostA)) + ? hostFreqCount.at(getIp(hostA)) + : 0; + int numInHostB = hostFreqCount.contains(getIp(hostB)) + ? hostFreqCount.at(getIp(hostB)) + : 0; + + // If at least one of the hosts has messages for this + // request, return the host with the more messages for + // this request (note that it is possible that this + // host has no available slots at all, in this case we + // will just pack 0 messages here but we still want to + // sort it first nontheless) + if (numInHostA != numInHostB) { + return numInHostA > numInHostB; + } + + // In case of a tie, use the same criteria than NEW + return isFirstHostLarger(hostA, hostB); + }); break; } default: { @@ -49,6 +102,10 @@ std::vector BinPackScheduler::getSortedHosts( return sortedHosts; } +// The BinPack's scheduler decision algorithm is very simple. It first sorts +// hosts (i.e. bins) in a specific order (depending on the scheduling type), +// and then starts filling bins from begining to end, until it runs out of +// messages to schedule std::shared_ptr BinPackScheduler::makeSchedulingDecision( const HostMap& hostMap, @@ -63,7 +120,7 @@ BinPackScheduler::makeSchedulingDecision( auto decisionType = getDecisionType(inFlightReqs, req); auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); - // Assign slots from the list + // Assign slots from the list (i.e. bin-pack) auto it = sortedHosts.begin(); int numLeftToSchedule = req->messages_size(); int msgIdx = 0; diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 6a501b225..b5b594a4d 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -21,6 +21,7 @@ message EmptyRequest { message BatchExecuteRequest { // Each BatchExecuteRequest has a unique app id int32 appId = 1; + // TODO: consider adding user/func to BER enum BatchExecuteType { FUNCTIONS = 0; diff --git a/tests/test/batch-scheduler/test_binpack_scheduler.cpp b/tests/test/batch-scheduler/test_binpack_scheduler.cpp index 03c0df673..20416e333 100644 --- a/tests/test/batch-scheduler/test_binpack_scheduler.cpp +++ b/tests/test/batch-scheduler/test_binpack_scheduler.cpp @@ -20,7 +20,7 @@ class BinPackSchedulerTestFixture : public BatchSchedulerFixture }; TEST_CASE_METHOD(BinPackSchedulerTestFixture, - "Test scheduling of new requests", + "Test scheduling of new requests with BinPack", "[batch-scheduler]") { // To mock new requests (i.e. DecisionType::NEW), we always set the @@ -124,4 +124,163 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture, config.hostMap, config.inFlightReqs, ber); compareSchedulingDecisions(actualDecision, config.expectedDecision); } + +TEST_CASE_METHOD(BinPackSchedulerTestFixture, + "Test scheduling of scale-change requests with BinPack", + "[batch-scheduler]") +{ + // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we + // need to have one in-flight request in the map with a different (always + // lower) number of messages + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = faabric::util::SchedulingDecision(appId, groupId), + }; + + SECTION("BinPack scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + // When scheduling a SCALE_CHANGE request, we always try to colocate as + // much as possible + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + // We prefer hosts with less capacity if they are already running requests + // for the same app + SECTION("Scheduling fits in one host and prefers known hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar" }); + } + + // Like with `NEW` requests, we can also spill to other hosts + SECTION("Scheduling spans more than one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + // If two hosts are already executing the app, we pick the one that is + // running the largest number of messages + SECTION("Scheduler prefers hosts with more running messages") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 1); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "foo" }); + config.expectedDecision = buildExpectedDecision(ber, { "bar" }); + } + + // Again, when picking a new host to spill to, we priorities hosts that + // are already running requests for this app + SECTION("Scheduling always picks known hosts first") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" }); + } + + // Sometimes the preferred hosts just don't have slots. They will be sorted + // first but the scheduler will skip them when bin-packing + SECTION("Scheduler ignores preferred but full hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "foo", "foo" }); + } + + // In case of a tie of the number of runing messages, we revert to `NEW`- + // like tie breaking + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (free slots)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (size)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "baz", "foo" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (alphabetical)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "bar", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 14e64351b..549ef4f8e 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -570,6 +570,27 @@ class BatchSchedulerFixture : public ConfFixture return hostMap; } + static faabric::batch_scheduler::InFlightReqs buildInFlightReqs( + std::shared_ptr ber, + int numMsgsOldBer, + std::vector hosts) + { + faabric::batch_scheduler::InFlightReqs inFlightReqs; + int appId = ber->appid(); + + auto oldBer = faabric::util::batchExecFactory( + ber->messages(0).user(), ber->messages(0).function(), numMsgsOldBer); + oldBer->set_appid(appId); + + assert(oldBer->messages_size() == hosts.size()); + inFlightReqs[appId] = + std::make_pair(oldBer, + std::make_shared( + buildExpectedDecision(oldBer, hosts))); + + return inFlightReqs; + } + static faabric::util::SchedulingDecision buildExpectedDecision( std::shared_ptr ber, std::vector hosts)