Skip to content

Commit

Permalink
scheduler: add logic for check for migration opportunities method
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Jan 8, 2022
1 parent 4eef2d3 commit 754ee92
Show file tree
Hide file tree
Showing 6 changed files with 486 additions and 5 deletions.
16 changes: 15 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace faabric::scheduler {

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
InFlightPair;

class Scheduler;

Scheduler& getScheduler();
Expand Down Expand Up @@ -223,7 +227,12 @@ class Scheduler
// ----------------------------------
// Function Migration
// ----------------------------------
void checkForMigrationOpportunities();
void checkForMigrationOpportunities(
faabric::util::MigrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);

std::shared_ptr<faabric::PendingMigrations> canAppBeMigrated(
uint32_t appId);

private:
std::string thisHost;
Expand Down Expand Up @@ -295,6 +304,11 @@ class Scheduler

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;

// ---- Function migration ----
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
pendingMigrations;
};

}
12 changes: 12 additions & 0 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,16 @@ enum SchedulingTopologyHint
FORCE_LOCAL,
NEVER_ALONE
};

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
};
}
17 changes: 17 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,20 @@ message PointToPointMappings {

repeated PointToPointMapping mappings = 3;
}

// ---------------------------------------------
// FUNCTION MIGRATIONS
// ---------------------------------------------

message PendingMigrations {
int32 appId = 1;
int32 groupId = 2;

message PendingMigration {
int32 messageId = 1;
string srcHost = 2;
string dstHost = 3;
}

repeated PendingMigration migrations = 3;
}
169 changes: 167 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ void Scheduler::reset()
threadResults.clear();
pushedSnapshotsMap.clear();

// Reset function migration tracking
inFlightRequests.clear();
pendingMigrations.clear();

// Records
recordedMessagesAll.clear();
recordedMessagesLocal.clear();
Expand Down Expand Up @@ -452,6 +456,25 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
throw std::runtime_error("Invalid scheduler hint for messages");
}

// Record in-flight request if function desires to be migrated
if (firstMsg.migrationcheckperiod() > 0) {
auto decisionPtr =
std::make_shared<faabric::util::SchedulingDecision>(decision);
inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr);
/*
if (inFlightRequests.size() == 1) {
functionMigrationThread.start(firstMsg.migrationcheckperiod());
} else if (firstMsg.migrationcheckperiod() !=
functionMigrationThread.wakeUpPeriodSeconds) {
SPDLOG_WARN("Ignoring migration check period as the migration"
"thread was initialised with a different one."
"(provided: {}, current: {})",
firstMsg.migrationcheckperiod(),
functionMigrationThread.wakeUpPeriodSeconds);
}
*/
}

// NOTE: we want to schedule things on this host _last_, otherwise functions
// may start executing before all messages have been dispatched, thus
// slowing the remaining scheduling.
Expand Down Expand Up @@ -889,6 +912,20 @@ void Scheduler::setFunctionResult(faabric::Message& msg)
// Write the successful result to the result queue
std::vector<uint8_t> inputData = faabric::util::messageToBytes(msg);
redis.publishSchedulerResult(key, msg.statuskey(), inputData);

// Remove the app from in-flight map if still there, and this host is the
// master host for the message
if (msg.masterhost() == thisHost) {
faabric::util::FullLock lock(mx);

inFlightRequests.erase(msg.appid());
pendingMigrations.erase(msg.appid());
// If there are no more apps to track, stop the thread checking for
// migration opportunities
if (inFlightRequests.size() == 0) {
// functionMigrationThread.stop();
}
}
}

void Scheduler::registerThread(uint32_t msgId)
Expand Down Expand Up @@ -1130,8 +1167,136 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId)
return node;
}

