diff --git a/include/faabric/mpi/MpiWorld.h b/include/faabric/mpi/MpiWorld.h index 8ee9f967d..001578dd6 100644 --- a/include/faabric/mpi/MpiWorld.h +++ b/include/faabric/mpi/MpiWorld.h @@ -201,9 +201,7 @@ class MpiWorld /* Function Migration */ - void prepareMigration( - int thisRank, - std::shared_ptr pendingMigrations); + void prepareMigration(int thisRank); private: int id = -1; diff --git a/include/faabric/scheduler/FunctionCallApi.h b/include/faabric/scheduler/FunctionCallApi.h index 5879e8331..3e8ab45db 100644 --- a/include/faabric/scheduler/FunctionCallApi.h +++ b/include/faabric/scheduler/FunctionCallApi.h @@ -6,9 +6,6 @@ enum FunctionCalls NoFunctionCall = 0, ExecuteFunctions = 1, Flush = 2, - Unregister = 3, - GetResources = 4, - PendingMigrations = 5, - SetMessageResult = 6, + SetMessageResult = 3, }; } diff --git a/include/faabric/scheduler/FunctionCallClient.h b/include/faabric/scheduler/FunctionCallClient.h index 02fbfd14b..9e3c4bd63 100644 --- a/include/faabric/scheduler/FunctionCallClient.h +++ b/include/faabric/scheduler/FunctionCallClient.h @@ -20,15 +20,6 @@ std::vector< std::pair>> getBatchRequests(); -std::vector> -getResourceRequests(); - -std::vector>> -getPendingMigrationsRequests(); - -std::vector> -getUnregisterRequests(); - std::vector>> getMessageResults(); @@ -52,14 +43,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient void sendFlush(); - faabric::HostResources getResources(); - - void sendPendingMigrations(std::shared_ptr req); - void executeFunctions(std::shared_ptr req); - void unregister(faabric::UnregisterRequest& req); - void setMessageResult(std::shared_ptr msg); }; diff --git a/include/faabric/scheduler/FunctionCallServer.h b/include/faabric/scheduler/FunctionCallServer.h index db62a330c..ed5d2e422 100644 --- a/include/faabric/scheduler/FunctionCallServer.h +++ b/include/faabric/scheduler/FunctionCallServer.h @@ -23,16 +23,8 @@ class FunctionCallServer final std::unique_ptr recvFlush( std::span buffer); - std::unique_ptr recvGetResources( - std::span buffer); - - std::unique_ptr recvPendingMigrations( - std::span buffer); - void recvExecuteFunctions(std::span buffer); - void recvUnregister(std::span buffer); - void recvSetMessageResult(std::span buffer); }; } diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 0903da78f..1ce9df57b 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -26,10 +26,6 @@ namespace faabric::scheduler { -typedef std::pair, - std::shared_ptr> - InFlightPair; - class Scheduler; Scheduler& getScheduler(); @@ -158,17 +154,6 @@ class Executor void threadPoolThread(std::stop_token st, int threadPoolIdx); }; -/** - * Background thread that periodically checks if there are migration - * opportunities for in-flight apps that have opted in to being checked for - * migrations. - */ -class FunctionMigrationThread : public faabric::util::PeriodicBackgroundThread -{ - public: - void doWork() override; -}; - /** * Background thread that periodically checks to see if any executors have * become stale (i.e. not handled any requests in a given timeout). If any are @@ -277,14 +262,6 @@ class Scheduler void removeHostFromGlobalSet(const std::string& host); - void removeRegisteredHost(const std::string& host, - const std::string& user, - const std::string& function); - - void addRegisteredHost(const std::string& host, - const std::string& user, - const std::string& function); - faabric::HostResources getThisHostResources(); void setThisHostResources(faabric::HostResources& res); @@ -304,14 +281,9 @@ class Scheduler // ---------------------------------- // Function Migration // ---------------------------------- - void checkForMigrationOpportunities(); - - std::shared_ptr getPendingAppMigrations( - uint32_t appId); - - void addPendingMigration(std::shared_ptr msg); - - void removePendingMigration(uint32_t appId); + std::shared_ptr checkForMigrationOpportunities( + faabric::Message& msg, + int overwriteNewGroupId = 0); private: std::string thisHost; @@ -341,8 +313,6 @@ class Scheduler void updateHostResources(); - faabric::HostResources getHostResources(const std::string& host); - // ---- Planner---- faabric::planner::KeepAliveThread keepAliveThread; @@ -353,16 +323,6 @@ class Scheduler std::unordered_map> registeredHosts; - faabric::batch_scheduler::SchedulingDecision doSchedulingDecision( - std::shared_ptr req, - faabric::batch_scheduler::SchedulingTopologyHint topologyHint); - - faabric::batch_scheduler::SchedulingDecision doCallFunctions( - std::shared_ptr req, - faabric::batch_scheduler::SchedulingDecision& decision, - faabric::util::FullLock& lock, - faabric::batch_scheduler::SchedulingTopologyHint topologyHint); - std::shared_ptr claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock); @@ -379,24 +339,6 @@ class Scheduler // ---- Point-to-point ---- faabric::transport::PointToPointBroker& broker; - - // ---- Function migration ---- - FunctionMigrationThread functionMigrationThread; - std::unordered_map inFlightRequests; - std::unordered_map> - pendingMigrations; - - std::vector> - doCheckForMigrationOpportunities( - faabric::batch_scheduler::MigrationStrategy migrationStrategy = - faabric::batch_scheduler::MigrationStrategy::BIN_PACK); - - void broadcastPendingMigrations( - std::shared_ptr pendingMigrations); - - void doStartFunctionMigrationThread( - std::shared_ptr req, - faabric::batch_scheduler::SchedulingDecision& decision); }; } diff --git a/src/mpi/MpiWorld.cpp b/src/mpi/MpiWorld.cpp index 9c7b50500..5549cd91e 100644 --- a/src/mpi/MpiWorld.cpp +++ b/src/mpi/MpiWorld.cpp @@ -1602,9 +1602,7 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank) } } -void MpiWorld::prepareMigration( - int thisRank, - std::shared_ptr pendingMigrations) +void MpiWorld::prepareMigration(int thisRank) { // Check that there are no pending asynchronous messages to send and receive for (auto umb : unackedMessageBuffers) { @@ -1629,45 +1627,10 @@ void MpiWorld::prepareMigration( "Migrating with pending async messages is not supported"); } - // Update local records + // Update local records if (thisRank == localLeader) { - for (int i = 0; i < pendingMigrations->migrations_size(); i++) { - auto m = pendingMigrations->mutable_migrations()->at(i); - assert(hostForRank.at(m.msg().mpirank()) == m.srchost()); - - // Update the host for this rank. We only update the positions of - // the to-be migrated ranks, avoiding race conditions with not- - // migrated ranks - hostForRank.at(m.msg().mpirank()) = m.dsthost(); - - // Update the ranks for host. This structure is used when doing - // collective communications by all ranks. At this point, all non- - // leader ranks will be hitting a barrier, for which they don't - // need the ranks for host map, therefore it is safe to modify it - if (m.dsthost() == thisHost && m.msg().mpirank() < localLeader) { - SPDLOG_WARN("Changing local leader {} -> {}", - localLeader, - m.msg().mpirank()); - localLeader = m.msg().mpirank(); - ranksForHost[m.dsthost()].insert( - ranksForHost[m.dsthost()].begin(), m.msg().mpirank()); - } - - ranksForHost[m.dsthost()].push_back(m.msg().mpirank()); - ranksForHost[m.srchost()].erase( - std::remove(ranksForHost[m.srchost()].begin(), - ranksForHost[m.srchost()].end(), - m.msg().mpirank()), - ranksForHost[m.srchost()].end()); - - if (ranksForHost[m.srchost()].empty()) { - ranksForHost.erase(m.srchost()); - } - - // This could be made more efficient as the broker method acquires - // a full lock every time - broker.updateHostForIdx(id, m.msg().mpirank(), m.dsthost()); - } + // TODO: we may be able to just initLocalRemote here? + initLocalRemoteLeaders(); // Set the migration flag hasBeenMigrated = true; diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index f1b11ba9e..868f91bfe 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -315,14 +315,27 @@ Planner::callBatch(std::shared_ptr req) faabric::util::FullLock lock(plannerMx); auto batchScheduler = faabric::batch_scheduler::getBatchScheduler(); + auto decisionType = + batchScheduler->getDecisionType(state.inFlightReqs, req); // First, make scheduling decision #ifndef NDEBUG #endif // TODO: remove this copy once we finalise on one Host abstraction auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap); - auto decision = batchScheduler->makeSchedulingDecision( - hostMapCopy, state.inFlightReqs, req); + + // For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove + // on the old decision (we don't care the one we send) + // TODO: alternatively we could make sure the ExecutorContext is always set? + std::shared_ptr decision; + if (decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE) { + auto oldReq = state.inFlightReqs.at(appId).first; + decision = batchScheduler->makeSchedulingDecision( + hostMapCopy, state.inFlightReqs, oldReq); + } else { + decision = batchScheduler->makeSchedulingDecision( + hostMapCopy, state.inFlightReqs, req); + } #ifndef NDEBUG // Here we make sure the state hasn't changed here (we pass const, but they // are const pointers) @@ -354,8 +367,6 @@ Planner::callBatch(std::shared_ptr req) // 1. Update the host-map to reflect the new host occupation // 2. Update the in-flight map to include the new request // 3. Send the PTP mappings to all the hosts involved - auto decisionType = - batchScheduler->getDecisionType(state.inFlightReqs, req); auto& broker = faabric::transport::getPointToPointBroker(); switch (decisionType) { case faabric::batch_scheduler::DecisionType::NEW: { diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 4a665b5cf..c7a9e05e5 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -46,16 +46,6 @@ message BatchExecuteRequest { // Arbitrary context for this batch int32 subType = 8; bytes contextData = 9; - - // Flag set by the scheduler when this batch is all executing on a single - // host - bool singleHost = 10; - - // TODO(planner-schedule): remove me - // Temporary flag to indicate the workers that the BER comes from the - // planner (i.e. proxy-ed through planner) and so it has not been scheduled - // yet. Whenever the planner does scheduling we will be able to remove this - bool comesFromPlanner = 11; } message BatchExecuteRequestStatus { @@ -239,15 +229,10 @@ message PointToPointMappings { // FUNCTION MIGRATIONS // --------------------------------------------- -message PendingMigrations { +message PendingMigration { int32 appId = 1; int32 groupId = 2; - - message PendingMigration { - Message msg = 1; - string srcHost = 2; - string dstHost = 3; - } - - repeated PendingMigration migrations = 3; + int32 groupIdx = 3; + string srcHost = 4; + string dstHost = 5; } diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index 0e1ed5504..1c3e98d85 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -205,12 +205,11 @@ void Executor::executeTasks(std::vector msgIdxs, std::shared_ptr req) { const std::string funcStr = faabric::util::funcToString(req); - SPDLOG_TRACE("{} executing {}/{} tasks of {} (single-host={})", + SPDLOG_TRACE("{} executing {}/{} tasks of {}", id, msgIdxs.size(), req->messages_size(), - funcStr, - req->singlehost()); + funcStr); // Note that this lock is specific to this executor, so will only block // when multiple threads are trying to schedule tasks. This will only @@ -227,13 +226,16 @@ void Executor::executeTasks(std::vector msgIdxs, bool isMaster = firstMsg.mainhost() == thisHost; bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - bool isSingleHost = req->singlehost(); + // TODO: isSingleHost should always be true for threads now + // bool isSingleHost = req->singlehost(); std::string snapshotKey = firstMsg.snapshotkey(); // Threads on a single host don't need to do anything with snapshots, as // they all share a single executor. Threads not on a single host need to // restore from the main thread snapshot. Non-threads need to restore from // a snapshot if they are given a snapshot key. + /* + * TODO: remove? if (isThreads && !isSingleHost) { // Check we get a valid memory view std::span memView = getMemoryView(); @@ -255,17 +257,17 @@ void Executor::executeTasks(std::vector msgIdxs, // Prepare list of lists for dirty pages from each thread threadLocalDirtyRegions.resize(req->messages_size()); - } else if (!isThreads && !firstMsg.snapshotkey().empty()) { + */ + if (!isThreads && !firstMsg.snapshotkey().empty()) { // Restore from snapshot if provided std::string snapshotKey = firstMsg.snapshotkey(); SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey); restore(snapshotKey); } else { - SPDLOG_TRACE("Not restoring {}. threads={}, key={}, single={}", + SPDLOG_TRACE("Not restoring {}. threads={}, key={}", funcStr, isThreads, - snapshotKey, - isSingleHost); + snapshotKey); } // Initialise batch counter @@ -423,10 +425,11 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) task.req->mutable_messages()->at(task.messageIndex); // Start dirty tracking if executing threads across hosts - bool isSingleHost = task.req->singlehost(); + // bool isSingleHost = task.req->singlehost(); bool isThreads = task.req->type() == faabric::BatchExecuteRequest::THREADS; - bool doDirtyTracking = isThreads && !isSingleHost; + // TODO: do we never need to do dirty tracking now? + bool doDirtyTracking = false; // isThreads && !isSingleHost; if (doDirtyTracking) { // If tracking is thread local, start here as it will happen for // each thread diff --git a/src/scheduler/FunctionCallClient.cpp b/src/scheduler/FunctionCallClient.cpp index 8a43dc84e..b75114289 100644 --- a/src/scheduler/FunctionCallClient.cpp +++ b/src/scheduler/FunctionCallClient.cpp @@ -28,10 +28,6 @@ static std::unordered_map> queuedResourceResponses; -static std::vector< - std::pair>> - pendingMigrationsRequests; - static std::vector> unregisterRequests; @@ -58,26 +54,6 @@ getBatchRequests() return batchMessages; } -std::vector> getResourceRequests() -{ - faabric::util::UniqueLock lock(mockMutex); - return resourceRequests; -} - -std::vector>> -getPendingMigrationsRequests() -{ - faabric::util::UniqueLock lock(mockMutex); - return pendingMigrationsRequests; -} - -std::vector> -getUnregisterRequests() -{ - faabric::util::UniqueLock lock(mockMutex); - return unregisterRequests; -} - std::vector>> getMessageResults() { @@ -97,7 +73,6 @@ void clearMockRequests() functionCalls.clear(); batchMessages.clear(); resourceRequests.clear(); - pendingMigrationsRequests.clear(); unregisterRequests.clear(); for (auto& p : queuedResourceResponses) { @@ -129,48 +104,6 @@ void FunctionCallClient::sendFlush() } } -faabric::HostResources FunctionCallClient::getResources() -{ - faabric::EmptyRequest request; - faabric::HostResources response; - - if (faabric::util::isMockMode()) { - faabric::util::UniqueLock lock(mockMutex); - - // Register the request - resourceRequests.emplace_back(host, request); - - // See if we have a queued response - if (queuedResourceResponses[host].size() > 0) { - response = queuedResourceResponses[host].dequeue(); - } - } else { - syncSend( - faabric::scheduler::FunctionCalls::GetResources, &request, &response); - } - - return response; -} - -// This function call is used by the main host of an application to let know -// other hosts running functions of the same application that a migration -// opportunity has been found. -void FunctionCallClient::sendPendingMigrations( - std::shared_ptr req) -{ - faabric::PendingMigrations request; - faabric::EmptyResponse response; - - if (faabric::util::isMockMode()) { - faabric::util::UniqueLock lock(mockMutex); - pendingMigrationsRequests.emplace_back(host, req); - } else { - syncSend(faabric::scheduler::FunctionCalls::PendingMigrations, - req.get(), - &response); - } -} - void FunctionCallClient::executeFunctions( const std::shared_ptr req) { @@ -183,16 +116,6 @@ void FunctionCallClient::executeFunctions( } } -void FunctionCallClient::unregister(faabric::UnregisterRequest& req) -{ - if (faabric::util::isMockMode()) { - faabric::util::UniqueLock lock(mockMutex); - unregisterRequests.emplace_back(host, req); - } else { - asyncSend(faabric::scheduler::FunctionCalls::Unregister, &req); - } -} - void FunctionCallClient::setMessageResult(std::shared_ptr msg) { if (faabric::util::isMockMode()) { diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index a6e8e23d3..bfe6b6412 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -25,10 +25,6 @@ void FunctionCallServer::doAsyncRecv(transport::Message& message) recvExecuteFunctions(message.udata()); break; } - case faabric::scheduler::FunctionCalls::Unregister: { - recvUnregister(message.udata()); - break; - } case faabric::scheduler::FunctionCalls::SetMessageResult: { recvSetMessageResult(message.udata()); break; @@ -48,12 +44,6 @@ std::unique_ptr FunctionCallServer::doSyncRecv( case faabric::scheduler::FunctionCalls::Flush: { return recvFlush(message.udata()); } - case faabric::scheduler::FunctionCalls::GetResources: { - return recvGetResources(message.udata()); - } - case faabric::scheduler::FunctionCalls::PendingMigrations: { - return recvPendingMigrations(message.udata()); - } default: { throw std::runtime_error( fmt::format("Unrecognized sync call header: {}", header)); @@ -89,40 +79,6 @@ void FunctionCallServer::recvExecuteFunctions(std::span buffer) std::make_shared(parsedMsg)); } -void FunctionCallServer::recvUnregister(std::span buffer) -{ - PARSE_MSG(faabric::UnregisterRequest, buffer.data(), buffer.size()) - - SPDLOG_DEBUG("Unregistering host {} for {}/{}", - parsedMsg.host(), - parsedMsg.user(), - parsedMsg.function()); - - // Remove the host from the warm set - scheduler.removeRegisteredHost( - parsedMsg.host(), parsedMsg.user(), parsedMsg.function()); -} - -std::unique_ptr FunctionCallServer::recvGetResources( - std::span buffer) -{ - auto response = std::make_unique( - scheduler.getThisHostResources()); - return response; -} - -std::unique_ptr -FunctionCallServer::recvPendingMigrations(std::span buffer) -{ - PARSE_MSG(faabric::PendingMigrations, buffer.data(), buffer.size()); - - auto msgPtr = std::make_shared(parsedMsg); - - scheduler.addPendingMigration(msgPtr); - - return std::make_unique(); -} - void FunctionCallServer::recvSetMessageResult(std::span buffer) { PARSE_MSG(faabric::Message, buffer.data(), buffer.size()) diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index d2edc9877..41454b67e 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -144,9 +144,6 @@ void Scheduler::reset() SPDLOG_DEBUG("Resetting scheduler"); resetThreadLocalCache(); - // Stop the function migration thread - functionMigrationThread.stop(); - // Stop the reaper thread reaperThread.stop(); @@ -185,10 +182,6 @@ void Scheduler::reset() pushedSnapshotsMap.clear(); - // Reset function migration tracking - inFlightRequests.clear(); - pendingMigrations.clear(); - // Records recordedMessagesAll.clear(); recordedMessagesLocal.clear(); @@ -290,8 +283,6 @@ int Scheduler::reapStaleExecutors() req.set_host(thisHost); req.set_user(user); req.set_function(function); - - getFunctionCallClient(mainHost)->unregister(req); } keysToRemove.emplace_back(key); @@ -334,80 +325,11 @@ const std::set& Scheduler::getFunctionRegisteredHosts( return registeredHosts[key]; } -void Scheduler::removeRegisteredHost(const std::string& host, - const std::string& user, - const std::string& function) -{ - faabric::util::FullLock lock(mx); - std::string key = user + "/" + function; - registeredHosts[key].erase(host); -} - -void Scheduler::addRegisteredHost(const std::string& host, - const std::string& user, - const std::string& function) -{ - std::string key = user + "/" + function; - registeredHosts[key].insert(host); -} - void Scheduler::vacateSlot() { thisHostUsedSlots.fetch_sub(1, std::memory_order_acq_rel); } -/* -faabric::batch_scheduler::SchedulingDecision Scheduler::callFunctions( - std::shared_ptr req) -{ - // We assume all the messages are for the same function and have the - // same main host - faabric::Message& firstMsg = req->mutable_messages()->at(0); - std::string mainHost = firstMsg.mainhost(); - - // Get topology hint from message - faabric::batch_scheduler::SchedulingTopologyHint topologyHint = - firstMsg.topologyhint().empty() - ? faabric::batch_scheduler::SchedulingTopologyHint::NONE - : faabric::batch_scheduler::strToTopologyHint.at( - firstMsg.topologyhint()); - - bool isForceLocal = - topologyHint == - faabric::batch_scheduler::SchedulingTopologyHint::FORCE_LOCAL; - - // If we're not the main host, we need to forward the request back to the - // main host. This will only happen if a nested batch execution happens. - if (!isForceLocal && mainHost != thisHost) { - std::string funcStr = faabric::util::funcToString(firstMsg, false); - SPDLOG_DEBUG("Forwarding {} back to main {}", funcStr, mainHost); - - getFunctionCallClient(mainHost)->executeFunctions(req); - faabric::batch_scheduler::SchedulingDecision decision( - firstMsg.appid(), firstMsg.groupid()); - decision.returnHost = mainHost; - return decision; - } - - faabric::util::FullLock lock(mx); - - faabric::batch_scheduler::SchedulingDecision decision = - doSchedulingDecision(req, topologyHint); - - // Pass decision as hint - return doCallFunctions(req, decision, lock, topologyHint); -} - -faabric::batch_scheduler::SchedulingDecision Scheduler::makeSchedulingDecision( - std::shared_ptr req, - faabric::batch_scheduler::SchedulingTopologyHint topologyHint) -{ - faabric::util::FullLock lock(mx); - - return doSchedulingDecision(req, topologyHint); -} -*/ - void Scheduler::executeBatch(std::shared_ptr req) { // ------------------------------------------- @@ -506,8 +428,10 @@ void Scheduler::executeBatch(std::shared_ptr req) // For threads we only need one executor, for anything else we want // one Executor per function in flight. + // TODO: can we do this without a lock, or can we put the lock + // elsewhere? + faabric::util::FullLock lock(mx); if (isThreads) { - /* TODO: thread execution // Threads use the existing executor. We assume there's only // one running at a time. std::vector>& thisExecutors = @@ -531,16 +455,15 @@ void Scheduler::executeBatch(std::shared_ptr req) assert(e != nullptr); // Execute the tasks + // TODO: make the default of executeTasks take all messages in req? + std::vector thisHostIdxs(req->messages_size()); + std::iota(thisHostIdxs.begin(), thisHostIdxs.end(), 0); e->executeTasks(thisHostIdxs, req); - */ } else { // Non-threads require one executor per task for (int i = 0; i < nMessages; i++) { faabric::Message& localMsg = req->mutable_messages()->at(i); - // TODO: can we do this without a lock, or can we put the lock - // elsewhere? - faabric::util::FullLock lock(mx); std::shared_ptr e = claimExecutor(localMsg, lock); e->executeTasks({ i }, req); } @@ -694,6 +617,7 @@ void Scheduler::flushLocally() getExecutorFactory()->flushHost(); } +// TODO(scheduler-cleanup): move this to the planner completely void Scheduler::setFunctionResult(faabric::Message& msg) { // Record which host did the execution @@ -702,13 +626,6 @@ void Scheduler::setFunctionResult(faabric::Message& msg) // Set finish timestamp msg.set_finishtimestamp(faabric::util::getGlobalClock().epochMillis()); - // Remove the app from in-flight map if still there, and this host is the - // main host for the message - // TODO: remove me - if (msg.mainhost() == thisHost) { - removePendingMigration(msg.appid()); - } - // Let the planner know this function has finished execution. This will // wake any thread waiting on this result faabric::planner::getPlannerClient().setMessageResult( @@ -858,12 +775,6 @@ void Scheduler::setThisHostResources(faabric::HostResources& res) this->thisHostUsedSlots.store(res.usedslots(), std::memory_order_release); } -faabric::HostResources Scheduler::getHostResources(const std::string& host) -{ - SPDLOG_TRACE("Requesting resources from {}", host); - return getFunctionCallClient(host)->getResources(); -} - // -------------------------------------------- // EXECUTION GRAPH // -------------------------------------------- @@ -878,256 +789,63 @@ std::string getChainedKey(unsigned int msgId) // MIGRATION // ---------------------------------------- -void FunctionMigrationThread::doWork() -{ - getScheduler().checkForMigrationOpportunities(); -} - -void Scheduler::checkForMigrationOpportunities() -{ - std::vector> - tmpPendingMigrations; - - { - // Acquire a shared lock to read from the in-flight requests map - faabric::util::SharedLock lock(mx); - - tmpPendingMigrations = doCheckForMigrationOpportunities(); - } - - // If we find migration opportunites - if (tmpPendingMigrations.size() > 0) { - // Acquire full lock to write to the pending migrations map - faabric::util::FullLock lock(mx); - - for (auto msgPtr : tmpPendingMigrations) { - // First, broadcast the pending migrations to other hosts - broadcastPendingMigrations(msgPtr); - // Second, update our local records - pendingMigrations[msgPtr->appid()] = std::move(msgPtr); +// To check for migration opportunities, we request a scheduling decision for +// the same batch execute request +std::shared_ptr +Scheduler::checkForMigrationOpportunities(faabric::Message& msg, + int overwriteNewGroupId) +{ + int appId = msg.appid(); + int groupId = msg.groupid(); + int groupIdx = msg.groupidx(); + SPDLOG_DEBUG("Message {}:{}:{} checking for migration opportunities", + appId, + groupId, + groupIdx); + + // TODO: maybe we could move this into a broker-specific function? + int newGroupId = 0; + if (groupIdx == 0) { + auto req = + faabric::util::batchExecFactory(msg.user(), msg.function(), 1); + faabric::util::updateBatchExecAppId(req, msg.appid()); + faabric::util::updateBatchExecGroupId(req, msg.groupid()); + req->set_type(faabric::BatchExecuteRequest::MIGRATION); + auto decision = planner::getPlannerClient().callFunctions(req); + newGroupId = decision.groupId; + + // Send the new group id to all the members of the group + auto groupIdxs = broker.getIdxsRegisteredForGroup(groupId); + groupIdxs.erase(0); + for (const auto& recvIdx : groupIdxs) { + broker.sendMessage( + groupId, 0, recvIdx, BYTES_CONST(&newGroupId), sizeof(int)); } + } else if (overwriteNewGroupId == 0) { + std::vector bytes = broker.recvMessage(groupId, 0, groupIdx); + newGroupId = faabric::util::bytesToInt(bytes); + } else { + // In some settings, like tests, we already know the new group id, so + // we can set it here + newGroupId = overwriteNewGroupId; } -} - -void Scheduler::broadcastPendingMigrations( - std::shared_ptr pendingMigrations) -{ - // Get all hosts for the to-be migrated app - auto msg = pendingMigrations->migrations().at(0).msg(); - const std::set& thisRegisteredHosts = - getFunctionRegisteredHosts(msg.user(), msg.function(), false); - - // Remove this host from the set - registeredHosts.erase(thisHost); - - // Send pending migrations to all involved hosts - for (auto& otherHost : thisRegisteredHosts) { - getFunctionCallClient(otherHost)->sendPendingMigrations( - pendingMigrations); - } -} -std::shared_ptr Scheduler::getPendingAppMigrations( - uint32_t appId) -{ - faabric::util::SharedLock lock(mx); - - if (pendingMigrations.find(appId) == pendingMigrations.end()) { + bool appMustMigrate = newGroupId != groupId; + if (!appMustMigrate) { return nullptr; } - return pendingMigrations[appId]; -} - -void Scheduler::addPendingMigration( - std::shared_ptr pMigration) -{ - faabric::util::FullLock lock(mx); - - auto msg = pMigration->migrations().at(0).msg(); - if (pendingMigrations.find(msg.appid()) != pendingMigrations.end()) { - SPDLOG_ERROR("Received remote request to add a pending migration for " - "app {}, but already recorded another migration request" - " for the same app.", - msg.appid()); - throw std::runtime_error("Remote request for app already there"); - } - - pendingMigrations[msg.appid()] = pMigration; -} + msg.set_groupid(newGroupId); + broker.waitForMappingsOnThisHost(newGroupId); + std::string newHost = broker.getHostForReceiver(newGroupId, groupIdx); -void Scheduler::removePendingMigration(uint32_t appId) -{ - faabric::util::FullLock lock(mx); + auto migration = std::make_shared(); + migration->set_appid(appId); + migration->set_groupid(newGroupId); + migration->set_groupidx(groupIdx); + migration->set_srchost(thisHost); + migration->set_dsthost(newHost); - inFlightRequests.erase(appId); - pendingMigrations.erase(appId); -} - -std::vector> -Scheduler::doCheckForMigrationOpportunities( - faabric::batch_scheduler::MigrationStrategy migrationStrategy) -{ - std::vector> - pendingMigrationsVec; - - // For each in-flight request that has opted in to be migrated, - // check if there is an opportunity to migrate - for (const auto& app : inFlightRequests) { - auto req = app.second.first; - auto originalDecision = *app.second.second; - - // If we have already recorded a pending migration for this req, - // skip - if (getPendingAppMigrations(originalDecision.appId) != nullptr) { - SPDLOG_TRACE("Skipping app {} as migration opportunity has " - "already been recorded", - originalDecision.appId); - continue; - } - - faabric::PendingMigrations msg; - msg.set_appid(originalDecision.appId); - - if (migrationStrategy == - faabric::batch_scheduler::MigrationStrategy::BIN_PACK) { - // We assume the batch was originally scheduled using - // bin-packing, thus the scheduling decision has at the begining - // (left) the hosts with the most allocated requests, and at the - // end (right) the hosts with the fewest. To check for migration - // oportunities, we compare a pointer to the possible - // destination of the migration (left), with one to the possible - // source of the migration (right). NOTE - this is a slight - // simplification, but makes the code simpler. - auto left = originalDecision.hosts.begin(); - auto right = originalDecision.hosts.end() - 1; - faabric::HostResources r = (*left == thisHost) - ? getThisHostResources() - : getHostResources(*left); - auto nAvailable = [&r]() -> int { - return r.slots() - r.usedslots(); - }; - auto claimSlot = [&r]() { - int currentUsedSlots = r.usedslots(); - r.set_usedslots(currentUsedSlots + 1); - }; - while (left < right) { - // If both pointers point to the same host, no migration - // opportunity, and must check another possible source of - // the migration - if (*left == *right) { - --right; - continue; - } - - // If the left pointer (possible destination of the - // migration) is out of available resources, no migration - // opportunity, and must check another possible destination - // of migration - if (nAvailable() == 0) { - auto oldHost = *left; - ++left; - if (*left != oldHost) { - r = (*left == thisHost) ? getThisHostResources() - : getHostResources(*left); - } - continue; - } - - // If each pointer points to a request scheduled in a - // different host, and the possible destination has slots, - // there is a migration opportunity - auto* migration = msg.add_migrations(); - migration->set_srchost(*right); - migration->set_dsthost(*left); - - faabric::Message* msgPtr = - &(*(req->mutable_messages()->begin() + - std::distance(originalDecision.hosts.begin(), right))); - auto* migrationMsgPtr = migration->mutable_msg(); - *migrationMsgPtr = *msgPtr; - // Decrement by one the availability, and check for more - // possible sources of migration - claimSlot(); - --right; - } - } else { - SPDLOG_ERROR("Unrecognised migration strategy: {}", - migrationStrategy); - throw std::runtime_error("Unrecognised migration strategy."); - } - - if (msg.migrations_size() > 0) { - pendingMigrationsVec.emplace_back( - std::make_shared(msg)); - SPDLOG_DEBUG("Detected migration opportunity for app: {}", - msg.appid()); - } else { - SPDLOG_DEBUG("No migration opportunity detected for app: {}", - msg.appid()); - } - } - - return pendingMigrationsVec; -} - -// Start the function migration thread if necessary -// NOTE: ideally, instead of allowing the applications to specify a check -// period, we would have a default one (overwritable through an env. -// variable), and apps would just opt in/out of being migrated. We set -// the actual check period instead to ease with experiments. -void Scheduler::doStartFunctionMigrationThread( - std::shared_ptr req, - faabric::batch_scheduler::SchedulingDecision& decision) -{ - bool startMigrationThread = inFlightRequests.size() == 0; - faabric::Message& firstMsg = req->mutable_messages()->at(0); - - if (inFlightRequests.find(decision.appId) != inFlightRequests.end()) { - // MPI applications are made up of two different requests: the - // original one (with one message) and the second one (with - // world size - 1 messages) created during world creation time. - // Thus, to correctly track migration opportunities we must merge - // both. We append the batch request to the original one (instead - // of the other way around) not to affect the rest of this methods - // functionality. - if (firstMsg.ismpi()) { - startMigrationThread = false; - auto originalReq = inFlightRequests[decision.appId].first; - auto originalDecision = inFlightRequests[decision.appId].second; - assert(req->messages_size() == firstMsg.mpiworldsize() - 1); - for (int i = 0; i < firstMsg.mpiworldsize() - 1; i++) { - // Append message to original request - auto* newMsgPtr = originalReq->add_messages(); - *newMsgPtr = req->messages().at(i); - - // Append message to original decision - originalDecision->addMessage(decision.hosts.at(i), - req->messages().at(i)); - } - } else { - SPDLOG_ERROR("There is already an in-flight request for app {}", - firstMsg.appid()); - throw std::runtime_error("App already in-flight"); - } - } else { - auto decisionPtr = - std::make_shared( - decision); - inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr); - } - - // Decide wether we have to start the migration thread or not - if (startMigrationThread) { - functionMigrationThread.start(firstMsg.migrationcheckperiod()); - } else if (firstMsg.migrationcheckperiod() != - functionMigrationThread.getIntervalSeconds()) { - SPDLOG_WARN("Ignoring migration check period for app {} as the" - "migration thread is already running with a different" - " check period (provided: {}, current: {})", - firstMsg.appid(), - firstMsg.migrationcheckperiod(), - functionMigrationThread.getIntervalSeconds()); - } + return migration; } } diff --git a/src/util/batch.cpp b/src/util/batch.cpp index de5ad2d8b..5eef241b7 100644 --- a/src/util/batch.cpp +++ b/src/util/batch.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace faabric::util { std::shared_ptr batchExecFactory() @@ -32,11 +33,15 @@ std::shared_ptr batchExecFactory( bool isBatchExecRequestValid(std::shared_ptr ber) { if (ber == nullptr) { + SPDLOG_ERROR("Ber points to null!"); return false; } // An empty BER (thus invalid) will have 0 messages and an id of 0 if (ber->messages_size() <= 0 && ber->appid() == 0) { + SPDLOG_ERROR("Invalid (uninitialised) BER (size: {} - app id: {})", + ber->messages_size(), + ber->appid()); return false; } @@ -46,6 +51,7 @@ bool isBatchExecRequestValid(std::shared_ptr ber) // If the user or func are empty, the BER is invalid if (user.empty() || func.empty()) { + SPDLOG_ERROR("Unset user ({}) or func ({}) in BER!", user, func); return false; } @@ -54,6 +60,9 @@ bool isBatchExecRequestValid(std::shared_ptr ber) auto msg = ber->messages(i); if (msg.user() != user || msg.function() != func || msg.appid() != appId) { + SPDLOG_ERROR("Malformed message in BER"); + SPDLOG_ERROR("Got: (id: {} - user: {} - func: {} - app: {})", msg.id(), msg.user(), msg.function(), msg.appid()); + SPDLOG_ERROR("Expected: (id: {} - user: {} - func: {} - app: {})", msg.id(), user, func, appId); return false; } } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 019983c10..4f9a5b6d6 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -101,24 +101,27 @@ int32_t TestExecutor::executeTask( std::shared_ptr chainedReq = faabric::util::batchExecFactory("dummy", "thread-check", nThreads); + faabric::util::updateBatchExecAppId(chainedReq, reqOrig->appid()); chainedReq->set_type(faabric::BatchExecuteRequest::THREADS); for (int i = 0; i < chainedReq->messages_size(); i++) { faabric::Message& m = chainedReq->mutable_messages()->at(i); - m.set_appid(msg.appid()); m.set_appidx(i + 1); } // Call the threads - std::vector> results = - executeThreads(chainedReq, {}); + auto& plannerCli = faabric::planner::getPlannerClient(); + plannerCli.callFunctions(chainedReq); // Await the results - for (auto [mid, result] : results) { - if (result != mid / 100) { + for (const auto& msg : chainedReq->messages()) { + // Register thread before we await for it + sch.registerThread(msg.id()); + int32_t result = sch.awaitThreadResult(msg.id()); + if (result != msg.id() / 100) { SPDLOG_ERROR("TestExecutor got invalid thread result, {} != {}", result, - mid / 100); + msg.id() / 100); return 1; } } @@ -224,10 +227,6 @@ int32_t TestExecutor::executeTask( } if (msg.function() == "single-host") { - if (reqOrig->singlehost()) { - return 10; - } - return 20; } @@ -320,8 +319,8 @@ class TestExecutorFixture bool forceLocal) { initThreadSnapshot(req); - conf.overrideCpuCount = 10; - conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; + // conf.overrideCpuCount = 10; + // conf.boundTimeout = SHORT_TEST_TIMEOUT_MS; if (forceLocal) { req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); @@ -427,20 +426,12 @@ TEST_CASE_METHOD(TestExecutorFixture, req->set_type(faabric::BatchExecuteRequest::THREADS); int nLocally = 0; - int nRemotely = 0; SECTION("Single host") { expectedRestoreCount = 0; nLocally = nThreads; } - SECTION("Non-single host") - { - expectedRestoreCount = 1; - nLocally = nThreads - 2; - nRemotely = 2; - } - // Set up a hint to force the scheduler to execute single host or not faabric::batch_scheduler::SchedulingDecision hint(123, 345); std::vector expectedHosts; @@ -448,17 +439,10 @@ TEST_CASE_METHOD(TestExecutorFixture, expectedHosts.emplace_back(thisHost); } - for (int i = 0; i < nRemotely; i++) { - expectedHosts.emplace_back(otherHost); - } - for (int i = 0; i < nThreads; i++) { hint.addMessage(expectedHosts.at(i), req->messages().at(i)); } - // Turn mock mode on to catch any cross-host messages - setMockMode(true); - // Execute the functions std::vector actualHosts = executeWithTestExecutorHint(req, hint); @@ -472,16 +456,10 @@ TEST_CASE_METHOD(TestExecutorFixture, // Check sent to other host if necessary auto batchRequests = getBatchRequests(); - if (nRemotely > 0) { - REQUIRE(batchRequests.size() == 1); - } // Check the hosts match up REQUIRE(actualHosts == expectedHosts); REQUIRE(restoreCount == expectedRestoreCount); - - // Turn off mock mode - setMockMode(false); } TEST_CASE_METHOD(TestExecutorFixture, @@ -489,7 +467,6 @@ TEST_CASE_METHOD(TestExecutorFixture, "[executor]") { int nThreads = 0; - SECTION("Overloaded") { nThreads = 100; } SECTION("Underloaded") { nThreads = 10; } @@ -530,18 +507,15 @@ TEST_CASE_METHOD(TestExecutorFixture, int nThreads; SECTION("Underloaded") { nThreads = 8; } - SECTION("Overloaded") { nThreads = 100; } - SECTION("Underloaded no single host optimisation") { nThreads = 10; conf.noSingleHostOptimisations = 1; } - std::shared_ptr req = - faabric::util::batchExecFactory("dummy", "thread-check", 1); + auto req = faabric::util::batchExecFactory("dummy", "thread-check", 1); faabric::Message msg = req->messages(0); - msg.set_inputdata(std::to_string(nThreads)); + req->mutable_messages(0)->set_inputdata(std::to_string(nThreads)); std::vector actualHosts = executeWithTestExecutor(req, false); std::vector expectedHosts = { conf.endpointHost }; @@ -558,23 +532,21 @@ TEST_CASE_METHOD(TestExecutorFixture, // We really want to stress things here, but it's quite quick to run, so // don't be afraid to bump up the number of threads int nRepeats = 10; - int nThreads = 1000; - - std::shared_ptr fac = - std::make_shared(); - setExecutorFactory(fac); + int nThreads = 10; - conf.overrideCpuCount = 10; + conf.overrideCpuCount = (nThreads + 1) * nRepeats; conf.boundTimeout = LONG_TEST_TIMEOUT_MS; + faabric::HostResources hostResources; + hostResources.set_slots(conf.overrideCpuCount); + sch.setThisHostResources(hostResources); for (int i = 0; i < nRepeats; i++) { std::shared_ptr req = faabric::util::batchExecFactory("dummy", "thread-check", 1); faabric::Message msg = req->messages(0); - msg.set_inputdata(std::to_string(nThreads)); + req->mutable_messages(0)->set_inputdata(std::to_string(nThreads)); - std::vector actualHosts = - executeWithTestExecutor(req, false); + auto actualHosts = executeWithTestExecutor(req, false); std::vector expectedHosts = { conf.endpointHost }; REQUIRE(actualHosts == expectedHosts); @@ -582,10 +554,16 @@ TEST_CASE_METHOD(TestExecutorFixture, plannerCli.getMessageResult(msg, LONG_TEST_TIMEOUT_MS); REQUIRE(res.returnvalue() == 0); + for (int mid : faabric::util::getChainedFunctions(msg)) { + auto chainedRes = plannerCli.getMessageResult(msg.appid(), mid, LONG_TEST_TIMEOUT_MS); + REQUIRE(chainedRes.returnvalue() == 0); + } + sch.reset(); } } +/* TODO(remote-threads): we currently disable remote threads TEST_CASE_METHOD(TestExecutorFixture, "Test thread results returned on non-main", "[executor]") @@ -629,6 +607,7 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(actualMessageIds == messageIds); } +*/ TEST_CASE_METHOD(TestExecutorFixture, "Test non-zero return code", "[executor]") { @@ -791,6 +770,7 @@ TEST_CASE_METHOD(TestExecutorFixture, exec->shutdown(); } +/* TODO(remote-threads): we currently disable remote threads TEST_CASE_METHOD(TestExecutorFixture, "Test snapshot diffs returned to main", "[executor]") @@ -884,7 +864,9 @@ TEST_CASE_METHOD(TestExecutorFixture, // Check no merge regions left on the snapshot REQUIRE(reg.getSnapshot(mainThreadSnapshotKey)->getMergeRegions().empty()); } +*/ +/* TODO(remote-threads): we currently disable remote threads TEST_CASE_METHOD(TestExecutorFixture, "Test snapshot diffs pushed to workers after initial snapshot", "[executor]") @@ -1002,15 +984,12 @@ TEST_CASE_METHOD(TestExecutorFixture, expectedDiffs.at(i).getData().size()); } } +*/ TEST_CASE_METHOD(TestExecutorFixture, "Test reset called for functions not threads", "[executor]") { - faabric::util::setMockMode(true); - - conf.overrideCpuCount = 4; - std::string hostOverride = conf.endpointHost; int nMessages = 1; faabric::BatchExecuteRequest::BatchExecuteType requestType = @@ -1054,8 +1033,7 @@ TEST_CASE_METHOD(TestExecutorFixture, msgIds.push_back(m.id()); } - // Call functions and force to execute locally - req->mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); + // Call functions plannerCli.callFunctions(req); // Await execution @@ -1087,32 +1065,16 @@ TEST_CASE_METHOD(TestExecutorFixture, int appId = req->messages(0).appid(); std::vector msgIds; - int expectedResult = 0; - SECTION("Single host") { expectedResult = 10; } - - SECTION("Single host disabled in conf") - { - expectedResult = 20; - conf.noSingleHostOptimisations = 1; - } - - SECTION("Not single host") - { - expectedResult = 20; - singleHosts[1] = otherHost; - singleHosts[2] = otherHost; - } + int expectedResult = 20; + // TODO(remote-threads): we ignore hints, so this has no effect faabric::batch_scheduler::SchedulingDecision hint(123, 123); for (int i = 0; i < nMessages; i++) { hint.addMessage(singleHosts[i], req->messages().at(i)); msgIds.push_back(req->messages(i).id()); } - // Mock mode to avoid requests sent across hosts - setMockMode(true); executeWithTestExecutorHint(req, hint); - setMockMode(false); // Await results on this host for (int i = 0; i < nMessages; i++) { diff --git a/tests/test/scheduler/test_executor_reaping.cpp b/tests/test/scheduler/test_executor_reaping.cpp index ac579c65b..bc7fb3994 100644 --- a/tests/test/scheduler/test_executor_reaping.cpp +++ b/tests/test/scheduler/test_executor_reaping.cpp @@ -13,6 +13,7 @@ namespace tests { class SchedulerReapingTestFixture : public SchedulerFixture + , public FunctionCallClientServerFixture , public ConfFixture { public: @@ -60,6 +61,7 @@ TEST_CASE_METHOD(SchedulerReapingTestFixture, auto req = faabric::util::batchExecFactory("foo", "bar", nMsgs); faabric::Message& firstMsg = req->mutable_messages()->at(0); plannerCli.callFunctions(req); + plannerCli.getMessageResult(firstMsg, 500); // Check executor count REQUIRE(sch.getFunctionExecutorCount(firstMsg) == nMsgs); diff --git a/tests/test/scheduler/test_function_client_server.cpp b/tests/test/scheduler/test_function_client_server.cpp index d56a879e0..5fe3d2a18 100644 --- a/tests/test/scheduler/test_function_client_server.cpp +++ b/tests/test/scheduler/test_function_client_server.cpp @@ -74,6 +74,10 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, plannerCli.callFunctions(reqA); plannerCli.callFunctions(reqB); + // Wait for functions to finish + plannerCli.getMessageResult(msgA, 2000); + plannerCli.getMessageResult(msgB, 2000); + // Check messages passed std::vector msgs = sch.getRecordedMessagesAll(); REQUIRE(msgs.size() == 2); @@ -81,10 +85,6 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, REQUIRE(msgs.at(1).function() == "bar"); sch.clearRecordedMessages(); - // Wait for functions to finish - plannerCli.getMessageResult(msgA, 2000); - plannerCli.getMessageResult(msgB, 2000); - // Check executors present REQUIRE(sch.getFunctionExecutorCount(msgA) == 1); REQUIRE(sch.getFunctionExecutorCount(msgB) == 1); @@ -130,102 +130,6 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, REQUIRE(sch.getRecordedMessagesShared().empty()); } -TEST_CASE_METHOD(FunctionClientServerTestFixture, - "Test get resources request", - "[scheduler]") -{ - int expectedSlots; - int expectedUsedSlots; - - faabric::HostResources originalResources; - originalResources.set_slots(sch.getThisHostResources().slots()); - - SECTION("Override resources") - { - faabric::HostResources res; - - expectedSlots = 10; - expectedUsedSlots = 15; - - res.set_slots(expectedSlots); - res.set_usedslots(expectedUsedSlots); - - sch.setThisHostResources(res); - } - SECTION("Default resources") - { - expectedSlots = sch.getThisHostResources().slots(); - expectedUsedSlots = 0; - } - - // Make the request - faabric::HostResources resResponse = functionCallClient.getResources(); - - REQUIRE(resResponse.slots() == expectedSlots); - REQUIRE(resResponse.usedslots() == expectedUsedSlots); - - // Reset the host resources - sch.setThisHostResources(originalResources); -} - -TEST_CASE_METHOD(FunctionClientServerTestFixture, - "Test unregister request", - "[scheduler]") -{ - faabric::util::setMockMode(true); - std::string otherHost = "other"; - - faabric::HostResources originalResources; - originalResources.set_slots(sch.getThisHostResources().slots()); - - // Remove capacity from this host and add on other - faabric::HostResources thisResources; - faabric::HostResources otherResources; - thisResources.set_slots(0); - otherResources.set_slots(5); - - sch.setThisHostResources(thisResources); - faabric::scheduler::queueResourceResponse(otherHost, otherResources); - - // Request a function and check the other host is registered - auto funcReq = faabric::util::batchExecFactory("foo", "bar", 1); - auto& msg = *funcReq->mutable_messages(0); - sch.addHostToGlobalSet(otherHost); - plannerCli.callFunctions(funcReq); - - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 1); - - faabric::util::setMockMode(false); - faabric::scheduler::clearMockRequests(); - - // Make the request with a host that's not registered - faabric::UnregisterRequest reqA; - reqA.set_host("foobar"); - reqA.set_user(msg.user()); - reqA.set_function(msg.function()); - - // Check that nothing's happened - functionCallServer.setRequestLatch(); - functionCallClient.unregister(reqA); - functionCallServer.awaitRequestLatch(); - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 1); - - // Make the request to unregister the actual host - faabric::UnregisterRequest reqB; - reqB.set_host(otherHost); - reqB.set_user(msg.user()); - reqB.set_function(msg.function()); - - functionCallServer.setRequestLatch(); - functionCallClient.unregister(reqB); - functionCallServer.awaitRequestLatch(); - - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 0); - - sch.setThisHostResources(originalResources); - faabric::scheduler::clearMockRequests(); -} - TEST_CASE_METHOD(FunctionClientServerTestFixture, "Test setting a message result with the function call client", "[scheduler]") diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp index 448252912..db749507d 100644 --- a/tests/test/scheduler/test_function_migration.cpp +++ b/tests/test/scheduler/test_function_migration.cpp @@ -34,7 +34,6 @@ class FunctionMigrationTestFixture : public SchedulerFixture } protected: - FunctionMigrationThread migrationThread; std::string mainHost = faabric::util::getSystemConfig().endpointHost; // Helper method to set the available hosts and slots per host prior to @@ -57,9 +56,7 @@ class FunctionMigrationTestFixture : public SchedulerFixture if (i == 0) { sch.setThisHostResources(resources); } else { - sch.addHostToGlobalSet(registeredHosts.at(i)); - faabric::scheduler::queueResourceResponse(registeredHosts.at(i), - resources); + sch.addHostToGlobalSet(registeredHosts.at(i), std::make_shared(resources)); } } } @@ -71,331 +68,82 @@ class FunctionMigrationTestFixture : public SchedulerFixture r.set_usedslots(usedSlots); sch.setThisHostResources(r); } - - std::shared_ptr - buildPendingMigrationsExpectation( - std::shared_ptr req, - std::vector hosts, - std::vector> migrations) - { - faabric::PendingMigrations expected; - expected.set_appid(req->messages().at(0).appid()); - - for (auto pair : migrations) { - auto* migration = expected.add_migrations(); - auto* migrationMsg = migration->mutable_msg(); - *migrationMsg = req->mutable_messages()->at(pair.first); - migration->set_srchost(hosts.at(pair.first)); - migration->set_dsthost(hosts.at(pair.second)); - } - - return std::make_shared(expected); - } - - void checkPendingMigrationsExpectation( - std::shared_ptr expectedMigrations, - std::shared_ptr actualMigrations, - std::vector hosts, - bool skipMsgIdCheck = false) - { - if (expectedMigrations == nullptr) { - REQUIRE(actualMigrations == expectedMigrations); - } else { - // Check actual migration matches expectation - REQUIRE(actualMigrations->appid() == expectedMigrations->appid()); - REQUIRE(actualMigrations->migrations_size() == - expectedMigrations->migrations_size()); - for (int i = 0; i < actualMigrations->migrations_size(); i++) { - auto actual = actualMigrations->mutable_migrations()->at(i); - auto expected = expectedMigrations->mutable_migrations()->at(i); - if (!skipMsgIdCheck) { - REQUIRE(actual.msg().id() == expected.msg().id()); - } - REQUIRE(actual.srchost() == expected.srchost()); - REQUIRE(actual.dsthost() == expected.dsthost()); - } - - // Check we have sent a message to all other hosts with the pending - // migration - auto pendingRequests = getPendingMigrationsRequests(); - REQUIRE(pendingRequests.size() == hosts.size() - 1); - for (auto& pendingReq : getPendingMigrationsRequests()) { - std::string host = pendingReq.first; - std::shared_ptr migration = - pendingReq.second; - auto it = std::find(hosts.begin(), hosts.end(), host); - REQUIRE(it != hosts.end()); - REQUIRE(migration == actualMigrations); - } - } - } }; TEST_CASE_METHOD(FunctionMigrationTestFixture, - "Test starting and stopping the function migration thread", + "Test we can detect migration opportunities", "[scheduler]") -{ - int wakeUpPeriodSeconds = 2; - migrationThread.start(wakeUpPeriodSeconds); - - SLEEP_MS(SHORT_TEST_TIMEOUT_MS); - - migrationThread.stop(); -} - -TEST_CASE_METHOD( - FunctionMigrationTestFixture, - "Test migration opportunities are only detected if set in the message", - "[scheduler]") { // First set resources before calling the functions: one will be allocated // locally, another one in the remote host - std::vector hosts = { mainHost, "hostA" }; + // TODO: proper mocking of a second host + std::vector hosts = { mainHost, LOCALHOST }; std::vector slots = { 1, 1 }; std::vector usedSlots = { 0, 0 }; setHostResources(hosts, slots, usedSlots); // Second, prepare the request we will migrate in-flight. - // NOTE: the sleep function sleeps for a set timeout before returning. auto req = faabric::util::batchExecFactory("foo", "sleep", 2); - int timeToSleep = SHORT_TEST_TIMEOUT_MS; - req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - uint32_t appId = req->messages().at(0).appid(); - uint32_t msgId = req->messages().at(0).id(); - - // Build expected pending migrations - std::shared_ptr expectedMigrations; - SECTION("Migration not enabled") { expectedMigrations = nullptr; } - - SECTION("Migration enabled") - { - // Set to a non-zero value so that migration is enabled - req->mutable_messages()->at(0).set_migrationcheckperiod(2); - - // Build expected migrations - std::vector> migrations = { { 1, 0 } }; - expectedMigrations = - buildPendingMigrationsExpectation(req, hosts, migrations); + for (int i = 0; i < req->messages_size(); i++) { + req->mutable_messages(i)->set_groupidx(i); } auto decision = plannerCli.callFunctions(req); // Update host resources so that a migration opportunity appears, but will // only be detected if migration check period is set. - updateLocalResources(2, 1); - - sch.checkForMigrationOpportunities(); - - auto actualMigrations = sch.getPendingAppMigrations(appId); - checkPendingMigrationsExpectation( - expectedMigrations, actualMigrations, hosts); - - faabric::Message res = - plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); - REQUIRE(res.returnvalue() == 0); - - // Check that after the result is set, the app can't be migrated no more - sch.checkForMigrationOpportunities(); - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); -} - -TEST_CASE_METHOD(FunctionMigrationTestFixture, - "Test checking for migration opportunities", - "[scheduler]") -{ - std::vector hosts = { mainHost, "hostA" }; - std::vector slots = { 1, 1 }; - std::vector usedSlots = { 0, 0 }; - setHostResources(hosts, slots, usedSlots); - - auto req = faabric::util::batchExecFactory("foo", "sleep", 2); - int timeToSleep = SHORT_TEST_TIMEOUT_MS; - req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - uint32_t appId = req->messages().at(0).appid(); - uint32_t msgId = req->messages().at(0).id(); - - // By setting the check period to a non-zero value, we are effectively - // opting in to be considered for migration - req->mutable_messages()->at(0).set_migrationcheckperiod(2); - - auto decision = plannerCli.callFunctions(req); - - std::shared_ptr expectedMigrations; - - // As we don't update the available resources, no migration opportunities - // will appear, even though we are checking for them - SECTION("Can not migrate") { expectedMigrations = nullptr; } - - SECTION("Can migrate") + bool mustMigrate; + SECTION("Must migrate") { - // Update host resources so that a migration opportunity appears + mustMigrate = true; updateLocalResources(2, 1); - - // Build expected migrations - std::vector> migrations = { { 1, 0 } }; - expectedMigrations = - buildPendingMigrationsExpectation(req, hosts, migrations); } - sch.checkForMigrationOpportunities(); - - auto actualMigrations = sch.getPendingAppMigrations(appId); - checkPendingMigrationsExpectation( - expectedMigrations, actualMigrations, hosts); - - faabric::Message res = - plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); - REQUIRE(res.returnvalue() == 0); - - // Check that after the result is set, the app can't be migrated no more - sch.checkForMigrationOpportunities(); - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); -} - -TEST_CASE_METHOD( - FunctionMigrationTestFixture, - "Test detecting migration opportunities for several messages and hosts", - "[scheduler]") -{ - // First set resources before calling the functions: one request will be - // allocated to each host - std::vector hosts = { mainHost, "hostA", "hostB", "hostC" }; - std::vector slots = { 1, 1, 1, 1 }; - std::vector usedSlots = { 0, 0, 0, 0 }; - setHostResources(hosts, slots, usedSlots); - - auto req = faabric::util::batchExecFactory("foo", "sleep", 4); - int timeToSleep = SHORT_TEST_TIMEOUT_MS; - req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - uint32_t appId = req->messages().at(0).appid(); - uint32_t msgId = req->messages().at(0).id(); - - // Opt in to be considered for migration - req->mutable_messages()->at(0).set_migrationcheckperiod(2); - - auto decision = plannerCli.callFunctions(req); - - // Set up expectations - std::shared_ptr expectedMigrations; - SECTION("Can not migrate") { expectedMigrations = nullptr; } - - SECTION("Can migrate") - { - // Update host resources so that two migration opportunities appear in - // different hosts. - std::vector newSlots = { 2, 2, 1, 1 }; - std::vector newUsedSlots = { 1, 1, 1, 1 }; - setHostResources(hosts, newSlots, newUsedSlots); - - // Build expected result: two migrations - std::vector> migrations = { { 3, 0 }, { 2, 1 } }; - expectedMigrations = - buildPendingMigrationsExpectation(req, hosts, migrations); + SECTION("Must not migrate") { mustMigrate = false; } + + // The group leader (message with index 0) will detect the migration, but + // does not have to migrate + auto migration0 = + sch.checkForMigrationOpportunities(*req->mutable_messages(0)); + if (mustMigrate) { + REQUIRE(migration0 != nullptr); + // App id is the same, but group id has changed as the distribution has + // changed + REQUIRE(migration0->appid() == decision.appId); + REQUIRE(migration0->groupid() != decision.groupId); + // Group idx 0 does not have to migrate + REQUIRE(migration0->groupidx() == 0); + REQUIRE(migration0->srchost() == migration0->dsthost()); + REQUIRE(decision.hosts.at(0) == migration0->dsthost()); + + // Group idx 1 must migrate. Note that we manually set the new group + // id, we only have to do this in the tests + auto migration1 = sch.checkForMigrationOpportunities( + *req->mutable_messages(1), migration0->groupid()); + REQUIRE(migration1->appid() == decision.appId); + REQUIRE(migration1->groupid() != decision.groupId); + // Group idx 0 does not have to migrate + REQUIRE(migration1->groupidx() == 1); + REQUIRE(migration1->dsthost() != decision.hosts.at(1)); + REQUIRE(migration1->dsthost() == mainHost); + } else { + REQUIRE(migration0 == nullptr); + auto migration1 = sch.checkForMigrationOpportunities( + *req->mutable_messages(1), decision.groupId); + REQUIRE(migration1 == nullptr); } - sch.checkForMigrationOpportunities(); - - auto actualMigrations = sch.getPendingAppMigrations(appId); - checkPendingMigrationsExpectation( - expectedMigrations, actualMigrations, hosts); - - faabric::Message res = - plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); - REQUIRE(res.returnvalue() == 0); - - // Check that after the result is set, the app can't be migrated no more - sch.checkForMigrationOpportunities(); - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); + sch.setFunctionResult(*req->mutable_messages(0)); + sch.setFunctionResult(*req->mutable_messages(1)); } -// TODO(flaky): fix test /* -TEST_CASE_METHOD( - FunctionMigrationTestFixture, - "Test function migration thread detects migration opportunities", - "[scheduler][.]") -{ - std::vector hosts = { mainHost, "hostA" }; - std::vector slots = { 1, 1 }; - std::vector usedSlots = { 0, 0 }; - setHostResources(hosts, slots, usedSlots); - - auto req = faabric::util::batchExecFactory("foo", "sleep", 2); - int checkPeriodSecs = 1; - int timeToSleep = 4 * checkPeriodSecs * 1000; - req->mutable_messages()->at(0).set_inputdata(std::to_string(timeToSleep)); - uint32_t appId = req->messages().at(0).appid(); - uint32_t msgId = req->messages().at(0).id(); - - // Opt in to be migrated - req->mutable_messages()->at(0).set_migrationcheckperiod(checkPeriodSecs); - - auto decision = sch.callFunctions(req); - - std::shared_ptr expectedMigrations; - - // As we don't update the available resources, no migration opportunities - // will appear, even though we are checking for them - SECTION("Can not migrate") { expectedMigrations = nullptr; } - - SECTION("Can migrate") - { - // Update host resources so that a migration opportunity appears - updateLocalResources(2, 1); - - // Build expected migrations - std::vector> migrations = { { 1, 0 } }; - expectedMigrations = - buildPendingMigrationsExpectation(req, hosts, migrations); - } - - // Instead of directly calling the scheduler function to check for migration - // opportunites, sleep for enough time (twice the check period) so that a - // migration is detected by the background thread. - SLEEP_MS(2 * checkPeriodSecs * 1000); - - auto actualMigrations = sch.getPendingAppMigrations(appId); - checkPendingMigrationsExpectation( - expectedMigrations, actualMigrations, hosts); - - faabric::Message res = sch.getFunctionResult(appId, msgId, 2 * timeToSleep); - REQUIRE(res.returnvalue() == 0); - - SLEEP_MS(100); - - // Check that after the result is set, the app can't be migrated no more - sch.checkForMigrationOpportunities(); - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); -} -*/ - -TEST_CASE_METHOD(FunctionMigrationTestFixture, - "Test adding and removing pending migrations manually", - "[scheduler]") -{ - auto req = faabric::util::batchExecFactory("foo", "sleep", 2); - uint32_t appId = req->messages().at(0).appid(); - std::vector hosts = { mainHost, "hostA" }; - std::vector> migrations = { { 1, 0 } }; - auto expectedMigrations = - buildPendingMigrationsExpectation(req, hosts, migrations); - - // Add migration manually - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); - sch.addPendingMigration(expectedMigrations); - REQUIRE(sch.getPendingAppMigrations(appId) == expectedMigrations); - - // Remove migration manually - sch.removePendingMigration(appId); - REQUIRE(sch.getPendingAppMigrations(appId) == nullptr); -} - TEST_CASE_METHOD(FunctionMigrationTestFixture, - "Test MPI function migration points", + "Test MPI migration opportunities", "[scheduler]") { // Set up host resources - std::vector hosts = { mainHost, "hostA" }; + std::vector hosts = { mainHost, LOCALHOST }; std::vector slots = { 2, 2 }; std::vector usedSlots = { 0, 0 }; setHostResources(hosts, slots, usedSlots); @@ -415,59 +163,84 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, firstMsg->set_ismpi(true); firstMsg->set_mpiworldsize(worldSize); firstMsg->set_mpiworldid(worldId); - firstMsg->set_migrationcheckperiod(checkPeriodSecs); - uint32_t appId = req->messages().at(0).appid(); - uint32_t msgId = req->messages().at(0).id(); // Call function that wil just sleep auto decision = plannerCli.callFunctions(req); + // updateGroupId(req, decision.groupId); // Manually create the world, and trigger a second function invocation in // the remote host faabric::mpi::MpiWorld world; - // Note that we deliberately pass a copy of the message. The `world.create` - // method modifies the passed message, which can race with the thread pool - // thread executing the message. Note that, normally, the thread pool - // thread _would_ be calling world.create itself, thus not racing - auto firstMsgCopy = req->messages(0); - world.create(firstMsgCopy, worldId, worldSize); + world.create(*firstMsg, worldId, worldSize); // Update host resources so that a migration opportunity appears - updateLocalResources(4, 2); - - // Build expected migrations - std::shared_ptr expectedMigrations; - - // We need to add to the original request the ones that will be - // chained by MPI (this is only needed to build the expectation). - // NOTE: we do it in a copy of the original request, as otherwise TSAN - // complains about a data race. - auto reqCopy = faabric::util::batchExecFactory("mpi", "sleep", worldSize); - for (int i = 0; i < worldSize; i++) { - reqCopy->mutable_messages(i)->set_appid(firstMsg->appid()); + bool mustMigrate; + SECTION("Must migrate") + { + mustMigrate = true; + updateLocalResources(4, 2); } - std::vector> migrations = { { 1, 0 }, { 1, 0 } }; - expectedMigrations = - buildPendingMigrationsExpectation(reqCopy, hosts, migrations); - - // Instead of directly calling the scheduler function to check for migration - // opportunites, sleep for enough time (twice the check period) so that a - // migration is detected by the background thread. - SLEEP_MS(2 * checkPeriodSecs * 1000); + SECTION("Must not migrate") { mustMigrate = false; } + + auto oldDecision = plannerCli.getSchedulingDecision(req); + // Check the expected migration for each rank. The expected migration will + // not be null if we have updated the local resoruces. In that case, only + // group idxs 2 and 3 will migrate + std::shared_ptr expectedMigration; + expectedMigration = + sch.checkForMigrationOpportunities(*req->mutable_messages(0)); + auto newDecision = plannerCli.getSchedulingDecision(req); + + REQUIRE(oldDecision.appId == newDecision.appId); + REQUIRE(oldDecision.hosts.size() == newDecision.hosts.size()); + if (mustMigrate) { + REQUIRE(expectedMigration != nullptr); + REQUIRE(oldDecision.groupId != newDecision.groupId); + REQUIRE(expectedMigration->appid() == newDecision.appId); + REQUIRE(expectedMigration->groupid() == newDecision.groupId); + REQUIRE(expectedMigration->groupidx() == 0); + REQUIRE(expectedMigration->srchost() == oldDecision.hosts.at(0)); + REQUIRE(expectedMigration->dsthost() == newDecision.hosts.at(0)); + } else { + REQUIRE(expectedMigration == nullptr); + REQUIRE(oldDecision.groupId == newDecision.groupId); + } - // When checking that a migration has taken place in MPI, we skip the msg - // id check. Part of the request is built by the runtime, and therefore - // we don't have access to the actual messages scheduled. - auto actualMigrations = sch.getPendingAppMigrations(appId); - checkPendingMigrationsExpectation( - expectedMigrations, actualMigrations, hosts, true); + // Impersonate other group idxs + for (int i = 1; i < worldSize; i++) { + faabric::Message msg; + msg.set_appid(oldDecision.appId); + msg.set_groupid(oldDecision.groupId); + msg.set_groupidx(i); + expectedMigration = + sch.checkForMigrationOpportunities(msg, newDecision.groupId); + if (mustMigrate) { + REQUIRE(expectedMigration != nullptr); + REQUIRE(expectedMigration->appid() == newDecision.appId); + REQUIRE(expectedMigration->groupid() == newDecision.groupId); + REQUIRE(expectedMigration->groupidx() == i); + // Note that we don't check the source host in the expected + // migration because we set the field to `thisHost`, which we can't + // impersonate in the tests + REQUIRE(expectedMigration->dsthost() == newDecision.hosts.at(i)); + // Check that group idxs 2 and 3 migrate to a different host + if (i > 1) { + REQUIRE(oldDecision.hosts.at(i) != newDecision.hosts.at(i)); + } else { + REQUIRE(oldDecision.hosts.at(i) == newDecision.hosts.at(i)); + } + } else { + REQUIRE(expectedMigration == nullptr); + } + } faabric::Message res = - plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); + sch.getFunctionResult(req->messages().at(0), 2 * timeToSleep); REQUIRE(res.returnvalue() == 0); // Clean up world.destroy(); } +*/ } diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 66ab32941..980aeeff3 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -327,11 +327,6 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, } } - // Check resource requests have been made to other host - auto resRequestsOne = faabric::scheduler::getResourceRequests(); - REQUIRE(resRequestsOne.size() == 1); - REQUIRE(resRequestsOne.at(0).first == otherHost); - // Check snapshots have been pushed auto snapshotPushes = faabric::snapshot::getSnapshotPushes(); if (expectedSnapshot.empty()) { @@ -424,11 +419,6 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, } } - // Check resource request made again - auto resRequestsTwo = faabric::scheduler::getResourceRequests(); - REQUIRE(resRequestsTwo.size() == 1); - REQUIRE(resRequestsTwo.at(0).first == otherHost); - // Check no other functions have been scheduled on this host REQUIRE(sch.getRecordedMessagesLocal().size() == (2 * thisCores)); REQUIRE(sch.getRecordedMessagesShared().size() == @@ -548,48 +538,6 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, } } -TEST_CASE_METHOD(SlowExecutorTestFixture, - "Test unregistering host", - "[scheduler]") -{ - faabric::util::setMockMode(true); - - std::string thisHost = faabric::util::getSystemConfig().endpointHost; - std::string otherHost = "foobar"; - sch.addHostToGlobalSet(otherHost); - - int nCores = 5; - faabric::HostResources res; - res.set_slots(nCores); - sch.setThisHostResources(res); - - // Set up capacity for other host - faabric::scheduler::queueResourceResponse(otherHost, res); - - std::shared_ptr req = - faabric::util::batchExecFactory("foo", "bar", nCores + 1); - plannerCli.callFunctions(req); - faabric::Message msg = req->messages().at(0); - - // Check other host is added - const std::set& expectedHosts = { otherHost }; - REQUIRE(sch.getFunctionRegisteredHosts(msg.user(), msg.function()) == - expectedHosts); - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 1); - - // Remove host for another function and check host isn't removed - faabric::Message otherMsg = faabric::util::messageFactory("foo", "qux"); - sch.removeRegisteredHost(otherHost, otherMsg.user(), otherMsg.function()); - REQUIRE(sch.getFunctionRegisteredHosts(msg.user(), msg.function()) == - expectedHosts); - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 1); - - // Remove host - sch.removeRegisteredHost(otherHost, msg.user(), msg.function()); - REQUIRE(sch.getFunctionRegisteredHosts(msg.user(), msg.function()).empty()); - REQUIRE(sch.getFunctionRegisteredHostCount(msg) == 0); -} - TEST_CASE_METHOD(SlowExecutorTestFixture, "Check test mode", "[scheduler]") { auto reqA = faabric::util::batchExecFactory("demo", "echo", 1);