diff --git a/.env b/.env index 0ad33f079..853a712cc 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.12.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.12.0 +FAABRIC_VERSION=0.13.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.13.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7feb5c043..97a278381 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -33,7 +33,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -47,7 +47,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -70,7 +70,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -110,7 +110,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -164,7 +164,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.12.0 + image: faasm.azurecr.io/faabric:0.13.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} diff --git a/VERSION b/VERSION index ac454c6a1..54d1a4f2a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.12.0 +0.13.0 diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 97fdcd744..ddccb4e0d 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -102,9 +102,16 @@ class Executor std::set getChainedMessageIds(); - protected: + // This method merges all the thread-local dirty regions and returns a + // set of diffs. It must be called once per executor, once all other + // threads in the local batch have finished executing + std::vector mergeDirtyRegions( + const Message& msg, + const std::vector& extraDirtyPages = {}); + virtual void setMemorySize(size_t newSize); + protected: virtual size_t getMaxMemorySize(); faabric::Message boundMessage; diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index d3e131928..95c427460 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -482,6 +482,9 @@ std::shared_ptr Planner::getBatchResults( for (auto msgResultPair : state.appResults.at(appId)) { *berStatus->add_messageresults() = *(msgResultPair.second); } + + // Set the finished condition + berStatus->set_finished(!state.inFlightReqs.contains(appId)); } return berStatus; @@ -746,6 +749,7 @@ void Planner::dispatchSchedulingDecision( hostRequests; assert(req->messages_size() == decision->hosts.size()); + bool isSingleHost = decision->isSingleHost(); // First we build all the BatchExecuteRequests for all the different hosts. // We need to keep a map as the hosts may not be contiguous in the decision @@ -765,18 +769,16 @@ void Planner::dispatchSchedulingDecision( hostRequests[thisHost]->set_type(req->type()); hostRequests[thisHost]->set_subtype(req->subtype()); hostRequests[thisHost]->set_contextdata(req->contextdata()); - - if (decision->isSingleHost()) { - hostRequests[thisHost]->set_singlehost(true); - } + hostRequests[thisHost]->set_singlehost(isSingleHost); + // Propagate the single host hint + hostRequests[thisHost]->set_singlehosthint(req->singlehosthint()); } *hostRequests[thisHost]->add_messages() = msg; } bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - bool isSingleHost = req->singlehost(); - if (isSingleHost && !decision->isSingleHost()) { + if (!isSingleHost && req->singlehosthint()) { SPDLOG_ERROR( "User provided single-host hint in BER, but decision is not!"); } @@ -789,6 +791,9 @@ void Planner::dispatchSchedulingDecision( // In a THREADS request, before sending an execution request we need to // push the main (caller) thread snapshot to all non-main hosts + // FIXME: ideally, we would do this from the caller thread, once we + // know the scheduling decision and all other threads would be awaiting + // for the snapshot if (isThreads && !isSingleHost) { auto snapshotKey = faabric::util::getMainThreadSnapshotKey(hostReq->messages(0)); diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index e8d32e4a1..4300dc3f8 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -315,14 +315,16 @@ faabric::batch_scheduler::SchedulingDecision PlannerClient::callFunctions( } // To optimise for single-host shared memory, we can skip sending the - // snapshot to the planner by setting the singlehost flag - if (!req->singlehost()) { + // snapshot to the planner by setting the single host hint + // FIXME(async-snaps): ideally, snapshots would be synchornised + // _after_ the scheduling decision is made + if (!req->singlehosthint()) { snapshotKey = faabric::util::getMainThreadSnapshotKey(firstMsg); } } else { // In a single-host setting we can skip sending the snapshots to the // planner - if (!req->singlehost()) { + if (!req->singlehosthint()) { snapshotKey = req->messages(0).snapshotkey(); } } diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index 9d622686a..a31fb92b1 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -262,13 +262,6 @@ void PlannerEndpointHandler::onRequest( // Prepare the response response.result(beast::http::status::ok); - // Work-out if it has finished using user-provided flags - if (faabric::util::getNumFinishedMessagesInBatch(actualBerStatus) == - berStatus.expectednummessages()) { - actualBerStatus->set_finished(true); - } else { - actualBerStatus->set_finished(false); - } response.body() = faabric::util::messageToJson(*actualBerStatus); return ctx.sendFunction(std::move(response)); diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index 5bbe6fab6..001e642cb 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -50,6 +50,10 @@ message BatchExecuteRequest { // Flag set by the scheduler when this batch is all executing on a single // host bool singleHost = 10; + + // Hint set by the user to hint that this execution should all be in a + // single host + bool singleHostHint = 11; } message BatchExecuteRequestStatus { diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index e3f5edd77..f01e14f34 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -110,8 +110,7 @@ Executor::~Executor() } } -// TODO(thread-opt): get rid of this method here and move to -// PlannerClient::callFunctions() +// TODO(rm-executeThreads): get rid of this method here std::vector> Executor::executeThreads( std::shared_ptr req, const std::vector& mergeRegions) @@ -172,6 +171,10 @@ std::vector> Executor::executeThreads( // Perform snapshot updates if not on single host if (!isSingleHost) { + // Add the diffs corresponding to this executor + auto diffs = mergeDirtyRegions(msg); + snap->queueDiffs(diffs); + // Write queued changes to snapshot int nWritten = snap->writeQueuedDiffs(); @@ -211,10 +214,9 @@ void Executor::executeTasks(std::vector msgIdxs, // Update the last-executed time for this executor lastExec = faabric::util::startTimer(); - faabric::Message& firstMsg = req->mutable_messages()->at(0); + auto& firstMsg = req->mutable_messages()->at(0); std::string thisHost = faabric::util::getSystemConfig().endpointHost; - bool isMaster = firstMsg.mainhost() == thisHost; bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; bool isSingleHost = req->singlehost(); std::string snapshotKey = firstMsg.snapshotkey(); @@ -269,43 +271,18 @@ void Executor::executeTasks(std::vector msgIdxs, for (int msgIdx : msgIdxs) { const faabric::Message& msg = req->messages().at(msgIdx); - int threadPoolIdx = -1; if (availablePoolThreads.empty()) { - // Here all threads are still executing, so we have to overload. - // If any tasks are blocking we risk a deadlock, and can no - // longer guarantee the application will finish. In general if - // we're on the main host and this is a thread, we should - // avoid the zeroth and first pool threads as they are likely to - // be the main thread and the zeroth in the communication group, - // so will be blocking. - if (isThreads && isMaster) { - if (threadPoolSize <= 2) { - SPDLOG_ERROR("Insufficient pool threads ({}) to " - "overload {} idx {}", - threadPoolSize, - funcStr, - msg.appidx()); - - throw std::runtime_error("Insufficient pool threads"); - } - - threadPoolIdx = (msg.appidx() % (threadPoolSize - 2)) + 2; - } else { - threadPoolIdx = msg.appidx() % threadPoolSize; - } + SPDLOG_ERROR("No available thread pool threads (size: {})", + threadPoolSize); + throw std::runtime_error("No available thread pool threads!"); + } - SPDLOG_DEBUG("Overloaded app index {} to thread {}", - msg.appidx(), - threadPoolIdx); - } else { - // Take next from those that are available - threadPoolIdx = *availablePoolThreads.begin(); - availablePoolThreads.erase(threadPoolIdx); + // Take next from those that are available + int threadPoolIdx = *availablePoolThreads.begin(); + availablePoolThreads.erase(threadPoolIdx); - SPDLOG_TRACE("Assigned app index {} to thread {}", - msg.appidx(), - threadPoolIdx); - } + SPDLOG_TRACE( + "Assigned app index {} to thread {}", msg.appidx(), threadPoolIdx); // Enqueue the task threadTaskQueues[threadPoolIdx].enqueue(ExecutorTask(msgIdx, req)); @@ -524,54 +501,24 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) threadPoolIdx, oldTaskCount - 1); - // Handle last-in-batch dirty tracking - std::string mainThreadSnapKey = - faabric::util::getMainThreadSnapshotKey(msg); + // Handle last-in-batch dirty tracking if we are last thread in a + // not-single-host execution, and are not on the main host (on the + // main host we still have the zero-th thread executing) + auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg); std::vector diffs; - if (isLastThreadInBatch && doDirtyTracking) { - // Stop non-thread-local tracking as we're the last in the batch - std::span memView = getMemoryView(); - tracker->stopTracking(memView); - - // Merge all dirty regions - { - faabric::util::FullLock lock(threadExecutionMutex); - - // Merge together regions from all threads - faabric::util::mergeManyDirtyPages(dirtyRegions, - threadLocalDirtyRegions); - - // Clear thread-local dirty regions, no longer needed - threadLocalDirtyRegions.clear(); - - // Merge the globally tracked regions - std::vector globalDirtyRegions = - tracker->getDirtyPages(memView); - faabric::util::mergeDirtyPages(dirtyRegions, - globalDirtyRegions); - } - - // Fill snapshot gaps with overwrite regions first - auto snap = reg.getSnapshot(mainThreadSnapKey); - snap->fillGapsWithBytewiseRegions(); - - // Compare snapshot with all dirty regions for this executor - { - // Do the diffing - faabric::util::FullLock lock(threadExecutionMutex); - diffs = snap->diffWithDirtyRegions(memView, dirtyRegions); - dirtyRegions.clear(); - } - - // If last in batch on this host, clear the merge regions (only - // needed for doing the diffing on the current host) - SPDLOG_DEBUG("Clearing merge regions for {}", mainThreadSnapKey); - snap->clearMergeRegions(); + // FIXME: thread 0 locally is not part of this batch, but is still + // in the same executor + bool isRemoteThread = + task.req->messages(0).mainhost() != conf.endpointHost; + if (isLastThreadInBatch && doDirtyTracking && isRemoteThread) { + diffs = mergeDirtyRegions(msg); } // If this is not a threads request and last in its batch, it may be // the main function (thread) in a threaded application, in which case // we want to stop any tracking and delete the main thread snapshot + // TODO(rm-executeThreads): this should disappear when pthreads do + // not call executeThreads anymore if (!isThreads && isLastThreadInExecutor) { // Stop tracking memory std::span memView = getMemoryView(); @@ -732,6 +679,54 @@ const faabric::Message& Executor::getChainedMessage(int messageId) return *(it->second); } +std::vector Executor::mergeDirtyRegions( + const Message& msg, + const std::vector& extraDirtyPages) +{ + std::vector diffs; + auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg); + + // Stop non-thread-local tracking as we're the last in the batch + std::span memView = getMemoryView(); + tracker->stopTracking(memView); + + // Merge all dirty regions + { + faabric::util::FullLock lock(threadExecutionMutex); + + // Merge together regions from all threads + faabric::util::mergeManyDirtyPages(dirtyRegions, + threadLocalDirtyRegions); + + // Clear thread-local dirty regions, no longer needed + threadLocalDirtyRegions.clear(); + + // Merge the globally tracked regions + std::vector globalDirtyRegions = tracker->getDirtyPages(memView); + faabric::util::mergeDirtyPages(dirtyRegions, globalDirtyRegions); + } + + // Fill snapshot gaps with overwrite regions first + auto snap = reg.getSnapshot(mainThreadSnapKey); + snap->fillGapsWithBytewiseRegions(); + + // Compare snapshot with all dirty regions for this executor + { + // Do the diffing + faabric::util::FullLock lock(threadExecutionMutex); + diffs = snap->diffWithDirtyRegions(memView, dirtyRegions); + dirtyRegions.clear(); + } + + // If last in batch on this host, clear the merge regions (only + // needed for doing the diffing on the current host) + SPDLOG_DEBUG("Clearing merge regions for {}", mainThreadSnapKey); + snap->clearMergeRegions(); + + // FIXME: is it very expensive to return these diffs? + return diffs; +} + std::set Executor::getChainedMessageIds() { faabric::util::UniqueLock lock(threadsMutex); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index b4e8ddcdf..e070d15d0 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -458,6 +458,7 @@ size_t Scheduler::getCachedMessageCount() void Scheduler::setThisHostResources(faabric::HostResources& res) { addHostToGlobalSet(thisHost, std::make_shared(res)); + conf.overrideCpuCount = res.slots(); } // -------------------------------------------- diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index e64a4a177..196fe1938 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -544,21 +544,6 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, msg.set_payloadjson(faabric::util::messageToJson(*otherBerStatus)); } - // If the request JSON payload contains a BER status for an in-flight BER, - // the request will succeed. Depending on the messages we tell the planner - // we are expecting, it will either succeed or not - SECTION("Success, but not finished") - { - expectedReturnCode = beast::http::status::ok; - auto expectedBerStatus = faabric::util::batchExecStatusFactory(appId); - expectedBerStatus->set_finished(false); - *expectedBerStatus->add_messageresults() = msgResult; - expectedResponseBody = faabric::util::messageToJson(*expectedBerStatus); - // Change the expected number of messages - berStatus->set_expectednummessages(2); - msg.set_payloadjson(faabric::util::messageToJson(*berStatus)); - } - // Post the EXECUTE_BATCH_STATUS request: msgJsonStr = faabric::util::messageToJson(msg); result = doPost(msgJsonStr); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index bc518ea2f..06853251a 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -1329,7 +1329,7 @@ TEST_CASE_METHOD(TestExecutorFixture, faabric::util::batchExecFactory("dummy", "blah", nThreads); req->set_type(faabric::BatchExecuteRequest::THREADS); // Set single-host to avoid any snapshot sending - req->set_singlehost(true); + req->set_singlehosthint(true); // Prepare executor auto exec = std::make_shared(*req->mutable_messages(0)); diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index d83a3888b..fc2779be6 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -241,7 +241,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, reqOne->set_contextdata(expectedContextData); // Set the singlehost flag to avoid sending snapshots to the planner - reqOne->set_singlehost(true); + reqOne->set_singlehosthint(true); auto actualDecisionOne = plannerCli.callFunctions(reqOne); @@ -297,7 +297,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, reqTwo->set_type(execMode); // Set the singlehost flag to avoid sending snapshots to the planner - reqTwo->set_singlehost(true); + reqTwo->set_singlehosthint(true); // Schedule the functions auto actualDecisionTwo = plannerCli.callFunctions(reqTwo);