void Scheduler::checkForMigrationOpportunities()
void Scheduler::checkForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy)
{
SPDLOG_INFO("Not implemented");
// Vector to cache all migrations we have to do, and update the shared map
// at the very end just once. This is because we need a unique lock to write
// to the shared map, but the rest of this method can do with a shared lock.
std::vector<std::shared_ptr<faabric::PendingMigrations>>
tmpPendingMigrations;

{
faabric::util::SharedLock lock(mx);

// For each in-flight request, check if there is an opportunity to
// migrate
for (const auto& app : inFlightRequests) {
auto req = app.second.first;
auto originalDecision = *app.second.second;

// If we have already recorded a pending migration for this req,
// skip
if (canAppBeMigrated(originalDecision.appId) != nullptr) {
continue;
}

faabric::PendingMigrations msg;
msg.set_appid(originalDecision.appId);
// TODO - generate a new groupId here for processes to wait on
// during the migration? msg.set_groupid();

if (migrationStrategy ==
faabric::util::MigrationStrategy::BIN_PACK) {
// We assume the batch was originally scheduled using
// bin-packing, thus the scheduling decision has at the begining
// (left) the hosts with the most allocated requests, and at the
// end (right) the hosts with the fewest. To check for migration
// oportunities, we compare a pointer to the possible
// destination of the migration (left), with one to the possible
// source of the migration (right). NOTE - this is a slight
// simplification, but makes the code simpler.
auto left = originalDecision.hosts.begin();
auto right = originalDecision.hosts.end() - 1;
faabric::HostResources r = (*left == thisHost)
? getThisHostResources()
: getHostResources(*left);
auto nAvailable = [&r]() -> int {
return r.slots() - r.usedslots();
};
auto claimSlot = [&r]() {
int currentUsedSlots = r.usedslots();
SPDLOG_INFO("Old slots: {} - New slots: {}",
currentUsedSlots,
currentUsedSlots + 1);
r.set_usedslots(currentUsedSlots + 1);
};
while (left < right) {
// If both pointers point to the same host, no migration
// opportunity, and must check another possible source of
// the migration
if (*left == *right) {
--right;
continue;
}

// If the left pointer (possible destination of the
// migration) is out of available resources, no migration
// opportunity, and must check another possible destination
// of migration
if (nAvailable() == 0) {
auto oldHost = *left;
++left;
if (*left != oldHost) {
r = (*left == thisHost) ? getThisHostResources()
: getHostResources(*left);
}
continue;
}

// If each pointer points to a request scheduled in a
// different host, and the possible destination has slots,
// there is a migration opportunity
auto* migration = msg.add_migrations();
auto msgIdPtr =
originalDecision.messageIds.begin() +
std::distance(originalDecision.hosts.begin(), right);
migration->set_messageid(*msgIdPtr);
migration->set_srchost(*right);
migration->set_dsthost(*left);
// Decrement by one the availability, and check for more
// possible sources of migration
claimSlot();
--right;
}
} else {
SPDLOG_ERROR("Unrecognised migration strategy: {}",
migrationStrategy);
throw std::runtime_error("Unrecognised migration strategy.");
}

if (msg.migrations_size() > 0) {
tmpPendingMigrations.emplace_back(
std::make_shared<faabric::PendingMigrations>(msg));
SPDLOG_DEBUG("Detected migration opportunity for app: {}",
msg.appid());
} else {
SPDLOG_DEBUG("No migration opportunity detected for app: {}",
msg.appid());
}
}
}

// Finally, store all the pending migrations in the shared map acquiring
// a unique lock.
if (tmpPendingMigrations.size() > 0) {
faabric::util::FullLock lock(mx);
for (auto msgPtr : tmpPendingMigrations) {
SPDLOG_INFO("Adding app: {}", msgPtr->appid());
pendingMigrations[msgPtr->appid()] = std::move(msgPtr);
}
}
}

std::shared_ptr<faabric::PendingMigrations> Scheduler::canAppBeMigrated(
uint32_t appId)
{
faabric::util::SharedLock lock(mx);

if (pendingMigrations.find(appId) == pendingMigrations.end()) {
return nullptr;
}

return pendingMigrations[appId];
}
}
12 changes: 12 additions & 0 deletions tests/test/scheduler/test_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ int32_t TestExecutor::executeTask(
throw std::runtime_error("This is a test error");
}

if (msg.function() == "migration") {
// Sleep for sufficiently more than the check period
SPDLOG_DEBUG("Migration test function going to sleep");
SLEEP_MS(SHORT_TEST_TIMEOUT_MS);
SPDLOG_DEBUG("Migration test function waking up");

msg.set_outputdata(
fmt::format("Migration test function {} executed", msg.id()));

return 0;
}

if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) {
SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id());
return msg.id() / 100;
Expand Down
Loading

0 comments on commit 754ee92

Please sign in to comment.