diff --git a/include/faabric/scheduler/FunctionCallApi.h b/include/faabric/scheduler/FunctionCallApi.h index eb9b83449..c95aede16 100644 --- a/include/faabric/scheduler/FunctionCallApi.h +++ b/include/faabric/scheduler/FunctionCallApi.h @@ -7,6 +7,7 @@ enum FunctionCalls ExecuteFunctions = 1, Flush = 2, Unregister = 3, - GetResources = 4 + GetResources = 4, + PendingMigrations = 5 }; } diff --git a/include/faabric/scheduler/FunctionCallClient.h b/include/faabric/scheduler/FunctionCallClient.h index 921139c86..399f10082 100644 --- a/include/faabric/scheduler/FunctionCallClient.h +++ b/include/faabric/scheduler/FunctionCallClient.h @@ -22,6 +22,9 @@ getBatchRequests(); std::vector> getResourceRequests(); +std::vector>> +getPendingMigrationsRequests(); + std::vector> getUnregisterRequests(); @@ -42,6 +45,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient faabric::HostResources getResources(); + void sendPendingMigrations(std::shared_ptr req); + void executeFunctions(std::shared_ptr req); void unregister(faabric::UnregisterRequest& req); diff --git a/include/faabric/scheduler/FunctionCallServer.h b/include/faabric/scheduler/FunctionCallServer.h index 23227c43c..aaacf6b1a 100644 --- a/include/faabric/scheduler/FunctionCallServer.h +++ b/include/faabric/scheduler/FunctionCallServer.h @@ -29,6 +29,10 @@ class FunctionCallServer final const uint8_t* buffer, size_t bufferSize); + std::unique_ptr recvPendingMigrations( + const uint8_t* buffer, + size_t bufferSize); + void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize); void recvUnregister(const uint8_t* buffer, size_t bufferSize); diff --git a/include/faabric/scheduler/FunctionMigrationThread.h b/include/faabric/scheduler/FunctionMigrationThread.h new file mode 100644 index 000000000..a945e432b --- /dev/null +++ b/include/faabric/scheduler/FunctionMigrationThread.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include + +namespace faabric::scheduler { +// Start a background thread that, every wake up period, will check if there +// are migration opportunities for in-flight apps that have opted in to +// being checked for migrations. +class FunctionMigrationThread +{ + public: + void start(int wakeUpPeriodSecondsIn); + + void stop(); + + int wakeUpPeriodSeconds; + + private: + std::unique_ptr workThread = nullptr; + std::mutex mx; + std::condition_variable mustStopCv; + std::atomic isShutdown; +}; +} diff --git a/include/faabric/scheduler/MpiWorld.h b/include/faabric/scheduler/MpiWorld.h index 4165cd4c8..1b85655ef 100644 --- a/include/faabric/scheduler/MpiWorld.h +++ b/include/faabric/scheduler/MpiWorld.h @@ -207,6 +207,12 @@ class MpiWorld void setMsgForRank(faabric::Message& msg); + /* Function Migration */ + + void prepareMigration( + int thisRank, + std::shared_ptr pendingMigrations); + private: int id = -1; int size = -1; @@ -283,5 +289,8 @@ class MpiWorld MPI_Status* status, faabric::MPIMessage::MPIMessageType messageType = faabric::MPIMessage::NORMAL); + + /* Function migration */ + bool hasBeenMigrated = false; }; } diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 58b84a623..147797382 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,10 @@ namespace faabric::scheduler { +typedef std::pair, + std::shared_ptr> + InFlightPair; + class Scheduler; Scheduler& getScheduler(); @@ -77,13 +82,13 @@ class Executor faabric::Message& msg, bool createIfNotExists = false); + virtual std::span getMemoryView(); + protected: virtual void restore(const std::string& snapshotKey); virtual void postFinish(); - virtual std::span getMemoryView(); - virtual void setMemorySize(size_t newSize); faabric::Message boundMessage; @@ -131,9 +136,7 @@ class Scheduler void callFunction(faabric::Message& msg, bool forceLocal = false); faabric::util::SchedulingDecision callFunctions( - std::shared_ptr req, - faabric::util::SchedulingTopologyHint = - faabric::util::SchedulingTopologyHint::NORMAL); + std::shared_ptr req); faabric::util::SchedulingDecision callFunctions( std::shared_ptr req, @@ -220,6 +223,27 @@ class Scheduler ExecGraph getFunctionExecGraph(unsigned int msgId); + // ---------------------------------- + // Function Migration + // ---------------------------------- + void checkForMigrationOpportunities(); + + std::shared_ptr getPendingAppMigrations( + uint32_t appId); + + void addPendingMigration(std::shared_ptr msg); + + void removePendingMigration(uint32_t appId); + + // ---------------------------------- + // Clients + // ---------------------------------- + faabric::scheduler::FunctionCallClient& getFunctionCallClient( + const std::string& otherHost); + + faabric::snapshot::SnapshotClient& getSnapshotClient( + const std::string& otherHost); + private: std::string thisHost; @@ -244,13 +268,6 @@ class Scheduler std::mutex localResultsMutex; - // ---- Clients ---- - faabric::scheduler::FunctionCallClient& getFunctionCallClient( - const std::string& otherHost); - - faabric::snapshot::SnapshotClient& getSnapshotClient( - const std::string& otherHost); - // ---- Host resources and hosts ---- faabric::HostResources thisHostResources; std::atomic thisHostUsedSlots = 0; @@ -290,6 +307,24 @@ class Scheduler // ---- Point-to-point ---- faabric::transport::PointToPointBroker& broker; + + // ---- Function migration ---- + FunctionMigrationThread functionMigrationThread; + std::unordered_map inFlightRequests; + std::unordered_map> + pendingMigrations; + + std::vector> + doCheckForMigrationOpportunities( + faabric::util::MigrationStrategy migrationStrategy = + faabric::util::MigrationStrategy::BIN_PACK); + + void broadcastPendingMigrations( + std::shared_ptr pendingMigrations); + + void doStartFunctionMigrationThread( + std::shared_ptr req, + faabric::util::SchedulingDecision& decision); }; } diff --git a/include/faabric/util/func.h b/include/faabric/util/func.h index b4fa1ba09..b8cbd3812 100644 --- a/include/faabric/util/func.h +++ b/include/faabric/util/func.h @@ -7,6 +7,14 @@ namespace faabric::util { +class FunctionMigratedException : public faabric::util::FaabricException +{ + public: + explicit FunctionMigratedException(std::string message) + : FaabricException(std::move(message)) + {} +}; + std::string funcToString(const faabric::Message& msg, bool includeId); std::string funcToString( diff --git a/include/faabric/util/scheduling.h b/include/faabric/util/scheduling.h index 0f2a6a64d..0e5c94325 100644 --- a/include/faabric/util/scheduling.h +++ b/include/faabric/util/scheduling.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -46,12 +47,47 @@ class SchedulingDecision // requests in a batch. // - NORMAL: bin-packs requests to slots in hosts starting from the master // host, and overloadds the master if it runs out of resources. +// - FORCE_LOCAL: force local execution irrespective of the available +// resources. // - NEVER_ALONE: never allocates a single (non-master) request to a host // without other requests of the batch. +// - UNDERFULL: schedule up to 50% of the master hosts' capacity to force +// migration opportunities to appear. enum SchedulingTopologyHint { NORMAL, FORCE_LOCAL, - NEVER_ALONE + NEVER_ALONE, + UNDERFULL, +}; + +// Map to convert input strings to scheduling topology hints and the other way +// around +const std::unordered_map + strToTopologyHint = { + { "NORMAL", SchedulingTopologyHint::NORMAL }, + { "FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL }, + { "NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE }, + { "UNDERFULL", SchedulingTopologyHint::UNDERFULL }, + }; + +const std::unordered_map + topologyHintToStr = { + { SchedulingTopologyHint::NORMAL, "NORMAL" }, + { SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL" }, + { SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE" }, + { SchedulingTopologyHint::UNDERFULL, "UNDERFULL" }, + }; + +// Migration strategies help the scheduler decide wether the scheduling decision +// for a batch request could be changed with the new set of available resources. +// - BIN_PACK: sort hosts by the number of functions from the batch they are +// running. Bin-pack batches in increasing order to hosts in +// decreasing order. +// - EMPTY_HOSTS: pack batches in increasing order to empty hosts. +enum MigrationStrategy +{ + BIN_PACK, + EMPTY_HOSTS }; } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 7982d34f4..e5b705a31 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -25,6 +25,7 @@ message BatchExecuteRequest { FUNCTIONS = 0; THREADS = 1; PROCESSES = 2; + MIGRATION = 3; } BatchExecuteType type = 2; @@ -164,6 +165,12 @@ message Message { bool recordExecGraph = 41; map intExecGraphDetails = 42; map execGraphDetails = 43; + + // Function migration + int32 migrationCheckPeriod = 44; + + // Scheduling + string topologyHint = 45; } // --------------------------------------------- @@ -242,3 +249,20 @@ message PointToPointMappings { repeated PointToPointMapping mappings = 3; } + +// --------------------------------------------- +// FUNCTION MIGRATIONS +// --------------------------------------------- + +message PendingMigrations { + int32 appId = 1; + int32 groupId = 2; + + message PendingMigration { + Message msg = 1; + string srcHost = 2; + string dstHost = 3; + } + + repeated PendingMigration migrations = 3; +} diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index f3772c10a..f1ced9340 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -4,6 +4,7 @@ faabric_lib(scheduler Executor.cpp FunctionCallClient.cpp FunctionCallServer.cpp + FunctionMigrationThread.cpp MpiContext.cpp MpiMessageBuffer.cpp MpiWorld.cpp diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 7576c6deb..305ca58b3 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -528,9 +529,28 @@ void Executor::threadPoolThread(int threadPoolIdx) // Execute the task int32_t returnValue; + bool migrated = false; try { returnValue = executeTask(threadPoolIdx, task.messageIndex, task.req); + } catch (const faabric::util::FunctionMigratedException& ex) { + SPDLOG_DEBUG( + "Task {} migrated, shutting down executor {}", msg.id(), id); + + // Note that when a task has been migrated, we need to perform all + // the normal executor shutdown, but we must NOT set the result for + // the call. + migrated = true; + selfShutdown = true; + returnValue = -99; + + // MPI migration + if (msg.ismpi()) { + auto& mpiWorld = + faabric::scheduler::getMpiWorldRegistry().getWorld( + msg.mpiworldid()); + mpiWorld.destroy(); + } } catch (const std::exception& ex) { returnValue = 1; @@ -667,6 +687,12 @@ void Executor::threadPoolThread(int threadPoolIdx) // executor. sch.vacateSlot(); + // If the function has been migrated, we drop out here and shut down the + // executor + if (migrated) { + break; + } + // Finally set the result of the task, this will allow anything // waiting on its result to continue execution, therefore must be // done once the executor has been reset, otherwise the executor may diff --git a/src/scheduler/FunctionCallClient.cpp b/src/scheduler/FunctionCallClient.cpp index db7490ef5..ad01260d6 100644 --- a/src/scheduler/FunctionCallClient.cpp +++ b/src/scheduler/FunctionCallClient.cpp @@ -28,6 +28,10 @@ static std::unordered_map> queuedResourceResponses; +static std::vector< + std::pair>> + pendingMigrationsRequests; + static std::vector> unregisterRequests; @@ -57,6 +61,13 @@ std::vector> getResourceRequests() return resourceRequests; } +std::vector>> +getPendingMigrationsRequests() +{ + faabric::util::UniqueLock lock(mockMutex); + return pendingMigrationsRequests; +} + std::vector> getUnregisterRequests() { @@ -76,6 +87,7 @@ void clearMockRequests() functionCalls.clear(); batchMessages.clear(); resourceRequests.clear(); + pendingMigrationsRequests.clear(); unregisterRequests.clear(); for (auto& p : queuedResourceResponses) { @@ -128,6 +140,25 @@ faabric::HostResources FunctionCallClient::getResources() return response; } +// This function call is used by the master host of an application to let know +// other hosts running functions of the same application that a migration +// opportunity has been found. +void FunctionCallClient::sendPendingMigrations( + std::shared_ptr req) +{ + faabric::PendingMigrations request; + faabric::EmptyResponse response; + + if (faabric::util::isMockMode()) { + faabric::util::UniqueLock lock(mockMutex); + pendingMigrationsRequests.emplace_back(host, req); + } else { + syncSend(faabric::scheduler::FunctionCalls::PendingMigrations, + req.get(), + &response); + } +} + void FunctionCallClient::executeFunctions( const std::shared_ptr req) { diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 2e0604baf..e9beb13c1 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -49,6 +49,9 @@ std::unique_ptr FunctionCallServer::doSyncRecv( case faabric::scheduler::FunctionCalls::GetResources: { return recvGetResources(buffer, bufferSize); } + case faabric::scheduler::FunctionCalls::PendingMigrations: { + return recvPendingMigrations(buffer, bufferSize); + } default: { throw std::runtime_error( fmt::format("Unrecognized sync call header: {}", header)); @@ -76,8 +79,9 @@ void FunctionCallServer::recvExecuteFunctions(const uint8_t* buffer, // This host has now been told to execute these functions no matter what // TODO - avoid this copy - scheduler.callFunctions(std::make_shared(msg), - faabric::util::SchedulingTopologyHint::FORCE_LOCAL); + msg.mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); + scheduler.callFunctions( + std::make_shared(msg)); } void FunctionCallServer::recvUnregister(const uint8_t* buffer, @@ -100,4 +104,17 @@ std::unique_ptr FunctionCallServer::recvGetResources( scheduler.getThisHostResources()); return response; } + +std::unique_ptr +FunctionCallServer::recvPendingMigrations(const uint8_t* buffer, + size_t bufferSize) +{ + PARSE_MSG(faabric::PendingMigrations, buffer, bufferSize); + + auto msgPtr = std::make_shared(msg); + + scheduler.addPendingMigration(msgPtr); + + return std::make_unique(); +} } diff --git a/src/scheduler/FunctionMigrationThread.cpp b/src/scheduler/FunctionMigrationThread.cpp new file mode 100644 index 000000000..d72afe8ff --- /dev/null +++ b/src/scheduler/FunctionMigrationThread.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include + +namespace faabric::scheduler { +void FunctionMigrationThread::start(int wakeUpPeriodSecondsIn) +{ + // Initialise wake up period and shutdown variable + wakeUpPeriodSeconds = wakeUpPeriodSecondsIn; + isShutdown.store(false, std::memory_order_release); + + // Main work loop + workThread = std::make_unique([&] { + // As we only check for migration opportunities every (possibly user- + // defined) timeout, we also support stopping the main thread through + // a condition variable. + while (!isShutdown.load(std::memory_order_acquire)) { + faabric::util::UniqueLock lock(mx); + + if (isShutdown.load(std::memory_order_acquire)) { + break; + } + + std::cv_status returnVal = mustStopCv.wait_for( + lock, std::chrono::milliseconds(wakeUpPeriodSeconds * 1000)); + + // If we hit the timeout it means we have not been notified to + // stop. Thus we check for migration opportunities. + if (returnVal == std::cv_status::timeout) { + SPDLOG_TRACE( + "Migration thread checking for migration opportunities"); + faabric::scheduler::getScheduler() + .checkForMigrationOpportunities(); + } + }; + + SPDLOG_DEBUG("Exiting main function migration thread loop"); + }); +} + +void FunctionMigrationThread::stop() +{ + if (workThread == nullptr) { + return; + } + + if (!isShutdown.load(std::memory_order_acquire)) { + faabric::util::UniqueLock lock(mx); + + // We set the flag _before_ we notify and after we acquire the lock. + // Therefore, either we check the flag (before going to sleep) or are + // woken by the notification. + isShutdown.store(true, std::memory_order_release); + mustStopCv.notify_one(); + } + + if (workThread->joinable()) { + workThread->join(); + } + + workThread = nullptr; +} +} diff --git a/src/scheduler/MpiWorld.cpp b/src/scheduler/MpiWorld.cpp index 6fac56a87..e3f2fb14b 100644 --- a/src/scheduler/MpiWorld.cpp +++ b/src/scheduler/MpiWorld.cpp @@ -222,20 +222,23 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize) msg.set_mpiworldid(id); msg.set_mpirank(i + 1); msg.set_mpiworldsize(size); - // Log chained functions to generate execution graphs - if (thisRankMsg != nullptr && thisRankMsg->recordexecgraph()) { - sch.logChainedFunction(call.id(), msg.id()); - msg.set_recordexecgraph(true); + if (thisRankMsg != nullptr) { + // Set message fields to allow for function migration + msg.set_appid(thisRankMsg->appid()); + msg.set_cmdline(thisRankMsg->cmdline()); + msg.set_inputdata(thisRankMsg->inputdata()); + msg.set_migrationcheckperiod(thisRankMsg->migrationcheckperiod()); + // Log chained functions to generate execution graphs + if (thisRankMsg->recordexecgraph()) { + sch.logChainedFunction(call.id(), msg.id()); + msg.set_recordexecgraph(true); + } } } std::vector executedAt; if (size > 1) { - // Send the init messages (note that message i corresponds to rank i+1) - // By default, we use the NEVER_ALONE policy for MPI executions to - // minimise cross-host messaging - faabric::util::SchedulingDecision decision = sch.callFunctions( - req, faabric::util::SchedulingTopologyHint::NEVER_ALONE); + faabric::util::SchedulingDecision decision = sch.callFunctions(req); executedAt = decision.hosts; } assert(executedAt.size() == size - 1); @@ -388,6 +391,9 @@ void MpiWorld::initLocalRemoteLeaders() { // First, group the ranks per host they belong to for convinience assert(hostForRank.size() == size); + // Clear the existing map in case we are calling this method during a + // migration + ranksForHost.clear(); for (int rank = 0; rank < hostForRank.size(); rank++) { std::string host = hostForRank.at(rank); @@ -1533,6 +1539,18 @@ void MpiWorld::barrier(int thisRank) thisRank, 0, nullptr, MPI_INT, 0, faabric::MPIMessage::BARRIER_JOIN); } + if (thisRank == localLeader && hasBeenMigrated) { + hasBeenMigrated = false; + if (thisRankMsg != nullptr) { + faabric::scheduler::getScheduler().removePendingMigration( + thisRankMsg->appid()); + } else { + SPDLOG_ERROR("App has been migrated but rank ({}) message not set", + thisRank); + throw std::runtime_error("App migrated but rank message not set"); + } + } + // Rank 0 broadcasts that the barrier is done (the others block here) broadcast( 0, thisRank, nullptr, MPI_INT, 0, faabric::MPIMessage::BARRIER_DONE); @@ -1554,12 +1572,10 @@ std::shared_ptr MpiWorld::getLocalQueue(int sendRank, // Note - the queues themselves perform concurrency control void MpiWorld::initLocalQueues() { - // Assert we only allocate queues once - assert(localQueues.size() == 0); localQueues.resize(size * size); - for (int recvRank = 0; recvRank < size; recvRank++) { - if (getHostForRank(recvRank) == thisHost) { - for (int sendRank = 0; sendRank < size; sendRank++) { + for (const int sendRank : ranksForHost[thisHost]) { + for (const int recvRank : ranksForHost[thisHost]) { + if (localQueues[getIndexForRanks(sendRank, recvRank)] == nullptr) { localQueues[getIndexForRanks(sendRank, recvRank)] = std::make_shared(); } @@ -1732,4 +1748,50 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank) throw std::runtime_error("Recv rank outside range"); } } + +void MpiWorld::prepareMigration( + int thisRank, + std::shared_ptr pendingMigrations) +{ + // Check that there are no pending asynchronous messages to send and receive + for (auto umb : unackedMessageBuffers) { + if (umb != nullptr && umb->size() > 0) { + SPDLOG_ERROR("Trying to migrate MPI application (id: {}) but rank" + " {} has {} pending async messages to receive", + thisRankMsg->appid(), + thisRank, + umb->size()); + throw std::runtime_error( + "Migrating with pending async messages is not supported"); + } + } + + if (!iSendRequests.empty()) { + SPDLOG_ERROR("Trying to migrate MPI application (id: {}) but rank" + " {} has {} pending async send messages to acknowledge", + thisRankMsg->appid(), + thisRank, + iSendRequests.size()); + throw std::runtime_error( + "Migrating with pending async messages is not supported"); + } + + if (thisRank == localLeader) { + for (int i = 0; i < pendingMigrations->migrations_size(); i++) { + auto m = pendingMigrations->mutable_migrations()->at(i); + assert(hostForRank.at(m.msg().mpirank()) == m.srchost()); + hostForRank.at(m.msg().mpirank()) = m.dsthost(); + } + + // Set the migration flag + hasBeenMigrated = true; + + // Reset the internal mappings. + initLocalBasePorts(hostForRank); + initLocalRemoteLeaders(); + + // Add the necessary new local messaging queues + initLocalQueues(); + } +} } diff --git a/src/scheduler/MpiWorldRegistry.cpp b/src/scheduler/MpiWorldRegistry.cpp index 5a2295188..e6b512b6a 100644 --- a/src/scheduler/MpiWorldRegistry.cpp +++ b/src/scheduler/MpiWorldRegistry.cpp @@ -52,9 +52,7 @@ MpiWorld& MpiWorldRegistry::getOrInitialiseWorld(faabric::Message& msg) { faabric::util::SharedLock lock(registryMutex); MpiWorld& world = worldMap[worldId]; - if (msg.recordexecgraph()) { - world.setMsgForRank(msg); - } + world.setMsgForRank(msg); return world; } } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 6dfcbb072..c9ad06ec7 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -120,6 +120,8 @@ void Scheduler::reset() } deadExecutorsList.clear(); + functionMigrationThread.stop(); + faabric::util::FullLock lock(mx); executors.clear(); deadExecutors.clear(); @@ -137,6 +139,10 @@ void Scheduler::reset() threadResults.clear(); pushedSnapshotsMap.clear(); + // Reset function migration tracking + inFlightRequests.clear(); + pendingMigrations.clear(); + // Records recordedMessagesAll.clear(); recordedMessagesLocal.clear(); @@ -233,8 +239,7 @@ void Scheduler::notifyExecutorShutdown(Executor* exec, } faabric::util::SchedulingDecision Scheduler::callFunctions( - std::shared_ptr req, - faabric::util::SchedulingTopologyHint topologyHint) + std::shared_ptr req) { // Note, we assume all the messages are for the same function and have the // same master host @@ -246,6 +251,12 @@ faabric::util::SchedulingDecision Scheduler::callFunctions( throw std::runtime_error("Message with no master host"); } + // Get topology hint from message + faabric::util::SchedulingTopologyHint topologyHint = + firstMsg.topologyhint().empty() + ? faabric::util::SchedulingTopologyHint::NORMAL + : faabric::util::strToTopologyHint.at(firstMsg.topologyhint()); + // If we're not the master host, we need to forward the request back to the // master host. This will only happen if a nested batch execution happens. if (topologyHint != faabric::util::SchedulingTopologyHint::FORCE_LOCAL && @@ -303,6 +314,9 @@ faabric::util::SchedulingDecision Scheduler::makeSchedulingDecision( // Work out how many we can handle locally int slots = thisHostResources.slots(); + if (topologyHint == faabric::util::SchedulingTopologyHint::UNDERFULL) { + slots = slots / 2; + } // Work out available cores, flooring at zero int available = @@ -442,6 +456,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( faabric::Message& firstMsg = req->mutable_messages()->at(0); std::string funcStr = faabric::util::funcToString(firstMsg, false); int nMessages = req->messages_size(); + bool isMigration = req->type() == faabric::BatchExecuteRequest::MIGRATION; if (decision.hosts.size() != nMessages) { SPDLOG_ERROR( @@ -452,6 +467,11 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( throw std::runtime_error("Invalid scheduler hint for messages"); } + // Record in-flight request if function desires to be migrated + if (!isMigration && firstMsg.migrationcheckperiod() > 0) { + doStartFunctionMigrationThread(req, decision); + } + // NOTE: we want to schedule things on this host _last_, otherwise functions // may start executing before all messages have been dispatched, thus // slowing the remaining scheduling. @@ -529,6 +549,15 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions( } } + // Now reset the tracking on the snapshot before we start executing + snap->clearTrackedChanges(); + } else if (isMigration) { + // If we are executing a migrated function, we don't need to distribute + // the snapshot to other hosts, as this snapshot is specific to the + // to-be-restored function + auto snap = + faabric::snapshot::getSnapshotRegistry().getSnapshot(snapshotKey); + // Now reset the tracking on the snapshot before we start executing snap->clearTrackedChanges(); } @@ -720,12 +749,12 @@ void Scheduler::callFunction(faabric::Message& msg, bool forceLocal) // Specify that this is a normal function, not a thread req->set_type(req->FUNCTIONS); - // Make the call if (forceLocal) { - callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); - } else { - callFunctions(req); + req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); } + + // Make the call + callFunctions(req); } void Scheduler::clearRecordedMessages() @@ -886,6 +915,12 @@ void Scheduler::setFunctionResult(faabric::Message& msg) throw std::runtime_error("Result key empty. Cannot publish result"); } + // Remove the app from in-flight map if still there, and this host is the + // master host for the message + if (msg.masterhost() == thisHost) { + removePendingMigration(msg.appid()); + } + // Write the successful result to the result queue std::vector inputData = faabric::util::messageToBytes(msg); redis.publishSchedulerResult(key, msg.statuskey(), inputData); @@ -1129,4 +1164,250 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId) return node; } + +void Scheduler::checkForMigrationOpportunities() +{ + std::vector> + tmpPendingMigrations; + + { + // Acquire a shared lock to read from the in-flight requests map + faabric::util::SharedLock lock(mx); + + tmpPendingMigrations = doCheckForMigrationOpportunities(); + } + + // If we find migration opportunites + if (tmpPendingMigrations.size() > 0) { + // Acquire full lock to write to the pending migrations map + faabric::util::FullLock lock(mx); + + for (auto msgPtr : tmpPendingMigrations) { + // First, broadcast the pending migrations to other hosts + broadcastPendingMigrations(msgPtr); + // Second, update our local records + pendingMigrations[msgPtr->appid()] = std::move(msgPtr); + } + } +} + +void Scheduler::broadcastPendingMigrations( + std::shared_ptr pendingMigrations) +{ + // Get all hosts for the to-be migrated app + auto msg = pendingMigrations->migrations().at(0).msg(); + std::string funcStr = faabric::util::funcToString(msg, false); + std::set& thisRegisteredHosts = registeredHosts[funcStr]; + + // Remove this host from the set + registeredHosts.erase(thisHost); + + // Send pending migrations to all involved hosts + for (auto& otherHost : thisRegisteredHosts) { + getFunctionCallClient(otherHost).sendPendingMigrations( + pendingMigrations); + } +} + +std::shared_ptr Scheduler::getPendingAppMigrations( + uint32_t appId) +{ + faabric::util::SharedLock lock(mx); + + if (pendingMigrations.find(appId) == pendingMigrations.end()) { + return nullptr; + } + + return pendingMigrations[appId]; +} + +void Scheduler::addPendingMigration( + std::shared_ptr pMigration) +{ + faabric::util::FullLock lock(mx); + + auto msg = pMigration->migrations().at(0).msg(); + if (pendingMigrations.find(msg.appid()) != pendingMigrations.end()) { + SPDLOG_ERROR("Received remote request to add a pending migration for " + "app {}, but already recorded another migration request" + " for the same app.", + msg.appid()); + throw std::runtime_error("Remote request for app already there"); + } + + pendingMigrations[msg.appid()] = pMigration; +} + +void Scheduler::removePendingMigration(uint32_t appId) +{ + faabric::util::FullLock lock(mx); + + inFlightRequests.erase(appId); + pendingMigrations.erase(appId); +} + +std::vector> +Scheduler::doCheckForMigrationOpportunities( + faabric::util::MigrationStrategy migrationStrategy) +{ + std::vector> + pendingMigrationsVec; + + // For each in-flight request that has opted in to be migrated, + // check if there is an opportunity to migrate + for (const auto& app : inFlightRequests) { + auto req = app.second.first; + auto originalDecision = *app.second.second; + + // If we have already recorded a pending migration for this req, + // skip + if (getPendingAppMigrations(originalDecision.appId) != nullptr) { + SPDLOG_TRACE("Skipping app {} as migration opportunity has " + "already been recorded", + originalDecision.appId); + continue; + } + + faabric::PendingMigrations msg; + msg.set_appid(originalDecision.appId); + + if (migrationStrategy == faabric::util::MigrationStrategy::BIN_PACK) { + // We assume the batch was originally scheduled using + // bin-packing, thus the scheduling decision has at the begining + // (left) the hosts with the most allocated requests, and at the + // end (right) the hosts with the fewest. To check for migration + // oportunities, we compare a pointer to the possible + // destination of the migration (left), with one to the possible + // source of the migration (right). NOTE - this is a slight + // simplification, but makes the code simpler. + auto left = originalDecision.hosts.begin(); + auto right = originalDecision.hosts.end() - 1; + faabric::HostResources r = (*left == thisHost) + ? getThisHostResources() + : getHostResources(*left); + auto nAvailable = [&r]() -> int { + return r.slots() - r.usedslots(); + }; + auto claimSlot = [&r]() { + int currentUsedSlots = r.usedslots(); + r.set_usedslots(currentUsedSlots + 1); + }; + while (left < right) { + // If both pointers point to the same host, no migration + // opportunity, and must check another possible source of + // the migration + if (*left == *right) { + --right; + continue; + } + + // If the left pointer (possible destination of the + // migration) is out of available resources, no migration + // opportunity, and must check another possible destination + // of migration + if (nAvailable() == 0) { + auto oldHost = *left; + ++left; + if (*left != oldHost) { + r = (*left == thisHost) ? getThisHostResources() + : getHostResources(*left); + } + continue; + } + + // If each pointer points to a request scheduled in a + // different host, and the possible destination has slots, + // there is a migration opportunity + auto* migration = msg.add_migrations(); + migration->set_srchost(*right); + migration->set_dsthost(*left); + + faabric::Message* msgPtr = + &(*(req->mutable_messages()->begin() + + std::distance(originalDecision.hosts.begin(), right))); + auto* migrationMsgPtr = migration->mutable_msg(); + *migrationMsgPtr = *msgPtr; + // Decrement by one the availability, and check for more + // possible sources of migration + claimSlot(); + --right; + } + } else { + SPDLOG_ERROR("Unrecognised migration strategy: {}", + migrationStrategy); + throw std::runtime_error("Unrecognised migration strategy."); + } + + if (msg.migrations_size() > 0) { + pendingMigrationsVec.emplace_back( + std::make_shared(msg)); + SPDLOG_DEBUG("Detected migration opportunity for app: {}", + msg.appid()); + } else { + SPDLOG_DEBUG("No migration opportunity detected for app: {}", + msg.appid()); + } + } + + return pendingMigrationsVec; +} + +// Start the function migration thread if necessary +// NOTE: ideally, instead of allowing the applications to specify a check +// period, we would have a default one (overwritable through an env. +// variable), and apps would just opt in/out of being migrated. We set +// the actual check period instead to ease with experiments. +void Scheduler::doStartFunctionMigrationThread( + std::shared_ptr req, + faabric::util::SchedulingDecision& decision) +{ + bool startMigrationThread = inFlightRequests.size() == 0; + faabric::Message& firstMsg = req->mutable_messages()->at(0); + + if (inFlightRequests.find(decision.appId) != inFlightRequests.end()) { + // MPI applications are made up of two different requests: the + // original one (with one message) and the second one (with + // world size - 1 messages) created during world creation time. + // Thus, to correctly track migration opportunities we must merge + // both. We append the batch request to the original one (instead + // of the other way around) not to affect the rest of this methods + // functionality. + if (firstMsg.ismpi()) { + startMigrationThread = false; + auto originalReq = inFlightRequests[decision.appId].first; + auto originalDecision = inFlightRequests[decision.appId].second; + assert(req->messages_size() == firstMsg.mpiworldsize() - 1); + for (int i = 0; i < firstMsg.mpiworldsize() - 1; i++) { + // Append message to original request + auto* newMsgPtr = originalReq->add_messages(); + *newMsgPtr = req->messages().at(i); + + // Append message to original decision + originalDecision->addMessage(decision.hosts.at(i), + req->messages().at(i)); + } + } else { + SPDLOG_ERROR("There is already an in-flight request for app {}", + firstMsg.appid()); + throw std::runtime_error("App already in-flight"); + } + } else { + auto decisionPtr = + std::make_shared(decision); + inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr); + } + + // Decide wether we have to start the migration thread or not + if (startMigrationThread) { + functionMigrationThread.start(firstMsg.migrationcheckperiod()); + } else if (firstMsg.migrationcheckperiod() != + functionMigrationThread.wakeUpPeriodSeconds) { + SPDLOG_WARN("Ignoring migration check period for app {} as the" + "migration thread is already running with a different" + " check period (provided: {}, current: {})", + firstMsg.appid(), + firstMsg.migrationcheckperiod(), + functionMigrationThread.wakeUpPeriodSeconds); + } +} } diff --git a/src/util/json.cpp b/src/util/json.cpp index 84b6da020..38d6061e7 100644 --- a/src/util/json.cpp +++ b/src/util/json.cpp @@ -228,6 +228,17 @@ std::string messageToJson(const faabric::Message& msg) } } + if (msg.migrationcheckperiod() > 0) { + d.AddMember("migration_check_period", msg.migrationcheckperiod(), a); + } + + if (!msg.topologyhint().empty()) { + d.AddMember( + "topology_hint", + Value(msg.topologyhint().c_str(), msg.topologyhint().size()).Move(), + a); + } + StringBuffer sb; Writer writer(sb); d.Accept(writer); @@ -439,6 +450,11 @@ faabric::Message jsonToMessage(const std::string& jsonIn) msgIntMap[it.first] = it.second; } + msg.set_migrationcheckperiod( + getIntFromJson(d, "migration_check_period", 0)); + + msg.set_topologyhint(getStringFromJson(d, "topology_hint", "NORMAL")); + PROF_END(jsonDecode) return msg; diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 3af1a230b..fe62c31e3 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -201,6 +201,22 @@ int32_t TestExecutor::executeTask( throw std::runtime_error("This is a test error"); } + if (msg.function() == "sleep") { + int timeToSleepMs = SHORT_TEST_TIMEOUT_MS; + if (!msg.inputdata().empty()) { + timeToSleepMs = std::stoi(msg.inputdata()); + } + SPDLOG_DEBUG("Sleep test function going to sleep for {} ms", + timeToSleepMs); + SLEEP_MS(timeToSleepMs); + SPDLOG_DEBUG("Sleep test function waking up"); + + msg.set_outputdata( + fmt::format("Migration test function {} executed", msg.id())); + + return 0; + } + if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) { SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id()); return msg.id() / 100; @@ -258,14 +274,12 @@ class TestExecutorFixture conf.overrideCpuCount = 10; conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; - faabric::util::SchedulingTopologyHint topologyHint = - faabric::util::SchedulingTopologyHint::NORMAL; if (forceLocal) { - topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); } - return sch.callFunctions(req, topologyHint).hosts; + return sch.callFunctions(req).hosts; } }; @@ -851,7 +865,8 @@ TEST_CASE_METHOD(TestExecutorFixture, } // Call functions and force to execute locally - sch.callFunctions(req, faabric::util::SchedulingTopologyHint::FORCE_LOCAL); + req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); + sch.callFunctions(req); // Await execution for (auto& m : *req->mutable_messages()) { diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp new file mode 100644 index 000000000..4ee305fde --- /dev/null +++ b/tests/test/scheduler/test_function_migration.cpp @@ -0,0 +1,457 @@ +#include + +#include +#include + +#include +#include +#include +#include + +using namespace faabric::scheduler; + +namespace tests { +class FunctionMigrationTestFixture : public SchedulerTestFixture +{ + public: + FunctionMigrationTestFixture() + { + faabric::util::setMockMode(true); + + std::shared_ptr fac = + std::make_shared(); + setExecutorFactory(fac); + } + + ~FunctionMigrationTestFixture() + { + faabric::util::setMockMode(false); + + // Remove all hosts from global set + for (const std::string& host : sch.getAvailableHosts()) { + sch.removeHostFromGlobalSet(host); + } + } + + protected: + FunctionMigrationThread migrationThread; + std::string masterHost = faabric::util::getSystemConfig().endpointHost; + + // Helper method to set the available hosts and slots per host prior to + // making a scheduling decision + void setHostResources(std::vector registeredHosts, + std::vector slotsPerHost, + std::vector usedSlotsPerHost) + { + assert(registeredHosts.size() == slotsPerHost.size()); + auto& sch = faabric::scheduler::getScheduler(); + sch.clearRecordedMessages(); + + for (int i = 0; i < registeredHosts.size(); i++) { + faabric::HostResources resources; + resources.set_slots(slotsPerHost.at(i)); + resources.set_usedslots(usedSlotsPerHost.at(i)); + + sch.addHostToGlobalSet(registeredHosts.at(i)); + + // If setting resources for the master host, update the scheduler. + // Otherwise, queue the resource response + if (i == 0) { + sch.setThisHostResources(resources); + } else { + faabric::scheduler::queueResourceResponse(registeredHosts.at(i), + resources); + } + } + } + + void updateLocalResources(int slots, int usedSlots) + { + faabric::HostResources r; + r.set_slots(slots); + r.set_usedslots(usedSlots); + sch.setThisHostResources(r); + } + + std::shared_ptr + buildPendingMigrationsExpectation( + std::shared_ptr req, + std::vector hosts, + std::vector> migrations) + { + faabric::PendingMigrations expected; + expected.set_appid(req->messages().at(0).appid()); + + for (auto pair : migrations) { + auto* migration = expected.add_migrations(); + auto* migrationMsg = migration->mutable_msg(); + *migrationMsg = req->mutable_messages()->at(pair.first); + migration->set_srchost(hosts.at(pair.first)); + migration->set_dsthost(hosts.at(pair.second)); + } + + return std::make_shared(expected); + } + + void checkPendingMigrationsExpectation( + std::shared_ptr expectedMigrations, + std::shared_ptr actualMigrations, + std::vector hosts, + bool skipMsgIdCheck = false) + { + if (expectedMigrations == nullptr) { + REQUIRE(actualMigrations == expectedMigrations); + } else { + // Check actual migration matches expectation + REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); + REQUIRE(actualMigrations->migrations_size() == + expectedMigrations->migrations_size()); + for (int i = 0; i < actualMigrations->migrations_size(); i++) { + auto actual = actualMigrations->mutable_migrations()->at(i); + auto expected = expectedMigrations->mutable_migrations()->at(i); + if (!skipMsgIdCheck) { + REQUIRE(actual.msg().id() == expected.msg().id()); + } + REQUIRE(actual.srchost() == expected.srchost()); + REQUIRE(actual.dsthost() == expected.dsthost()); + } + + // Check we have sent a message to all other hosts with the pending + // migration + auto pendingRequests = getPendingMigrationsRequests(); + REQUIRE(pendingRequests.size() == hosts.size() - 1); + for (auto& pendingReq : getPendingMigrationsRequests()) { + std::string host = pendingReq.first; + std::shared_ptr migration = + pendingReq.second; + auto it = std::find(hosts.begin(), hosts.end(), host); + REQUIRE(it != hosts.end()); + REQUIRE(migration == actualMigrations); + } + } + } +}; + +TEST_CASE_METHOD(FunctionMigrationTestFixture, + "Test starting and stopping the function migration thread", + "[scheduler]") +{ + int wakeUpPeriodSeconds = 2; + migrationThread.start(wakeUpPeriodSeconds); + + SLEEP_MS(SHORT_TEST_TIMEOUT_MS); + + migrationThread.stop(); +} + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test migration oportunities are only detected if set in the message", + "[scheduler]") +{ + // First set resources before calling the functions: one will be allocated + // locally, another one in the remote host + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + // Second, prepare the request we will migrate in-flight. + // NOTE: the sleep function sleeps for a set timeout before returning. + auto req = faabric::util::batchExecFactory("foo", "sleep", 2); + int timeToSleep = SHORT_TEST_TIMEOUT_MS; + req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); + uint32_t appId = req->messages().at(0).appid(); + + // Build expected pending migrations + std::shared_ptr expectedMigrations; + SECTION("Migration not enabled") { expectedMigrations = nullptr; } + + SECTION("Migration enabled") + { + // Set to a non-zero value so that migration is enabled + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + + // Build expected migrations + std::vector> migrations = { { 1, 0 } }; + expectedMigrations = + buildPendingMigrationsExpectation(req, hosts, migrations); + } + + auto decision = sch.callFunctions(req); + + // Update host resources so that a migration opportunity appears, but will + // only be detected if migration check period is set. + updateLocalResources(2, 1); + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.getPendingAppMigrations(appId); + checkPendingMigrationsExpectation( + expectedMigrations, actualMigrations, hosts); + + faabric::Message res = + sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); +} + +TEST_CASE_METHOD(FunctionMigrationTestFixture, + "Test checking for migration opportunities", + "[scheduler]") +{ + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "sleep", 2); + int timeToSleep = SHORT_TEST_TIMEOUT_MS; + req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); + uint32_t appId = req->messages().at(0).appid(); + + // By setting the check period to a non-zero value, we are effectively + // opting in to be considered for migration + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + + auto decision = sch.callFunctions(req); + + std::shared_ptr expectedMigrations; + + // As we don't update the available resources, no migration opportunities + // will appear, even though we are checking for them + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + SECTION("Can migrate") + { + // Update host resources so that a migration opportunity appears + updateLocalResources(2, 1); + + // Build expected migrations + std::vector> migrations = { { 1, 0 } }; + expectedMigrations = + buildPendingMigrationsExpectation(req, hosts, migrations); + } + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.getPendingAppMigrations(appId); + checkPendingMigrationsExpectation( + expectedMigrations, actualMigrations, hosts); + + faabric::Message res = + sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); +} + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test detecting migration opportunities for several messages and hosts", + "[scheduler]") +{ + // First set resources before calling the functions: one request will be + // allocated to each host + std::vector hosts = { masterHost, "hostA", "hostB", "hostC" }; + std::vector slots = { 1, 1, 1, 1 }; + std::vector usedSlots = { 0, 0, 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "sleep", 4); + int timeToSleep = SHORT_TEST_TIMEOUT_MS; + req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); + uint32_t appId = req->messages().at(0).appid(); + + // Opt in to be considered for migration + req->mutable_messages()->at(0).set_migrationcheckperiod(2); + + auto decision = sch.callFunctions(req); + + // Set up expectations + std::shared_ptr expectedMigrations; + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + SECTION("Can migrate") + { + // Update host resources so that two migration opportunities appear in + // different hosts. + std::vector newSlots = { 2, 2, 1, 1 }; + std::vector newUsedSlots = { 1, 1, 1, 1 }; + setHostResources(hosts, newSlots, newUsedSlots); + + // Build expected result: two migrations + std::vector> migrations = { { 3, 0 }, { 2, 1 } }; + expectedMigrations = + buildPendingMigrationsExpectation(req, hosts, migrations); + } + + sch.checkForMigrationOpportunities(); + + auto actualMigrations = sch.getPendingAppMigrations(appId); + checkPendingMigrationsExpectation( + expectedMigrations, actualMigrations, hosts); + + faabric::Message res = + sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); +} + +TEST_CASE_METHOD( + FunctionMigrationTestFixture, + "Test function migration thread detects migration opportunities", + "[scheduler]") +{ + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 1, 1 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + auto req = faabric::util::batchExecFactory("foo", "sleep", 2); + int checkPeriodSecs = 1; + int timeToSleep = 4 * checkPeriodSecs * 1000; + req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); + uint32_t appId = req->messages().at(0).appid(); + + // Opt in to be migrated + req->mutable_messages()->at(0).set_migrationcheckperiod(checkPeriodSecs); + + auto decision = sch.callFunctions(req); + + std::shared_ptr expectedMigrations; + + // As we don't update the available resources, no migration opportunities + // will appear, even though we are checking for them + SECTION("Can not migrate") { expectedMigrations = nullptr; } + + SECTION("Can migrate") + { + // Update host resources so that a migration opportunity appears + updateLocalResources(2, 1); + + // Build expected migrations + std::vector> migrations = { { 1, 0 } }; + expectedMigrations = + buildPendingMigrationsExpectation(req, hosts, migrations); + } + + // Instead of directly calling the scheduler function to check for migration + // opportunites, sleep for enough time (twice the check period) so that a + // migration is detected by the background thread. + SLEEP_MS(2 * checkPeriodSecs * 1000); + + auto actualMigrations = sch.getPendingAppMigrations(appId); + checkPendingMigrationsExpectation( + expectedMigrations, actualMigrations, hosts); + + faabric::Message res = + sch.getFunctionResult(req->messages().at(0).id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Check that after the result is set, the app can't be migrated no more + sch.checkForMigrationOpportunities(); + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); +} + +TEST_CASE_METHOD(FunctionMigrationTestFixture, + "Test adding and removing pending migrations manually", + "[scheduler]") +{ + auto req = faabric::util::batchExecFactory("foo", "sleep", 2); + uint32_t appId = req->messages().at(0).appid(); + std::vector hosts = { masterHost, "hostA" }; + std::vector> migrations = { { 1, 0 } }; + auto expectedMigrations = + buildPendingMigrationsExpectation(req, hosts, migrations); + + // Add migration manually + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); + sch.addPendingMigration(expectedMigrations); + REQUIRE(sch.getPendingAppMigrations(appId) == expectedMigrations); + + // Remove migration manually + sch.removePendingMigration(appId); + REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); +} + +TEST_CASE_METHOD(FunctionMigrationTestFixture, + "Test MPI function migration points", + "[scheduler]") +{ + // Set up host resources + std::vector hosts = { masterHost, "hostA" }; + std::vector slots = { 2, 2 }; + std::vector usedSlots = { 0, 0 }; + setHostResources(hosts, slots, usedSlots); + + // Clear MPI registries + getMpiWorldRegistry().clear(); + + auto req = faabric::util::batchExecFactory("mpi", "sleep", 1); + int checkPeriodSecs = 1; + int timeToSleep = 4 * checkPeriodSecs * 1000; + + int worldId = 123; + int worldSize = 4; + auto* firstMsg = req->mutable_messages(0); + firstMsg->set_inputdata(std::to_string(timeToSleep)); + firstMsg->set_ismpi(true); + firstMsg->set_mpiworldsize(worldSize); + firstMsg->set_mpiworldid(worldId); + firstMsg->set_migrationcheckperiod(checkPeriodSecs); + uint32_t appId = req->messages().at(0).appid(); + + // Call function that wil just sleep + auto decision = sch.callFunctions(req); + + // Manually create the world, and trigger a second function invocation in + // the remote host + MpiWorld world; + world.create(*firstMsg, worldId, worldSize); + + // Update host resources so that a migration opportunity appears + updateLocalResources(4, 2); + + // Build expected migrations + std::shared_ptr expectedMigrations; + // We need to add to the original request the ones that will be + // chained by MPI (this is only needed to build the expectation). + // NOTE: we do it in a copy of the original request, as otherwise TSAN + // complains about a data race. + auto reqCopy = faabric::util::batchExecFactory("mpi", "sleep", worldSize); + for (int i = 0; i < worldSize; i++) { + reqCopy->mutable_messages(i)->set_appid(firstMsg->appid()); + } + std::vector> migrations = { { 1, 0 }, { 1, 0 } }; + expectedMigrations = + buildPendingMigrationsExpectation(reqCopy, hosts, migrations); + + // Instead of directly calling the scheduler function to check for migration + // opportunites, sleep for enough time (twice the check period) so that a + // migration is detected by the background thread. + SLEEP_MS(2 * checkPeriodSecs * 1000); + + // When checking that a migration has taken place in MPI, we skip the msg + // id check. Part of the request is build by the runtime, and therefore + // we don't have access to the actual messages scheduled. + auto actualMigrations = sch.getPendingAppMigrations(appId); + checkPendingMigrationsExpectation( + expectedMigrations, actualMigrations, hosts, true); + + faabric::Message res = + sch.getFunctionResult(firstMsg->id(), 2 * timeToSleep); + REQUIRE(res.returnvalue() == 0); + + // Clean up + world.destroy(); +} +} diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index be67f9afe..142d446bf 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -1002,17 +1002,12 @@ TEST_CASE_METHOD(DummyExecutorFixture, expectedDecision.addMessage(expectedHosts.at(i), req->messages().at(i)); } - // Set topology hint - faabric::util::SchedulingTopologyHint topologyHint = - faabric::util::SchedulingTopologyHint::NORMAL; - if (forceLocal) { - topologyHint = faabric::util::SchedulingTopologyHint::FORCE_LOCAL; + req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); } // Schedule and check decision - faabric::util::SchedulingDecision actualDecision = - sch.callFunctions(req, topologyHint); + faabric::util::SchedulingDecision actualDecision = sch.callFunctions(req); checkSchedulingDecisionEquality(expectedDecision, actualDecision); // Check mappings set up locally or not diff --git a/tests/test/scheduler/test_scheduling_decisions.cpp b/tests/test/scheduler/test_scheduling_decisions.cpp index fb18e679f..4f985a6cc 100644 --- a/tests/test/scheduler/test_scheduling_decisions.cpp +++ b/tests/test/scheduler/test_scheduling_decisions.cpp @@ -97,9 +97,13 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture // Set resources for all hosts setHostResources(config.hosts, config.slots); + // Set topology hint in request + req->mutable_messages()->at(0).set_topologyhint( + faabric::util::topologyHintToStr.at(config.topologyHint)); + // The first time we run the batch request, we will follow the // unregistered hosts path - actualDecision = sch.callFunctions(req, config.topologyHint); + actualDecision = sch.callFunctions(req); REQUIRE(actualDecision.hosts == config.expectedHosts); checkRecordedBatchMessages(actualDecision, config); @@ -114,9 +118,12 @@ class SchedulingDecisionTestFixture : public SchedulerTestFixture faabric::util::batchExecFactory("foo", "baz", req->messages_size()); setHostResources(config.hosts, config.slots); + reqCopy->mutable_messages()->at(0).set_topologyhint( + faabric::util::topologyHintToStr.at(config.topologyHint)); + // The second time we run the batch request, we will follow // the registered hosts path - actualDecision = sch.callFunctions(reqCopy, config.topologyHint); + actualDecision = sch.callFunctions(reqCopy); REQUIRE(actualDecision.hosts == config.expectedHosts); checkRecordedBatchMessages(actualDecision, config); } @@ -390,4 +397,40 @@ TEST_CASE_METHOD(SchedulingDecisionTestFixture, testActualSchedulingDecision(req, config); } + +TEST_CASE_METHOD(SchedulingDecisionTestFixture, + "Test underfull scheduling topology hint", + "[scheduler]") +{ + SchedulingConfig config = { + .hosts = { masterHost, "hostA" }, + .slots = { 2, 2 }, + .numReqs = 2, + .topologyHint = faabric::util::SchedulingTopologyHint::UNDERFULL, + .expectedHosts = { masterHost, "hostA" }, + }; + + std::shared_ptr req; + + SECTION("Test hint's basic functionality") + { + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test hint does not affect other hosts") + { + config.numReqs = 3; + config.expectedHosts = { masterHost, "hostA", "hostA" }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + SECTION("Test with hint we still overload to master") + { + config.numReqs = 4; + config.expectedHosts = { masterHost, "hostA", "hostA", masterHost }; + req = faabric::util::batchExecFactory("foo", "bar", config.numReqs); + } + + testActualSchedulingDecision(req, config); +} } diff --git a/tests/test/util/test_json.cpp b/tests/test/util/test_json.cpp index f8ac1766d..2fefc6ed3 100644 --- a/tests/test/util/test_json.cpp +++ b/tests/test/util/test_json.cpp @@ -47,6 +47,10 @@ TEST_CASE("Test message to JSON round trip", "[util]") auto& intMap = *msg.mutable_intexecgraphdetails(); intMap["foo"] = 0; + msg.set_migrationcheckperiod(33); + + msg.set_topologyhint("TEST_TOPOLOGY_HINT"); + SECTION("Dodgy characters") { msg.set_inputdata("[0], %$ 2233 9"); } SECTION("Bytes") diff --git a/tests/utils/message_utils.cpp b/tests/utils/message_utils.cpp index 58924958c..d3189ec18 100644 --- a/tests/utils/message_utils.cpp +++ b/tests/utils/message_utils.cpp @@ -52,5 +52,7 @@ void checkMessageEquality(const faabric::Message& msgA, checkMessageMapEquality(msgA.execgraphdetails(), msgB.execgraphdetails()); checkMessageMapEquality(msgA.intexecgraphdetails(), msgB.intexecgraphdetails()); + + REQUIRE(msgA.migrationcheckperiod() == msgB.migrationcheckperiod()); } }