From c1f65090d4947e28d654e1007024e0f121841d52 Mon Sep 17 00:00:00 2001 From: Carlos Date: Tue, 19 Dec 2023 12:21:43 +0000 Subject: [PATCH] batch-scheduler: fix isSingleHost check (#359) * batch-scheduler: fix isSingleHost check * tests: add regression test * gh: bump minor code version * dist-tests: fix dist-test that was relying on an incorrect notion of being singleHost --- .env | 4 +- .github/workflows/tests.yml | 14 +++--- VERSION | 2 +- src/batch-scheduler/SchedulingDecision.cpp | 9 ++-- src/planner/Planner.cpp | 2 +- src/scheduler/Executor.cpp | 2 + tests/dist/dist_test_fixtures.h | 38 ++++++++-------- tests/dist/scheduler/functions.cpp | 29 +++++++------ tests/dist/scheduler/test_snapshots.cpp | 43 +++++++++++-------- .../test_scheduling_decisions.cpp | 2 +- 10 files changed, 80 insertions(+), 65 deletions(-) diff --git a/.env b/.env index 51ab05bbf..0ad33f079 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.11.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.11.0 +FAABRIC_VERSION=0.12.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.12.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b689ad097..9fb4b0061 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -17,13 +17,13 @@ jobs: runs-on: ubuntu-latest steps: - name: Cancel Workflow Action - uses: styfle/cancel-workflow-action@0.11.0 + uses: styfle/cancel-workflow-action@0.12.0 conan-cache: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -36,7 +36,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -50,7 +50,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -73,7 +73,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -113,7 +113,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -167,7 +167,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.11.0 + image: faasm.azurecr.io/faabric:0.12.0 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} diff --git a/VERSION b/VERSION index d9df1bbc0..ac454c6a1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.11.0 +0.12.0 diff --git a/src/batch-scheduler/SchedulingDecision.cpp b/src/batch-scheduler/SchedulingDecision.cpp index 342451db8..5439068f6 100644 --- a/src/batch-scheduler/SchedulingDecision.cpp +++ b/src/batch-scheduler/SchedulingDecision.cpp @@ -11,16 +11,17 @@ SchedulingDecision::SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn) bool SchedulingDecision::isSingleHost() { + auto& conf = faabric::util::getSystemConfig(); + // Always return false if single-host optimisations are switched off - faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + // TODO(thread-opt): remove this flag if (conf.noSingleHostOptimisations == 1) { return false; } std::string thisHost = conf.endpointHost; - return std::all_of(hosts.begin(), hosts.end(), [&](const std::string& s) { - return s == thisHost; - }); + std::set hostSet(hosts.begin(), hosts.end()); + return hostSet.size() == 1; } void SchedulingDecision::addMessage(const std::string& host, diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index afa739722..d3e131928 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -612,7 +612,7 @@ Planner::callBatch(std::shared_ptr req) decision->print(); #endif - // 1. For a scale change request, we only need to update the hosts + // 1. For a new request, we only need to update the hosts // with the new messages being scheduled for (int i = 0; i < decision->hosts.size(); i++) { claimHostSlots(state.hostMap.at(decision->hosts.at(i))); diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index ef9720269..4ce659f9f 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -110,6 +110,8 @@ Executor::~Executor() } } +// TODO(thread-opt): get rid of this method here and move to +// PlannerClient::callFunctions() std::vector> Executor::executeThreads( std::shared_ptr req, const std::vector& mergeRegions) diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index 1fd26597f..6f2083694 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -29,12 +29,33 @@ class DistTestsFixture sch.addHostToGlobalSet(getWorkerIP()); sch.removeHostFromGlobalSet(LOCALHOST); + // Give some resources to each host + updateLocalSlots(4, 0); + updateRemoteSlots(4, 0); + // Set up executor std::shared_ptr fac = std::make_shared(); faabric::scheduler::setExecutorFactory(fac); } + void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0) + { + faabric::HostResources localRes; + localRes.set_slots(newLocalSlots); + localRes.set_usedslots(newUsedLocalSlots); + sch.setThisHostResources(localRes); + } + + void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0) + { + faabric::HostResources remoteRes; + remoteRes.set_slots(newRemoteSlots); + remoteRes.set_usedslots(newRemoteUsedSlots); + sch.addHostToGlobalSet(workerIP, + std::make_shared(remoteRes)); + } + ~DistTestsFixture() = default; std::string getWorkerIP() @@ -65,23 +86,6 @@ class MpiDistTestsFixture : public DistTestsFixture int worldSize = 4; bool origIsMsgOrderingOn; - void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0) - { - faabric::HostResources localRes; - localRes.set_slots(newLocalSlots); - localRes.set_usedslots(newUsedLocalSlots); - sch.setThisHostResources(localRes); - } - - void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0) - { - faabric::HostResources remoteRes; - remoteRes.set_slots(newRemoteSlots); - remoteRes.set_usedslots(newRemoteUsedSlots); - sch.addHostToGlobalSet(workerIP, - std::make_shared(remoteRes)); - } - void setLocalSlots(int numLocalSlots, int worldSizeIn = 0) { if (worldSizeIn > 0) { diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index aac7c7e2f..8a708b515 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -64,19 +64,22 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec, { faabric::Message& msg = req->mutable_messages()->at(msgIdx); - std::string msgInput = msg.inputdata(); - std::vector inputBytes = faabric::util::stringToBytes(msgInput); - std::vector otherData = { 1, 2, 3, 4 }; - - // Modify the executor's memory - int offsetA = 10; - int offsetB = HOST_PAGE_SIZE + 10; - std::memcpy(exec->getDummyMemory().data() + offsetA, - otherData.data(), - otherData.size()); - std::memcpy(exec->getDummyMemory().data() + offsetB, - inputBytes.data(), - inputBytes.size()); + if (msg.groupidx() > 0) { + std::string msgInput = msg.inputdata(); + std::vector inputBytes = + faabric::util::stringToBytes(msgInput); + std::vector otherData = { 1, 2, 3, 4 }; + + // Modify the executor's memory + int offsetA = 10; + int offsetB = HOST_PAGE_SIZE + 10; + std::memcpy(exec->getDummyMemory().data() + offsetA, + otherData.data(), + otherData.size()); + std::memcpy(exec->getDummyMemory().data() + offsetB, + inputBytes.data(), + inputBytes.size()); + } return 123; } diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index d44a81aaf..b06f767db 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -27,38 +27,43 @@ TEST_CASE_METHOD(DistTestsFixture, std::string function = "fake-diffs"; std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; - // Set up the message + // Set up the messages std::shared_ptr req = - faabric::util::batchExecFactory(user, function, 1); + faabric::util::batchExecFactory(user, function, 2); req->set_type(faabric::BatchExecuteRequest::THREADS); - // Set up some input data - faabric::Message& msg = req->mutable_messages()->at(0); - msg.set_inputdata(inputData.data(), inputData.size()); + for (int i = 0; i < req->messages_size(); i++) { + req->mutable_messages(i)->set_groupidx(i); + req->mutable_messages(i)->set_inputdata(inputData.data(), + inputData.size()); + } // Set up the main thread snapshot auto& reg = faabric::snapshot::getSnapshotRegistry(); size_t snapSize = DIST_TEST_EXECUTOR_MEMORY_SIZE; - std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg); + std::string snapshotKey = + faabric::util::getMainThreadSnapshotKey(req->messages(0)); auto snap = std::make_shared(snapSize); reg.registerSnapshot(snapshotKey, snap); - // Force the function to be executed remotely - faabric::HostResources res; - res.set_usedslots(1); - res.set_slots(1); - sch.setThisHostResources(res); - res.set_usedslots(0); - res.set_slots(4); - sch.addHostToGlobalSet(getWorkerIP(), std::make_shared(res)); + // Force the execution to span multiple hosts so that it triggers dirty + // tracking + std::vector expectedHosts = { getMasterIP(), getWorkerIP() }; + auto preloadDec = std::make_shared( + req->appid(), req->groupid()); + for (int i = 0; i < req->messages_size(); i++) { + preloadDec->addMessage(expectedHosts.at(i), 0, 0, i); + } - std::vector expectedHosts = { getWorkerIP() }; + plannerCli.preloadSchedulingDecision(preloadDec); auto decision = plannerCli.callFunctions(req); - std::vector executedHosts = decision.hosts; - REQUIRE(expectedHosts == executedHosts); + REQUIRE(expectedHosts == decision.hosts); - auto msgResult = plannerCli.getMessageResult(req->appid(), msg.id(), 500); - REQUIRE(msgResult.returnvalue() == 123); + for (const auto& msg : req->messages()) { + auto msgResult = + plannerCli.getMessageResult(req->appid(), msg.id(), 500); + REQUIRE(msgResult.returnvalue() == 123); + } // Write the diffs and check they've been applied REQUIRE(snap->getQueuedDiffsCount() == 2); diff --git a/tests/test/batch-scheduler/test_scheduling_decisions.cpp b/tests/test/batch-scheduler/test_scheduling_decisions.cpp index a58ca52ca..710c590c8 100644 --- a/tests/test/batch-scheduler/test_scheduling_decisions.cpp +++ b/tests/test/batch-scheduler/test_scheduling_decisions.cpp @@ -30,7 +30,7 @@ TEST_CASE_METHOD(ConfFixture, "Test building scheduling decisions", "[util]") hostB = "hostA"; hostC = "hostA"; - expectSingleHost = false; + expectSingleHost = true; expectedUniqueHosts = { "hostA" }; }