From 6b3063dc87c7fd8b4d3b7fa5599559831a66a23e Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Sat, 20 Apr 2024 16:54:37 +0000 Subject: [PATCH] planner: implement compaction planner policy --- .env | 4 +- .github/workflows/tests.yml | 12 +- VERSION | 2 +- .../faabric/batch-scheduler/BatchScheduler.h | 6 +- .../batch-scheduler/CompactScheduler.h | 35 ++ include/faabric/planner/Planner.h | 2 + src/batch-scheduler/BatchScheduler.cpp | 12 + src/batch-scheduler/CMakeLists.txt | 1 + src/batch-scheduler/CompactScheduler.cpp | 367 ++++++++++++ src/planner/Planner.cpp | 8 + src/planner/PlannerEndpointHandler.cpp | 21 +- src/planner/planner.proto | 2 + tests/dist/mpi/test_multiple_mpi_worlds.cpp | 65 +- .../test_compact_scheduler.cpp | 558 ++++++++++++++++++ tests/test/planner/test_planner_endpoint.cpp | 40 ++ tests/utils/faabric_utils.h | 2 + tests/utils/planner_utils.cpp | 13 + 17 files changed, 1136 insertions(+), 14 deletions(-) create mode 100644 include/faabric/batch-scheduler/CompactScheduler.h create mode 100644 src/batch-scheduler/CompactScheduler.cpp create mode 100644 tests/test/batch-scheduler/test_compact_scheduler.cpp diff --git a/.env b/.env index a133f1791..a17db033a 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.17.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.17.0 +FAABRIC_VERSION=0.18.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.18.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7a41087ad..ecac5e483 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 env: DEPLOYMENT_TYPE: gha-ci steps: @@ -34,7 +34,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -45,7 +45,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -65,7 +65,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 options: --privileged services: redis: @@ -104,7 +104,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 options: --privileged services: redis: @@ -156,7 +156,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.17.0 + image: faasm.azurecr.io/faabric:0.18.0 services: redis: image: redis diff --git a/VERSION b/VERSION index c5523bd09..66333910a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.17.0 +0.18.0 diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h index 30453ddde..fd9974e7c 100644 --- a/include/faabric/batch-scheduler/BatchScheduler.h +++ b/include/faabric/batch-scheduler/BatchScheduler.h @@ -84,9 +84,11 @@ class BatchScheduler static int numSlots(const Host& host) { return host->slots; } + static int numUsedSlots(const Host& host) { return host->usedSlots; } + static int numSlotsAvailable(const Host& host) { - return std::max(0, numSlots(host) - host->usedSlots); + return std::max(0, numSlots(host) - numUsedSlots(host)); } static void claimSlots(Host& host, int numSlotsToClaim) @@ -120,4 +122,6 @@ class BatchScheduler std::shared_ptr getBatchScheduler(); void resetBatchScheduler(); + +void resetBatchScheduler(const std::string& newMode); } diff --git a/include/faabric/batch-scheduler/CompactScheduler.h b/include/faabric/batch-scheduler/CompactScheduler.h new file mode 100644 index 000000000..ef3bc4089 --- /dev/null +++ b/include/faabric/batch-scheduler/CompactScheduler.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include + +namespace faabric::batch_scheduler { + +// This batch scheduler behaves in the same way than BinPack for NEW and +// SCALE_CHANGE requests, but for DIST_CHANGE requests it tries to compact +// to the fewest number of VMs. +class CompactScheduler final : public BatchScheduler +{ + public: + std::shared_ptr makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) override; + + private: + bool isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) override; + + bool isFirstDecisionBetter(HostMap& hostMap, + std::shared_ptr decisionA, + std::shared_ptr decisionB); + + std::vector getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) override; +}; +} diff --git a/include/faabric/planner/Planner.h b/include/faabric/planner/Planner.h index db8f9defb..0c28aeeee 100644 --- a/include/faabric/planner/Planner.h +++ b/include/faabric/planner/Planner.h @@ -33,6 +33,8 @@ class Planner void printConfig() const; + void setPolicy(const std::string& newPolicy); + // ---------- // Util public API // ---------- diff --git a/src/batch-scheduler/BatchScheduler.cpp b/src/batch-scheduler/BatchScheduler.cpp index 71bbf699b..58dc27d35 100644 --- a/src/batch-scheduler/BatchScheduler.cpp +++ b/src/batch-scheduler/BatchScheduler.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -20,6 +21,8 @@ std::shared_ptr getBatchScheduler() if (mode == "bin-pack") { batchScheduler = std::make_shared(); + } else if (mode == "compact") { + batchScheduler = std::make_shared(); } else { SPDLOG_ERROR("Unrecognised batch scheduler mode: {}", mode); throw std::runtime_error("Unrecognised batch scheduler mode"); @@ -33,6 +36,15 @@ void resetBatchScheduler() batchScheduler = nullptr; } +void resetBatchScheduler(const std::string& newMode) +{ + resetBatchScheduler(); + + faabric::util::getSystemConfig().batchSchedulerMode = newMode; + + getBatchScheduler(); +} + DecisionType BatchScheduler::getDecisionType( const InFlightReqs& inFlightReqs, std::shared_ptr req) diff --git a/src/batch-scheduler/CMakeLists.txt b/src/batch-scheduler/CMakeLists.txt index 3e2a346e1..7a73ddcfa 100644 --- a/src/batch-scheduler/CMakeLists.txt +++ b/src/batch-scheduler/CMakeLists.txt @@ -6,6 +6,7 @@ faabric_lib(scheduling_util faabric_lib(batch_scheduler BatchScheduler.cpp BinPackScheduler.cpp + CompactScheduler.cpp ) target_link_libraries(batch_scheduler PRIVATE diff --git a/src/batch-scheduler/CompactScheduler.cpp b/src/batch-scheduler/CompactScheduler.cpp new file mode 100644 index 000000000..0af5f7427 --- /dev/null +++ b/src/batch-scheduler/CompactScheduler.cpp @@ -0,0 +1,367 @@ +#include +#include +#include +#include + +namespace faabric::batch_scheduler { + +static std::map getHostFreqCount( + std::shared_ptr decision) +{ + std::map hostFreqCount; + for (auto host : decision->hosts) { + hostFreqCount[host] += 1; + } + + return hostFreqCount; +} + +// Given a new decision that improves on an old decision (i.e. to migrate), we +// want to make sure that we minimise the number of migration requests we send. +// This is, we want to keep as many host-message scheduling in the old decision +// as possible, and also have the overall locality of the new decision (i.e. +// the host-message histogram) +// NOTE: keep in mind that the newDecision has the right host histogram, but +// the messages may be completely out-of-order +static std::shared_ptr minimiseNumOfMigrations( + std::shared_ptr newDecision, + std::shared_ptr oldDecision) +{ + auto decision = std::make_shared(oldDecision->appId, + oldDecision->groupId); + + // We want to maintain the new decision's host-message histogram + auto hostFreqCount = getHostFreqCount(newDecision); + + // Helper function to find the next host in the histogram with slots + auto nextHostWithSlots = [&hostFreqCount]() -> std::string { + for (auto [ip, slots] : hostFreqCount) { + if (slots > 0) { + return ip; + } + } + + // Unreachable (in this context) + throw std::runtime_error("No next host with slots found!"); + }; + + assert(newDecision->hosts.size() == oldDecision->hosts.size()); + + // First we try to allocate to each message the same host they used to have + for (int i = 0; i < oldDecision->hosts.size(); i++) { + auto oldHost = oldDecision->hosts.at(i); + + if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) { + decision->addMessageInPosition(i, + oldHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + oldDecision->mpiPorts.at(i)); + + hostFreqCount.at(oldHost) -= 1; + } + } + + // Second we allocate the rest + for (int i = 0; i < oldDecision->hosts.size(); i++) { + if (decision->nFunctions <= i || decision->hosts.at(i).empty()) { + + auto nextHost = nextHostWithSlots(); + decision->addMessageInPosition(i, + nextHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + -1); + + hostFreqCount.at(nextHost) -= 1; + } + } + + // Assert that we have preserved the new decision's host-message histogram + // (use the pre-processor macro as we assert repeatedly in the loop, so we + // want to avoid having an empty loop in non-debug mode) +#ifndef NDEBUG + for (auto [host, freq] : hostFreqCount) { + assert(freq == 0); + } +#endif + + return decision; +} + +bool CompactScheduler::isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) +{ + throw std::runtime_error("Method not supported for COMPACT scheduler"); +} + +HostMap deepCopyHostMap(const HostMap& hostMap) +{ + HostMap newHostMap; + + for (const auto& [ip, host] : hostMap) { + newHostMap[ip] = + std::make_shared(host->ip, host->slots, host->usedSlots); + } + + return newHostMap; +} + +// For the Compact scheduler, a decision is better than another one if the +// total number of empty hosts has increased +bool CompactScheduler::isFirstDecisionBetter( + HostMap& hostMap, + std::shared_ptr newDecision, + std::shared_ptr oldDecision) +{ + auto getNumFreeHosts = [](const HostMap& hostMap) -> int { + int numFreeHosts = 0; + + for (const auto& [ip, host] : hostMap) { + if (host->usedSlots == 0) { + numFreeHosts++; + } + } + + return numFreeHosts; + }; + + auto updateHostMapWithDecision = + [](const HostMap& hostMap, + std::shared_ptr decision, + const std::string& opr) -> HostMap { + // Be explicit about copying the host maps here + HostMap newHostMap = deepCopyHostMap(hostMap); + + for (const auto& hostIp : decision->hosts) { + if (opr == "add") { + newHostMap.at(hostIp)->usedSlots++; + } else if (opr == "subtract") { + newHostMap.at(hostIp)->usedSlots--; + } + } + + return newHostMap; + }; + + // Here we compare the number of free hosts in the original decision and + // in the new one. Note that, as part of getSortedHosts, we have already + // subtrated the old decision from the host map, so we need to add it again + + auto originalHostMap = + updateHostMapWithDecision(hostMap, oldDecision, "add"); + int numFreeHostsBefore = getNumFreeHosts(originalHostMap); + + // Update the host map by removing the old decision and adding the new one + + auto newHostMap = updateHostMapWithDecision(hostMap, newDecision, "add"); + int numFreeHostsAfter = getNumFreeHosts(newHostMap); + + // The first (new) decision is better if it has MORE free hosts + return numFreeHostsAfter > numFreeHostsBefore; +} + +std::vector CompactScheduler::getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) +{ + std::vector sortedHosts; + for (auto [ip, host] : hostMap) { + sortedHosts.push_back(host); + } + + std::shared_ptr oldDecision = nullptr; + std::map hostFreqCount; + if (decisionType != DecisionType::NEW) { + oldDecision = inFlightReqs.at(req->appid()).second; + hostFreqCount = getHostFreqCount(oldDecision); + } + + auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool { + // The Compact scheduler sorts hosts by number of available slots + int nAvailableA = numSlotsAvailable(hostA); + int nAvailableB = numSlotsAvailable(hostB); + if (nAvailableA != nAvailableB) { + return nAvailableA > nAvailableB; + } + + // In case of a tie, it will pick larger hosts first + int nSlotsA = numSlots(hostA); + int nSlotsB = numSlots(hostB); + if (nSlotsA != nSlotsB) { + return nSlotsA > nSlotsB; + } + + // Lastly, in case of a tie, return the largest host alphabetically + return getIp(hostA) > getIp(hostB); + }; + + auto isFirstHostLargerWithFreq = [&](auto hostA, auto hostB) -> bool { + // When updating an existing scheduling decision (SCALE_CHANGE or + // DIST_CHANGE), the BinPack scheduler takes into consideration the + // existing host-message histogram (i.e. how many messages for this app + // does each host _already_ run) + + int numInHostA = hostFreqCount.contains(getIp(hostA)) + ? hostFreqCount.at(getIp(hostA)) + : 0; + int numInHostB = hostFreqCount.contains(getIp(hostB)) + ? hostFreqCount.at(getIp(hostB)) + : 0; + + // If at least one of the hosts has messages for this request, return + // the host with the more messages for this request (note that it is + // possible that this host has no available slots at all, in this case + // we will just pack 0 messages here but we still want to sort it first + // nontheless) + if (numInHostA != numInHostB) { + return numInHostA > numInHostB; + } + + // In case of a tie, use the same criteria than NEW + return isFirstHostLarger(hostA, hostB); + }; + + auto isFirstHostFuller = [&](const Host& hostA, const Host& hostB) -> bool { + // In a DIST_CHANGE decision we want to globally minimise the + // number of free VMs (i.e. COMPACT), so we sort the hosts in + // increasing order of fullness and, in case of a tie, prefer hosts + // that are already running messages for this app + int nUsedA = numUsedSlots(hostA); + int nUsedB = numUsedSlots(hostB); + if (nUsedA != nUsedB) { + return nUsedA > nUsedB; + } + + // In case of a tie in free slots, prefer hosts already running + // messages for this app + // return isFirstHostLargerWithFreq(hostA, hostB); + return isFirstHostLarger(hostA, hostB); + }; + + switch (decisionType) { + case DecisionType::NEW: { + // For a NEW decision type, the Compact scheduler just sorts the + // hosts in decreasing order of capacity, and bin-packs messages + // to hosts in this order + std::sort( + sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger); + break; + } + case DecisionType::SCALE_CHANGE: { + // If we are changing the scale of a running app (i.e. via chaining + // or thread/process forking) we want to prioritise co-locating + // as much as possible. This means that we will sort first by the + // frequency of messages of the running app, and second with the + // same criteria than NEW + // IMPORTANT: a SCALE_CHANGE request with 4 messages means that we + // want to add 4 NEW messages to the running app (not that the new + // total count is 4) + std::sort(sortedHosts.begin(), + sortedHosts.end(), + isFirstHostLargerWithFreq); + break; + } + case DecisionType::DIST_CHANGE: { + // When migrating, we want to know if the provided for app (which + // is already in-flight) can be improved according to the compact + // scheduling logic. This is equivalent to saying that the global + // number of free hosts increases + auto oldDecision = inFlightReqs.at(req->appid()).second; + auto hostFreqCount = getHostFreqCount(oldDecision); + + // To decide on a migration opportunity, is like having another + // shot at re-scheduling the app from scratch. Thus, we remove + // the current slots we occupy, and try to fill in holes in the + // existing host map + + // First remove the slots the app occupies to have a fresh new + // shot at the scheduling + for (auto host : sortedHosts) { + if (hostFreqCount.contains(getIp(host))) { + freeSlots(host, hostFreqCount.at(getIp(host))); + } + } + + std::sort( + sortedHosts.begin(), sortedHosts.end(), isFirstHostFuller); + + break; + } + default: { + SPDLOG_ERROR("Unrecognised decision type: {}", decisionType); + throw std::runtime_error("Unrecognised decision type"); + } + } + + return sortedHosts; +} + +// The BinPack's scheduler decision algorithm is very simple. It first sorts +// hosts (i.e. bins) in a specific order (depending on the scheduling type), +// and then starts filling bins from begining to end, until it runs out of +// messages to schedule +std::shared_ptr CompactScheduler::makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) +{ + auto decision = std::make_shared(req->appid(), 0); + + // Get the sorted list of hosts + auto decisionType = getDecisionType(inFlightReqs, req); + auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); + + // Assign slots from the list (i.e. bin-pack) + auto itr = sortedHosts.begin(); + int numLeftToSchedule = req->messages_size(); + int msgIdx = 0; + while (itr < sortedHosts.end()) { + // Calculate how many slots can we assign to this host (assign as many + // as possible) + int numOnThisHost = + std::min(numLeftToSchedule, numSlotsAvailable(*itr)); + for (int i = 0; i < numOnThisHost; i++) { + decision->addMessage(getIp(*itr), req->messages(msgIdx)); + msgIdx++; + } + + // Update the number of messages left to schedule + numLeftToSchedule -= numOnThisHost; + + // If there are no more messages to schedule, we are done + if (numLeftToSchedule == 0) { + break; + } + + // Otherwise, it means that we have exhausted this host, and need to + // check in the next one + itr++; + } + + // If we still have enough slots to schedule, we are out of slots + if (numLeftToSchedule > 0) { + return std::make_shared(NOT_ENOUGH_SLOTS_DECISION); + } + + // In case of a DIST_CHANGE decision (i.e. migration), we want to make sure + // that the new decision is better than the previous one + if (decisionType == DecisionType::DIST_CHANGE) { + auto oldDecision = inFlightReqs.at(req->appid()).second; + if (isFirstDecisionBetter(hostMap, decision, oldDecision)) { + // If we are sending a better migration, make sure that we minimise + // the number of migrations to be done + return minimiseNumOfMigrations(decision, oldDecision); + } + + return std::make_shared(DO_NOT_MIGRATE_DECISION); + } + + return decision; +} +} diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 577d5bc2c..09a88cae7 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -132,6 +132,14 @@ void Planner::printConfig() const SPDLOG_INFO("HTTP_SERVER_THREADS {}", config.numthreadshttpserver()); } +void Planner::setPolicy(const std::string& newPolicy) +{ + // Acquire lock to prevent any changes in state whilst we change the policy + faabric::util::FullLock lock(plannerMx); + + faabric::batch_scheduler::resetBatchScheduler(newPolicy); +} + bool Planner::reset() { SPDLOG_INFO("Resetting planner"); diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index a31fb92b1..6edee2a6b 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -267,7 +267,6 @@ void PlannerEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } case faabric::planner::HttpMessage_Type_PRELOAD_SCHEDULING_DECISION: { - // foo bar // in: BatchExecuteRequest // out: none SPDLOG_DEBUG( @@ -308,6 +307,26 @@ void PlannerEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } + case faabric::planner::HttpMessage_Type_SET_POLICY: { + SPDLOG_DEBUG("Planner received SET_POLICY request"); + + std::string newPolicy = msg.payloadjson(); + + try { + faabric::planner::getPlanner().setPolicy(newPolicy); + } catch (std::exception& e) { + response.result(beast::http::status::bad_request); + response.body() = + std::string("Unrecognised policy name: " + newPolicy); + return ctx.sendFunction(std::move(response)); + } + + // Prepare the response + response.result(beast::http::status::ok); + response.body() = std::string("Policy set correctly"); + + return ctx.sendFunction(std::move(response)); + } default: { SPDLOG_ERROR("Unrecognised message type {}", msg.type()); response.result(beast::http::status::bad_request); diff --git a/src/planner/planner.proto b/src/planner/planner.proto index 60ac6e4c4..af6b00b1d 100644 --- a/src/planner/planner.proto +++ b/src/planner/planner.proto @@ -45,6 +45,7 @@ message HttpMessage { EXECUTE_BATCH = 10; EXECUTE_BATCH_STATUS = 11; PRELOAD_SCHEDULING_DECISION = 12; + SET_POLICY = 13; } Type type = 1 [json_name = "http_type"]; @@ -55,6 +56,7 @@ message HttpMessage { // - EXECUTE_BATCH_STATUS: where the body is a BER too // - PRELOAD_SCHEDULING_DECISION: where the body is a // faabric::BatchExecuteRequest + // - SET_POLICY: where the body is a string with the new planner policy string payloadJson = 2 [json_name = "payload"]; } diff --git a/tests/dist/mpi/test_multiple_mpi_worlds.cpp b/tests/dist/mpi/test_multiple_mpi_worlds.cpp index 0f07510ed..93b9387ef 100644 --- a/tests/dist/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/dist/mpi/test_multiple_mpi_worlds.cpp @@ -2,7 +2,6 @@ #include "dist_test_fixtures.h" #include "faabric_utils.h" -#include "init.h" #include "mpi/mpi_native.h" #include @@ -62,7 +61,7 @@ TEST_CASE_METHOD(MpiDistTestsFixture, } TEST_CASE_METHOD(MpiDistTestsFixture, - "Test MPI migration with two MPI worlds", + "Test MPI migration with two MPI worlds (bin-pack)", "[mpi]") { int worldSize = 4; @@ -126,7 +125,67 @@ TEST_CASE_METHOD(MpiDistTestsFixture, } TEST_CASE_METHOD(MpiDistTestsFixture, - "Test migrating two MPI applications in parallel", + "Test MPI migration with two MPI worlds (compact)", + "[mpi]") +{ + updatePlannerPolicy("compact"); + + int worldSize = 4; + + // Prepare both requests: + // - The first will do work, sleep for five seconds, and do work again + // - The second will do work and check for migration opportunities + auto req1 = setRequest("alltoall-sleep"); + auto req2 = setRequest("migration"); + auto& msg = req2->mutable_messages()->at(0); + msg.set_inputdata(std::to_string(NUM_MIGRATION_LOOPS)); + + updateLocalSlots(8); + updateRemoteSlots(8); + + std::vector hostsBefore1 = { + getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP() + }; + std::vector hostsBefore2; + + SECTION("Migrate main rank") + { + hostsBefore2 = { getWorkerIP(), getWorkerIP(), getWorkerIP(), getWorkerIP() }; + } + + SECTION("Don't migrate main rank") + { + hostsBefore2 = { getMasterIP(), getMasterIP(), getWorkerIP(), getWorkerIP() }; + } + + std::vector hostsAfterMigration = std::vector(worldSize, getMasterIP()); + + // Preload decisions to force sub-optimal scheduling + auto preloadDec1 = std::make_shared( + req1->appid(), req1->groupid()); + auto preloadDec2 = std::make_shared( + req2->appid(), req2->groupid()); + for (int i = 0; i < worldSize; i++) { + preloadDec1->addMessage(hostsBefore1.at(i), 0, 0, i); + preloadDec2->addMessage(hostsBefore2.at(i), 0, 0, i); + } + plannerCli.preloadSchedulingDecision(preloadDec1); + plannerCli.preloadSchedulingDecision(preloadDec2); + + plannerCli.callFunctions(req1); + auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1); + REQUIRE(hostsBefore1 == actualHostsBefore1); + + plannerCli.callFunctions(req2); + auto actualHostsBefore2 = waitForMpiMessagesInFlight(req2); + REQUIRE(hostsBefore2 == actualHostsBefore2); + + checkAllocationAndResult(req1, hostsBefore1); + checkAllocationAndResult(req2, hostsAfterMigration); +} + +TEST_CASE_METHOD(MpiDistTestsFixture, + "Test migrating two MPI applications in parallel (bin-pack)", "[mpi]") { // Set the slots for the first request: 2 locally and 2 remote diff --git a/tests/test/batch-scheduler/test_compact_scheduler.cpp b/tests/test/batch-scheduler/test_compact_scheduler.cpp new file mode 100644 index 000000000..fbb293ded --- /dev/null +++ b/tests/test/batch-scheduler/test_compact_scheduler.cpp @@ -0,0 +1,558 @@ +#include + +#include "fixtures.h" + +#include +#include + +using namespace faabric::batch_scheduler; + +namespace tests { + +class CompactSchedulerTestFixture : public BatchSchedulerFixture +{ + public: + CompactSchedulerTestFixture() + { + conf.batchSchedulerMode = "compact"; + batchScheduler = getBatchScheduler(); + } +}; + +TEST_CASE_METHOD(CompactSchedulerTestFixture, + "Test scheduling of new requests with Compact", + "[batch-scheduler]") +{ + // To mock new requests (i.e. DecisionType::NEW), we always set the + // InFlightReqs map to an empty map + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 1, 1 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + SECTION("Scheduling is exactly one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" }); + } + + // The bin-pack scheduler will pick hosts with larger empty slots first + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar" }); + } + + SECTION("Scheduling spans exactly two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // In particular, it will prioritise hosts with overall less capacity if + // they have more free resources + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 4 }, { 0, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "bar" }); + } + + // In case of a tie in free resources, the Compact scheduler will pick + // hosts with larger overall capacity first + SECTION("Scheduling spans two hosts with same free resources") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // If there's still a tie, the Compact scheduler will solve the tie by + // sorting the hosts alphabetically (from larger to smaller) + SECTION("Scheduling spans two hosts with same free resources and size") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + SECTION("Scheduling spans an arbitrarily large number of hosts") + { + config.hostMap = buildHostMap({ "foo", "bar", "baz", "bip", "bup" }, + { 4, 6, 2, 3, 1 }, + { 0, 2, 2, 2, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 10); + config.expectedDecision = buildExpectedDecision(ber, + { "bar", + "bar", + "bar", + "bar", + "foo", + "foo", + "foo", + "foo", + "bip", + "bup" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +TEST_CASE_METHOD(CompactSchedulerTestFixture, + "Test scheduling of scale-change requests with Compact", + "[batch-scheduler]") +{ + // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we + // need to have one in-flight request in the map with the same app id + // (and not of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // The configs in this test must be read as follows: + // - the host map's used slots contains the current distribution for the app + // (i.e. the number of used slots matches the number in in-flight reqs) + // - the host map's slots contain the total slots + // - the ber contains the NEW messages we are going to add + // - the expected decision includes the expected scheduling decision for + // the new messages + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + // When scheduling a SCALE_CHANGE request, we always try to colocate as + // much as possible + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + // We prefer hosts with less capacity if they are already running requests + // for the same app + SECTION("Scheduling fits in one host and prefers known hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar" }); + } + + // Like with `NEW` requests, we can also spill to other hosts + SECTION("Scheduling spans more than one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + // If two hosts are already executing the app, we pick the one that is + // running the largest number of messages + SECTION("Scheduler prefers hosts with more running messages") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 1); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "foo" }); + config.expectedDecision = buildExpectedDecision(ber, { "bar" }); + } + + // Again, when picking a new host to spill to, we priorities hosts that + // are already running requests for this app + SECTION("Scheduling always picks known hosts first") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" }); + } + + // Sometimes the preferred hosts just don't have slots. They will be sorted + // first but the scheduler will skip them when bin-packing + SECTION("Scheduler ignores preferred but full hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "foo", "foo" }); + } + + // In case of a tie of the number of runing messages, we revert to `NEW`- + // like tie breaking + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (free slots)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (size)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "baz", "foo" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (alphabetical)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "bar", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +TEST_CASE_METHOD(CompactSchedulerTestFixture, + "Test scheduling of dist-change requests with Compact", + "[batch-scheduler]") +{ + // To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we + // need to have one in-flight request in the map with the same app id, the + // same size (and of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // The configs in this test must be read as follows: + // - the host map's used slots contains the current distribution for the app + // - the host map's slots contain the total slots, there is a migration + // opportunity if we can improve the current distribution + // - we repeat the distribtution when building the in-flight requests (but + // also the host names) + + SECTION("Compact returns nothing if there's no opportunity to migrate " + "(single host)") + { + config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 2); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("Compact returns nothing if there's no opportunity to migrate " + "(multiple hosts)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 2 }, { 4, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 5, { "foo", "foo", "foo", "foo", "bar" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("Compact detects opportunities to free one host") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 4, 4, 4 }, { 2, 2, 4 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "baz", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + SECTION("Compact detects opportunities to free many hosts") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + SECTION("Compact detects opportunities to free many hosts") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 4, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "lol", "lol" }); + } + + SECTION("Compact detects opportunities to consolidate to one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 2 }, { 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" }); + } + + SECTION( + "In case of a tie, it uses the same tie-break than NEW (free slots)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 5 }, { 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar", "bar" }); + } + + SECTION( + "In case of a tie, it uses the same tie-break than NEW (total capacity)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 5 }, { 2, 3 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar", "bar" }); + } + + SECTION("In case of a tie, it uses the same tie-break than NEW (larger " + "alphabetically)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 4 }, { 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" }); + } + + SECTION("Compact prefers hosts running more messages") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 3, 2, 1 }, + { 2, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "bar", "foo" }); + } + + SECTION("Compact prefers hosts running more slots (even if less messages)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 3, 5, 1 }, + { 2, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar", "bar" }); + } + + SECTION("Compact always prefers consolidating to fewer hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 3, 5, 1 }, + { 1, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "foo", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar" }); + } + + SECTION( + "If it cannot increase the number of free VMs, Compact does nothing") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + }, + { 4, 4 }, + { 3, 3 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, 6, { "foo", "foo", "foo", "bar", "bar", "bar" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION( + "When migrating, Compact will minimise the number of messages to migrate") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + "bat", + }, + { 2, 2, 1, 1 }, + { 1, 1, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "bar", "baz", "bat" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "bar", "bar", "foo" }); + } + + SECTION("Compact will ignore empty hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 4, 4 }, + { 4, 4, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("Compact will minimise the number of messages to migrate") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 9); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, + 9, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" }); + config.expectedDecision = buildExpectedDecision( + ber, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" }); + } + + SECTION("Compact will minimise the number of messages to migrate (ii)") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 3, 2 }, { 2, 3, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, 7, { "bar", "bar", "bar", "baz", "baz", "foo", "foo" }); + config.expectedDecision = buildExpectedDecision( + ber, { "bar", "bar", "foo", "foo", "foo", "foo", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} +} diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index f5ca28b12..62578f67d 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -723,4 +723,44 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, // Wait for BER to finish waitForBerToFinish(ber); } + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, + "Test setting the planner policy", + "[planner]") +{ + // First, prepare an HTTP request to execute a batch + HttpMessage msg; + msg.set_type(HttpMessage_Type_SET_POLICY); + + std::string policy; + + SECTION("Valid request (bin-pack)") + { + policy = "bin-pack"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Policy set correctly"; + } + + SECTION("Valid request (compact)") + { + policy = "compact"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Policy set correctly"; + } + + SECTION("Invalid request") + { + policy = "foo-bar"; + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = "Unrecognised policy name: " + policy; + } + + msg.set_payloadjson(policy); + msgJsonStr = faabric::util::messageToJson(msg); + + std::pair result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); +} } diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index 6a4004f6e..2e7c777c7 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -96,4 +96,6 @@ std::pair postToUrl(const std::string& host, void flushPlannerWorkers(); void resetPlanner(); + +void updatePlannerPolicy(const std::string& newPolicy); } diff --git a/tests/utils/planner_utils.cpp b/tests/utils/planner_utils.cpp index 1f370024d..2956f06ad 100644 --- a/tests/utils/planner_utils.cpp +++ b/tests/utils/planner_utils.cpp @@ -16,6 +16,19 @@ void resetPlanner() assert(result.first == 200); } +void updatePlannerPolicy(const std::string& newPolicy) +{ + faabric::planner::HttpMessage msg; + msg.set_type(faabric::planner::HttpMessage_Type_SET_POLICY); + msg.set_payloadjson(newPolicy); + std::string jsonStr = faabric::util::messageToJson(msg); + + faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + std::pair result = + postToUrl(conf.plannerHost, conf.plannerPort, jsonStr); + assert(result.first == 200); +} + void flushPlannerWorkers() { faabric::planner::HttpMessage msg;