From 06460d053d142409f495ce95334371331efec2f6 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Wed, 2 Aug 2023 11:39:55 +0000 Subject: [PATCH] bin-pack: implement dist-change decision making. still need to add more tests --- .../faabric/batch-scheduler/BatchScheduler.h | 23 ++- .../batch-scheduler/BinPackScheduler.h | 4 + src/batch-scheduler/BinPackScheduler.cpp | 189 +++++++++++++++++- src/proto/faabric.proto | 1 + .../test_binpack_scheduler.cpp | 47 ++++- tests/utils/fixtures.h | 17 +- 6 files changed, 272 insertions(+), 9 deletions(-) diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h index b42f1adb5..f5bd668c0 100644 --- a/include/faabric/batch-scheduler/BatchScheduler.h +++ b/include/faabric/batch-scheduler/BatchScheduler.h @@ -5,6 +5,9 @@ #include +#define DO_NOT_MIGRATE -98 +#define DO_NOT_MIGRATE_DECISION \ + faabric::util::SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE) #define NOT_ENOUGH_SLOTS -99 #define NOT_ENOUGH_SLOTS_DECISION \ faabric::util::SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS) @@ -42,7 +45,8 @@ typedef std::map HostMap; * first time. * 2) A `DIST_CHANGE` scheduling decision happens when we are scheduling a BER * _not_ for the first time, but the BER has the same number of messages that - * it had before. This corresponds to a request to migrate. + * it had before, and is set with the MIGRATION flag. This corresponds to a + * request to migrate. * 3) A `SCALE_CHANGE` scheduling decision happens when we are scheduling a BER * _not_ for the first time, and the BER has a differet number of messages than * it had before. This corresponds to a chaining request or a thread/process @@ -85,6 +89,8 @@ class BatchScheduler // ---------- // Helper Host accessor metods (we encapsulate them to allow changing the // underlying `Host` typedef easily) + // TODO: consider moving to a separate class, or as members of the Host + // wrapper class // ---------- static int numSlots(const Host& host) { return host->slots; } @@ -94,12 +100,27 @@ class BatchScheduler return std::max(0, numSlots(host) - host->usedSlots); } + static void claimSlots(Host& host, int numSlotsToClaim) + { + host->usedSlots = + std::min(numSlots(host), host->usedSlots + numSlotsToClaim); + } + + static void freeSlots(Host& host, int numSlotsToFree) + { + host->usedSlots = std::max(0, host->usedSlots - numSlotsToFree); + } + static std::string getIp(const Host& host) { return host->ip; } // ---------- // Virtual scheduling methods // ---------- + virtual bool isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) = 0; + virtual std::vector getSortedHosts( const HostMap& hostMap, const InFlightReqs& inFlightReqs, diff --git a/include/faabric/batch-scheduler/BinPackScheduler.h b/include/faabric/batch-scheduler/BinPackScheduler.h index d566265d1..75ab7a611 100644 --- a/include/faabric/batch-scheduler/BinPackScheduler.h +++ b/include/faabric/batch-scheduler/BinPackScheduler.h @@ -19,6 +19,10 @@ class BinPackScheduler final : public BatchScheduler std::shared_ptr req) override; private: + bool isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) override; + std::vector getSortedHosts( const HostMap& hostMap, const InFlightReqs& inFlightReqs, diff --git a/src/batch-scheduler/BinPackScheduler.cpp b/src/batch-scheduler/BinPackScheduler.cpp index da4bdad16..fa0b69775 100644 --- a/src/batch-scheduler/BinPackScheduler.cpp +++ b/src/batch-scheduler/BinPackScheduler.cpp @@ -3,8 +3,130 @@ #include #include +#include + namespace faabric::batch_scheduler { +// TODO: decide what to do with this method (i.e. where to place it) +// Given a new decision that improves on an old decision (i.e. to migrate), we +// want to make sure that we minimise the number of migration requests we send. +// This is, we want to keep as many host-message scheduling in the old decision +// as possible, and also have the overall locality of the new decision (i.e. +// the host-message histogram) +static std::shared_ptr +minimiseNumOfMigrations( + std::shared_ptr newDecision, + std::shared_ptr oldDecision) +{ + auto decision = std::make_shared( + oldDecision->appId, oldDecision->groupId); + + // We want to maintain the new decision's host-message histogram + std::map hostFreqCount; + for (auto host : newDecision->hosts) { + hostFreqCount[host] += 1; + } + + assert(newDecision->hosts.size() == oldDecision->hosts.size()); + std::queue skippedHosts; + for (int i = 0; i < newDecision->hosts.size(); i++) { + // If both decisions schedule this message to the same host great, as + // we can keep the old scheduling + if (newDecision->hosts.at(i) == oldDecision->hosts.at(i)) { + decision->addMessage(oldDecision->hosts.at(i), + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i)); + hostFreqCount.at(oldDecision->hosts.at(i)) -= 1; + continue; + } + + // If not, assign the old decision as long as we still can (i.e. as + // long as we still have slots in the histogram (note that it could be + // that the old host is not in the new histogram at all) + if (hostFreqCount.contains(oldDecision->hosts.at(i)) && + hostFreqCount.at(oldDecision->hosts.at(i)) > 0) { + decision->addMessage(oldDecision->hosts.at(i), + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i)); + hostFreqCount.at(oldDecision->hosts.at(i)) -= 1; + + // For simplicity, keep track of the hosts in the new decision we + // have skipped, to use them later + skippedHosts.push(newDecision->hosts.at(i)); + + continue; + } + + // If we can't assign the host from the old decision, then it means + // that that message MUST be migrated, so it doesn't really matter + // which of the hosts from the new migration we pick (as the new + // decision is optimal in terms of bin-packing), as long as there are + // still slots in the histogram + while (hostFreqCount.at(skippedHosts.front()) == 0) { + skippedHosts.pop(); + } + decision->addMessage(skippedHosts.front(), + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i)); + hostFreqCount.at(skippedHosts.front()) -= 1; + skippedHosts.pop(); + } + + // Assert that we have preserved the new decision's host-message histogram + // (use the pre-processor macro as we assert repeatedly in the loop, so we + // want to avoid having an empty loop in non-debug mode) +#ifndef NDEBUG + // First drain the queue + while (!skippedHosts.empty()) { + assert(hostFreqCount.at(skippedHosts.front()) == 0); + skippedHosts.pop(); + } + + // Second make sure we have achieved the desired host-message histogram + for (auto [host, freq] : hostFreqCount) { + assert(freq == 0); + } +#endif + + return decision; +} + +// For the BinPack scheduler, a decision is better than another one if it has +// a lower number of cross-VM crossings (i.e. better locality, or better +// packing) +bool BinPackScheduler::isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) +{ + auto getLocalityScore = + [](std::shared_ptr decision) -> int { + // First, calculate the host-message histogram (or frequency count) + std::map hostFreqCount; + for (auto host : decision->hosts) { + hostFreqCount[host] += 1; + } + + // If scheduling is single host, return a 0 + if (hostFreqCount.size() == 1) { + return 0; + } + + // Else, do the product of all entries + int score = 1; + for (auto [host, freq] : hostFreqCount) { + score = score * freq; + } + + return score; + }; + + // The first decision is better if it has a LOWER locality score + return getLocalityScore(decisionA) < getLocalityScore(decisionB); +} + std::vector BinPackScheduler::getSortedHosts( const HostMap& hostMap, const InFlightReqs& inFlightReqs, @@ -93,6 +215,47 @@ std::vector BinPackScheduler::getSortedHosts( }); break; } + case DecisionType::DIST_CHANGE: { + // When migrating, we want to know if the provided for app (which + // is already in-flight) can be improved according to the bin-pack + // scheduling logic. This is equivalent to saying that the number + // of cross-vm links can be reduced (i.e. we improve locality) + auto hostFreqCount = getHostFreqCount(); + std::sort(sortedHosts.begin(), + sortedHosts.end(), + // TODO: this is shared with SCALE_CHANGE, so abstract + // away FIXME + [&](auto hostA, auto hostB) -> bool { + int numInHostA = hostFreqCount.contains(getIp(hostA)) + ? hostFreqCount.at(getIp(hostA)) + : 0; + int numInHostB = hostFreqCount.contains(getIp(hostB)) + ? hostFreqCount.at(getIp(hostB)) + : 0; + + // If at least one of the hosts has messages for this + // request, return the host with the more messages for + // this request (note that it is possible that this + // host has no available slots at all, in this case we + // will just pack 0 messages here but we still want to + // sort it first nontheless) + if (numInHostA != numInHostB) { + return numInHostA > numInHostB; + } + + // In case of a tie, use the same criteria than NEW + return isFirstHostLarger(hostA, hostB); + }); + // Before returning the sorted hosts for dist change, we subtract + // all slots occupied by the application we want to migrate (note + // that we want to take into account for the sorting) + for (auto h : sortedHosts) { + if (hostFreqCount.contains(getIp(h))) { + freeSlots(h, hostFreqCount.at(getIp(h))); + } + } + break; + } default: { SPDLOG_ERROR("Unrecognised decision type: {}", decisionType); throw std::runtime_error("Unrecognised decision type"); @@ -139,7 +302,7 @@ BinPackScheduler::makeSchedulingDecision( // If there are no more messages to schedule, we are done if (numLeftToSchedule == 0) { - return decision; + break; } // Otherwise, it means that we have exhausted this host, and need to @@ -147,8 +310,26 @@ BinPackScheduler::makeSchedulingDecision( it++; } - // If we reach this point, it means that we don't have enough slots - return std::make_shared( - NOT_ENOUGH_SLOTS_DECISION); + // If we still have enough slots to schedule, we are out of slots + if (numLeftToSchedule > 0) { + return std::make_shared( + NOT_ENOUGH_SLOTS_DECISION); + } + + // In case of a DIST_CHANGE decision (i.e. migration), we want to make sure + // that the new decision is better than the previous one + if (decisionType == DecisionType::DIST_CHANGE) { + auto oldDecision = inFlightReqs.at(req->appid()).second; + if (isFirstDecisionBetter(decision, oldDecision)) { + // If we are sending a better migration, make sure that we minimise + // the number of migrations to be done + return minimiseNumOfMigrations(decision, oldDecision); + } + + return std::make_shared( + DO_NOT_MIGRATE_DECISION); + } + + return decision; } } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index b5b594a4d..dd8e46abf 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -22,6 +22,7 @@ message BatchExecuteRequest { // Each BatchExecuteRequest has a unique app id int32 appId = 1; // TODO: consider adding user/func to BER + // TODO: consider adding the request type: SCALE_CHANGE, DIST_CHANGE, NEW enum BatchExecuteType { FUNCTIONS = 0; diff --git a/tests/test/batch-scheduler/test_binpack_scheduler.cpp b/tests/test/batch-scheduler/test_binpack_scheduler.cpp index 20416e333..3e4785e4b 100644 --- a/tests/test/batch-scheduler/test_binpack_scheduler.cpp +++ b/tests/test/batch-scheduler/test_binpack_scheduler.cpp @@ -130,8 +130,8 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture, "[batch-scheduler]") { // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we - // need to have one in-flight request in the map with a different (always - // lower) number of messages + // need to have one in-flight request in the map with the same app id + // (and not of type MIGRATION) BatchSchedulerConfig config = { .hostMap = {}, .inFlightReqs = {}, @@ -283,4 +283,47 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture, config.hostMap, config.inFlightReqs, ber); compareSchedulingDecisions(actualDecision, config.expectedDecision); } + +TEST_CASE_METHOD(BinPackSchedulerTestFixture, + "Test scheduling of dist-change requests with BinPack", + "[batch-scheduler]") +{ + // To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we + // need to have one in-flight request in the map with the same app id, the + // same size (and of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = faabric::util::SchedulingDecision(appId, groupId), + }; + + // The configs in this test must be read as follows: + // - the host map's used slots contains the current distribution for the app + // - the host map's slots contain the total slots, there is a migration + // opportunity if we can improve the current distribution + // - we repeat the distribtution when building the in-flight requests (but + // also the host names) + + // Given a migration (defined by the number of cross-VM links, or + // equivalently the host-to-message histogram), the BinPack scheduler will + // try to minimise the number of messages to actually be migrated + SECTION("BinPack will minimise the number of messages to migrate") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 9); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, + 9, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" }); + config.expectedDecision = buildExpectedDecision( + ber, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 549ef4f8e..1f50cc028 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -578,8 +578,21 @@ class BatchSchedulerFixture : public ConfFixture faabric::batch_scheduler::InFlightReqs inFlightReqs; int appId = ber->appid(); - auto oldBer = faabric::util::batchExecFactory( - ber->messages(0).user(), ber->messages(0).function(), numMsgsOldBer); + std::shared_ptr oldBer = nullptr; + // If possible, literally copy the messages from the new BER to the + // old one (the one in-flight) + if (numMsgsOldBer > ber->messages_size()) { + oldBer = + faabric::util::batchExecFactory(ber->messages(0).user(), + ber->messages(0).function(), + numMsgsOldBer); + } else { + oldBer = faabric::util::batchExecFactory( + ber->messages(0).user(), ber->messages(0).function(), 0); + for (int i = 0; i < numMsgsOldBer; i++) { + *oldBer->add_messages() = *ber->mutable_messages(i); + } + } oldBer->set_appid(appId); assert(oldBer->messages_size() == hosts.size());