diff --git a/.env b/.env index 460d83daa..6ec017429 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.6.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.6.0 +FAABRIC_VERSION=0.6.1 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.6.1 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 230f82e2d..edbd1e105 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,7 +23,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -36,7 +36,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -50,7 +50,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -73,7 +73,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -113,7 +113,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -167,7 +167,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.6.0 + image: faasm.azurecr.io/faabric:0.6.1 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} diff --git a/VERSION b/VERSION index a918a2aa1..ee6cdce3c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.0 +0.6.1 diff --git a/include/faabric/planner/PlannerClient.h b/include/faabric/planner/PlannerClient.h index 1a225d54c..0c04839aa 100644 --- a/include/faabric/planner/PlannerClient.h +++ b/include/faabric/planner/PlannerClient.h @@ -4,9 +4,14 @@ #include #include +#include #include namespace faabric::planner { + +typedef std::promise> MessageResultPromise; +typedef std::shared_ptr MessageResultPromisePtr; + /* The planner's implementation of group membership requires clients to send * keep-alive messages. Once started, this background thread will send these * messages @@ -25,6 +30,20 @@ class KeepAliveThread : public faabric::util::PeriodicBackgroundThread std::shared_mutex keepAliveThreadMx; }; +/* + * Local state associated with the current host, used to cache results and + * avoid unnecessary interactions with the planner server. + */ +struct PlannerCache +{ + std::unordered_map plannerResults; +}; + +/* + * The planner client is used to communicate with the planner over the network. + * To minimise the number of open connections, we have one static instance + * of the client per-host. This means that the planner client is reentrant. + */ class PlannerClient final : public faabric::transport::MessageEndpointClient { public: @@ -34,6 +53,8 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient void ping(); + void clearCache(); + // ------ // Host membership calls // ------ @@ -51,7 +72,24 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient void setMessageResult(std::shared_ptr msg); - std::shared_ptr getMessageResult( + void setMessageResultLocally(std::shared_ptr msg); + + faabric::Message getMessageResult(int appId, int msgId, int timeoutMs); + + faabric::Message getMessageResult(const faabric::Message& msg, + int timeoutMs); + + private: + std::mutex plannerCacheMx; + PlannerCache cache; + + faabric::Message doGetMessageResult( + std::shared_ptr msgPtr, + int timeoutMs); + + // This method actually gets the message result from the planner (i.e. + // sends a request to the planner server) + std::shared_ptr getMessageResultFromPlanner( std::shared_ptr msg); }; @@ -59,7 +97,5 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient // Static setter/getters // ----------------------------------- -std::shared_ptr getPlannerClient(); - -void clearPlannerClient(); +PlannerClient& getPlannerClient(); } diff --git a/include/faabric/planner/PlannerServer.h b/include/faabric/planner/PlannerServer.h index 8249ce582..ed9df956c 100644 --- a/include/faabric/planner/PlannerServer.h +++ b/include/faabric/planner/PlannerServer.h @@ -15,6 +15,10 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer std::unique_ptr doSyncRecv( transport::Message& message) override; + // Asynchronous calls + + void recvSetMessageResult(std::span buffer); + // Synchronous calls std::unique_ptr recvPing(); @@ -27,9 +31,6 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer std::unique_ptr recvRemoveHost( std::span buffer); - std::unique_ptr recvSetMessageResult( - std::span buffer); - std::unique_ptr recvGetMessageResult( std::span buffer); diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 0a383f304..34d69d55c 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -18,7 +18,6 @@ #include #include -#include #include #define AVAILABLE_HOST_SET "available_hosts" @@ -29,8 +28,6 @@ namespace faabric::scheduler { typedef std::pair, std::shared_ptr> InFlightPair; -typedef std::promise> MessageResultPromise; -typedef std::shared_ptr MessageResultPromisePtr; class Scheduler; @@ -228,14 +225,10 @@ class Scheduler // ---------------------------------- // Message results // ---------------------------------- - void setFunctionResult(faabric::Message& msg); - - void setMessageResultLocally(std::shared_ptr msg); - - faabric::Message getFunctionResult(const faabric::Message& msg, - int timeoutMs); - faabric::Message getFunctionResult(int appId, int msgId, int timeoutMs); + // TODO(planner-scheduler): move this method to the planner client once + // the planner controls scheduling + void setFunctionResult(faabric::Message& msg); void setThreadResult(const faabric::Message& msg, int32_t returnValue, @@ -340,13 +333,6 @@ class Scheduler std::unordered_map> pushedSnapshotsMap; - // ---- Message results ---- - std::unordered_map plannerResults; - std::mutex plannerResultsMutex; - faabric::Message doGetFunctionResult( - std::shared_ptr msgPtr, - int timeoutMs); - // ---- Host resources and hosts ---- faabric::HostResources thisHostResources; std::atomic thisHostUsedSlots = 0; diff --git a/src/planner/PlannerClient.cpp b/src/planner/PlannerClient.cpp index 2cd02f548..eb25ea5aa 100644 --- a/src/planner/PlannerClient.cpp +++ b/src/planner/PlannerClient.cpp @@ -16,7 +16,7 @@ namespace faabric::planner { void KeepAliveThread::doWork() { - PlannerClient cli; + auto& cli = getPlannerClient(); faabric::util::SharedLock lock(keepAliveThreadMx); @@ -77,6 +77,13 @@ void PlannerClient::ping() SPDLOG_DEBUG("Succesfully pinged the planner server at {}", expectedIp); } +void PlannerClient::clearCache() +{ + faabric::util::UniqueLock lock(plannerCacheMx); + + cache.plannerResults.clear(); +} + std::vector PlannerClient::getAvailableHosts() { EmptyRequest req; @@ -116,11 +123,35 @@ void PlannerClient::removeHost(std::shared_ptr req) void PlannerClient::setMessageResult(std::shared_ptr msg) { - faabric::EmptyResponse response; - syncSend(PlannerCalls::SetMessageResult, msg.get(), &response); + asyncSend(PlannerCalls::SetMessageResult, msg.get()); } -std::shared_ptr PlannerClient::getMessageResult( +// This function sets a message result locally. It is invoked as a callback +// from the planner to notify all hosts waiting for a message result that the +// result is ready +void PlannerClient::setMessageResultLocally( + std::shared_ptr msg) +{ + faabric::util::UniqueLock lock(plannerCacheMx); + + // It may happen that the planner returns the message result before we + // have had time to prepare the promise. This should happen rarely as it + // is an unexpected race condition, thus why we emit a warning + if (cache.plannerResults.find(msg->id()) == cache.plannerResults.end()) { + SPDLOG_WARN( + "Setting message result before promise is set for (id: {}, app: {})", + msg->id(), + msg->appid()); + cache.plannerResults.insert( + { msg->id(), std::make_shared() }); + } + + cache.plannerResults.at(msg->id())->set_value(msg); +} + +// This internal method method actually gets the message result from the +// planner +std::shared_ptr PlannerClient::getMessageResultFromPlanner( std::shared_ptr msg) { faabric::Message responseMsg; @@ -133,31 +164,107 @@ std::shared_ptr PlannerClient::getMessageResult( return std::make_shared(responseMsg); } +faabric::Message PlannerClient::getMessageResult(const faabric::Message& msg, + int timeoutMs) +{ + // Deliberately make a copy here so that we can set the masterhost when + // registering interest in the results + auto msgPtr = std::make_shared(msg); + msgPtr->set_masterhost(faabric::util::getSystemConfig().endpointHost); + return doGetMessageResult(msgPtr, timeoutMs); +} + +faabric::Message PlannerClient::getMessageResult(int appId, + int msgId, + int timeoutMs) +{ + auto msgPtr = std::make_shared(); + msgPtr->set_appid(appId); + msgPtr->set_id(msgId); + msgPtr->set_masterhost(faabric::util::getSystemConfig().endpointHost); + return doGetMessageResult(msgPtr, timeoutMs); +} + +// This method gets the function result from the planner in a blocking fashion. +// Even though the results are stored in the planner, we want to block in the +// client (i.e. here) and not in the planner. This is to avoid consuming +// planner threads. This method will first query the planner once +// for the result. If its not there, the planner will register this host's +// interest, and send a function call setting the message result. In the +// meantime, we wait on a promise +faabric::Message PlannerClient::doGetMessageResult( + std::shared_ptr msgPtr, + int timeoutMs) +{ + int msgId = msgPtr->id(); + auto resMsgPtr = getMessageResultFromPlanner(msgPtr); + + // If when we first check the message it is there, return. Otherwise, we + // will have told the planner we want the result + if (resMsgPtr) { + return *resMsgPtr; + } + + bool isBlocking = timeoutMs > 0; + // If the result is not in the planner, and we are not blocking, return + if (!isBlocking) { + faabric::Message msgResult; + msgResult.set_type(faabric::Message_MessageType_EMPTY); + return msgResult; + } + + // If we are here, we need to wait for the planner to let us know that + // the message is ready. To do so, we need to set a promise at the message + // id. We do so immediately after returning, so that we don't race with + // the planner sending the result back + std::future> fut; + { + faabric::util::UniqueLock lock(plannerCacheMx); + + if (cache.plannerResults.find(msgId) == cache.plannerResults.end()) { + cache.plannerResults.insert( + { msgId, std::make_shared() }); + } + + // Note that it is deliberately an error for two threads to retrieve + // the future at the same time + fut = cache.plannerResults.at(msgId)->get_future(); + } + + while (true) { + faabric::Message msgResult; + auto status = fut.wait_for(std::chrono::milliseconds(timeoutMs)); + if (status == std::future_status::timeout) { + msgResult.set_type(faabric::Message_MessageType_EMPTY); + } else { + // Acquire a lock to read the value of the promise to avoid data + // races + faabric::util::UniqueLock lock(plannerCacheMx); + msgResult = *fut.get(); + } + + { + // Remove the result promise irrespective of whether we timed out + // or not, as promises are single-use + faabric::util::UniqueLock lock(plannerCacheMx); + cache.plannerResults.erase(msgId); + } + + return msgResult; + } +} + // ----------------------------------- // Static setter/getters // ----------------------------------- -// Even though there's just one planner server, and thus there will only be -// one client per instance, using a ConcurrentMap gives us the thread-safe -// wrapper for free -static faabric::util:: - ConcurrentMap> - plannerClient; - -std::shared_ptr getPlannerClient() +PlannerClient& getPlannerClient() { auto plannerHost = faabric::util::getIPFromHostname( faabric::util::getSystemConfig().plannerHost); - auto client = plannerClient.get(plannerHost).value_or(nullptr); - if (client == nullptr) { - SPDLOG_DEBUG("Adding new planner client for {}", plannerHost); - client = plannerClient.tryEmplaceShared(plannerHost).second; - } - return client; -} -void clearPlannerClient() -{ - plannerClient.clear(); + static PlannerClient plannerCli(plannerHost); + + return plannerCli; } } diff --git a/src/planner/PlannerServer.cpp b/src/planner/PlannerServer.cpp index 3db751be9..cfc0bbe38 100644 --- a/src/planner/PlannerServer.cpp +++ b/src/planner/PlannerServer.cpp @@ -22,6 +22,10 @@ void PlannerServer::doAsyncRecv(transport::Message& message) { uint8_t header = message.getMessageCode(); switch (header) { + case PlannerCalls::SetMessageResult: { + recvSetMessageResult(message.udata()); + break; + } default: { // If we don't recognise the header, let the client fail, but don't // crash the planner @@ -47,9 +51,6 @@ std::unique_ptr PlannerServer::doSyncRecv( case PlannerCalls::RemoveHost: { return recvRemoveHost(message.udata()); } - case PlannerCalls::SetMessageResult: { - return recvSetMessageResult(message.udata()); - } case PlannerCalls::GetMessageResult: { return recvGetMessageResult(message.udata()); } @@ -123,14 +124,10 @@ std::unique_ptr PlannerServer::recvRemoveHost( return std::make_unique(); } -std::unique_ptr PlannerServer::recvSetMessageResult( - std::span buffer) +void PlannerServer::recvSetMessageResult(std::span buffer) { PARSE_MSG(Message, buffer.data(), buffer.size()); - planner.setMessageResult(std::make_shared(parsedMsg)); - - return std::make_unique(); } std::unique_ptr PlannerServer::recvGetMessageResult( diff --git a/src/runner/FaabricMain.cpp b/src/runner/FaabricMain.cpp index 436061a72..33912d9ca 100644 --- a/src/runner/FaabricMain.cpp +++ b/src/runner/FaabricMain.cpp @@ -46,8 +46,7 @@ void FaabricMain::startRunner() faabric::redis::Redis::getState().ping(); // Ensure we can ping the planner - faabric::planner::PlannerClient cli; - cli.ping(); + faabric::planner::getPlannerClient().ping(); auto& sch = faabric::scheduler::getScheduler(); sch.addHostToGlobalSet(); diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index e787907b2..9ae62c7d8 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -130,7 +130,7 @@ FunctionCallServer::recvPendingMigrations(std::span buffer) void FunctionCallServer::recvSetMessageResult(std::span buffer) { PARSE_MSG(faabric::Message, buffer.data(), buffer.size()) - scheduler.setMessageResultLocally( + faabric::planner::getPlannerClient().setMessageResultLocally( std::make_shared(parsedMsg)); } } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index c197f9b9a..3fd3fcc73 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -65,7 +65,7 @@ Scheduler::~Scheduler() std::set Scheduler::getAvailableHosts() { auto availableHosts = - faabric::planner::getPlannerClient()->getAvailableHosts(); + faabric::planner::getPlannerClient().getAvailableHosts(); std::set availableHostsIps; for (const auto& host : availableHosts) { availableHostsIps.insert(host.ip()); @@ -95,8 +95,7 @@ void Scheduler::addHostToGlobalSet( req->mutable_host()->set_usedslots(0); } - int plannerTimeout = - faabric::planner::getPlannerClient()->registerHost(req); + int plannerTimeout = faabric::planner::getPlannerClient().registerHost(req); // Once the host is registered, set-up a periodic thread to send a heart- // beat to the planner. Note that this method may be called multiple times @@ -125,7 +124,7 @@ void Scheduler::removeHostFromGlobalSet(const std::string& hostIp) req->mutable_host()->set_ip(hostIp); } - faabric::planner::getPlannerClient()->removeHost(req); + faabric::planner::getPlannerClient().removeHost(req); // Clear the keep alive thread if (isThisHost) { @@ -160,10 +159,10 @@ void Scheduler::reset() // Clear the point to point broker broker.clear(); - // Clear the clients (do we need to do this here? + // Clear the clients clearFunctionCallClients(); clearSnapshotClients(); - faabric::planner::clearPlannerClient(); + faabric::planner::getPlannerClient().clearCache(); faabric::util::FullLock lock(mx); @@ -1072,32 +1071,10 @@ void Scheduler::setFunctionResult(faabric::Message& msg) // Let the planner know this function has finished execution. This will // wake any thread waiting on this result - faabric::planner::getPlannerClient()->setMessageResult( + faabric::planner::getPlannerClient().setMessageResult( std::make_shared(msg)); } -// This function sets a message result locally. It is invoked as a callback -// from the planner to notify all hosts waiting for a message result that the -// result is ready -void Scheduler::setMessageResultLocally(std::shared_ptr msg) -{ - faabric::util::UniqueLock lock(plannerResultsMutex); - - // It may happen that the planner returns the message result before we - // have had time to prepare the promise. This should happen rarely as it - // is an unexpected race condition, thus why we emit a warning - if (plannerResults.find(msg->id()) == plannerResults.end()) { - SPDLOG_WARN( - "Setting message result before promise is set for (id: {}, app: {})", - msg->id(), - msg->appid()); - plannerResults.insert( - { msg->id(), std::make_shared() }); - } - - plannerResults.at(msg->id())->set_value(msg); -} - void Scheduler::registerThread(uint32_t msgId) { // Here we need to ensure the promise is registered locally so @@ -1222,97 +1199,6 @@ size_t Scheduler::getCachedMessageCount() return threadResultMessages.size(); } -faabric::Message Scheduler::getFunctionResult(const faabric::Message& msg, - int timeoutMs) -{ - // Deliberately make a copy here so that we can set the masterhost when - // registering interest in the results - auto msgPtr = std::make_shared(msg); - msgPtr->set_masterhost(thisHost); - return doGetFunctionResult(msgPtr, timeoutMs); -} - -faabric::Message Scheduler::getFunctionResult(int appId, - int msgId, - int timeoutMs) -{ - auto msgPtr = std::make_shared(); - msgPtr->set_appid(appId); - msgPtr->set_id(msgId); - msgPtr->set_masterhost(thisHost); - return doGetFunctionResult(msgPtr, timeoutMs); -} - -// This method gets the function result from the planner in a blocking fashion. -// Even though the results are stored in the planner, we want to block in the -// client (i.e. here) and not in the planner. This is to avoid consuming -// planner threads. This method will first query the planner once -// for the result. If its not there, the planner will register this host's -// interest, and send a function call setting the message result. In the -// meantime, we wait on a promise -faabric::Message Scheduler::doGetFunctionResult( - std::shared_ptr msgPtr, - int timeoutMs) -{ - int msgId = msgPtr->id(); - auto resMsgPtr = - faabric::planner::getPlannerClient()->getMessageResult(msgPtr); - - // If when we first check the message it is there, return. Otherwise, we - // will have told the planner we want the result - if (resMsgPtr) { - return *resMsgPtr; - } - - bool isBlocking = timeoutMs > 0; - // If the result is not in the planner, and we are not blocking, return - if (!isBlocking) { - faabric::Message msgResult; - msgResult.set_type(faabric::Message_MessageType_EMPTY); - return msgResult; - } - - // If we are here, we need to wait for the planner to let us know that - // the message is ready. To do so, we need to set a promise at the message - // id. We do so immediately after returning, so that we don't race with - // the planner sending the result back - std::future> fut; - { - faabric::util::UniqueLock lock(plannerResultsMutex); - - if (plannerResults.find(msgId) == plannerResults.end()) { - plannerResults.insert( - { msgId, std::make_shared() }); - } - - // Note that it is deliberately an error for two threads to retrieve - // the future at the same time - fut = plannerResults.at(msgId)->get_future(); - } - - while (true) { - faabric::Message msgResult; - auto status = fut.wait_for(std::chrono::milliseconds(timeoutMs)); - if (status == std::future_status::timeout) { - msgResult.set_type(faabric::Message_MessageType_EMPTY); - } else { - // Acquire a lock to read the value of the promise to avoid data - // races - faabric::util::UniqueLock lock(plannerResultsMutex); - msgResult = *fut.get(); - } - - { - // Remove the result promise irrespective of whether we timed out - // or not, as promises are single-use - faabric::util::UniqueLock lock(plannerResultsMutex); - plannerResults.erase(msgId); - } - - return msgResult; - } -} - faabric::HostResources Scheduler::getThisHostResources() { faabric::util::SharedLock lock(mx); diff --git a/src/util/ExecGraph.cpp b/src/util/ExecGraph.cpp index a3e99f1b5..3281efc21 100644 --- a/src/util/ExecGraph.cpp +++ b/src/util/ExecGraph.cpp @@ -36,7 +36,7 @@ ExecGraphNode getFunctionExecGraphNode(int appId, int msgId) // Get result without blocking, as we expect the results to have been // published already faabric::Message resultMsg = - faabric::scheduler::getScheduler().getFunctionResult(msg, 0); + faabric::planner::getPlannerClient().getMessageResult(msg, 0); if (resultMsg.type() == faabric::Message_MessageType_EMPTY) { SPDLOG_ERROR( "Message result in exec. graph not ready (msg: {} - app: {})", @@ -70,7 +70,7 @@ std::set getChainedFunctions(const faabric::Message& msg) { // Note that we can't get the chained functions until the result for the // parent message has been set - auto resultMsg = faabric::scheduler::getScheduler().getFunctionResult( + auto resultMsg = faabric::planner::getPlannerClient().getMessageResult( msg, EXEC_GRAPH_TIMEOUT_MS); std::set chainedIds( resultMsg.mutable_chainedmsgids()->begin(), diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index 67a4aba55..f58de2927 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -136,7 +136,7 @@ class MpiDistTestsFixture : public DistTestsFixture bool skipExecGraphCheck = false) { faabric::Message& msg = req->mutable_messages()->at(0); - faabric::Message result = sch.getFunctionResult(msg, timeoutMs); + faabric::Message result = plannerCli.getMessageResult(msg, timeoutMs); REQUIRE(result.returnvalue() == 0); SLEEP_MS(1000); if (!skipExecGraphCheck) { @@ -152,7 +152,7 @@ class MpiDistTestsFixture : public DistTestsFixture int timeoutMs = 1000) { faabric::Message& msg = req->mutable_messages()->at(0); - faabric::Message result = sch.getFunctionResult(msg, timeoutMs); + faabric::Message result = plannerCli.getMessageResult(msg, timeoutMs); REQUIRE(result.returnvalue() == 0); SLEEP_MS(1000); auto execGraph = faabric::util::getFunctionExecGraph(msg); diff --git a/tests/dist/scheduler/test_exec_graph.cpp b/tests/dist/scheduler/test_exec_graph.cpp index ed6dca84d..367ba8552 100644 --- a/tests/dist/scheduler/test_exec_graph.cpp +++ b/tests/dist/scheduler/test_exec_graph.cpp @@ -42,7 +42,7 @@ TEST_CASE_METHOD(DistTestsFixture, // Wait for the result, and immediately after query for the execution // graph for (const auto msgId : msgIds) { - sch.getFunctionResult(appId, msgId, 1000); + plannerCli.getMessageResult(appId, msgId, 1000); } auto execGraph = faabric::util::getFunctionExecGraph(m); REQUIRE(countExecGraphNodes(execGraph) == nFuncs); diff --git a/tests/dist/scheduler/test_funcs.cpp b/tests/dist/scheduler/test_funcs.cpp index 7a7a83012..313eb0dd8 100644 --- a/tests/dist/scheduler/test_funcs.cpp +++ b/tests/dist/scheduler/test_funcs.cpp @@ -50,7 +50,7 @@ TEST_CASE_METHOD(DistTestsFixture, for (int i = 0; i < nLocalSlots; i++) { faabric::Message& m = req->mutable_messages()->at(i); - sch.getFunctionResult(m, 1000); + plannerCli.getMessageResult(m, 1000); std::string expected = fmt::format("Function {} executed on host {}", m.id(), getMasterIP()); @@ -60,7 +60,7 @@ TEST_CASE_METHOD(DistTestsFixture, // Check functions executed on the other host for (int i = nLocalSlots; i < nFuncs; i++) { faabric::Message& m = req->mutable_messages()->at(i); - faabric::Message result = sch.getFunctionResult(m, 1000); + faabric::Message result = plannerCli.getMessageResult(m, 1000); std::string expected = fmt::format("Function {} executed on host {}", m.id(), getWorkerIP()); diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 9c4491188..acc21b8ba 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -104,7 +104,7 @@ TEST_CASE_METHOD(DistTestsFixture, std::vector executedHosts = decision.hosts; REQUIRE(expectedHosts == executedHosts); - faabric::Message actualResult = sch.getFunctionResult(msg, 10000); + faabric::Message actualResult = plannerCli.getMessageResult(msg, 10000); REQUIRE(actualResult.returnvalue() == 333); } @@ -129,7 +129,7 @@ TEST_CASE_METHOD(DistTestsFixture, std::vector executedHosts = decision.hosts; REQUIRE(expectedHosts == executedHosts); - faabric::Message actualResult = sch.getFunctionResult(msg, 10000); + faabric::Message actualResult = plannerCli.getMessageResult(msg, 10000); REQUIRE(actualResult.returnvalue() == 0); } } diff --git a/tests/dist/transport/functions.cpp b/tests/dist/transport/functions.cpp index 2f547fe98..16e5d166d 100644 --- a/tests/dist/transport/functions.cpp +++ b/tests/dist/transport/functions.cpp @@ -150,7 +150,8 @@ int handleDistributedLock(tests::DistTestExecutor* exec, // Await results bool success = true; for (const auto& msg : nestedReq->messages()) { - faabric::Message res = sch.getFunctionResult(msg, 30000); + faabric::Message res = + faabric::planner::getPlannerClient().getMessageResult(msg, 30000); if (res.returnvalue() != 0) { success = false; } @@ -272,7 +273,8 @@ class DistributedCoordinationTestRunner bool success = true; for (const auto& m : chainReq->messages()) { - faabric::Message result = sch.getFunctionResult(m, 10000); + faabric::Message result = + faabric::planner::getPlannerClient().getMessageResult(m, 10000); if (result.returnvalue() != 0) { SPDLOG_ERROR("Distributed coordination check call failed: {}", m.id()); diff --git a/tests/dist/transport/test_coordination.cpp b/tests/dist/transport/test_coordination.cpp index ae8f6e65b..7718892ad 100644 --- a/tests/dist/transport/test_coordination.cpp +++ b/tests/dist/transport/test_coordination.cpp @@ -28,7 +28,7 @@ TEST_CASE_METHOD(DistTestsFixture, "Test distributed lock", "[ptp][transport]") sch.callFunctions(req); faabric::Message& m = req->mutable_messages()->at(0); - faabric::Message result = sch.getFunctionResult(m, 30000); + faabric::Message result = plannerCli.getMessageResult(m, 30000); REQUIRE(result.returnvalue() == 0); } } diff --git a/tests/dist/transport/test_point_to_point.cpp b/tests/dist/transport/test_point_to_point.cpp index 40963dffd..32bd35b32 100644 --- a/tests/dist/transport/test_point_to_point.cpp +++ b/tests/dist/transport/test_point_to_point.cpp @@ -76,7 +76,7 @@ class PointToPointDistTestFixture : public DistTestsFixture for (int i = 0; i < nFuncs; i++) { faabric::Message& m = req->mutable_messages()->at(i); - sch.getFunctionResult(m, 2000); + plannerCli.getMessageResult(m, 2000); REQUIRE(m.returnvalue() == 0); } } @@ -160,7 +160,7 @@ TEST_CASE_METHOD(DistTestsFixture, REQUIRE(expectedHosts == executedHosts); // Get result - faabric::Message result = sch.getFunctionResult(m, 10000); + faabric::Message result = plannerCli.getMessageResult(m, 10000); REQUIRE(result.returnvalue() == 0); } } diff --git a/tests/test/planner/test_planner_client_server.cpp b/tests/test/planner/test_planner_client_server.cpp index 75920f23f..f75ba9a1b 100644 --- a/tests/test/planner/test_planner_client_server.cpp +++ b/tests/test/planner/test_planner_client_server.cpp @@ -15,7 +15,7 @@ TEST_CASE_METHOD(PlannerClientServerFixture, "Test sending ping to planner", "[planner]") { - PlannerClient cli(LOCALHOST); + auto& cli = getPlannerClient(); REQUIRE_NOTHROW(cli.ping()); } @@ -100,25 +100,28 @@ TEST_CASE_METHOD(PlannerClientServerFixture, faabric::util::messageFactory("foo", "bar")); // If we try to get the message result before setting it first, nothing - // happens - auto resultMsgPtr = plannerCli.getMessageResult(msgPtr); - REQUIRE(resultMsgPtr == nullptr); + // happens (note that we need to pass a 0 timeout to not block) + auto resultMsg = + plannerCli.getMessageResult(msgPtr->appid(), msgPtr->id(), 0); + REQUIRE(resultMsg.type() == faabric::Message_MessageType_EMPTY); - // If we set the message result, then we can get it + // If we set the message result, then we can get it (note that we read it + // from the mocked requests) int expectedReturnValue = 1337; msgPtr->set_returnvalue(expectedReturnValue); plannerCli.setMessageResult(msgPtr); - resultMsgPtr = plannerCli.getMessageResult(msgPtr); - REQUIRE(resultMsgPtr->id() == msgPtr->id()); - REQUIRE(resultMsgPtr->appid() == msgPtr->appid()); - REQUIRE(resultMsgPtr->returnvalue() == expectedReturnValue); + SLEEP_MS(500); - // Also, setting the message result triggers the planner to send a - // request to the host that tried to get the result before + // Read from mocked requests auto msgResults = faabric::scheduler::getMessageResults(); REQUIRE(msgResults.size() == 1); REQUIRE(msgResults.at(0).first == faabric::util::getSystemConfig().endpointHost); + REQUIRE(msgResults.at(0).second->type() != + faabric::Message_MessageType_EMPTY); + REQUIRE(msgResults.at(0).second->id() == msgPtr->id()); + REQUIRE(msgResults.at(0).second->appid() == msgPtr->appid()); + REQUIRE(msgResults.at(0).second->returnvalue() == expectedReturnValue); faabric::scheduler::clearMockRequests(); faabric::util::setMockMode(false); diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index 7bd06db58..11bfd5ffc 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -112,7 +112,7 @@ TEST_CASE_METHOD(PlannerEndpointTestFixture, REQUIRE(result.second == expectedResponseBody); // Check that, initially, there are no available hosts - PlannerClient cli; + auto& cli = getPlannerClient(); std::vector availableHosts = cli.getAvailableHosts(); REQUIRE(availableHosts.empty()); @@ -237,7 +237,7 @@ TEST_CASE_METHOD(PlannerEndpointTestFixture, // Register the hosts auto regReq = std::make_shared(); - PlannerClient cli; + auto& cli = getPlannerClient(); for (auto host : expectedHostsResponse.hosts()) { *regReq->mutable_host() = host; cli.registerHost(regReq); @@ -323,7 +323,7 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, // Call a function first, and wait for the result sch.callFunctions(ber); - auto resultMsg = sch.getFunctionResult(appId, msgId, 1000); + auto resultMsg = getPlannerClient().getMessageResult(appId, msgId, 1000); SECTION("Success") { @@ -427,7 +427,7 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, expectedReturnCode); REQUIRE(result.second == expectedResponseBody); - auto msgResult = sch.getFunctionResult(appId, msgId, 1000); + auto msgResult = getPlannerClient().getMessageResult(appId, msgId, 1000); REQUIRE(msgResult.returnvalue() == 0); // If the request is succesful, check that the response has the fields @@ -460,7 +460,7 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, beast::http::status::ok); // Make sure execution has finished and the result is available - auto msgResult = sch.getFunctionResult(appId, msgId, 1000); + auto msgResult = getPlannerClient().getMessageResult(appId, msgId, 1000); REQUIRE(msgResult.returnvalue() == 0); // Second, prepare an HTTP request to get the batch's execution status diff --git a/tests/test/runner/test_main.cpp b/tests/test/runner/test_main.cpp index 74e53076a..193555d50 100644 --- a/tests/test/runner/test_main.cpp +++ b/tests/test/runner/test_main.cpp @@ -58,7 +58,8 @@ TEST_CASE_METHOD(MainRunnerTestFixture, "Test main runner", "[runner]") std::string expected = fmt::format("DummyExecutor executed {}", msgId); faabric::Message res = - sch.getFunctionResult(appId, msgId, SHORT_TEST_TIMEOUT_MS); + faabric::planner::getPlannerClient().getMessageResult( + appId, msgId, SHORT_TEST_TIMEOUT_MS); REQUIRE(res.outputdata() == expected); } } diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index fa834ea5b..7eeededeb 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -149,16 +149,17 @@ int32_t TestExecutor::executeTask( Scheduler& sch = getScheduler(); sch.callFunctions(reqThis); sch.callFunctions(reqOther); + auto& plannerCli = faabric::planner::getPlannerClient(); for (const auto& m : reqThis->messages()) { faabric::Message res = - sch.getFunctionResult(m, SHORT_TEST_TIMEOUT_MS); + plannerCli.getMessageResult(m, SHORT_TEST_TIMEOUT_MS); assert(res.outputdata() == "chain-check-a successful"); } for (const auto& m : reqOther->messages()) { faabric::Message res = - sch.getFunctionResult(m, SHORT_TEST_TIMEOUT_MS); + plannerCli.getMessageResult(m, SHORT_TEST_TIMEOUT_MS); assert(res.outputdata() == "chain-check-b successful"); } @@ -358,7 +359,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector expectedHosts = { conf.endpointHost }; REQUIRE(actualHosts == expectedHosts); - faabric::Message result = sch.getFunctionResult(msg, SHORT_TEST_TIMEOUT_MS); + faabric::Message result = + plannerCli.getMessageResult(msg, SHORT_TEST_TIMEOUT_MS); std::string expected = fmt::format("Simple function {} executed", msg.id()); REQUIRE(result.outputdata() == expected); @@ -382,7 +384,7 @@ TEST_CASE_METHOD(TestExecutorFixture, executeWithTestExecutor(req, false); faabric::Message result = - sch.getFunctionResult(msg, SHORT_TEST_TIMEOUT_MS); + plannerCli.getMessageResult(msg, SHORT_TEST_TIMEOUT_MS); std::string expected = fmt::format("Simple function {} executed", msg.id()); REQUIRE(result.outputdata() == expected); @@ -404,7 +406,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector expectedHosts = { conf.endpointHost }; REQUIRE(actualHosts == expectedHosts); - faabric::Message result = sch.getFunctionResult(msg, SHORT_TEST_TIMEOUT_MS); + faabric::Message result = + plannerCli.getMessageResult(msg, SHORT_TEST_TIMEOUT_MS); REQUIRE(result.outputdata() == "All chain checks successful"); // Check that restore has not been called @@ -546,8 +549,7 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector expectedHosts = { conf.endpointHost }; REQUIRE(actualHosts == expectedHosts); - auto& sch = faabric::scheduler::getScheduler(); - faabric::Message res = sch.getFunctionResult(msg, 5000); + faabric::Message res = plannerCli.getMessageResult(msg, 5000); REQUIRE(res.returnvalue() == 0); } @@ -578,7 +580,8 @@ TEST_CASE_METHOD(TestExecutorFixture, std::vector expectedHosts = { conf.endpointHost }; REQUIRE(actualHosts == expectedHosts); - faabric::Message res = sch.getFunctionResult(msg, LONG_TEST_TIMEOUT_MS); + faabric::Message res = + plannerCli.getMessageResult(msg, LONG_TEST_TIMEOUT_MS); REQUIRE(res.returnvalue() == 0); sch.reset(); @@ -637,7 +640,7 @@ TEST_CASE_METHOD(TestExecutorFixture, "Test non-zero return code", "[executor]") executeWithTestExecutor(req, false); - faabric::Message res = sch.getFunctionResult(msg, 2000); + faabric::Message res = plannerCli.getMessageResult(msg, 2000); REQUIRE(res.returnvalue() == 1); } @@ -649,7 +652,7 @@ TEST_CASE_METHOD(TestExecutorFixture, "Test erroring function", "[executor]") executeWithTestExecutor(req, false); - faabric::Message res = sch.getFunctionResult(msg, 2000); + faabric::Message res = plannerCli.getMessageResult(msg, 2000); REQUIRE(res.returnvalue() == 1); std::string expectedErrorMsg = fmt::format( @@ -697,18 +700,18 @@ TEST_CASE_METHOD(TestExecutorFixture, sch.callFunctions(reqB); sch.callFunctions(reqC); - faabric::Message resA1 = - sch.getFunctionResult(reqA->messages().at(0), SHORT_TEST_TIMEOUT_MS); - faabric::Message resA2 = - sch.getFunctionResult(reqA->messages().at(1), SHORT_TEST_TIMEOUT_MS); + faabric::Message resA1 = plannerCli.getMessageResult(reqA->messages().at(0), + SHORT_TEST_TIMEOUT_MS); + faabric::Message resA2 = plannerCli.getMessageResult(reqA->messages().at(1), + SHORT_TEST_TIMEOUT_MS); - faabric::Message resB = - sch.getFunctionResult(reqB->messages().at(0), SHORT_TEST_TIMEOUT_MS); + faabric::Message resB = plannerCli.getMessageResult(reqB->messages().at(0), + SHORT_TEST_TIMEOUT_MS); - faabric::Message resC1 = - sch.getFunctionResult(reqC->messages().at(0), SHORT_TEST_TIMEOUT_MS); - faabric::Message resC2 = - sch.getFunctionResult(reqC->messages().at(1), SHORT_TEST_TIMEOUT_MS); + faabric::Message resC1 = plannerCli.getMessageResult(reqC->messages().at(0), + SHORT_TEST_TIMEOUT_MS); + faabric::Message resC2 = plannerCli.getMessageResult(reqC->messages().at(1), + SHORT_TEST_TIMEOUT_MS); REQUIRE(resA1.outputdata() == "Message A1"); REQUIRE(resA2.outputdata() == "Message A2"); @@ -785,7 +788,7 @@ TEST_CASE_METHOD(TestExecutorFixture, REQUIRE(millisC < millisB); // Wait for execution to finish - sch.getFunctionResult(msg, 2000); + plannerCli.getMessageResult(msg, 2000); exec->shutdown(); } @@ -1062,7 +1065,7 @@ TEST_CASE_METHOD(TestExecutorFixture, if (requestType == faabric::BatchExecuteRequest::THREADS) { sch.awaitThreadResult(msgId); } else { - sch.getFunctionResult(appId, msgId, 2000); + plannerCli.getMessageResult(appId, msgId, 2000); } } @@ -1117,7 +1120,7 @@ TEST_CASE_METHOD(TestExecutorFixture, for (int i = 0; i < nMessages; i++) { if (singleHosts[i] == thisHost) { faabric::Message res = - sch.getFunctionResult(appId, msgIds.at(i), 2000); + plannerCli.getMessageResult(appId, msgIds.at(i), 2000); // Check result as expected REQUIRE(res.returnvalue() == expectedResult); @@ -1138,7 +1141,7 @@ TEST_CASE_METHOD(TestExecutorFixture, for (int i = 0; i < nMessages; i++) { faabric::Message res = - sch.getFunctionResult(req->messages().at(i), 2000); + plannerCli.getMessageResult(req->messages().at(i), 2000); REQUIRE(res.returnvalue() == expectedResult); } diff --git a/tests/test/scheduler/test_function_client_server.cpp b/tests/test/scheduler/test_function_client_server.cpp index 4aa279b7f..00a5f4bf0 100644 --- a/tests/test/scheduler/test_function_client_server.cpp +++ b/tests/test/scheduler/test_function_client_server.cpp @@ -82,8 +82,8 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, sch.clearRecordedMessages(); // Wait for functions to finish - sch.getFunctionResult(msgA, 2000); - sch.getFunctionResult(msgB, 2000); + plannerCli.getMessageResult(msgA, 2000); + plannerCli.getMessageResult(msgB, 2000); // Check executors present REQUIRE(sch.getFunctionExecutorCount(msgA) == 1); @@ -118,7 +118,7 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, for (const auto& m : req->messages()) { // This timeout can be long as it shouldn't fail - sch.getFunctionResult(m, 5 * SHORT_TEST_TIMEOUT_MS); + plannerCli.getMessageResult(m, 5 * SHORT_TEST_TIMEOUT_MS); } // Check no other hosts have been registered @@ -241,7 +241,7 @@ TEST_CASE_METHOD(FunctionClientServerTestFixture, // This thread will block waiting for another thread to set the message // result std::jthread waiterThread{ [&] { - auto resultMsg = sch.getFunctionResult(msg, 2000); + auto resultMsg = plannerCli.getMessageResult(msg, 2000); actualReturnCode = resultMsg.returnvalue(); } }; diff --git a/tests/test/scheduler/test_function_migration.cpp b/tests/test/scheduler/test_function_migration.cpp index 296dd9375..37b096d2a 100644 --- a/tests/test/scheduler/test_function_migration.cpp +++ b/tests/test/scheduler/test_function_migration.cpp @@ -191,7 +191,8 @@ TEST_CASE_METHOD( checkPendingMigrationsExpectation( expectedMigrations, actualMigrations, hosts); - faabric::Message res = sch.getFunctionResult(appId, msgId, 2 * timeToSleep); + faabric::Message res = + plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); REQUIRE(res.returnvalue() == 0); // Check that after the result is set, the app can't be migrated no more @@ -243,7 +244,8 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, checkPendingMigrationsExpectation( expectedMigrations, actualMigrations, hosts); - faabric::Message res = sch.getFunctionResult(appId, msgId, 2 * timeToSleep); + faabric::Message res = + plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); REQUIRE(res.returnvalue() == 0); // Check that after the result is set, the app can't be migrated no more @@ -298,7 +300,8 @@ TEST_CASE_METHOD( checkPendingMigrationsExpectation( expectedMigrations, actualMigrations, hosts); - faabric::Message res = sch.getFunctionResult(appId, msgId, 2 * timeToSleep); + faabric::Message res = + plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); REQUIRE(res.returnvalue() == 0); // Check that after the result is set, the app can't be migrated no more @@ -461,7 +464,8 @@ TEST_CASE_METHOD(FunctionMigrationTestFixture, checkPendingMigrationsExpectation( expectedMigrations, actualMigrations, hosts, true); - faabric::Message res = sch.getFunctionResult(appId, msgId, 2 * timeToSleep); + faabric::Message res = + plannerCli.getMessageResult(appId, msgId, 2 * timeToSleep); REQUIRE(res.returnvalue() == 0); // Clean up diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index d84a444b9..b11306620 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -323,7 +324,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, if (isThreads) { sch.awaitThreadResult(reqOneMsgIds.at(i)); } else { - sch.getFunctionResult(appId, reqOneMsgIds.at(i), 10000); + plannerCli.getMessageResult(appId, reqOneMsgIds.at(i), 10000); } } @@ -421,7 +422,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, if (isThreads) { sch.awaitThreadResult(reqTwoMsgIds.at(i)); } else { - sch.getFunctionResult(appId, reqTwoMsgIds.at(i), 10000); + plannerCli.getMessageResult(appId, reqTwoMsgIds.at(i), 10000); } } @@ -544,7 +545,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, if (execMode == faabric::BatchExecuteRequest::THREADS) { sch.awaitThreadResult(msg.id()); } else { - sch.getFunctionResult(msg, 10000); + plannerCli.getMessageResult(msg, 10000); } } } @@ -640,7 +641,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, sch.setFunctionResult(call); // Check retrieval method gets the same call out again - faabric::Message actualCall2 = sch.getFunctionResult(call, 1); + faabric::Message actualCall2 = plannerCli.getMessageResult(call, 1); checkMessageEquality(call, actualCall2); } @@ -659,6 +660,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, for (int i = 0; i < nWaiters; i++) { waiterThreads.emplace_back([nWaiterMessages] { Scheduler& sch = scheduler::getScheduler(); + auto& plannerCli = faabric::planner::getPlannerClient(); std::shared_ptr req = faabric::util::batchExecFactory("demo", "echo", nWaiterMessages); @@ -671,7 +673,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // Invoke and await sch.callFunctions(req); for (auto msgId : msgIds) { - sch.getFunctionResult(appId, msgId, 5000); + plannerCli.getMessageResult(appId, msgId, 5000); } }); } @@ -729,7 +731,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, } // Check status when nothing has been written - const faabric::Message result = sch.getFunctionResult(msg, 0); + const faabric::Message result = plannerCli.getMessageResult(msg, 1000); REQUIRE(result.returnvalue() == expectedReturnValue); REQUIRE(result.type() == expectedType); @@ -754,14 +756,18 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, // Check empty initially REQUIRE(faabric::util::getChainedFunctions(msg).empty()); - // Log and check this shows up in the result + // Log and check this shows up in the result (change the message id as, + // technically, messages should be unique, so setting the result a second + // time for the same message is undefined behaviour) + msg.set_id(faabric::util::generateGid()); faabric::util::logChainedFunction(msg, chainedMsgA); std::set expected = { (unsigned int)chainedMsgA.id() }; sch.setFunctionResult(msg); REQUIRE(faabric::util::getChainedFunctions(msg) == expected); - // Log some more and check + // Log some more and check (update the message id again) + msg.set_id(faabric::util::generateGid()); faabric::util::logChainedFunction(msg, chainedMsgA); faabric::util::logChainedFunction(msg, chainedMsgB); faabric::util::logChainedFunction(msg, chainedMsgC); @@ -907,8 +913,8 @@ TEST_CASE_METHOD(DummyExecutorTestFixture, "Test executor reuse", "[scheduler]") // Execute a couple of functions sch.callFunctions(reqA); for (auto msgId : reqAMsgIds) { - faabric::Message res = - sch.getFunctionResult(msgA.appid(), msgId, SHORT_TEST_TIMEOUT_MS); + faabric::Message res = plannerCli.getMessageResult( + msgA.appid(), msgId, SHORT_TEST_TIMEOUT_MS); REQUIRE(res.returnvalue() == 0); } @@ -918,8 +924,8 @@ TEST_CASE_METHOD(DummyExecutorTestFixture, "Test executor reuse", "[scheduler]") // Execute a couple more functions sch.callFunctions(reqB); for (auto msgId : reqBMsgIds) { - faabric::Message res = - sch.getFunctionResult(msgB.appid(), msgId, SHORT_TEST_TIMEOUT_MS); + faabric::Message res = plannerCli.getMessageResult( + msgB.appid(), msgId, SHORT_TEST_TIMEOUT_MS); REQUIRE(res.returnvalue() == 0); } @@ -1058,7 +1064,7 @@ TEST_CASE_METHOD(DummyExecutorTestFixture, continue; } - sch.getFunctionResult(appId, msgIds.at(i), 10000); + plannerCli.getMessageResult(appId, msgIds.at(i), 10000); } } diff --git a/tests/test/util/test_exec_graph.cpp b/tests/test/util/test_exec_graph.cpp index 24139a54a..9b47e7f0f 100644 --- a/tests/test/util/test_exec_graph.cpp +++ b/tests/test/util/test_exec_graph.cpp @@ -154,15 +154,11 @@ TEST_CASE_METHOD(MpiBaseTestFixture, "Test MPI execution graph", "[scheduler]") ExecGraph expected{ .rootNode = nodeA }; - /* TODO: fix - for (const auto& id : sch.getChainedFunctions(msg.id())) { - sch.getFunctionResult(id, 500); - } - */ // Wait for the MPI messages to finish - sch.getFunctionResult(msg, 2000); + auto& plannerCli = faabric::planner::getPlannerClient(); + plannerCli.getMessageResult(msg, 2000); for (const auto& id : faabric::util::getChainedFunctions(msg)) { - sch.getFunctionResult(msg.appid(), id, 2000); + plannerCli.getMessageResult(msg.appid(), id, 2000); } ExecGraph actual = getFunctionExecGraph(msg); diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 201a3db09..4c8de9d66 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -114,7 +114,7 @@ class PlannerClientServerFixture { public: PlannerClientServerFixture() - : plannerCli(faabric::util::getSystemConfig().plannerHost) + : plannerCli(faabric::planner::getPlannerClient()) { plannerServer.start(); plannerCli.ping(); @@ -127,7 +127,7 @@ class PlannerClientServerFixture } protected: - faabric::planner::PlannerClient plannerCli; + faabric::planner::PlannerClient& plannerCli; faabric::planner::PlannerServer plannerServer; };