Skip to content

Commit

Permalink
bin-pack: implement dist-change decision making. still need to add mo…
Browse files Browse the repository at this point in the history
…re tests
  • Loading branch information
csegarragonz committed Aug 2, 2023
1 parent 8d397e6 commit 06460d0
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 9 deletions.
23 changes: 22 additions & 1 deletion include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include <string>

#define DO_NOT_MIGRATE -98
#define DO_NOT_MIGRATE_DECISION \
faabric::util::SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
#define NOT_ENOUGH_SLOTS -99
#define NOT_ENOUGH_SLOTS_DECISION \
faabric::util::SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)
Expand Down Expand Up @@ -42,7 +45,8 @@ typedef std::map<std::string, Host> HostMap;
* first time.
* 2) A `DIST_CHANGE` scheduling decision happens when we are scheduling a BER
* _not_ for the first time, but the BER has the same number of messages that
* it had before. This corresponds to a request to migrate.
* it had before, and is set with the MIGRATION flag. This corresponds to a
* request to migrate.
* 3) A `SCALE_CHANGE` scheduling decision happens when we are scheduling a BER
* _not_ for the first time, and the BER has a differet number of messages than
* it had before. This corresponds to a chaining request or a thread/process
Expand Down Expand Up @@ -85,6 +89,8 @@ class BatchScheduler
// ----------
// Helper Host accessor metods (we encapsulate them to allow changing the
// underlying `Host` typedef easily)
// TODO: consider moving to a separate class, or as members of the Host
// wrapper class
// ----------

static int numSlots(const Host& host) { return host->slots; }
Expand All @@ -94,12 +100,27 @@ class BatchScheduler
return std::max<int>(0, numSlots(host) - host->usedSlots);
}

static void claimSlots(Host& host, int numSlotsToClaim)
{
host->usedSlots =
std::min<int>(numSlots(host), host->usedSlots + numSlotsToClaim);
}

static void freeSlots(Host& host, int numSlotsToFree)
{
host->usedSlots = std::max<int>(0, host->usedSlots - numSlotsToFree);
}

static std::string getIp(const Host& host) { return host->ip; }

// ----------
// Virtual scheduling methods
// ----------

virtual bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) = 0;

virtual std::vector<Host> getSortedHosts(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/batch-scheduler/BinPackScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class BinPackScheduler final : public BatchScheduler
std::shared_ptr<faabric::BatchExecuteRequest> req) override;

private:
bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) override;

