diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 70743576d..d7b9efa35 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -22,6 +22,10 @@ namespace faabric::scheduler { +typedef std::pair, + std::shared_ptr> + InFlightPair; + class Scheduler; Scheduler& getScheduler(); @@ -223,7 +227,12 @@ class Scheduler // ---------------------------------- // Function Migration // ---------------------------------- - void checkForMigrationOpportunities(); + void checkForMigrationOpportunities( + faabric::util::MigrationStrategy = + faabric::util::MigrationStrategy::BIN_PACK); + + std::shared_ptr canAppBeMigrated( + uint32_t appId); private: std::string thisHost; @@ -295,6 +304,11 @@ class Scheduler // ---- Point-to-point ---- faabric::transport::PointToPointBroker& broker; + + // ---- Function migration ---- + std::unordered_map inFlightRequests; + std::unordered_map> + pendingMigrations; }; } diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 0f2a6a64d..2905d8862 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -54,4 +54,16 @@ enum SchedulingTopologyHint FORCE_LOCAL, NEVER_ALONE }; + +// Migration strategies help the scheduler decide wether the scheduling decision +// for a batch request could be changed with the new set of available resources. +// - BIN_PACK: sort hosts by the number of functions from the batch they are +// running. Bin-pack batches in increasing order to hosts in +// decreasing order. +// - EMPTY_HOSTS: pack batches in increasing order to empty hosts. +enum MigrationStrategy +{ + BIN_PACK, + EMPTY_HOSTS +}; } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 50fc13dcf..7382b9fe7 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -245,3 +245,20 @@ message PointToPointMappings { repeated PointToPointMapping mappings = 3; } + +// --------------------------------------------- +// FUNCTION MIGRATIONS +// --------------------------------------------- + +message PendingMigrations { + int32 appId = 1; + int32 groupId = 2; + + message PendingMigration { + int32 messageId = 1; + string srcHost = 2; + string dstHost = 3; + } + + repeated PendingMigration migrations = 3; +} diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index c4af4fbe0..deb0f3680 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -137,6 +137,10 @@ void Scheduler::reset() threadResults.clear(); pushedSnapshotsMap.clear(); + // Reset function migration tracking + inFlightRequests.clear(); + pendingMigrations.clear(); + // Records recordedMessagesAll.clear(); recordedMessagesLocal.clear(); @@ -452,6 +456,25 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( throw std::runtime_error("Invalid scheduler hint for messages"); } + // Record in-flight request if function desires to be migrated + if (firstMsg.migrationcheckperiod() > 0) { + auto decisionPtr = + std::make_shared(decision); + inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr); + /* + if (inFlightRequests.size() == 1) { + functionMigrationThread.start(firstMsg.migrationcheckperiod()); + } else if (firstMsg.migrationcheckperiod() != + functionMigrationThread.wakeUpPeriodSeconds) { + SPDLOG_WARN("Ignoring migration check period as the migration" + "thread was initialised with a different one." + "(provided: {}, current: {})", + firstMsg.migrationcheckperiod(), + functionMigrationThread.wakeUpPeriodSeconds); + } + */ + } + // NOTE: we want to schedule things on this host _last_, otherwise functions // may start executing before all messages have been dispatched, thus // slowing the remaining scheduling. @@ -889,6 +912,20 @@ void Scheduler::setFunctionResult(faabric::Message& msg) // Write the successful result to the result queue std::vector inputData = faabric::util::messageToBytes(msg); redis.publishSchedulerResult(key, msg.statuskey(), inputData); + + // Remove the app from in-flight map if still there, and this host is the + // master host for the message + if (msg.masterhost() == thisHost) { + faabric::util::FullLock lock(mx); + + inFlightRequests.erase(msg.appid()); + pendingMigrations.erase(msg.appid()); + // If there are no more apps to track, stop the thread checking for + // migration opportunities + if (inFlightRequests.size() == 0) { + // functionMigrationThread.stop(); + } + } } void Scheduler::registerThread(uint32_t msgId) @@ -1130,8 +1167,133 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) return node; } -void Scheduler::checkForMigrationOpportunities() +void Scheduler::checkForMigrationOpportunities( + faabric::util::MigrationStrategy migrationStrategy) { - SPDLOG_INFO("Not implemented"); + // Vector to cache all migrations we have to do, and update the shared map + // at the very end just once. This is because we need a unique lock to write + // to the shared map, but the rest of this method can do with a shared lock. + std::vector> + tmpPendingMigrations; + + { + faabric::util::SharedLock lock(mx); + + // For each in-flight request, check if there is an opportunity to + // migrate + for (const auto& app : inFlightRequests) { + auto req = app.second.first; + auto originalDecision = *app.second.second; + + // If we have already recorded a pending migration for this req, + // skip + if (canAppBeMigrated(originalDecision.appId) != nullptr) { + continue; + } + + faabric::PendingMigrations msg; + msg.set_appid(originalDecision.appId); + // TODO - generate a new groupId here for processes to wait on + // during the migration? msg.set_groupid(); + + if (migrationStrategy == + faabric::util::MigrationStrategy::BIN_PACK) { + // We assume the batch was originally scheduled using + // bin-packing, thus the scheduling decision has at the begining + // (left) the hosts with the most allocated requests, and at the + // end (right) the hosts with the fewest. To check for migration + // oportunities, we compare a pointer to the possible + // destination of the migration (left), with one to the possible + // source of the migration (right). NOTE - this is a slight + // simplification, but makes the code simpler. + auto left = originalDecision.hosts.begin(); + auto right = originalDecision.hosts.end() - 1; + faabric::HostResources r = (*left == thisHost) + ? getThisHostResources() + : getHostResources(*left); + auto nAvailable = [&r]() -> int { + return r.slots() - r.usedslots(); + }; + auto claimSlot = [&r]() { + int currentUsedSlots = r.usedslots(); + r.set_usedslots(currentUsedSlots + 1); + }; + while (left < right) { + // If both pointers point to the same host, no migration + // opportunity, and must check another possible source of + // the migration + if (*left == *right) { + --right; + continue; + } + + // If the left pointer (possible destination of the + // migration) is out of available resources, no migration + // opportunity, and must check another possible destination + // of migration + if (nAvailable() == 0) { + auto oldHost = *left; + ++left; + if (*left != oldHost) { + r = (*left == thisHost) ? getThisHostResources() + : getHostResources(*left); + } + continue; + } + + // If each pointer points to a request scheduled in a + // different host, and the possible destination has slots, + // there is a migration opportunity + auto* migration = msg.add_migrations(); + auto msgIdPtr = + originalDecision.messageIds.begin() + + std::distance(originalDecision.hosts.begin(), right); + migration->set_messageid(*msgIdPtr); + migration->set_srchost(*right); + migration->set_dsthost(*left); + // Decrement by one the availability, and check for more + // possible sources of migration + claimSlot(); + --right; + } + } else { + SPDLOG_ERROR("Unrecognised migration strategy: {}", + migrationStrategy); + throw std::runtime_error("Unrecognised migration strategy."); + } + + if (msg.migrations_size() > 0) { + tmpPendingMigrations.emplace_back( + std::make_shared(msg)); + SPDLOG_DEBUG("Detected migration opportunity for app: {}", + msg.appid()); + } else { + SPDLOG_DEBUG("No migration opportunity detected for app: {}", + msg.appid()); + } + } + } + + // Finally, store all the pending migrations in the shared map acquiring + // a unique lock. + if (tmpPendingMigrations.size() > 0) { + faabric::util::FullLock lock(mx); + for (auto msgPtr : tmpPendingMigrations) { + SPDLOG_INFO("Adding app: {}", msgPtr->appid()); + pendingMigrations[msgPtr->appid()] = std::move(msgPtr); + } + } +} + +std::shared_ptr Scheduler::canAppBeMigrated( + uint32_t appId) +{ + faabric::util::SharedLock lock(mx); + + if (pendingMigrations.find(appId) == pendingMigrations.end()) { + return nullptr; + } + + return pendingMigrations[appId]; } } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 3af1a230b..af7a1ff2a 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -201,6 +201,18 @@ int32_t TestExecutor::executeTask( throw std::runtime_error("This is a test error"); } + if (msg.function() == "migration") { + // Sleep for sufficiently more than the check period + SPDLOG_DEBUG("Migration test function going to sleep"); + SLEEP_MS(SHORT_TEST_TIMEOUT_MS); + SPDLOG_DEBUG("Migration test function waking up"); + + msg.set_outputdata( + fmt::format("Migration test function {} executed", msg.id())); + + return 0; + } + if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); return msg.id() / 100; diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp index d56db92e5..96362ebd8 100644 --- a/tests/test/scheduler/test_function_migration.cpp +++ b/tests/test/scheduler/test_function_migration.cpp @@ -4,6 +4,8 @@ #include #include +#include +#include #include using namespace faabric::scheduler; @@ -12,12 +14,65 @@ namespace tests { class FunctionMigrationTestFixture : public SchedulerTestFixture { public: - FunctionMigrationTestFixture() { faabric::util::setMockMode(true); } + FunctionMigrationTestFixture() + { + faabric::util::setMockMode(true); - ~FunctionMigrationTestFixture() { faabric::util::setMockMode(false); } + std::shared_ptr fac = + std::make_shared(); + setExecutorFactory(fac); + } + + ~FunctionMigrationTestFixture() + { + sch.clearRecordedMessages(); + faabric::util::setMockMode(false); + + // Remove all hosts from global set + for (const std::string& host : sch.getAvailableHosts()) { + sch.removeHostFromGlobalSet(host); + } + } protected: FunctionMigrationThread migrationThread; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost, + std::vector usedSlotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = faabric::scheduler::getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(0); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + faabric::scheduler::queueResourceResponse(registeredHosts.at(i), + resources); + } + } + } + + void updateLocalResources(int slots, int usedSlots) + { + faabric::HostResources r; + r.set_slots(slots); + r.set_usedslots(usedSlots); + sch.setThisHostResources(r); + } }; TEST_CASE_METHOD(FunctionMigrationTestFixture, @@ -31,4 +86,210 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, migrationThread.stop(); } + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test function migration thread only works if set in the message", + "[scheduler]") +{ + // First set resources before calling the functions: one will be allocated + // locally, another one in the remote host + std::vector hosts = { masterHost, "new-hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + // The migration function sleeps for a while before returning + auto req = faabric::util::batchExecFactory("foo", "migration", 2); + uint32_t appId = req->messages().at(0).appid(); + std::shared_ptr expectedMigrations; + SECTION("Migration not enabled") { expectedMigrations = nullptr; } + + SECTION("Migration enabled") + { + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + + // Build expected result + faabric::PendingMigrations expected; + expected.set_appid(appId); + auto* migration = expected.add_migrations(); + migration->set_messageid(req->messages().at(1).id()); + migration->set_srchost(hosts.at(1)); + migration->set_dsthost(hosts.at(0)); + expectedMigrations = + std::make_shared(expected); + } + + auto decision = sch.callFunctions(req); + + // Update host resources so that a migration opportunity appears, but will + // only be detected if migration check period is set. + updateLocalResources(2, 1); + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.canAppBeMigrated(appId); + if (expectedMigrations == nullptr) { + REQUIRE(actualMigrations == expectedMigrations); + } else { + REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); + REQUIRE(actualMigrations->migrations_size() == + expectedMigrations->migrations_size()); + for (int i = 0; i < actualMigrations->migrations_size(); i++) { + auto actual = actualMigrations->mutable_migrations()->at(i); + auto expected = expectedMigrations->mutable_migrations()->at(i); + REQUIRE(actual.messageid() == expected.messageid()); + REQUIRE(actual.srchost() == expected.srchost()); + REQUIRE(actual.dsthost() == expected.dsthost()); + } + } + + faabric::Message res = sch.getFunctionResult(req->messages().at(0).id(), + 2 * SHORT_TEST_TIMEOUT_MS); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.canAppBeMigrated(appId) == nullptr); + sch.removeHostFromGlobalSet("new-hostA"); +} + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test function migration thread detects migration opportunities", + "[scheduler]") +{ + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "migration", 2); + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + uint32_t appId = req->messages().at(0).appid(); + + auto decision = sch.callFunctions(req); + + std::shared_ptr expectedMigrations; + + // As we don't update the available resources, no migration opportunities + // will appear, even though we are checking for them + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + SECTION("Can migrate") + { + // Update host resources so that a migration opportunity appears + updateLocalResources(2, 1); + + // Build expected result + faabric::PendingMigrations expected; + expected.set_appid(appId); + auto* migration = expected.add_migrations(); + migration->set_messageid(req->messages().at(1).id()); + migration->set_srchost(hosts.at(1)); + migration->set_dsthost(hosts.at(0)); + expectedMigrations = + std::make_shared(expected); + } + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.canAppBeMigrated(appId); + if (expectedMigrations == nullptr) { + REQUIRE(actualMigrations == expectedMigrations); + } else { + REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); + REQUIRE(actualMigrations->migrations_size() == + expectedMigrations->migrations_size()); + for (int i = 0; i < actualMigrations->migrations_size(); i++) { + auto actual = actualMigrations->mutable_migrations()->at(i); + auto expected = expectedMigrations->mutable_migrations()->at(i); + REQUIRE(actual.messageid() == expected.messageid()); + REQUIRE(actual.srchost() == expected.srchost()); + REQUIRE(actual.dsthost() == expected.dsthost()); + } + } + + faabric::Message res = sch.getFunctionResult(req->messages().at(0).id(), + 2 * SHORT_TEST_TIMEOUT_MS); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.canAppBeMigrated(appId) == nullptr); +} + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test detecting migration opportunities with several hosts and requests", + "[scheduler]") +{ + // First set resources before calling the functions: one will be allocated + // locally, another one in the remote host + std::vector hosts = { masterHost, "hostA", "hostB", "hostC" }; + std::vector slots = { 1, 1, 1, 1 }; + std::vector usedSlots = { 0, 0, 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "migration", 4); + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + auto decision = sch.callFunctions(req); + uint32_t appId = req->messages().at(0).appid(); + + // Set up expectations + std::shared_ptr expectedMigrations; + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + SECTION("Can migrate") + { + // Update host resources so that two migration opportunities appear + std::vector newSlots = { 2, 2, 1, 1 }; + std::vector newUsedSlots = { 1, 1, 1, 1 }; + setHostResources(hosts, newSlots, newUsedSlots); + + // Build expected result + faabric::PendingMigrations expected; + expected.set_appid(appId); + // Migrate last message (scheduled to last host) to first host. This + // fills up the first host. + auto* migration1 = expected.add_migrations(); + migration1->set_messageid(req->messages().at(3).id()); + migration1->set_srchost(hosts.at(3)); + migration1->set_dsthost(hosts.at(0)); + // Migrate penultimate message (scheduled to penultimate host) to first + // host. This fills up the first host. + auto* migration2 = expected.add_migrations(); + migration2->set_messageid(req->messages().at(2).id()); + migration2->set_srchost(hosts.at(2)); + migration2->set_dsthost(hosts.at(1)); + expectedMigrations = + std::make_shared(expected); + } + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.canAppBeMigrated(appId); + if (expectedMigrations == nullptr) { + REQUIRE(actualMigrations == expectedMigrations); + } else { + REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); + REQUIRE(actualMigrations->migrations_size() == + expectedMigrations->migrations_size()); + for (int i = 0; i < actualMigrations->migrations_size(); i++) { + auto actual = actualMigrations->mutable_migrations()->at(i); + auto expected = expectedMigrations->mutable_migrations()->at(i); + REQUIRE(actual.messageid() == expected.messageid()); + REQUIRE(actual.srchost() == expected.srchost()); + REQUIRE(actual.dsthost() == expected.dsthost()); + } + } + + faabric::Message res = sch.getFunctionResult(req->messages().at(0).id(), + 2 * SHORT_TEST_TIMEOUT_MS); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.canAppBeMigrated(appId) == nullptr); +} }