Skip to content

Commit

Permalink
executor: move executor code to separate top-level directory (#365)
Browse files Browse the repository at this point in the history
* executor: move executor code to separate top-level directory

* tests: refactor tests to accomodate for change in source code structure

* dist-tests: fix server compilation
  • Loading branch information
csegarragonz authored Jan 16, 2024
1 parent 7fba9d8 commit 3483573
Show file tree
Hide file tree
Showing 45 changed files with 497 additions and 540 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ endfunction()

add_subdirectory(src/batch-scheduler)
add_subdirectory(src/endpoint)
add_subdirectory(src/executor)
add_subdirectory(src/flat)
add_subdirectory(src/mpi)
add_subdirectory(src/planner)
Expand All @@ -121,6 +122,7 @@ add_library(faabric
faabric.cpp
$<TARGET_OBJECTS:batch_scheduler_obj>
$<TARGET_OBJECTS:endpoint_obj>
$<TARGET_OBJECTS:executor_obj>
$<TARGET_OBJECTS:flat_obj>
$<TARGET_OBJECTS:mpi_obj>
$<TARGET_OBJECTS:planner_obj>
Expand Down
4 changes: 2 additions & 2 deletions examples/server.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <faabric/endpoint/FaabricEndpoint.h>
#include <faabric/executor/ExecutorFactory.h>
#include <faabric/runner/FaabricMain.h>
#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/util/logging.h>

using namespace faabric::scheduler;
using namespace faabric::executor;

class ExampleExecutor : public Executor
{
Expand Down
119 changes: 119 additions & 0 deletions include/faabric/executor/Executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <faabric/executor/ExecutorTask.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/snapshot/SnapshotRegistry.h>
#include <faabric/util/clock.h>
#include <faabric/util/exception.h>
#include <faabric/util/queue.h>
#include <faabric/util/snapshot.h>

namespace faabric::executor {

class ChainedCallException : public faabric::util::FaabricException
{
public:
explicit ChainedCallException(std::string message)
: FaabricException(std::move(message))
{}
};

class Executor
{
public:
std::string id;

explicit Executor(faabric::Message& msg);

// Must be marked virtual to permit proper calling of subclass destructors
virtual ~Executor();

void executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual void shutdown();

virtual void reset(faabric::Message& msg);

virtual int32_t executeTask(
int threadPoolIdx,
int msgIdx,
std::shared_ptr<faabric::BatchExecuteRequest> req);

bool tryClaim();

void claim();

void releaseClaim();

std::shared_ptr<faabric::util::SnapshotData> getMainThreadSnapshot(
faabric::Message& msg,
bool createIfNotExists = false);

long getMillisSinceLastExec();

virtual std::span<uint8_t> getMemoryView();

virtual void restore(const std::string& snapshotKey);

faabric::Message& getBoundMessage();

bool isExecuting();

bool isShutdown() { return _isShutdown; }

void addChainedMessage(const faabric::Message& msg);

const faabric::Message& getChainedMessage(int messageId);

std::set<unsigned int> getChainedMessageIds();

std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(
const Message& msg,
const std::vector<char>& extraDirtyPages = {});

// FIXME: what is the right visibility?
void setThreadResult(faabric::Message& msg,
int32_t returnValue,
const std::string& key,
const std::vector<faabric::util::SnapshotDiff>& diffs);

virtual void setMemorySize(size_t newSize);

protected:
virtual size_t getMaxMemorySize();

faabric::Message boundMessage;

faabric::snapshot::SnapshotRegistry& reg;

std::shared_ptr<faabric::util::DirtyTracker> tracker;

uint32_t threadPoolSize = 0;

std::map<int, std::shared_ptr<faabric::Message>> chainedMessages;

private:
// ---- Accounting ----
std::atomic<bool> claimed = false;
std::atomic<bool> _isShutdown = false;
std::atomic<int> batchCounter = 0;
std::atomic<int> threadBatchCounter = 0;
faabric::util::TimePoint lastExec;

// ---- Application threads ----
std::shared_mutex threadExecutionMutex;
std::vector<char> dirtyRegions;
std::vector<std::vector<char>> threadLocalDirtyRegions;
void deleteMainThreadSnapshot(const faabric::Message& msg);

// ---- Function execution thread pool ----
std::mutex threadsMutex;
std::vector<std::shared_ptr<std::jthread>> threadPoolThreads;
std::set<int> availablePoolThreads;

std::vector<faabric::util::Queue<ExecutorTask>> threadTaskQueues;

void threadPoolThread(std::stop_token st, int threadPoolIdx);
};
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#pragma once

#include <faabric/executor/Executor.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/exception.h>

namespace faabric::scheduler {
namespace faabric::executor {

class ExecutorContextException : public faabric::util::FaabricException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <faabric/scheduler/Scheduler.h>
#include <faabric/executor/Executor.h>

namespace faabric::scheduler {
namespace faabric::executor {

class ExecutorFactory
{
Expand Down
27 changes: 27 additions & 0 deletions include/faabric/executor/ExecutorTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <faabric/proto/faabric.pb.h>

namespace faabric::executor {

class ExecutorTask
{
public:
ExecutorTask() = default;

ExecutorTask(int messageIndexIn,
std::shared_ptr<BatchExecuteRequest> reqIn);

// Delete everything copy-related, default everything move-related
ExecutorTask(const ExecutorTask& other) = delete;

ExecutorTask& operator=(const ExecutorTask& other) = delete;

ExecutorTask(ExecutorTask&& other) = default;

ExecutorTask& operator=(ExecutorTask&& other) = default;

std::shared_ptr<BatchExecuteRequest> req;
int messageIndex = 0;
};
}
4 changes: 2 additions & 2 deletions include/faabric/runner/FaabricMain.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <faabric/scheduler/ExecutorFactory.h>
#include <faabric/executor/ExecutorFactory.h>
#include <faabric/scheduler/FunctionCallServer.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotServer.h>
Expand All @@ -12,7 +12,7 @@ namespace faabric::runner {
class FaabricMain
{
public:
FaabricMain(std::shared_ptr<faabric::scheduler::ExecutorFactory> fac);
FaabricMain(std::shared_ptr<faabric::executor::ExecutorFactory> fac);

void startBackground();

Expand Down
Loading

0 comments on commit 3483573

Please sign in to comment.