diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f85309ad..78d0121e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ endfunction() add_subdirectory(src/batch-scheduler) add_subdirectory(src/endpoint) +add_subdirectory(src/executor) add_subdirectory(src/flat) add_subdirectory(src/mpi) add_subdirectory(src/planner) @@ -121,6 +122,7 @@ add_library(faabric faabric.cpp $ $ + $ $ $ $ diff --git a/examples/server.cpp b/examples/server.cpp index 18d50d44a..a6a9d2c56 100644 --- a/examples/server.cpp +++ b/examples/server.cpp @@ -1,9 +1,9 @@ #include +#include #include -#include #include -using namespace faabric::scheduler; +using namespace faabric::executor; class ExampleExecutor : public Executor { diff --git a/include/faabric/executor/Executor.h b/include/faabric/executor/Executor.h new file mode 100644 index 000000000..47e11a295 --- /dev/null +++ b/include/faabric/executor/Executor.h @@ -0,0 +1,119 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace faabric::executor { + +class ChainedCallException : public faabric::util::FaabricException +{ + public: + explicit ChainedCallException(std::string message) + : FaabricException(std::move(message)) + {} +}; + +class Executor +{ + public: + std::string id; + + explicit Executor(faabric::Message& msg); + + // Must be marked virtual to permit proper calling of subclass destructors + virtual ~Executor(); + + void executeTasks(std::vector msgIdxs, + std::shared_ptr req); + + virtual void shutdown(); + + virtual void reset(faabric::Message& msg); + + virtual int32_t executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr req); + + bool tryClaim(); + + void claim(); + + void releaseClaim(); + + std::shared_ptr getMainThreadSnapshot( + faabric::Message& msg, + bool createIfNotExists = false); + + long getMillisSinceLastExec(); + + virtual std::span getMemoryView(); + + virtual void restore(const std::string& snapshotKey); + + faabric::Message& getBoundMessage(); + + bool isExecuting(); + + bool isShutdown() { return _isShutdown; } + + void addChainedMessage(const faabric::Message& msg); + + const faabric::Message& getChainedMessage(int messageId); + + std::set getChainedMessageIds(); + + std::vector mergeDirtyRegions( + const Message& msg, + const std::vector& extraDirtyPages = {}); + + // FIXME: what is the right visibility? + void setThreadResult(faabric::Message& msg, + int32_t returnValue, + const std::string& key, + const std::vector& diffs); + + virtual void setMemorySize(size_t newSize); + + protected: + virtual size_t getMaxMemorySize(); + + faabric::Message boundMessage; + + faabric::snapshot::SnapshotRegistry& reg; + + std::shared_ptr tracker; + + uint32_t threadPoolSize = 0; + + std::map> chainedMessages; + + private: + // ---- Accounting ---- + std::atomic claimed = false; + std::atomic _isShutdown = false; + std::atomic batchCounter = 0; + std::atomic threadBatchCounter = 0; + faabric::util::TimePoint lastExec; + + // ---- Application threads ---- + std::shared_mutex threadExecutionMutex; + std::vector dirtyRegions; + std::vector> threadLocalDirtyRegions; + void deleteMainThreadSnapshot(const faabric::Message& msg); + + // ---- Function execution thread pool ---- + std::mutex threadsMutex; + std::vector> threadPoolThreads; + std::set availablePoolThreads; + + std::vector> threadTaskQueues; + + void threadPoolThread(std::stop_token st, int threadPoolIdx); +}; +} diff --git a/include/faabric/scheduler/ExecutorContext.h b/include/faabric/executor/ExecutorContext.h similarity index 93% rename from include/faabric/scheduler/ExecutorContext.h rename to include/faabric/executor/ExecutorContext.h index 42d1a6449..3b8a97416 100644 --- a/include/faabric/scheduler/ExecutorContext.h +++ b/include/faabric/executor/ExecutorContext.h @@ -1,9 +1,10 @@ #pragma once +#include #include -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class ExecutorContextException : public faabric::util::FaabricException { diff --git a/include/faabric/scheduler/ExecutorFactory.h b/include/faabric/executor/ExecutorFactory.h similarity index 81% rename from include/faabric/scheduler/ExecutorFactory.h rename to include/faabric/executor/ExecutorFactory.h index bb71d67ce..ba549d2c8 100644 --- a/include/faabric/scheduler/ExecutorFactory.h +++ b/include/faabric/executor/ExecutorFactory.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class ExecutorFactory { diff --git a/include/faabric/executor/ExecutorTask.h b/include/faabric/executor/ExecutorTask.h new file mode 100644 index 000000000..ca81f85dc --- /dev/null +++ b/include/faabric/executor/ExecutorTask.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace faabric::executor { + +class ExecutorTask +{ + public: + ExecutorTask() = default; + + ExecutorTask(int messageIndexIn, + std::shared_ptr reqIn); + + // Delete everything copy-related, default everything move-related + ExecutorTask(const ExecutorTask& other) = delete; + + ExecutorTask& operator=(const ExecutorTask& other) = delete; + + ExecutorTask(ExecutorTask&& other) = default; + + ExecutorTask& operator=(ExecutorTask&& other) = default; + + std::shared_ptr req; + int messageIndex = 0; +}; +} diff --git a/include/faabric/runner/FaabricMain.h b/include/faabric/runner/FaabricMain.h index 6a5fa6f72..b34e3893c 100644 --- a/include/faabric/runner/FaabricMain.h +++ b/include/faabric/runner/FaabricMain.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -12,7 +12,7 @@ namespace faabric::runner { class FaabricMain { public: - FaabricMain(std::shared_ptr fac); + FaabricMain(std::shared_ptr fac); void startBackground(); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index ddccb4e0d..0a0092c5b 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -19,137 +20,6 @@ class Scheduler; Scheduler& getScheduler(); -class ExecutorTask -{ - public: - ExecutorTask() = default; - - ExecutorTask(int messageIndexIn, - std::shared_ptr reqIn); - - // Delete everything copy-related, default everything move-related - ExecutorTask(const ExecutorTask& other) = delete; - - ExecutorTask& operator=(const ExecutorTask& other) = delete; - - ExecutorTask(ExecutorTask&& other) = default; - - ExecutorTask& operator=(ExecutorTask&& other) = default; - - std::shared_ptr req; - int messageIndex = 0; -}; - -class ChainedCallException : public faabric::util::FaabricException -{ - public: - explicit ChainedCallException(std::string message) - : FaabricException(std::move(message)) - {} -}; - -class Executor -{ - public: - std::string id; - - explicit Executor(faabric::Message& msg); - - // Must be marked virtual to permit proper calling of subclass destructors - virtual ~Executor(); - - std::vector> executeThreads( - std::shared_ptr req, - const std::vector& mergeRegions); - - void executeTasks(std::vector msgIdxs, - std::shared_ptr req); - - virtual void shutdown(); - - virtual void reset(faabric::Message& msg); - - virtual int32_t executeTask( - int threadPoolIdx, - int msgIdx, - std::shared_ptr req); - - bool tryClaim(); - - void claim(); - - void releaseClaim(); - - std::shared_ptr getMainThreadSnapshot( - faabric::Message& msg, - bool createIfNotExists = false); - - long getMillisSinceLastExec(); - - virtual std::span getMemoryView(); - - virtual void restore(const std::string& snapshotKey); - - faabric::Message& getBoundMessage(); - - bool isExecuting(); - - bool isShutdown() { return _isShutdown; } - - void addChainedMessage(const faabric::Message& msg); - - const faabric::Message& getChainedMessage(int messageId); - - std::set getChainedMessageIds(); - - // This method merges all the thread-local dirty regions and returns a - // set of diffs. It must be called once per executor, once all other - // threads in the local batch have finished executing - std::vector mergeDirtyRegions( - const Message& msg, - const std::vector& extraDirtyPages = {}); - - virtual void setMemorySize(size_t newSize); - - protected: - virtual size_t getMaxMemorySize(); - - faabric::Message boundMessage; - - Scheduler& sch; - - faabric::snapshot::SnapshotRegistry& reg; - - std::shared_ptr tracker; - - uint32_t threadPoolSize = 0; - - std::map> chainedMessages; - - private: - // ---- Accounting ---- - std::atomic claimed = false; - std::atomic _isShutdown = false; - std::atomic batchCounter = 0; - std::atomic threadBatchCounter = 0; - faabric::util::TimePoint lastExec; - - // ---- Application threads ---- - std::shared_mutex threadExecutionMutex; - std::vector dirtyRegions; - std::vector> threadLocalDirtyRegions; - void deleteMainThreadSnapshot(const faabric::Message& msg); - - // ---- Function execution thread pool ---- - std::mutex threadsMutex; - std::vector> threadPoolThreads; - std::set availablePoolThreads; - - std::vector> threadTaskQueues; - - void threadPoolThread(std::stop_token st, int threadPoolIdx); -}; - /** * Background thread that periodically checks to see if any executors have * become stale (i.e. not handled any requests in a given timeout). If any are @@ -185,19 +55,10 @@ class Scheduler long getFunctionExecutorCount(const faabric::Message& msg); - void flushLocally(); - // ---------------------------------- // Message results // ---------------------------------- - void setFunctionResult(faabric::Message& msg); - - void setThreadResult(faabric::Message& msg, - int32_t returnValue, - const std::string& key, - const std::vector& diffs); - /** * Caches a message along with the thread result, to allow the thread result * to refer to data held in that message (i.e. snapshot diffs). The message @@ -250,7 +111,9 @@ class Scheduler std::atomic _isShutdown = false; // ---- Executors ---- - std::unordered_map>> + std::unordered_map< + std::string, + std::vector>> executors; // ---- Threads ---- @@ -265,7 +128,7 @@ class Scheduler // ---- Actual scheduling ---- SchedulerReaperThread reaperThread; - std::shared_ptr claimExecutor( + std::shared_ptr claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock); diff --git a/src/executor/CMakeLists.txt b/src/executor/CMakeLists.txt new file mode 100644 index 000000000..ec2619186 --- /dev/null +++ b/src/executor/CMakeLists.txt @@ -0,0 +1,14 @@ +faabric_lib(executor + Executor.cpp + ExecutorContext.cpp + ExecutorFactory.cpp + ExecutorTask.cpp +) + +# FIXME: do we need all these deps here? +target_link_libraries(executor PRIVATE + faabric::scheduling_util + faabric::snapshot + faabric::state + faabric::redis +) diff --git a/src/scheduler/Executor.cpp b/src/executor/Executor.cpp similarity index 85% rename from src/scheduler/Executor.cpp rename to src/executor/Executor.cpp index f01e14f34..aa1447c91 100644 --- a/src/scheduler/Executor.cpp +++ b/src/executor/Executor.cpp @@ -1,8 +1,11 @@ #include +#include +#include +#include #include +#include #include -#include -#include +#include #include #include #include @@ -29,18 +32,11 @@ #define POOL_SHUTDOWN -1 -namespace faabric::scheduler { - -ExecutorTask::ExecutorTask(int messageIndexIn, - std::shared_ptr reqIn) - : req(std::move(reqIn)) - , messageIndex(messageIndexIn) -{} +namespace faabric::executor { // TODO - avoid the copy of the message here? Executor::Executor(faabric::Message& msg) : boundMessage(msg) - , sch(getScheduler()) , reg(faabric::snapshot::getSnapshotRegistry()) , tracker(faabric::util::getDirtyTracker()) , threadPoolSize(faabric::util::getUsableCores()) @@ -110,90 +106,8 @@ Executor::~Executor() } } -// TODO(rm-executeThreads): get rid of this method here -std::vector> Executor::executeThreads( - std::shared_ptr req, - const std::vector& mergeRegions) -{ - SPDLOG_DEBUG("Executor {} executing {} threads", id, req->messages_size()); - - std::string funcStr = faabric::util::funcToString(req); - bool isSingleHost = req->singlehost(); - - // Do snapshotting if not on a single host - faabric::Message& msg = req->mutable_messages()->at(0); - std::shared_ptr snap = nullptr; - if (!isSingleHost) { - snap = getMainThreadSnapshot(msg, true); - - // Get dirty regions since last batch of threads - std::span memView = getMemoryView(); - tracker->stopTracking(memView); - tracker->stopThreadLocalTracking(memView); - - // If this is the first batch, these dirty regions will be empty - std::vector dirtyRegions = tracker->getBothDirtyPages(memView); - - // Apply changes to snapshot - snap->fillGapsWithBytewiseRegions(); - std::vector updates = - snap->diffWithDirtyRegions(memView, dirtyRegions); - - if (updates.empty()) { - SPDLOG_DEBUG( - "No updates to main thread snapshot for {} over {} pages", - faabric::util::funcToString(msg, false), - dirtyRegions.size()); - } else { - SPDLOG_DEBUG("Updating main thread snapshot for {} with {} diffs", - faabric::util::funcToString(msg, false), - updates.size()); - snap->applyDiffs(updates); - } - - // Clear merge regions, not persisted between batches of threads - snap->clearMergeRegions(); - - // Now we have to add any merge regions we've been saving up for this - // next batch of threads - for (const auto& mr : mergeRegions) { - snap->addMergeRegion( - mr.offset, mr.length, mr.dataType, mr.operation); - } - } - - // Invoke threads and await - // TODO: for the time being, threads may execute for a long time so we - // are a bit more generous with the timeout - auto decision = faabric::planner::getPlannerClient().callFunctions(req); - std::vector> results = sch.awaitThreadResults( - req, 10 * faabric::util::getSystemConfig().boundTimeout); - - // Perform snapshot updates if not on single host - if (!isSingleHost) { - // Add the diffs corresponding to this executor - auto diffs = mergeDirtyRegions(msg); - snap->queueDiffs(diffs); - - // Write queued changes to snapshot - int nWritten = snap->writeQueuedDiffs(); - - // Remap memory to snapshot if it's been updated - std::span memView = getMemoryView(); - if (nWritten > 0) { - setMemorySize(snap->getSize()); - snap->mapToMemory(memView); - } - - // Start tracking again - memView = getMemoryView(); - tracker->startTracking(memView); - tracker->startThreadLocalTracking(memView); - } - - return results; -} - +// TODO(thread-opt): get rid of this method here and move to +// PlannerClient::callFunctions() void Executor::executeTasks(std::vector msgIdxs, std::shared_ptr req) { @@ -353,6 +267,42 @@ void Executor::deleteMainThreadSnapshot(const faabric::Message& msg) } */ +void Executor::setThreadResult( + faabric::Message& msg, + int32_t returnValue, + const std::string& key, + const std::vector& diffs) +{ + bool isMaster = + msg.mainhost() == faabric::util::getSystemConfig().endpointHost; + if (isMaster) { + if (!diffs.empty()) { + // On main we queue the diffs locally directly, on a remote + // host we push them back to main + SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} (group {})", + diffs.size(), + faabric::util::funcToString(msg, false), + key, + msg.groupid()); + + auto snap = reg.getSnapshot(key); + + // Here we don't have ownership over all of the snapshot diff data, + // but that's ok as the executor memory will outlast the snapshot + // merging operation. + snap->queueDiffs(diffs); + } + } else { + // Push thread result and diffs together + faabric::snapshot::getSnapshotClient(msg.mainhost()) + ->pushThreadResult(msg.appid(), msg.id(), returnValue, key, diffs); + } + + // Finally, set the message result in the planner + faabric::planner::getPlannerClient().setMessageResult( + std::make_shared(msg)); +} + void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) { SPDLOG_DEBUG("Thread pool thread {}:{} starting up", id, threadPoolIdx); @@ -517,8 +467,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // If this is not a threads request and last in its batch, it may be // the main function (thread) in a threaded application, in which case // we want to stop any tracking and delete the main thread snapshot - // TODO(rm-executeThreads): this should disappear when pthreads do - // not call executeThreads anymore + /* FIXME: remove me if (!isThreads && isLastThreadInExecutor) { // Stop tracking memory std::span memView = getMemoryView(); @@ -532,6 +481,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // deleteMainThreadSnapshot(msg); } } + */ // If this batch is finished, reset the executor and release its // claim. Note that we have to release the claim _after_ resetting, @@ -564,13 +514,14 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // Set non-final thread result if (isLastThreadInBatch) { // Include diffs if this is the last one - sch.setThreadResult(msg, returnValue, mainThreadSnapKey, diffs); + setThreadResult(msg, returnValue, mainThreadSnapKey, diffs); } else { - sch.setThreadResult(msg, returnValue, "", {}); + setThreadResult(msg, returnValue, "", {}); } } else { // Set normal function result - sch.setFunctionResult(msg); + faabric::planner::getPlannerClient().setMessageResult( + std::make_shared(msg)); } } } diff --git a/src/scheduler/ExecutorContext.cpp b/src/executor/ExecutorContext.cpp similarity index 91% rename from src/scheduler/ExecutorContext.cpp rename to src/executor/ExecutorContext.cpp index eaee7a1b8..1c44453aa 100644 --- a/src/scheduler/ExecutorContext.cpp +++ b/src/executor/ExecutorContext.cpp @@ -1,6 +1,6 @@ -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { static thread_local std::shared_ptr context = nullptr; diff --git a/src/scheduler/ExecutorFactory.cpp b/src/executor/ExecutorFactory.cpp similarity index 84% rename from src/scheduler/ExecutorFactory.cpp rename to src/executor/ExecutorFactory.cpp index 01d9a2490..e778f71f6 100644 --- a/src/scheduler/ExecutorFactory.cpp +++ b/src/executor/ExecutorFactory.cpp @@ -1,7 +1,7 @@ -#include +#include #include -namespace faabric::scheduler { +namespace faabric::executor { static std::shared_ptr _factory; diff --git a/src/executor/ExecutorTask.cpp b/src/executor/ExecutorTask.cpp new file mode 100644 index 000000000..c279f75d9 --- /dev/null +++ b/src/executor/ExecutorTask.cpp @@ -0,0 +1,10 @@ +#include + +namespace faabric::executor { + +ExecutorTask::ExecutorTask(int messageIndexIn, + std::shared_ptr reqIn) + : req(std::move(reqIn)) + , messageIndex(messageIndexIn) +{} +} diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index 4300dc3f8..00d2c39dd 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -129,6 +130,11 @@ void PlannerClient::removeHost(std::shared_ptr req) void PlannerClient::setMessageResult(std::shared_ptr msg) { + // Set finish timestamp + msg->set_finishtimestamp(faabric::util::getGlobalClock().epochMillis()); + + // Let the planner know this function has finished execution. This will + // wake any thread waiting on this result asyncSend(PlannerCalls::SetMessageResult, msg.get()); } diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index 0a900364c..f0a762f92 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -1,6 +1,6 @@ +#include #include #include -#include #include #include #include @@ -9,10 +9,10 @@ namespace faabric::runner { FaabricMain::FaabricMain( - std::shared_ptr execFactory) + std::shared_ptr execFactory) : stateServer(faabric::state::getGlobalState()) { - faabric::scheduler::setExecutorFactory(execFactory); + faabric::executor::setExecutorFactory(execFactory); } void FaabricMain::startBackground() diff --git a/src/scheduler/CMakeLists.txt b/src/scheduler/CMakeLists.txt index 468638b15..c0717cf4a 100644 --- a/src/scheduler/CMakeLists.txt +++ b/src/scheduler/CMakeLists.txt @@ -1,12 +1,10 @@ faabric_lib(scheduler - ExecutorContext.cpp - ExecutorFactory.cpp - Executor.cpp FunctionCallClient.cpp FunctionCallServer.cpp Scheduler.cpp ) +# FIXME: do we need all these deps here? target_link_libraries(scheduler PRIVATE faabric::scheduling_util faabric::snapshot diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 70b56890a..733738494 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -54,11 +55,17 @@ std::unique_ptr FunctionCallServer::doSyncRecv( std::unique_ptr FunctionCallServer::recvFlush( std::span buffer) { + SPDLOG_INFO("Flushing host {}", + faabric::util::getSystemConfig().endpointHost); + // Clear out any cached state faabric::state::getGlobalState().forceClearAll(false); // Clear the scheduler - scheduler.flushLocally(); + faabric::scheduler::getScheduler().reset(); + + // Clear the executor factory + faabric::executor::getExecutorFactory()->flushHost(); return std::make_unique(); } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index e070d15d0..70d777b19 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -1,8 +1,8 @@ #include +#include #include #include #include -#include #include #include #include @@ -179,8 +179,9 @@ int Scheduler::reapStaleExecutors() int nReaped = 0; for (auto& execPair : executors) { std::string key = execPair.first; - std::vector>& execs = execPair.second; - std::vector> toRemove; + std::vector>& execs = + execPair.second; + std::vector> toRemove; if (execs.empty()) { continue; @@ -268,10 +269,10 @@ void Scheduler::executeBatch(std::shared_ptr req) if (isThreads) { // Threads use the existing executor. We assume there's only // one running at a time. - std::vector>& thisExecutors = - executors[funcStr]; + std::vector>& + thisExecutors = executors[funcStr]; - std::shared_ptr e = nullptr; + std::shared_ptr e = nullptr; if (thisExecutors.empty()) { // Create executor if not exists e = claimExecutor(*req->mutable_messages(0), lock); @@ -297,7 +298,8 @@ void Scheduler::executeBatch(std::shared_ptr req) for (int i = 0; i < nMessages; i++) { faabric::Message& localMsg = req->mutable_messages()->at(i); - std::shared_ptr e = claimExecutor(localMsg, lock); + std::shared_ptr e = + claimExecutor(localMsg, lock); e->executeTasks({ i }, req); } } @@ -315,18 +317,18 @@ std::vector Scheduler::getRecordedMessages() return recordedMessages; } -std::shared_ptr Scheduler::claimExecutor( +std::shared_ptr Scheduler::claimExecutor( faabric::Message& msg, faabric::util::FullLock& schedulerLock) { std::string funcStr = faabric::util::funcToString(msg, false); - std::vector>& thisExecutors = executors[funcStr]; + std::vector>& thisExecutors = + executors[funcStr]; - std::shared_ptr factory = - getExecutorFactory(); + auto factory = faabric::executor::getExecutorFactory(); - std::shared_ptr claimed = nullptr; + std::shared_ptr claimed = nullptr; for (auto& e : thisExecutors) { if (e->tryClaim()) { claimed = e; @@ -365,63 +367,6 @@ std::string Scheduler::getThisHost() return thisHost; } -void Scheduler::flushLocally() -{ - SPDLOG_INFO("Flushing host {}", - faabric::util::getSystemConfig().endpointHost); - - // Reset this scheduler - reset(); - - // Flush the host - getExecutorFactory()->flushHost(); -} - -void Scheduler::setFunctionResult(faabric::Message& msg) -{ - // Set finish timestamp - msg.set_finishtimestamp(faabric::util::getGlobalClock().epochMillis()); - - // Let the planner know this function has finished execution. This will - // wake any thread waiting on this result - faabric::planner::getPlannerClient().setMessageResult( - std::make_shared(msg)); -} - -void Scheduler::setThreadResult( - faabric::Message& msg, - int32_t returnValue, - const std::string& key, - const std::vector& diffs) -{ - bool isMaster = msg.mainhost() == conf.endpointHost; - if (isMaster) { - if (!diffs.empty()) { - // On main we queue the diffs locally directly, on a remote - // host we push them back to main - SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} (group {})", - diffs.size(), - faabric::util::funcToString(msg, false), - key, - msg.groupid()); - - auto snap = reg.getSnapshot(key); - - // Here we don't have ownership over all of the snapshot diff data, - // but that's ok as the executor memory will outlast the snapshot - // merging operation. - snap->queueDiffs(diffs); - } - } else { - // Push thread result and diffs together - getSnapshotClient(msg.mainhost()) - ->pushThreadResult(msg.appid(), msg.id(), returnValue, key, diffs); - } - - // Finally, set the message result in the planner - setFunctionResult(msg); -} - void Scheduler::setThreadResultLocally(uint32_t appId, uint32_t msgId, int32_t returnValue, diff --git a/tests/dist/DistTestExecutor.cpp b/tests/dist/DistTestExecutor.cpp index 7422e33c2..324876136 100644 --- a/tests/dist/DistTestExecutor.cpp +++ b/tests/dist/DistTestExecutor.cpp @@ -2,12 +2,14 @@ #include +#include +#include #include #include #include #include -using namespace faabric::scheduler; +using namespace faabric::executor; namespace tests { @@ -94,6 +96,90 @@ void DistTestExecutor::setUpDummyMemory(size_t memSize) dummyMemorySize = memSize; } +std::vector> DistTestExecutor::executeThreads( + std::shared_ptr req, + const std::vector& mergeRegions) +{ + SPDLOG_DEBUG("Executor {} executing {} threads", id, req->messages_size()); + + std::string funcStr = faabric::util::funcToString(req); + bool isSingleHost = req->singlehost(); + + // Do snapshotting if not on a single host + faabric::Message& msg = req->mutable_messages()->at(0); + std::shared_ptr snap = nullptr; + if (!isSingleHost) { + snap = getMainThreadSnapshot(msg, true); + + // Get dirty regions since last batch of threads + std::span memView = getMemoryView(); + tracker->stopTracking(memView); + tracker->stopThreadLocalTracking(memView); + + // If this is the first batch, these dirty regions will be empty + std::vector dirtyRegions = tracker->getBothDirtyPages(memView); + + // Apply changes to snapshot + snap->fillGapsWithBytewiseRegions(); + std::vector updates = + snap->diffWithDirtyRegions(memView, dirtyRegions); + + if (updates.empty()) { + SPDLOG_DEBUG( + "No updates to main thread snapshot for {} over {} pages", + faabric::util::funcToString(msg, false), + dirtyRegions.size()); + } else { + SPDLOG_DEBUG("Updating main thread snapshot for {} with {} diffs", + faabric::util::funcToString(msg, false), + updates.size()); + snap->applyDiffs(updates); + } + + // Clear merge regions, not persisted between batches of threads + snap->clearMergeRegions(); + + // Now we have to add any merge regions we've been saving up for this + // next batch of threads + for (const auto& mr : mergeRegions) { + snap->addMergeRegion( + mr.offset, mr.length, mr.dataType, mr.operation); + } + } + + // Invoke threads and await + // TODO: for the time being, threads may execute for a long time so we + // are a bit more generous with the timeout + auto decision = faabric::planner::getPlannerClient().callFunctions(req); + auto& sch = faabric::scheduler::getScheduler(); + std::vector> results = sch.awaitThreadResults( + req, 10 * faabric::util::getSystemConfig().boundTimeout); + + // Perform snapshot updates if not on single host + if (!isSingleHost) { + // Add the diffs corresponding to this executor + auto diffs = mergeDirtyRegions(msg); + snap->queueDiffs(diffs); + + // Write queued changes to snapshot + int nWritten = snap->writeQueuedDiffs(); + + // Remap memory to snapshot if it's been updated + std::span memView = getMemoryView(); + if (nWritten > 0) { + setMemorySize(snap->getSize()); + snap->mapToMemory(memView); + } + + // Start tracking again + memView = getMemoryView(); + tracker->startTracking(memView); + tracker->startThreadLocalTracking(memView); + } + + return results; +} + std::shared_ptr DistTestExecutorFactory::createExecutor( faabric::Message& msg) { diff --git a/tests/dist/DistTestExecutor.h b/tests/dist/DistTestExecutor.h index fb7f0d7d4..87de3a5d5 100644 --- a/tests/dist/DistTestExecutor.h +++ b/tests/dist/DistTestExecutor.h @@ -1,7 +1,6 @@ #pragma once -#include -#include +#include #include #include @@ -9,7 +8,7 @@ namespace tests { #define DIST_TEST_EXECUTOR_MEMORY_SIZE (30 * faabric::util::HOST_PAGE_SIZE) -class DistTestExecutor final : public faabric::scheduler::Executor +class DistTestExecutor final : public faabric::executor::Executor { public: DistTestExecutor(faabric::Message& msg); @@ -27,6 +26,11 @@ class DistTestExecutor final : public faabric::scheduler::Executor std::span getDummyMemory(); + // Helper method to execute threads in a distributed test + std::vector> executeThreads( + std::shared_ptr req, + const std::vector& mergeRegions); + protected: void setMemorySize(size_t newSize) override; @@ -38,10 +42,10 @@ class DistTestExecutor final : public faabric::scheduler::Executor void setUpDummyMemory(size_t memSize); }; -class DistTestExecutorFactory : public faabric::scheduler::ExecutorFactory +class DistTestExecutorFactory : public faabric::executor::ExecutorFactory { protected: - std::shared_ptr createExecutor( + std::shared_ptr createExecutor( faabric::Message& msg) override; }; diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index 6f2083694..a9f8c0ff3 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -5,7 +5,7 @@ #include "DistTestExecutor.h" -#include +#include #include #include @@ -36,7 +36,7 @@ class DistTestsFixture // Set up executor std::shared_ptr fac = std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + faabric::executor::setExecutorFactory(fac); } void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0) diff --git a/tests/dist/main.cpp b/tests/dist/main.cpp index 31028804c..ba503fa17 100644 --- a/tests/dist/main.cpp +++ b/tests/dist/main.cpp @@ -9,8 +9,8 @@ #include "faabric_utils.h" #include "init.h" +#include #include -#include #include #include @@ -23,7 +23,7 @@ int main(int argc, char* argv[]) faabric::util::initLogging(); tests::initDistTests(); - std::shared_ptr fac = + std::shared_ptr fac = std::make_shared(); // WARNING: all 0MQ sockets have to have gone *out of scope* before we shut diff --git a/tests/dist/mpi/mpi_native.cpp b/tests/dist/mpi/mpi_native.cpp index fc157e9be..1938533c8 100644 --- a/tests/dist/mpi/mpi_native.cpp +++ b/tests/dist/mpi/mpi_native.cpp @@ -1,10 +1,10 @@ #include "mpi_native.h" +#include #include #include #include #include -#include #include #include #include @@ -773,7 +773,7 @@ namespace tests::mpi { // run migration tests for MPI also in faabric void mpiMigrationPoint(int entrypointFuncArg) { - auto* call = &faabric::scheduler::ExecutorContext::get()->getMsg(); + auto* call = &faabric::executor::ExecutorContext::get()->getMsg(); auto& sch = faabric::scheduler::getScheduler(); // Detect if there is a pending migration for the current app @@ -821,7 +821,7 @@ void mpiMigrationPoint(int entrypointFuncArg) // chaining from the main host of the app, and // we are most likely migrating from a non-main host. Thus, we must // take and push the snapshot manually. - auto* exec = faabric::scheduler::ExecutorContext::get()->getExecutor(); + auto* exec = faabric::executor::ExecutorContext::get()->getExecutor(); auto snap = std::make_shared(exec->getMemoryView()); std::string snapKey = "migration_" + std::to_string(msg.id()); diff --git a/tests/dist/server.cpp b/tests/dist/server.cpp index a407d91d1..e5b3e30e4 100644 --- a/tests/dist/server.cpp +++ b/tests/dist/server.cpp @@ -2,12 +2,10 @@ #include "init.h" #include +#include #include -#include #include -using namespace faabric::scheduler; - int main() { faabric::util::initLogging(); @@ -20,26 +18,23 @@ int main() res.set_slots(slots); faabric::scheduler::getScheduler().setThisHostResources(res); - // WARNING: All 0MQ operations must be contained within their own scope so - // that all sockets are destructed before the context is closed. - { - SPDLOG_INFO("Starting distributed test server on worker"); - std::shared_ptr fac = - std::make_shared(); - faabric::runner::FaabricMain m(fac); - m.startBackground(); - - SPDLOG_INFO("---------------------------------"); - SPDLOG_INFO("Distributed test server started"); - SPDLOG_INFO("---------------------------------"); - - // Endpoint will block until killed - SPDLOG_INFO("Starting HTTP endpoint on worker"); - faabric::endpoint::FaabricEndpoint endpoint; - endpoint.start(faabric::endpoint::EndpointMode::SIGNAL); - - SPDLOG_INFO("Shutting down"); - m.shutdown(); - } + SPDLOG_INFO("Starting distributed test server on worker"); + std::shared_ptr fac = + std::make_shared(); + faabric::runner::FaabricMain m(fac); + m.startBackground(); + + SPDLOG_INFO("---------------------------------"); + SPDLOG_INFO("Distributed test server started"); + SPDLOG_INFO("---------------------------------"); + + // Endpoint will block until killed + SPDLOG_INFO("Starting HTTP endpoint on worker"); + faabric::endpoint::FaabricEndpoint endpoint; + endpoint.start(faabric::endpoint::EndpointMode::SIGNAL); + + SPDLOG_INFO("Shutting down"); + m.shutdown(); + return EXIT_SUCCESS; } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/executor/test_executor.cpp similarity index 96% rename from tests/test/scheduler/test_executor.cpp rename to tests/test/executor/test_executor.cpp index 06853251a..6e0228bf9 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/executor/test_executor.cpp @@ -2,12 +2,10 @@ #include "fixtures.h" -#include - +#include +#include #include #include -#include -#include #include #include #include @@ -19,7 +17,9 @@ #include #include -using namespace faabric::scheduler; +#include + +using namespace faabric::executor; using namespace faabric::util; namespace tests { @@ -79,6 +79,7 @@ int32_t TestExecutor::executeTask( { faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); std::string funcStr = faabric::util::funcToString(msg, true); + auto& sch = faabric::scheduler::getScheduler(); bool isThread = reqOrig->type() == faabric::BatchExecuteRequest::THREADS; @@ -239,8 +240,7 @@ int32_t TestExecutor::executeTask( } if (msg.function() == "context-check") { - std::shared_ptr ctx = - faabric::scheduler::ExecutorContext::get(); + std::shared_ptr ctx = ExecutorContext::get(); if (ctx == nullptr) { SPDLOG_ERROR("Executor context is null"); return 999; @@ -398,7 +398,7 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(result.outputdata() == expected); // Flush - sch.flushLocally(); + getExecutorFactory()->flushHost(); } } @@ -478,7 +478,7 @@ TEST_CASE_METHOD(TestExecutorFixture, } // Check sent to other host if necessary - auto batchRequests = getBatchRequests(); + auto batchRequests = faabric::scheduler::getBatchRequests(); // Check the hosts match up REQUIRE(restoreCount == expectedRestoreCount); @@ -742,12 +742,9 @@ TEST_CASE_METHOD(TestExecutorFixture, faabric::Message msgA = faabric::util::messageFactory("foo", "bar"); faabric::Message msgB = faabric::util::messageFactory("foo", "bar"); - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr execA = - fac->createExecutor(msgA); - std::shared_ptr execB = - fac->createExecutor(msgB); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr execA = fac->createExecutor(msgA); + std::shared_ptr execB = fac->createExecutor(msgB); // Claim one REQUIRE(execA->tryClaim()); @@ -783,10 +780,8 @@ TEST_CASE_METHOD(TestExecutorFixture, localHost.set_usedslots(5); sch.setThisHostResources(localHost); - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = - fac->createExecutor(msg); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(msg); long millisA = exec->getMillisSinceLastExec(); @@ -1189,9 +1184,8 @@ TEST_CASE_METHOD(TestExecutorFixture, "Test executor restore", "[executor]") snap->copyInData(dataB, offsetB); // Create an executor - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = fac->createExecutor(m); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(m); // Restore from snapshot exec->restore(snapKey); @@ -1226,9 +1220,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::string snapKey = faabric::util::getMainThreadSnapshotKey(m); // Create an executor - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); - std::shared_ptr exec = fac->createExecutor(m); + std::shared_ptr fac = getExecutorFactory(); + std::shared_ptr exec = fac->createExecutor(m); // Get a pointer to the TestExecutor so we can override the max memory auto testExec = std::static_pointer_cast(exec); @@ -1290,7 +1283,7 @@ TEST_CASE_METHOD(TestExecutorFixture, auto& firstMsg = *req->mutable_messages(0); // Create an executor - auto fac = faabric::scheduler::getExecutorFactory(); + auto fac = getExecutorFactory(); auto exec = fac->createExecutor(firstMsg); // At the begining there are no chained messages @@ -1313,7 +1306,7 @@ TEST_CASE_METHOD(TestExecutorFixture, } TEST_CASE_METHOD(TestExecutorFixture, - "Test execute threads using top-level function in executor", + "Test executing threads manually", "[executor]") { int nThreads = 5; @@ -1331,19 +1324,14 @@ TEST_CASE_METHOD(TestExecutorFixture, // Set single-host to avoid any snapshot sending req->set_singlehosthint(true); - // Prepare executor - auto exec = std::make_shared(*req->mutable_messages(0)); - - // Execute directly calling the executor - auto results = exec->executeThreads(req, {}); + auto decision = faabric::planner::getPlannerClient().callFunctions(req); + auto results = faabric::scheduler::getScheduler().awaitThreadResults( + req, 10 * faabric::util::getSystemConfig().boundTimeout); // Check results REQUIRE(results.size() == req->messages_size()); for (const auto& [mid, res] : results) { REQUIRE(res == (mid / 100)); } - - // Shut down executor - exec->shutdown(); } } diff --git a/tests/test/scheduler/test_executor_context.cpp b/tests/test/executor/test_executor_context.cpp similarity index 95% rename from tests/test/scheduler/test_executor_context.cpp rename to tests/test/executor/test_executor_context.cpp index 72e192df7..58e2ea912 100644 --- a/tests/test/scheduler/test_executor_context.cpp +++ b/tests/test/executor/test_executor_context.cpp @@ -3,10 +3,10 @@ #include "faabric_utils.h" #include "fixtures.h" -#include +#include #include -using namespace faabric::scheduler; +using namespace faabric::executor; namespace tests { diff --git a/tests/test/scheduler/test_executor_reaping.cpp b/tests/test/executor/test_executor_reaping.cpp similarity index 90% rename from tests/test/scheduler/test_executor_reaping.cpp rename to tests/test/executor/test_executor_reaping.cpp index 778ed09d7..185ab45c8 100644 --- a/tests/test/scheduler/test_executor_reaping.cpp +++ b/tests/test/executor/test_executor_reaping.cpp @@ -7,7 +7,7 @@ #include #include -using namespace faabric::scheduler; +using namespace faabric::executor; namespace tests { @@ -23,9 +23,9 @@ class SchedulerReapingTestFixture res.set_slots(20); sch.setThisHostResources(res); - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + setExecutorFactory(fac); } ~SchedulerReapingTestFixture() {} diff --git a/tests/test/mpi/test_mpi_context.cpp b/tests/test/mpi/test_mpi_context.cpp index 9b7fda9ec..900289efe 100644 --- a/tests/test/mpi/test_mpi_context.cpp +++ b/tests/test/mpi/test_mpi_context.cpp @@ -112,9 +112,8 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Check joining world", "[mpi]") int worldId = cA.getWorldId(); waitForMpiMessages(reqA, worldSize); - Scheduler& sch = getScheduler(); // Set the function result to have access to the chained messages - sch.setFunctionResult(msgA); + plannerCli.setMessageResult(std::make_shared(msgA)); auto chainedMsgs = faabric::util::getChainedFunctions(msgA); REQUIRE(chainedMsgs.size() == worldSize - 1); diff --git a/tests/test/planner/test_planner_client_server.cpp b/tests/test/planner/test_planner_client_server.cpp index 00f4ac574..c6fb3a414 100644 --- a/tests/test/planner/test_planner_client_server.cpp +++ b/tests/test/planner/test_planner_client_server.cpp @@ -154,9 +154,9 @@ class PlannerClientServerExecTestFixture sch.shutdown(); sch.addHostToGlobalSet(); - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + faabric::executor::setExecutorFactory(fac); } ~PlannerClientServerExecTestFixture() diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index 196fe1938..2e7148dfd 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -294,9 +294,9 @@ class PlannerEndpointExecTestFixture sch.shutdown(); sch.addHostToGlobalSet(); - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + faabric::executor::setExecutorFactory(fac); } ~PlannerEndpointExecTestFixture() diff --git a/tests/test/runner/test_main.cpp b/tests/test/runner/test_main.cpp index bce0361e3..0b7d32daa 100644 --- a/tests/test/runner/test_main.cpp +++ b/tests/test/runner/test_main.cpp @@ -3,9 +3,9 @@ #include "faabric_utils.h" #include "fixtures.h" +#include #include #include -#include #include #include #include @@ -20,16 +20,16 @@ class MainRunnerTestFixture : public SchedulerFixture public: MainRunnerTestFixture() { - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + faabric::executor::setExecutorFactory(fac); } }; TEST_CASE_METHOD(MainRunnerTestFixture, "Test main runner", "[runner]") { - std::shared_ptr fac = - faabric::scheduler::getExecutorFactory(); + std::shared_ptr fac = + faabric::executor::getExecutorFactory(); faabric::runner::FaabricMain m(fac); m.startBackground(); diff --git a/tests/test/scheduler/test_function_client_server.cpp b/tests/test/scheduler/test_function_client_server.cpp index ff370048e..04f511340 100644 --- a/tests/test/scheduler/test_function_client_server.cpp +++ b/tests/test/scheduler/test_function_client_server.cpp @@ -6,10 +6,10 @@ #include #include +#include #include #include #include -#include #include #include #include @@ -31,14 +31,14 @@ class FunctionClientServerTestFixture FunctionClientServerTestFixture() { executorFactory = - std::make_shared(); + std::make_shared(); setExecutorFactory(executorFactory); } ~FunctionClientServerTestFixture() { executorFactory->reset(); } protected: - std::shared_ptr executorFactory; + std::shared_ptr executorFactory; }; TEST_CASE_METHOD(ConfFixture, diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp index 2220935a5..98ccdba03 100644 --- a/tests/test/scheduler/test_function_migration.cpp +++ b/tests/test/scheduler/test_function_migration.cpp @@ -137,8 +137,10 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, req->mutable_messages(1)->set_executedhost(LOCALHOST); } - sch.setFunctionResult(*req->mutable_messages(0)); - sch.setFunctionResult(*req->mutable_messages(1)); + plannerCli.setMessageResult( + std::make_shared(*req->mutable_messages(0))); + plannerCli.setMessageResult( + std::make_shared(*req->mutable_messages(1))); } TEST_CASE_METHOD(FunctionMigrationTestFixture, diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index fc2779be6..9a5794825 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -5,9 +5,9 @@ #include "fixtures.h" #include +#include #include #include -#include #include #include #include @@ -27,7 +27,7 @@ using namespace faabric::scheduler; namespace tests { -class SlowExecutor final : public Executor +class SlowExecutor final : public faabric::executor::Executor { public: SlowExecutor(faabric::Message& msg) @@ -72,10 +72,11 @@ class SlowExecutor final : public Executor } }; -class SlowExecutorFactory : public ExecutorFactory +class SlowExecutorFactory : public faabric::executor::ExecutorFactory { protected: - std::shared_ptr createExecutor(faabric::Message& msg) override + std::shared_ptr createExecutor( + faabric::Message& msg) override { return std::make_shared(msg); } @@ -90,15 +91,15 @@ class SlowExecutorTestFixture public: SlowExecutorTestFixture() { - std::shared_ptr fac = + std::shared_ptr fac = std::make_shared(); setExecutorFactory(fac); }; ~SlowExecutorTestFixture() { - std::shared_ptr fac = - std::make_shared(); + std::shared_ptr fac = + std::make_shared(); setExecutorFactory(fac); }; }; @@ -112,15 +113,15 @@ class DummyExecutorTestFixture public: DummyExecutorTestFixture() { - std::shared_ptr fac = - std::make_shared(); + std::shared_ptr fac = + std::make_shared(); setExecutorFactory(fac); }; ~DummyExecutorTestFixture() { - std::shared_ptr fac = - std::make_shared(); + std::shared_ptr fac = + std::make_shared(); setExecutorFactory(fac); }; }; @@ -384,11 +385,15 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, res.set_usedslots(1); sch.setThisHostResources(res); - sch.setFunctionResult(firstMsg); + plannerCli.setMessageResult(std::make_shared(firstMsg)); // Check retrieval method gets the same call out again faabric::Message resultMsg = plannerCli.getMessageResult(firstMsg, 1); + // Manually set the timestamp for the sent message so that the check + // matches (the timestamp is set in the planner client) + firstMsg.set_finishtimestamp(resultMsg.finishtimestamp()); + checkMessageEquality(firstMsg, resultMsg); } @@ -467,7 +472,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, msg.set_outputdata(expectedOutput); msg.set_returnvalue(1); msg.set_executedhost(expectedHost); - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); expectedReturnValue = 1; expectedType = faabric::Message_MessageType_CALL; @@ -481,7 +486,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, msg.set_outputdata(expectedOutput); msg.set_returnvalue(0); msg.set_executedhost(expectedHost); - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); expectedReturnValue = 0; expectedType = faabric::Message_MessageType_CALL; @@ -516,7 +521,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // We need to set the function result in order to get the chained // functions. We can do so multiple times msg.set_executedhost(faabric::util::getSystemConfig().endpointHost); - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); // Check empty initially REQUIRE(faabric::util::getChainedFunctions(msg).empty()); @@ -528,7 +533,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, faabric::util::logChainedFunction(msg, chainedMsgA); std::set expected = { (unsigned int)chainedMsgA.id() }; - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); REQUIRE(faabric::util::getChainedFunctions(msg) == expected); // Log some more and check (update the message id again) @@ -540,7 +545,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, (unsigned int)chainedMsgB.id(), (unsigned int)chainedMsgC.id() }; - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); REQUIRE(faabric::util::getChainedFunctions(msg) == expected); } @@ -610,6 +615,9 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, msg.set_mainhost("otherHost"); msg.set_executedhost(faabric::util::getSystemConfig().endpointHost); + auto fac = faabric::executor::getExecutorFactory(); + auto exec = fac->createExecutor(msg); + // If we want to set a function result, the planner must see at least one // slot, and at least one used slot in this host faabric::HostResources res; @@ -637,7 +645,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, snapData)); } - sch.setThreadResult(msg, returnValue, snapKey, diffs); + exec->setThreadResult(msg, returnValue, snapKey, diffs); auto actualResults = faabric::snapshot::getThreadResults(); REQUIRE(actualResults.size() == 1); diff --git a/tests/test/snapshot/test_snapshot_client_server.cpp b/tests/test/snapshot/test_snapshot_client_server.cpp index c2f7d7d6b..4b523e82d 100644 --- a/tests/test/snapshot/test_snapshot_client_server.cpp +++ b/tests/test/snapshot/test_snapshot_client_server.cpp @@ -445,13 +445,13 @@ TEST_CASE_METHOD(SnapshotClientServerTestFixture, msgA.set_id(threadIdA); msgA.set_returnvalue(returnValueA); msgA.set_executedhost(faabric::util::getSystemConfig().endpointHost); - sch.setFunctionResult(msgA); + plannerCli.setMessageResult(std::make_shared(msgA)); Message msgB; msgB.set_appid(appIdB); msgB.set_id(threadIdB); msgB.set_returnvalue(returnValueB); msgB.set_executedhost(faabric::util::getSystemConfig().endpointHost); - sch.setFunctionResult(msgB); + plannerCli.setMessageResult(std::make_shared(msgB)); int rA = 0; int rB = 0; diff --git a/tests/test/util/test_exec_graph.cpp b/tests/test/util/test_exec_graph.cpp index bb39b079e..ca51e1736 100644 --- a/tests/test/util/test_exec_graph.cpp +++ b/tests/test/util/test_exec_graph.cpp @@ -55,14 +55,14 @@ TEST_CASE_METHOD(ExecGraphTestFixture, "Test execution graph", "[util]") logChainedFunction(msgC2, msgD); // Set all execution results - scheduler::Scheduler& sch = scheduler::getScheduler(); - sch.setFunctionResult(msgA); - sch.setFunctionResult(msgB1); - sch.setFunctionResult(msgB2); - sch.setFunctionResult(msgC1); - sch.setFunctionResult(msgC2); - sch.setFunctionResult(msgC3); - sch.setFunctionResult(msgD); + auto& plannerCli = faabric::planner::getPlannerClient(); + plannerCli.setMessageResult(std::make_shared(msgA)); + plannerCli.setMessageResult(std::make_shared(msgB1)); + plannerCli.setMessageResult(std::make_shared(msgB2)); + plannerCli.setMessageResult(std::make_shared(msgC1)); + plannerCli.setMessageResult(std::make_shared(msgC2)); + plannerCli.setMessageResult(std::make_shared(msgC3)); + plannerCli.setMessageResult(std::make_shared(msgD)); ExecGraph actual = getFunctionExecGraph(msgA); @@ -176,7 +176,7 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]") // it here SLEEP_MS(500); msg.set_executedhost(thisHost); - sch.setFunctionResult(msg); + plannerCli.setMessageResult(std::make_shared(msg)); // Wait for the MPI messages to finish plannerCli.getMessageResult(msg, 2000); diff --git a/tests/utils/CMakeLists.txt b/tests/utils/CMakeLists.txt index db78b086b..3ed662b76 100644 --- a/tests/utils/CMakeLists.txt +++ b/tests/utils/CMakeLists.txt @@ -7,7 +7,6 @@ add_library(faabric_test_utils message_utils.cpp planner_utils.cpp scheduling_utils.cpp - system_utils.cpp ) target_compile_options(faabric_test_utils PUBLIC -fno-omit-frame-pointer) diff --git a/tests/utils/DummyExecutor.cpp b/tests/utils/DummyExecutor.cpp index 1a3a21566..a708a2fca 100644 --- a/tests/utils/DummyExecutor.cpp +++ b/tests/utils/DummyExecutor.cpp @@ -1,14 +1,14 @@ #include "DummyExecutor.h" +#include #include -#include #include #include #include #define SHORT_SLEEP_MS 50 -namespace faabric::scheduler { +namespace faabric::executor { DummyExecutor::DummyExecutor(faabric::Message& msg) : Executor(msg) diff --git a/tests/utils/DummyExecutor.h b/tests/utils/DummyExecutor.h index c5a43c9fd..b544507c0 100644 --- a/tests/utils/DummyExecutor.h +++ b/tests/utils/DummyExecutor.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class DummyExecutor final : public Executor { diff --git a/tests/utils/DummyExecutorFactory.cpp b/tests/utils/DummyExecutorFactory.cpp index 3d49b513c..e3c8059e6 100644 --- a/tests/utils/DummyExecutorFactory.cpp +++ b/tests/utils/DummyExecutorFactory.cpp @@ -3,7 +3,7 @@ #include -namespace faabric::scheduler { +namespace faabric::executor { std::shared_ptr DummyExecutorFactory::createExecutor( faabric::Message& msg) diff --git a/tests/utils/DummyExecutorFactory.h b/tests/utils/DummyExecutorFactory.h index 10d95086c..c901da0b3 100644 --- a/tests/utils/DummyExecutorFactory.h +++ b/tests/utils/DummyExecutorFactory.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -namespace faabric::scheduler { +namespace faabric::executor { class DummyExecutorFactory : public ExecutorFactory { diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index c6c69375d..6a4004f6e 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -61,7 +61,6 @@ using namespace faabric; CATCH_REGISTER_LISTENER(LogListener) namespace tests { -void cleanFaabric(); template void checkMessageMapEquality(T mapA, T mapB) diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 9c1f9b0d8..1af23f948 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include #include #include @@ -16,8 +18,6 @@ #include #include #include -#include -#include #include #include #include @@ -271,7 +271,7 @@ class ExecutorContextFixture public: ExecutorContextFixture() {} - ~ExecutorContextFixture() { faabric::scheduler::ExecutorContext::unset(); } + ~ExecutorContextFixture() { faabric::executor::ExecutorContext::unset(); } /** * Creates a batch request and sets up the associated context @@ -293,13 +293,13 @@ class ExecutorContextFixture */ void setUpContext(std::shared_ptr req) { - faabric::scheduler::ExecutorContext::set(nullptr, req, 0); + faabric::executor::ExecutorContext::set(nullptr, req, 0); } }; #define TEST_EXECUTOR_DEFAULT_MEMORY_SIZE (10 * faabric::util::HOST_PAGE_SIZE) -class TestExecutor final : public faabric::scheduler::Executor +class TestExecutor final : public faabric::executor::Executor { public: TestExecutor(faabric::Message& msg); @@ -324,10 +324,10 @@ class TestExecutor final : public faabric::scheduler::Executor std::shared_ptr reqOrig) override; }; -class TestExecutorFactory : public faabric::scheduler::ExecutorFactory +class TestExecutorFactory : public faabric::executor::ExecutorFactory { protected: - std::shared_ptr createExecutor( + std::shared_ptr createExecutor( faabric::Message& msg) override; }; @@ -399,9 +399,9 @@ class MpiBaseTestFixture , req(faabric::util::batchExecFactory(user, func, 1)) , msg(*req->mutable_messages(0)) { - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); + std::shared_ptr fac = + std::make_shared(); + faabric::executor::setExecutorFactory(fac); msg.set_mpiworldid(worldId); msg.set_mpiworldsize(worldSize); diff --git a/tests/utils/message_utils.cpp b/tests/utils/message_utils.cpp index 206c9e5aa..8879181ce 100644 --- a/tests/utils/message_utils.cpp +++ b/tests/utils/message_utils.cpp @@ -12,7 +12,6 @@ void checkMessageEquality(const faabric::Message& msgA, REQUIRE(msgA.user() == msgB.user()); REQUIRE(msgA.function() == msgB.function()); REQUIRE(msgA.executedhost() == msgB.executedhost()); - REQUIRE(msgA.finishtimestamp() == msgB.finishtimestamp()); REQUIRE(msgA.timestamp() == msgB.timestamp()); REQUIRE(msgA.snapshotkey() == msgB.snapshotkey()); diff --git a/tests/utils/system_utils.cpp b/tests/utils/system_utils.cpp deleted file mode 100644 index 665fdf5d1..000000000 --- a/tests/utils/system_utils.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include "DummyExecutorFactory.h" -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace tests { -void cleanFaabric() -{ - faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); - - // Clear out Redis - redis::Redis::getState().flushAll(); - redis::Redis::getQueue().flushAll(); - - // Clear out any cached state, do so for both modes - std::string& originalStateMode = conf.stateMode; - conf.stateMode = "inmemory"; - state::getGlobalState().forceClearAll(true); - conf.stateMode = "redis"; - state::getGlobalState().forceClearAll(true); - conf.stateMode = originalStateMode; - - // Reset scheduler - scheduler::Scheduler& sch = scheduler::getScheduler(); - sch.shutdown(); - sch.addHostToGlobalSet(); - - // Give scheduler enough resources - faabric::HostResources res; - res.set_slots(10); - sch.setThisHostResources(res); - - // Clear snapshots - faabric::snapshot::getSnapshotRegistry().clear(); - - // Reset system config - conf.reset(); - - // Set test mode back on and mock mode off - faabric::util::setTestMode(true); - faabric::util::setMockMode(false); - faabric::scheduler::clearMockRequests(); - faabric::snapshot::clearMockSnapshotRequests(); - - // Set up dummy executor factory - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); - - // Clear out MPI worlds - faabric::mpi::MpiWorldRegistry& mpiRegistry = - faabric::mpi::getMpiWorldRegistry(); - mpiRegistry.clear(); -} -}