std::vector<Host> getSortedHosts(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
Expand Down
189 changes: 185 additions & 4 deletions src/batch-scheduler/BinPackScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,130 @@
#include <faabric/util/logging.h>
#include <faabric/util/scheduling.h>

#include <queue>

namespace faabric::batch_scheduler {

// TODO: decide what to do with this method (i.e. where to place it)
// Given a new decision that improves on an old decision (i.e. to migrate), we
// want to make sure that we minimise the number of migration requests we send.
// This is, we want to keep as many host-message scheduling in the old decision
// as possible, and also have the overall locality of the new decision (i.e.
// the host-message histogram)
static std::shared_ptr<faabric::util::SchedulingDecision>
minimiseNumOfMigrations(
std::shared_ptr<faabric::util::SchedulingDecision> newDecision,
std::shared_ptr<faabric::util::SchedulingDecision> oldDecision)
{
auto decision = std::make_shared<faabric::util::SchedulingDecision>(
oldDecision->appId, oldDecision->groupId);

// We want to maintain the new decision's host-message histogram
std::map<std::string, int> hostFreqCount;
for (auto host : newDecision->hosts) {
hostFreqCount[host] += 1;
}

assert(newDecision->hosts.size() == oldDecision->hosts.size());
std::queue<std::string> skippedHosts;
for (int i = 0; i < newDecision->hosts.size(); i++) {
// If both decisions schedule this message to the same host great, as
// we can keep the old scheduling
if (newDecision->hosts.at(i) == oldDecision->hosts.at(i)) {
decision->addMessage(oldDecision->hosts.at(i),
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(oldDecision->hosts.at(i)) -= 1;
continue;
}

// If not, assign the old decision as long as we still can (i.e. as
// long as we still have slots in the histogram (note that it could be
// that the old host is not in the new histogram at all)
if (hostFreqCount.contains(oldDecision->hosts.at(i)) &&
hostFreqCount.at(oldDecision->hosts.at(i)) > 0) {
decision->addMessage(oldDecision->hosts.at(i),
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(oldDecision->hosts.at(i)) -= 1;

// For simplicity, keep track of the hosts in the new decision we
// have skipped, to use them later
skippedHosts.push(newDecision->hosts.at(i));

continue;
}

// If we can't assign the host from the old decision, then it means
// that that message MUST be migrated, so it doesn't really matter
// which of the hosts from the new migration we pick (as the new
// decision is optimal in terms of bin-packing), as long as there are
// still slots in the histogram
while (hostFreqCount.at(skippedHosts.front()) == 0) {
skippedHosts.pop();
}
decision->addMessage(skippedHosts.front(),
oldDecision->messageIds.at(i),
oldDecision->appIdxs.at(i),
oldDecision->groupIdxs.at(i));
hostFreqCount.at(skippedHosts.front()) -= 1;
skippedHosts.pop();
}

// Assert that we have preserved the new decision's host-message histogram
// (use the pre-processor macro as we assert repeatedly in the loop, so we
// want to avoid having an empty loop in non-debug mode)
#ifndef NDEBUG
// First drain the queue
while (!skippedHosts.empty()) {
assert(hostFreqCount.at(skippedHosts.front()) == 0);
skippedHosts.pop();
}

// Second make sure we have achieved the desired host-message histogram
for (auto [host, freq] : hostFreqCount) {
assert(freq == 0);
}
#endif

return decision;
}

// For the BinPack scheduler, a decision is better than another one if it has
// a lower number of cross-VM crossings (i.e. better locality, or better
// packing)
bool BinPackScheduler::isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB)
{
auto getLocalityScore =
[](std::shared_ptr<faabric::util::SchedulingDecision> decision) -> int {
// First, calculate the host-message histogram (or frequency count)
std::map<std::string, int> hostFreqCount;
for (auto host : decision->hosts) {
hostFreqCount[host] += 1;
}

// If scheduling is single host, return a 0
if (hostFreqCount.size() == 1) {
return 0;
}

// Else, do the product of all entries
int score = 1;
for (auto [host, freq] : hostFreqCount) {
score = score * freq;
}

return score;
};

// The first decision is better if it has a LOWER locality score
return getLocalityScore(decisionA) < getLocalityScore(decisionB);
}

std::vector<Host> BinPackScheduler::getSortedHosts(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
Expand Down Expand Up @@ -93,6 +215,47 @@ std::vector<Host> BinPackScheduler::getSortedHosts(
});
break;
}
case DecisionType::DIST_CHANGE: {
// When migrating, we want to know if the provided for app (which
// is already in-flight) can be improved according to the bin-pack
// scheduling logic. This is equivalent to saying that the number
// of cross-vm links can be reduced (i.e. we improve locality)
auto hostFreqCount = getHostFreqCount();
std::sort(sortedHosts.begin(),
sortedHosts.end(),
// TODO: this is shared with SCALE_CHANGE, so abstract
// away FIXME
[&](auto hostA, auto hostB) -> bool {
int numInHostA = hostFreqCount.contains(getIp(hostA))
? hostFreqCount.at(getIp(hostA))
: 0;
int numInHostB = hostFreqCount.contains(getIp(hostB))
? hostFreqCount.at(getIp(hostB))
: 0;

// If at least one of the hosts has messages for this
// request, return the host with the more messages for
// this request (note that it is possible that this
// host has no available slots at all, in this case we
// will just pack 0 messages here but we still want to
// sort it first nontheless)
if (numInHostA != numInHostB) {
return numInHostA > numInHostB;
}

// In case of a tie, use the same criteria than NEW
return isFirstHostLarger(hostA, hostB);
});
// Before returning the sorted hosts for dist change, we subtract
// all slots occupied by the application we want to migrate (note
// that we want to take into account for the sorting)
for (auto h : sortedHosts) {
if (hostFreqCount.contains(getIp(h))) {
freeSlots(h, hostFreqCount.at(getIp(h)));
}
}
break;
}
default: {
SPDLOG_ERROR("Unrecognised decision type: {}", decisionType);
throw std::runtime_error("Unrecognised decision type");
Expand Down Expand Up @@ -139,16 +302,34 @@ BinPackScheduler::makeSchedulingDecision(

// If there are no more messages to schedule, we are done
if (numLeftToSchedule == 0) {
return decision;
break;
}

// Otherwise, it means that we have exhausted this host, and need to
// check in the next one
it++;
}

// If we reach this point, it means that we don't have enough slots
return std::make_shared<faabric::util::SchedulingDecision>(
NOT_ENOUGH_SLOTS_DECISION);
// If we still have enough slots to schedule, we are out of slots
if (numLeftToSchedule > 0) {
return std::make_shared<faabric::util::SchedulingDecision>(
NOT_ENOUGH_SLOTS_DECISION);
}

// In case of a DIST_CHANGE decision (i.e. migration), we want to make sure
// that the new decision is better than the previous one
if (decisionType == DecisionType::DIST_CHANGE) {
auto oldDecision = inFlightReqs.at(req->appid()).second;
if (isFirstDecisionBetter(decision, oldDecision)) {
// If we are sending a better migration, make sure that we minimise
// the number of migrations to be done
return minimiseNumOfMigrations(decision, oldDecision);
}

return std::make_shared<faabric::util::SchedulingDecision>(
DO_NOT_MIGRATE_DECISION);
}

return decision;
}
}
1 change: 1 addition & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message BatchExecuteRequest {
// Each BatchExecuteRequest has a unique app id
int32 appId = 1;
// TODO: consider adding user/func to BER
// TODO: consider adding the request type: SCALE_CHANGE, DIST_CHANGE, NEW

enum BatchExecuteType {
FUNCTIONS = 0;
Expand Down
47 changes: 45 additions & 2 deletions tests/test/batch-scheduler/test_binpack_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture,
"[batch-scheduler]")
{
// To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we
// need to have one in-flight request in the map with a different (always
// lower) number of messages
// need to have one in-flight request in the map with the same app id
// (and not of type MIGRATION)
BatchSchedulerConfig config = {
.hostMap = {},
.inFlightReqs = {},
Expand Down Expand Up @@ -283,4 +283,47 @@ TEST_CASE_METHOD(BinPackSchedulerTestFixture,
config.hostMap, config.inFlightReqs, ber);
compareSchedulingDecisions(actualDecision, config.expectedDecision);
}

TEST_CASE_METHOD(BinPackSchedulerTestFixture,
"Test scheduling of dist-change requests with BinPack",
"[batch-scheduler]")
{
// To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we
// need to have one in-flight request in the map with the same app id, the
// same size (and of type MIGRATION)
BatchSchedulerConfig config = {
.hostMap = {},
.inFlightReqs = {},
.expectedDecision = faabric::util::SchedulingDecision(appId, groupId),
};

// The configs in this test must be read as follows:
// - the host map's used slots contains the current distribution for the app
// - the host map's slots contain the total slots, there is a migration
// opportunity if we can improve the current distribution
// - we repeat the distribtution when building the in-flight requests (but
// also the host names)

// Given a migration (defined by the number of cross-VM links, or
// equivalently the host-to-message histogram), the BinPack scheduler will
// try to minimise the number of messages to actually be migrated
SECTION("BinPack will minimise the number of messages to migrate")
{
config.hostMap =
buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 });
ber = faabric::util::batchExecFactory("bat", "man", 9);
ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION);
config.inFlightReqs = buildInFlightReqs(
ber,
9,
{ "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" });
config.expectedDecision = buildExpectedDecision(
ber,
{ "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" });
}

actualDecision = *batchScheduler->makeSchedulingDecision(
config.hostMap, config.inFlightReqs, ber);
compareSchedulingDecisions(actualDecision, config.expectedDecision);
}
}
17 changes: 15 additions & 2 deletions tests/utils/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,21 @@ class BatchSchedulerFixture : public ConfFixture
faabric::batch_scheduler::InFlightReqs inFlightReqs;
int appId = ber->appid();

auto oldBer = faabric::util::batchExecFactory(
ber->messages(0).user(), ber->messages(0).function(), numMsgsOldBer);
std::shared_ptr<BatchExecuteRequest> oldBer = nullptr;
// If possible, literally copy the messages from the new BER to the
// old one (the one in-flight)
if (numMsgsOldBer > ber->messages_size()) {
oldBer =
faabric::util::batchExecFactory(ber->messages(0).user(),
ber->messages(0).function(),
numMsgsOldBer);
} else {
oldBer = faabric::util::batchExecFactory(
ber->messages(0).user(), ber->messages(0).function(), 0);
for (int i = 0; i < numMsgsOldBer; i++) {
*oldBer->add_messages() = *ber->mutable_messages(i);
}
}
oldBer->set_appid(appId);

assert(oldBer->messages_size() == hosts.size());
Expand Down

0 comments on commit 06460d0

Please sign in to comment.