Skip to content

Commit

Permalink
planner: set finished flag when app is not in-flight anymore (#362)
Browse files Browse the repository at this point in the history
* planner: set finished flag when app is not in-flight anymore

* gh: bump code version

* tests: remove obsolete test case

* planner: propagate the single host hint (not set it in terms of the decision)

* proto: add singleHostHint flag to BER

* executor: simplify dirty-tracking logic in executor with new fork-join model

* scheduler: override cpu count when setting the local resoruces

* tests: fix executeThreads call

* tests: set singlehosthint flag

* tests(tsan): remove data race in executor
  • Loading branch information
csegarragonz authored Jan 16, 2024
1 parent 20bf6a1 commit 7fba9d8
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 124 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.12.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.12.0
FAABRIC_VERSION=0.13.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.13.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -33,7 +33,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -47,7 +47,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -70,7 +70,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -110,7 +110,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -164,7 +164,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.12.0
image: faasm.azurecr.io/faabric:0.13.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.12.0
0.13.0
9 changes: 8 additions & 1 deletion include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ class Executor

std::set<unsigned int> getChainedMessageIds();

protected:
// This method merges all the thread-local dirty regions and returns a
// set of diffs. It must be called once per executor, once all other
// threads in the local batch have finished executing
std::vector<faabric::util::SnapshotDiff> mergeDirtyRegions(
const Message& msg,
const std::vector<char>& extraDirtyPages = {});

virtual void setMemorySize(size_t newSize);

protected:
virtual size_t getMaxMemorySize();

faabric::Message boundMessage;
Expand Down
17 changes: 11 additions & 6 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,9 @@ std::shared_ptr<faabric::BatchExecuteRequestStatus> Planner::getBatchResults(
for (auto msgResultPair : state.appResults.at(appId)) {
*berStatus->add_messageresults() = *(msgResultPair.second);
}

// Set the finished condition
berStatus->set_finished(!state.inFlightReqs.contains(appId));
}

return berStatus;
Expand Down Expand Up @@ -746,6 +749,7 @@ void Planner::dispatchSchedulingDecision(
hostRequests;

assert(req->messages_size() == decision->hosts.size());
bool isSingleHost = decision->isSingleHost();

// First we build all the BatchExecuteRequests for all the different hosts.
// We need to keep a map as the hosts may not be contiguous in the decision
Expand All @@ -765,18 +769,16 @@ void Planner::dispatchSchedulingDecision(
hostRequests[thisHost]->set_type(req->type());
hostRequests[thisHost]->set_subtype(req->subtype());
hostRequests[thisHost]->set_contextdata(req->contextdata());

if (decision->isSingleHost()) {
hostRequests[thisHost]->set_singlehost(true);
}
hostRequests[thisHost]->set_singlehost(isSingleHost);
// Propagate the single host hint
hostRequests[thisHost]->set_singlehosthint(req->singlehosthint());
}

*hostRequests[thisHost]->add_messages() = msg;
}

bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
bool isSingleHost = req->singlehost();
if (isSingleHost && !decision->isSingleHost()) {
if (!isSingleHost && req->singlehosthint()) {
SPDLOG_ERROR(
"User provided single-host hint in BER, but decision is not!");
}
Expand All @@ -789,6 +791,9 @@ void Planner::dispatchSchedulingDecision(

// In a THREADS request, before sending an execution request we need to
// push the main (caller) thread snapshot to all non-main hosts
// FIXME: ideally, we would do this from the caller thread, once we
// know the scheduling decision and all other threads would be awaiting
// for the snapshot
if (isThreads && !isSingleHost) {
auto snapshotKey =
faabric::util::getMainThreadSnapshotKey(hostReq->messages(0));
Expand Down
8 changes: 5 additions & 3 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions(
}

// To optimise for single-host shared memory, we can skip sending the
// snapshot to the planner by setting the singlehost flag
if (!req->singlehost()) {
// snapshot to the planner by setting the single host hint
// FIXME(async-snaps): ideally, snapshots would be synchornised
// _after_ the scheduling decision is made
if (!req->singlehosthint()) {
snapshotKey = faabric::util::getMainThreadSnapshotKey(firstMsg);
}
} else {
// In a single-host setting we can skip sending the snapshots to the
// planner
if (!req->singlehost()) {
if (!req->singlehosthint()) {
snapshotKey = req->messages(0).snapshotkey();
}
}
Expand Down
7 changes: 0 additions & 7 deletions src/planner/PlannerEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,6 @@ void PlannerEndpointHandler::onRequest(

// Prepare the response
response.result(beast::http::status::ok);
// Work-out if it has finished using user-provided flags
if (faabric::util::getNumFinishedMessagesInBatch(actualBerStatus) ==
berStatus.expectednummessages()) {
actualBerStatus->set_finished(true);
} else {
actualBerStatus->set_finished(false);
}
response.body() = faabric::util::messageToJson(*actualBerStatus);

return ctx.sendFunction(std::move(response));
Expand Down
4 changes: 4 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ message BatchExecuteRequest {
// Flag set by the scheduler when this batch is all executing on a single
// host
bool singleHost = 10;

// Hint set by the user to hint that this execution should all be in a
// single host
bool singleHostHint = 11;
}

message BatchExecuteRequestStatus {
Expand Down
155 changes: 75 additions & 80 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ Executor::~Executor()
}
}

// TODO(thread-opt): get rid of this method here and move to
// PlannerClient::callFunctions()
// TODO(rm-executeThreads): get rid of this method here
std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
std::shared_ptr<faabric::BatchExecuteRequest> req,
const std::vector<faabric::util::SnapshotMergeRegion>& mergeRegions)
Expand Down Expand Up @@ -172,6 +171,10 @@ std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(

// Perform snapshot updates if not on single host
if (!isSingleHost) {
// Add the diffs corresponding to this executor
auto diffs = mergeDirtyRegions(msg);
snap->queueDiffs(diffs);

// Write queued changes to snapshot
int nWritten = snap->writeQueuedDiffs();

Expand Down Expand Up @@ -211,10 +214,9 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Update the last-executed time for this executor
lastExec = faabric::util::startTimer();

faabric::Message& firstMsg = req->mutable_messages()->at(0);
auto& firstMsg = req->mutable_messages()->at(0);
std::string thisHost = faabric::util::getSystemConfig().endpointHost;

bool isMaster = firstMsg.mainhost() == thisHost;
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
bool isSingleHost = req->singlehost();
std::string snapshotKey = firstMsg.snapshotkey();
Expand Down Expand Up @@ -269,43 +271,18 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
for (int msgIdx : msgIdxs) {
const faabric::Message& msg = req->messages().at(msgIdx);

int threadPoolIdx = -1;
if (availablePoolThreads.empty()) {
// Here all threads are still executing, so we have to overload.
// If any tasks are blocking we risk a deadlock, and can no
// longer guarantee the application will finish. In general if
// we're on the main host and this is a thread, we should
// avoid the zeroth and first pool threads as they are likely to
// be the main thread and the zeroth in the communication group,
// so will be blocking.
if (isThreads && isMaster) {
if (threadPoolSize <= 2) {
SPDLOG_ERROR("Insufficient pool threads ({}) to "
"overload {} idx {}",
threadPoolSize,
funcStr,
msg.appidx());

throw std::runtime_error("Insufficient pool threads");
}

threadPoolIdx = (msg.appidx() % (threadPoolSize - 2)) + 2;
} else {
threadPoolIdx = msg.appidx() % threadPoolSize;
}
SPDLOG_ERROR("No available thread pool threads (size: {})",
threadPoolSize);
throw std::runtime_error("No available thread pool threads!");
}

SPDLOG_DEBUG("Overloaded app index {} to thread {}",
msg.appidx(),
threadPoolIdx);
} else {
// Take next from those that are available
threadPoolIdx = *availablePoolThreads.begin();
availablePoolThreads.erase(threadPoolIdx);
// Take next from those that are available
int threadPoolIdx = *availablePoolThreads.begin();
availablePoolThreads.erase(threadPoolIdx);

SPDLOG_TRACE("Assigned app index {} to thread {}",
msg.appidx(),
threadPoolIdx);
}
SPDLOG_TRACE(
"Assigned app index {} to thread {}", msg.appidx(), threadPoolIdx);

// Enqueue the task
threadTaskQueues[threadPoolIdx].enqueue(ExecutorTask(msgIdx, req));
Expand Down Expand Up @@ -524,54 +501,24 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
threadPoolIdx,
oldTaskCount - 1);

// Handle last-in-batch dirty tracking
std::string mainThreadSnapKey =
faabric::util::getMainThreadSnapshotKey(msg);
// Handle last-in-batch dirty tracking if we are last thread in a
// not-single-host execution, and are not on the main host (on the
// main host we still have the zero-th thread executing)
auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg);
std::vector<faabric::util::SnapshotDiff> diffs;
if (isLastThreadInBatch && doDirtyTracking) {
// Stop non-thread-local tracking as we're the last in the batch
std::span<uint8_t> memView = getMemoryView();
tracker->stopTracking(memView);

// Merge all dirty regions
{
faabric::util::FullLock lock(threadExecutionMutex);

// Merge together regions from all threads
faabric::util::mergeManyDirtyPages(dirtyRegions,
threadLocalDirtyRegions);

// Clear thread-local dirty regions, no longer needed
threadLocalDirtyRegions.clear();

// Merge the globally tracked regions
std::vector<char> globalDirtyRegions =
tracker->getDirtyPages(memView);
faabric::util::mergeDirtyPages(dirtyRegions,
globalDirtyRegions);
}

// Fill snapshot gaps with overwrite regions first
auto snap = reg.getSnapshot(mainThreadSnapKey);
snap->fillGapsWithBytewiseRegions();

// Compare snapshot with all dirty regions for this executor
{
// Do the diffing
faabric::util::FullLock lock(threadExecutionMutex);
diffs = snap->diffWithDirtyRegions(memView, dirtyRegions);
dirtyRegions.clear();
}

// If last in batch on this host, clear the merge regions (only
// needed for doing the diffing on the current host)
SPDLOG_DEBUG("Clearing merge regions for {}", mainThreadSnapKey);
snap->clearMergeRegions();
// FIXME: thread 0 locally is not part of this batch, but is still
// in the same executor
bool isRemoteThread =
task.req->messages(0).mainhost() != conf.endpointHost;
if (isLastThreadInBatch && doDirtyTracking && isRemoteThread) {
diffs = mergeDirtyRegions(msg);
}

// If this is not a threads request and last in its batch, it may be
// the main function (thread) in a threaded application, in which case
// we want to stop any tracking and delete the main thread snapshot
// TODO(rm-executeThreads): this should disappear when pthreads do
// not call executeThreads anymore
if (!isThreads && isLastThreadInExecutor) {
// Stop tracking memory
std::span<uint8_t> memView = getMemoryView();
Expand Down Expand Up @@ -732,6 +679,54 @@ const faabric::Message& Executor::getChainedMessage(int messageId)
return *(it->second);
}

std::vector<faabric::util::SnapshotDiff> Executor::mergeDirtyRegions(
const Message& msg,
const std::vector<char>& extraDirtyPages)
{
std::vector<faabric::util::SnapshotDiff> diffs;
auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg);

// Stop non-thread-local tracking as we're the last in the batch
std::span<uint8_t> memView = getMemoryView();
tracker->stopTracking(memView);

// Merge all dirty regions
{
faabric::util::FullLock lock(threadExecutionMutex);

// Merge together regions from all threads
faabric::util::mergeManyDirtyPages(dirtyRegions,
threadLocalDirtyRegions);

// Clear thread-local dirty regions, no longer needed
threadLocalDirtyRegions.clear();

// Merge the globally tracked regions
std::vector<char> globalDirtyRegions = tracker->getDirtyPages(memView);
faabric::util::mergeDirtyPages(dirtyRegions, globalDirtyRegions);
}

// Fill snapshot gaps with overwrite regions first
auto snap = reg.getSnapshot(mainThreadSnapKey);
snap->fillGapsWithBytewiseRegions();

// Compare snapshot with all dirty regions for this executor
{
// Do the diffing
faabric::util::FullLock lock(threadExecutionMutex);
diffs = snap->diffWithDirtyRegions(memView, dirtyRegions);
dirtyRegions.clear();
}

// If last in batch on this host, clear the merge regions (only
// needed for doing the diffing on the current host)
SPDLOG_DEBUG("Clearing merge regions for {}", mainThreadSnapKey);
snap->clearMergeRegions();

// FIXME: is it very expensive to return these diffs?
return diffs;
}

std::set<unsigned int> Executor::getChainedMessageIds()
{
faabric::util::UniqueLock lock(threadsMutex);
Expand Down
1 change: 1 addition & 0 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ size_t Scheduler::getCachedMessageCount()
void Scheduler::setThisHostResources(faabric::HostResources& res)
{
addHostToGlobalSet(thisHost, std::make_shared<faabric::HostResources>(res));
conf.overrideCpuCount = res.slots();
}

// --------------------------------------------
Expand Down
Loading

0 comments on commit 7fba9d8

Please sign in to comment.