diff --git a/.env b/.env index 53da71de5..8b1fb5863 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.4.5 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.4.5 +FAABRIC_VERSION=0.4.6 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.4.6 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 88b0433d6..39dc34bd5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -23,7 +23,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -36,7 +36,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -50,7 +50,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -73,7 +73,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -113,7 +113,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} @@ -167,7 +167,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.4.5 + image: faasm.azurecr.io/faabric:0.4.6 credentials: username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }} password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }} diff --git a/VERSION b/VERSION index 0bfccb080..ef52a6480 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.4.5 +0.4.6 diff --git a/include/faabric/planner/Planner.h b/include/faabric/planner/Planner.h index b5e18dc0a..c96667746 100644 --- a/include/faabric/planner/Planner.h +++ b/include/faabric/planner/Planner.h @@ -2,6 +2,7 @@ #include #include +#include #include @@ -59,6 +60,10 @@ class Planner std::shared_ptr getMessageResult( std::shared_ptr msg); + // Get all the results recorded for one batch + std::shared_ptr getBatchResults( + int32_t appId); + private: // There's a singleton instance of the planner running, but it must allow // concurrent requests diff --git a/include/faabric/util/batch.h b/include/faabric/util/batch.h index 813ac62a9..01080e952 100644 --- a/include/faabric/util/batch.h +++ b/include/faabric/util/batch.h @@ -3,6 +3,11 @@ #include namespace faabric::util { + +// ---------- +// Batch Execute Requests (BER) +// ---------- + std::shared_ptr batchExecFactory(); std::shared_ptr batchExecFactory( @@ -11,4 +16,14 @@ std::shared_ptr batchExecFactory( int count = 1); bool isBatchExecRequestValid(std::shared_ptr ber); + +// ---------- +// Batch Execute Requests' Status +// ---------- + +std::shared_ptr batchExecStatusFactory( + int32_t appId); + +std::shared_ptr batchExecStatusFactory( + std::shared_ptr ber); } diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index 04d9ca573..a9387fa6c 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -31,116 +31,11 @@ void FaabricEndpointHandler::onRequest( // Text response type response.set(header::content_type, "text/plain"); - PROF_START(endpointRoundTrip) - - // Parse message from JSON in request - const std::string& requestStr = request.body(); - - // Handle JSON - if (requestStr.empty()) { - SPDLOG_ERROR("Faabric handler received empty request"); - response.result(beast::http::status::bad_request); - response.body() = std::string("Empty request"); - } else { - auto req = faabric::util::batchExecFactory(); - req->set_type(req->FUNCTIONS); - faabric::Message& msg = *req->add_messages(); - faabric::util::jsonToMessage(requestStr, &msg); - faabric::scheduler::Scheduler& sched = - faabric::scheduler::getScheduler(); - - if (msg.isstatusrequest()) { - SPDLOG_DEBUG("Processing status request"); - const faabric::Message result = sched.getFunctionResult(msg, 0); - - if (result.type() == faabric::Message_MessageType_EMPTY) { - response.result(beast::http::status::ok); - response.body() = std::string("RUNNING"); - } else if (result.returnvalue() == 0) { - response.result(beast::http::status::ok); - response.body() = faabric::util::messageToJson(result); - } else { - response.result(beast::http::status::internal_server_error); - response.body() = "FAILED: " + result.outputdata(); - } - } else { - executeFunction( - std::move(ctx), std::move(response), std::move(req), 0); - return; - } - } - - PROF_END(endpointRoundTrip) + // TODO: for the moment we keep the endpoint handler, but we are not meant + // to receive any requests here. Eventually we will delete it + SPDLOG_ERROR("Faabric handler received empty request"); + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty request"); ctx.sendFunction(std::move(response)); } - -void FaabricEndpointHandler::executeFunction( - HttpRequestContext&& ctx, - faabric::util::BeastHttpResponse&& response, - std::shared_ptr ber, - size_t messageIndex) -{ - auto& conf = faabric::util::getSystemConfig(); - - // Set app ID, message ID and master host on the first message of the BER - // TODO: eventually do it on the BER itself - faabric::util::setMessageId(*ber->mutable_messages(0)); - ber->mutable_messages(0)->set_masterhost(conf.endpointHost); - int appId = ber->messages(0).appid(); - int msgId = ber->messages(0).id(); - assert(appId != 0); - assert(msgId != 0); - - if (ber->messages(0).user().empty()) { - response.result(beast::http::status::bad_request); - response.body() = std::string("Empty user"); - return ctx.sendFunction(std::move(response)); - } - - if (ber->messages(0).function().empty()) { - response.result(beast::http::status::bad_request); - response.body() = std::string("Empty function"); - return ctx.sendFunction(std::move(response)); - } - - // This is set to false by the scheduler if the function ends up being sent - // elsewhere - if (!ber->messages(0).isasync()) { - ber->mutable_messages(0)->set_executeslocally(true); - } - - auto tid = gettid(); - const std::string funcStr = - faabric::util::funcToString(ber->messages(0), true); - SPDLOG_DEBUG("Worker HTTP thread {} scheduling {}", tid, funcStr); - - // Schedule it - faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); - sch.callFunctions(ber); - - // Await result on global bus (may have been executed on a different worker) - if (ber->messages(0).isasync()) { - response.result(beast::http::status::ok); - response.body() = faabric::util::messageToJson(ber->messages(0)); - return ctx.sendFunction(std::move(response)); - } - - // TODO: temporarily make this HTTP call block one server thread. - // Eventually. we will route all HTTP requests through the planner instead - // of the worker, so we will be able to remove this blocking call - SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr); - auto result = - sch.getFunctionResult(appId, msgId, conf.globalMessageTimeout); - - beast::http::status statusCode = - (result.returnvalue() == 0) ? beast::http::status::ok - : beast::http::status::internal_server_error; - response.result(statusCode); - SPDLOG_DEBUG("Worker thread {} result {}", - gettid(), - faabric::util::funcToString(result, true)); - - response.body() = result.outputdata(); - return ctx.sendFunction(std::move(response)); -} } diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index b2b7c5443..768525a9d 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -1,5 +1,7 @@ #include +#include #include +#include #include #include #include @@ -267,6 +269,27 @@ std::shared_ptr Planner::getMessageResult( return nullptr; } +std::shared_ptr Planner::getBatchResults( + int32_t appId) +{ + auto berStatus = faabric::util::batchExecStatusFactory(appId); + + // Acquire a read lock to copy all the results we have for this batch + { + faabric::util::SharedLock lock(plannerMx); + + if (!state.appResults.contains(appId)) { + return nullptr; + } + + for (auto msgResultPair : state.appResults.at(appId)) { + *berStatus->add_messageresults() = *(msgResultPair.second); + } + } + + return berStatus; +} + Planner& getPlanner() { static Planner planner; diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index 4d9affe83..02a21400b 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -2,7 +2,9 @@ #include #include #include +#include #include +#include #include #include @@ -10,6 +12,11 @@ namespace faabric::planner { using header = beast::http::field; +// TODO(schedule): this atomic variable is used to temporarily select which +// host to forward an execute request to. This is because the planner still +// does not schedule resources to hosts, just acts as a proxy. +static std::atomic nextHostIdx = 0; + void PlannerEndpointHandler::onRequest( faabric::endpoint::HttpRequestContext&& ctx, faabric::util::BeastHttpRequest&& request) @@ -29,7 +36,7 @@ void PlannerEndpointHandler::onRequest( response.set(header::content_type, "text/plain"); // Request body contains a string that is formatted as a JSON - const std::string& requestStr = request.body(); + std::string requestStr = request.body(); // Handle JSON if (requestStr.empty()) { @@ -113,6 +120,7 @@ void PlannerEndpointHandler::onRequest( response.body() = std::string("Bad JSON in request body"); return ctx.sendFunction(std::move(response)); } + auto execGraph = faabric::util::getFunctionExecGraph(payloadMsg); // An empty exec graph has one node with all fields null-ed if (execGraph.rootNode.msg.id() == 0) { @@ -125,6 +133,95 @@ void PlannerEndpointHandler::onRequest( } return ctx.sendFunction(std::move(response)); } + case faabric::planner::HttpMessage_Type_EXECUTE_BATCH: { + // in: BatchExecuteRequest + // out: BatchExecuteRequestStatus + // Parse the message payload + SPDLOG_DEBUG("Planner received EXECUTE_BATCH request"); + faabric::BatchExecuteRequest rawBer; + try { + faabric::util::jsonToMessage(msg.payloadjson(), &rawBer); + } catch (faabric::util::JsonSerialisationException e) { + response.result(beast::http::status::bad_request); + response.body() = std::string("Bad JSON in body's payload"); + return ctx.sendFunction(std::move(response)); + } + auto ber = std::make_shared(rawBer); + + // Sanity check the BER + if (!faabric::util::isBatchExecRequestValid(ber)) { + response.result(beast::http::status::bad_request); + response.body() = "Bad BatchExecRequest"; + return ctx.sendFunction(std::move(response)); + } + ber->set_comesfromplanner(true); + + // Schedule and execute the BER + // TODO: make scheduling decision here + // FIXME: for the moment, just forward randomly to one node. Note + // that choosing the node randomly may yield to uneven load + // distributions + auto availableHosts = + faabric::planner::getPlanner().getAvailableHosts(); + if (availableHosts.empty()) { + SPDLOG_ERROR("Planner doesn't have any registered hosts to" + " schedule EXECUTE_BATCH request to!"); + response.result(beast::http::status::internal_server_error); + response.body() = std::string("No available hosts"); + return ctx.sendFunction(std::move(response)); + } + // Note that hostIdx++ is an atomic increment + int hostIdx = nextHostIdx++ % availableHosts.size(); + faabric::scheduler::getScheduler() + .getFunctionCallClient(availableHosts.at(hostIdx)->ip()) + ->executeFunctions(ber); + + // Prepare the response + response.result(beast::http::status::ok); + auto berStatus = faabric::util::batchExecStatusFactory(ber); + response.body() = faabric::util::messageToJson(*berStatus); + + return ctx.sendFunction(std::move(response)); + } + case faabric::planner::HttpMessage_Type_EXECUTE_BATCH_STATUS: { + // in: BatchExecuteRequestStatus + // out: BatchExecuteRequestStatus + // Parse the message payload + SPDLOG_DEBUG("Planner received EXECUTE_BATCH_STATUS request"); + faabric::BatchExecuteRequestStatus berStatus; + try { + faabric::util::jsonToMessage(msg.payloadjson(), &berStatus); + } catch (faabric::util::JsonSerialisationException e) { + response.result(beast::http::status::bad_request); + response.body() = std::string("Bad JSON in request body"); + return ctx.sendFunction(std::move(response)); + } + + // Work-out how many message results we have for the requested BER + auto actualBerStatus = + faabric::planner::getPlanner().getBatchResults(berStatus.appid()); + + // If the result is null, it means that the app id is not + // registered in the results map. This is an error + if (actualBerStatus == nullptr) { + response.result(beast::http::status::internal_server_error); + response.body() = std::string("App not registered in results"); + return ctx.sendFunction(std::move(response)); + } + + // Prepare the response + response.result(beast::http::status::ok); + // Work-out if it has finished using user-provided flags + if (actualBerStatus->messageresults_size() == + berStatus.expectednummessages()) { + actualBerStatus->set_finished(true); + } else { + actualBerStatus->set_finished(false); + } + response.body() = faabric::util::messageToJson(*actualBerStatus); + + return ctx.sendFunction(std::move(response)); + } default: { SPDLOG_ERROR("Unrecognised message type {}", msg.type()); response.result(beast::http::status::bad_request); diff --git a/src/planner/planner.proto b/src/planner/planner.proto index 8c06c06b0..a18a6812b 100644 --- a/src/planner/planner.proto +++ b/src/planner/planner.proto @@ -39,12 +39,16 @@ message HttpMessage { FLUSH_EXECUTORS = 3; GET_CONFIG = 4; GET_EXEC_GRAPH = 5; + EXECUTE_BATCH = 6; + EXECUTE_BATCH_STATUS = 7; } Type type = 1 [json_name = "http_type"]; // Payload parameter to populate with the message body if necessary. The // body is only needed for: // - GET_EXEC_GRAPH: where the body is a faabric::Message + // - EXECUTE_BATCH: where the body is a faabric::BatchExecuteRequest + // - EXECUTE_BATCH_STATUS: where the body is a BER too string payloadJson = 2 [json_name = "payload"]; } diff --git a/src/proto/faabric.proto b/src/proto/faabric.proto index e1ea71eb0..6a501b225 100644 --- a/src/proto/faabric.proto +++ b/src/proto/faabric.proto @@ -19,7 +19,8 @@ message EmptyRequest { // --------------------------------------------- message BatchExecuteRequest { - int32 id = 1; + // Each BatchExecuteRequest has a unique app id + int32 appId = 1; enum BatchExecuteType { FUNCTIONS = 0; @@ -42,6 +43,26 @@ message BatchExecuteRequest { // Flag set by the scheduler when this batch is all executing on a single // host bool singleHost = 8; + + // TODO(planner-schedule): remove me + // Temporary flag to indicate the workers that the BER comes from the + // planner (i.e. proxy-ed through planner) and so it has not been scheduled + // yet. Whenever the planner does scheduling we will be able to remove this + bool comesFromPlanner = 9; +} + +message BatchExecuteRequestStatus { + // Each BatchExecuteRequest has a unique app id + int32 appId = 1; + + bool finished = 2; + + repeated Message messageResults = 3; + + // Batches can dynamically change size throughout execution. This parameter + // is a hint provided by the client letting faabric know how many message + // results we expect for this BER + int32 expectedNumMessages = 4; } message HostResources { @@ -65,6 +86,8 @@ message FunctionStatusResponse { message Message { int32 id = 1; + // The app id indicates which BatchExecuteRequest this message belongs to. + // Each BER has a unique app id int32 appId = 2; int32 appIdx = 3; string masterHost = 4; @@ -96,9 +119,7 @@ message Message { string executedHost = 18; int64 finishTimestamp = 19 [json_name = "finish_ts"]; - bool isAsync = 20 [json_name = "async"]; bool isPython = 21 [json_name = "python"]; - bool isStatusRequest = 22 [json_name = "status"]; string pythonUser = 24 [json_name = "py_user"]; string pythonFunction = 25 [json_name = "py_func"]; diff --git a/src/scheduler/FunctionCallServer.cpp b/src/scheduler/FunctionCallServer.cpp index 27bd1c1e1..e787907b2 100644 --- a/src/scheduler/FunctionCallServer.cpp +++ b/src/scheduler/FunctionCallServer.cpp @@ -78,8 +78,17 @@ void FunctionCallServer::recvExecuteFunctions(std::span buffer) PARSE_MSG(faabric::BatchExecuteRequest, buffer.data(), buffer.size()) // This host has now been told to execute these functions no matter what - // TODO - avoid this copy - parsedMsg.mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); + // TODO(planner-schedule): this if is only here because, temporarily, the + // planner doesn't take any scheduling decisions + if (!parsedMsg.comesfromplanner()) { + parsedMsg.mutable_messages()->at(0).set_topologyhint("FORCE_LOCAL"); + } else { + // This flags were set by the old endpoint, we temporarily set them here + parsedMsg.mutable_messages()->at(0).set_timestamp( + faabric::util::getGlobalClock().epochMillis()); + parsedMsg.mutable_messages()->at(0).set_masterhost( + faabric::util::getSystemConfig().endpointHost); + } scheduler.callFunctions( std::make_shared(parsedMsg)); } diff --git a/src/util/batch.cpp b/src/util/batch.cpp index 3699f2338..f9fa4d83f 100644 --- a/src/util/batch.cpp +++ b/src/util/batch.cpp @@ -6,7 +6,7 @@ namespace faabric::util { std::shared_ptr batchExecFactory() { auto req = std::make_shared(); - req->set_id(generateGid()); + req->set_appid(generateGid()); return req; } @@ -18,7 +18,7 @@ std::shared_ptr batchExecFactory( auto req = batchExecFactory(); // Force the messages to have the same app ID than the BER - int appId = req->id(); + int appId = req->appid(); for (int i = 0; i < count; i++) { *req->add_messages() = messageFactory(user, function); req->mutable_messages()->at(i).set_appid(appId); @@ -34,7 +34,7 @@ bool isBatchExecRequestValid(std::shared_ptr ber) } // An empty BER (thus invalid) will have 0 messages and an id of 0 - if (ber->messages_size() <= 0 && ber->id() == 0) { + if (ber->messages_size() <= 0 && ber->appid() == 0) { return false; } @@ -42,6 +42,17 @@ bool isBatchExecRequestValid(std::shared_ptr ber) std::string func = ber->messages(0).function(); int appId = ber->messages(0).appid(); + // If the user or func are empty, the BER is invalid + if (user.empty() || func.empty()) { + return false; + } + + // The BER and all messages must have the same appid + if (ber->appid() != appId) { + return false; + } + + // All messages in the BER must have the same app id, user, and function for (int i = 1; i < ber->messages_size(); i++) { auto msg = ber->messages(i); if (msg.user() != user || msg.function() != func || @@ -52,4 +63,20 @@ bool isBatchExecRequestValid(std::shared_ptr ber) return true; } + +std::shared_ptr batchExecStatusFactory( + int32_t appId) +{ + auto berStatus = std::make_shared(); + berStatus->set_appid(appId); + berStatus->set_finished(false); + + return berStatus; +} + +std::shared_ptr batchExecStatusFactory( + std::shared_ptr ber) +{ + return batchExecStatusFactory(ber->appid()); +} } diff --git a/tests/test/endpoint/test_endpoint.cpp b/tests/test/endpoint/test_endpoint.cpp new file mode 100644 index 000000000..68d6ec0d2 --- /dev/null +++ b/tests/test/endpoint/test_endpoint.cpp @@ -0,0 +1,140 @@ +#include + +#include "DummyExecutor.h" +#include "DummyExecutorFactory.h" +#include "faabric_utils.h" +#include "fixtures.h" + +#include +#include +#include + +#include + +static int portIn = 9090; + +namespace tests { + +using namespace faabric::endpoint; + +/* + * Very simple endpoint handler to test the endpoint functionality + */ +class TestEndpointHandler final + : public faabric::endpoint::HttpRequestHandler + , public std::enable_shared_from_this +{ + public: + void onRequest(faabric::endpoint::HttpRequestContext&& ctx, + faabric::util::BeastHttpRequest&& request) override + { + std::string requestStr = request.body(); + + faabric::util::BeastHttpResponse response; + + if (requestStr.empty()) { + response.result(beast::http::status::bad_request); + response.body() = std::string("Empty request"); + } else if (requestStr == "ping") { + response.result(beast::http::status::ok); + response.body() = std::string("pong"); + } else { + response.result(beast::http::status::bad_request); + response.body() = std::string("Bad request body"); + } + + return ctx.sendFunction(std::move(response)); + } +}; + +class EndpointTestFixture +{ + public: + EndpointTestFixture() + : host(LOCALHOST) + , port(++portIn) + , endpoint(port, 4, std::make_shared()) + { + endpoint.start(faabric::endpoint::EndpointMode::BG_THREAD); + } + + ~EndpointTestFixture() { endpoint.stop(); } + + protected: + std::string host; + int port; + FaabricEndpoint endpoint; + + // Test case state + boost::beast::http::status expectedReturnCode; + std::string expectedResponseBody; + + std::pair doPost(const std::string& body) + { + return postToUrl(host, port, body); + } +}; + +void* doWork(void* arg) +{ + FaabricEndpoint endpoint(9080, 4, std::make_shared()); + endpoint.start(EndpointMode::SIGNAL); + + SPDLOG_INFO("Exitting.."); + + endpoint.stop(); + + pthread_exit(0); +} + +TEST_CASE("Test starting an endpoint in signal mode", "[endpoint]") +{ + // Use pthreads to be able to signal the thread correctly + pthread_t ptid; + + pthread_create(&ptid, nullptr, &doWork, nullptr); + + // Send a post request to make sure endpoint is running + std::pair result = postToUrl(LOCALHOST, 9080, "ping"); + REQUIRE(boost::beast::http::int_to_status(result.first) == + boost::beast::http::status::ok); + REQUIRE(result.second == "pong"); + + pthread_kill(ptid, SIGINT); + + pthread_join(ptid, nullptr); +} + +TEST_CASE_METHOD(EndpointTestFixture, + "Test posting a request to the endpoint", + "[endpoint]") +{ + std::string requestBody; + + SECTION("Valid request") + { + requestBody = "ping"; + expectedReturnCode = boost::beast::http::status::ok; + expectedResponseBody = "pong"; + } + + SECTION("Empty request") + { + requestBody = ""; + expectedReturnCode = boost::beast::http::status::bad_request; + expectedResponseBody = "Empty request"; + } + + SECTION("Invalid request") + { + requestBody = "pong"; + expectedReturnCode = boost::beast::http::status::bad_request; + expectedResponseBody = "Bad request body"; + } + + std::pair result = doPost(requestBody); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); +} +} diff --git a/tests/test/endpoint/test_endpoint_api.cpp b/tests/test/endpoint/test_endpoint_api.cpp deleted file mode 100644 index 1123bbbdb..000000000 --- a/tests/test/endpoint/test_endpoint_api.cpp +++ /dev/null @@ -1,208 +0,0 @@ -#include - -#include "faabric_utils.h" -#include "fixtures.h" - -#include -#include -#include -#include -#include -#include - -using namespace faabric::scheduler; - -namespace tests { - -// This is a bit gnarly, we get "Address already in use" errors if we try to use -// the same port for each case, so we need to switch it every time. -static int port = 8080; - -#define ASYNC_EXEC_SLEEP_TIME 2000 - -class EndpointApiTestExecutor final : public Executor -{ - public: - EndpointApiTestExecutor(faabric::Message& msg) - : Executor(msg) - {} - - ~EndpointApiTestExecutor() {} - - int32_t executeTask( - int threadPoolIdx, - int msgIdx, - std::shared_ptr reqOrig) override - { - faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); - - int returnVal = 0; - if (msg.function() == "valid") { - msg.set_outputdata( - fmt::format("Endpoint API test executed {}", msg.id())); - - } else if (msg.function() == "error") { - returnVal = 1; - msg.set_outputdata(fmt::format( - "Endpoint API returning {} for {}", returnVal, msg.id())); - } else if (msg.isasync()) { - returnVal = 0; - - SLEEP_MS(ASYNC_EXEC_SLEEP_TIME); - - msg.set_outputdata( - fmt::format("Finished async message {}", msg.id())); - } else { - throw std::runtime_error("Endpoint API error"); - } - - return returnVal; - } -}; - -class EndpointApiTestExecutorFactory : public ExecutorFactory -{ - protected: - std::shared_ptr createExecutor(faabric::Message& msg) override - { - return std::make_shared(msg); - } -}; - -class EndpointApiTestFixture - : public FunctionCallClientServerFixture - , public SchedulerFixture -{ - public: - EndpointApiTestFixture() - { - executorFactory = std::make_shared(); - setExecutorFactory(executorFactory); - } - - ~EndpointApiTestFixture() = default; - - protected: - std::shared_ptr executorFactory; -}; - -TEST_CASE_METHOD(EndpointApiTestFixture, - "Test requests to endpoint", - "[endpoint]") -{ - port++; - - faabric::endpoint::FaabricEndpoint endpoint( - port, 2, std::make_shared()); - - endpoint.start(faabric::endpoint::EndpointMode::BG_THREAD); - - // Wait for the server to start - SLEEP_MS(2000); - - std::string body; - int expectedReturnCode = 200; - std::string expectedResponseBody; - - SECTION("Empty request") - { - expectedReturnCode = 400; - expectedResponseBody = "Empty request"; - } - - SECTION("Valid request") - { - faabric::Message msg = faabric::util::messageFactory("foo", "valid"); - body = faabric::util::messageToJson(msg); - expectedReturnCode = 200; - expectedResponseBody = - fmt::format("Endpoint API test executed {}", msg.id()); - } - - SECTION("Error request") - { - faabric::Message msg = faabric::util::messageFactory("foo", "error"); - body = faabric::util::messageToJson(msg); - expectedReturnCode = 500; - expectedResponseBody = - fmt::format("Endpoint API returning 1 for {}", msg.id()); - } - - SECTION("Invalid function") - { - faabric::Message msg = faabric::util::messageFactory("foo", "junk"); - body = faabric::util::messageToJson(msg); - expectedReturnCode = 500; - expectedResponseBody = fmt::format( - "Task {} threw exception. What: Endpoint API error", msg.id()); - } - - std::pair result = postToUrl(LOCALHOST, port, body); - REQUIRE(result.first == expectedReturnCode); - REQUIRE(result.second == expectedResponseBody); - - endpoint.stop(); -} - -TEST_CASE_METHOD(EndpointApiTestFixture, - "Test status requests to endpoint", - "[endpoint]") -{ - port++; - faabric::endpoint::FaabricEndpoint endpoint( - port, 2, std::make_shared()); - - endpoint.start(faabric::endpoint::EndpointMode::BG_THREAD); - - // Wait for the server to start - SLEEP_MS(2000); - - // Make the initial invocation - faabric::Message msg = faabric::util::messageFactory("foo", "blah"); - msg.set_isasync(true); - std::string body = faabric::util::messageToJson(msg); - - std::pair result = postToUrl(LOCALHOST, port, body); - faabric::Message response; - faabric::util::jsonToMessage(result.second, &response); - - REQUIRE(result.first == 200); - REQUIRE(response.id() == msg.id()); - - // Make a status request, should still be running - faabric::Message statusMsg; - statusMsg.set_user("foo"); - statusMsg.set_function("blah"); - statusMsg.set_id(msg.id()); - statusMsg.set_appid(msg.appid()); - statusMsg.set_isstatusrequest(true); - - std::string statusBody = faabric::util::messageToJson(statusMsg); - - std::pair statusResult = - postToUrl(LOCALHOST, port, statusBody); - - REQUIRE(statusResult.first == 200); - REQUIRE(statusResult.second == "RUNNING"); - - // Unfortunately awaiting the result here will erase it from the system - // state when it does return, hence it will no longer be available to get - // via the status request. Therefore we just have to sleep. - SLEEP_MS(ASYNC_EXEC_SLEEP_TIME + 2000); - - std::pair statusResultAfter = - postToUrl(LOCALHOST, port, statusBody); - - // Check we got a response, and that it's not still running - REQUIRE(statusResultAfter.first == 200); - REQUIRE(statusResultAfter.second != "RUNNING"); - - faabric::Message resultMsg; - faabric::util::jsonToMessage(statusResultAfter.second, &resultMsg); - REQUIRE(resultMsg.returnvalue() == 0); - REQUIRE(resultMsg.outputdata() == - fmt::format("Finished async message {}", msg.id())); - - endpoint.stop(); -} -} diff --git a/tests/test/endpoint/test_handler.cpp b/tests/test/endpoint/test_handler.cpp deleted file mode 100644 index 75b1de316..000000000 --- a/tests/test/endpoint/test_handler.cpp +++ /dev/null @@ -1,181 +0,0 @@ -#include - -#include "DummyExecutor.h" -#include "DummyExecutorFactory.h" -#include "faabric_utils.h" -#include "fixtures.h" - -#include -#include -#include - -namespace tests { - -class EndpointHandlerTestFixture - : public FunctionCallClientServerFixture - , public SchedulerFixture -{ - protected: - // Taking in a shared_ptr by reference to ensure the handler was constructed - // with std::make_shared - static std::pair synchronouslyHandleFunction( - std::shared_ptr& handler, - std::string requestStr) - { - asio::io_context ioc(1); - asio::strand strand = asio::make_strand(ioc); - faabric::util::BeastHttpResponse response; - faabric::util::BeastHttpRequest req(beast::http::verb::get, "/", 10); - req.body() = requestStr; - faabric::endpoint::HttpRequestContext ctx{ - ioc, - strand, - [&](faabric::util::BeastHttpResponse&& resp) { - response = std::move(resp); - } - }; - handler->onRequest(std::move(ctx), std::move(req)); - ioc.run(); - return std::make_pair(response.result_int(), response.body()); - } -}; - -TEST_CASE_METHOD(EndpointHandlerTestFixture, - "Test valid calls to endpoint", - "[endpoint]") -{ - // Must be async to avoid needing a result - faabric::Message call = faabric::util::messageFactory("foo", "bar"); - call.set_isasync(true); - std::string user = "foo"; - std::string function = "bar"; - std::string actualInput; - - SECTION("With input") - { - actualInput = "foobar"; - call.set_inputdata(actualInput); - } - - SECTION("No input") {} - - call.set_user(user); - call.set_function(function); - - const std::string& requestStr = faabric::util::messageToJson(call); - - // Handle the function - std::shared_ptr handler = - std::make_shared(); - std::pair response = - synchronouslyHandleFunction(handler, requestStr); - - REQUIRE(response.first == 200); - faabric::Message responseMsg; - faabric::util::jsonToMessage(response.second, &responseMsg); - - // Check actual call has right details including the ID returned to the - // caller - std::vector msgs = sch.getRecordedMessagesAll(); - REQUIRE(msgs.size() == 1); - faabric::Message actualCall = msgs.at(0); - REQUIRE(actualCall.user() == call.user()); - REQUIRE(actualCall.function() == call.function()); - REQUIRE(actualCall.id() == responseMsg.id()); - REQUIRE(actualCall.inputdata() == actualInput); - - // Wait for the result - actualCall.set_appid(responseMsg.appid()); - sch.getFunctionResult(actualCall, 2000); -} - -TEST_CASE_METHOD(EndpointHandlerTestFixture, - "Test empty invocation", - "[endpoint]") -{ - std::shared_ptr handler = - std::make_shared(); - std::pair actual = - synchronouslyHandleFunction(handler, ""); - - REQUIRE(actual.first == 400); - REQUIRE(actual.second == "Empty request"); -} - -TEST_CASE_METHOD(EndpointHandlerTestFixture, - "Test empty JSON invocation", - "[endpoint]") -{ - faabric::Message call; - call.set_isasync(true); - - std::string expected; - - SECTION("Empty user") - { - expected = "Empty user"; - call.set_function("echo"); - } - - SECTION("Empty function") - { - expected = "Empty function"; - call.set_user("demo"); - } - - std::shared_ptr handler = - std::make_shared(); - const std::string& requestStr = faabric::util::messageToJson(call); - std::pair actual = - synchronouslyHandleFunction(handler, requestStr); - - REQUIRE(actual.first == 400); - REQUIRE(actual.second == expected); -} - -TEST_CASE_METHOD(EndpointHandlerTestFixture, - "Check getting function status from endpoint", - "[endpoint]") -{ - // Create a message - faabric::Message msg = faabric::util::messageFactory("demo", "echo"); - - int expectedReturnCode = 200; - std::string expectedOutput; - - SECTION("Running") { expectedOutput = "RUNNING"; } - - SECTION("Failure") - { - std::string errorMsg = "I have failed"; - msg.set_outputdata(errorMsg); - msg.set_returnvalue(1); - sch.setFunctionResult(msg); - - expectedReturnCode = 500; - - expectedOutput = "FAILED: " + errorMsg; - } - - SECTION("Success") - { - std::string errorMsg = "I have succeeded"; - msg.set_outputdata(errorMsg); - msg.set_returnvalue(0); - sch.setFunctionResult(msg); - - expectedOutput = faabric::util::messageToJson(msg); - } - - msg.set_isstatusrequest(true); - - std::shared_ptr handler = - std::make_shared(); - const std::string& requestStr = faabric::util::messageToJson(msg); - std::pair actual = - synchronouslyHandleFunction(handler, requestStr); - - REQUIRE(actual.first == expectedReturnCode); - REQUIRE(actual.second == expectedOutput); -} -} diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index 903747997..f163165de 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -14,12 +14,12 @@ using namespace faabric::planner; namespace tests { -class FaabricPlannerEndpointTestFixture +class PlannerEndpointTestFixture : public ConfFixture , public PlannerClientServerFixture { public: - FaabricPlannerEndpointTestFixture() + PlannerEndpointTestFixture() : host(LOCALHOST) , port(conf.plannerPort) , endpoint( @@ -29,9 +29,14 @@ class FaabricPlannerEndpointTestFixture { conf.plannerHost = LOCALHOST; endpoint.start(faabric::endpoint::EndpointMode::BG_THREAD); + resetPlanner(); } - ~FaabricPlannerEndpointTestFixture() { endpoint.stop(); } + ~PlannerEndpointTestFixture() + { + resetPlanner(); + endpoint.stop(); + } protected: std::string host; @@ -49,9 +54,7 @@ class FaabricPlannerEndpointTestFixture } }; -TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, - "Test planner reset", - "[planner]") +TEST_CASE_METHOD(PlannerEndpointTestFixture, "Test planner reset", "[planner]") { expectedReturnCode = boost::beast::http::status::ok; expectedResponseBody = "Planner fully reset!"; @@ -90,7 +93,7 @@ TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, REQUIRE(availableHosts.empty()); } -TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, +TEST_CASE_METHOD(PlannerEndpointTestFixture, "Test flushing available hosts", "[planner]") { @@ -133,7 +136,7 @@ TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, REQUIRE(availableHosts.empty()); } -TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, +TEST_CASE_METHOD(PlannerEndpointTestFixture, "Test flushing executors", "[planner]") { @@ -182,7 +185,7 @@ TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, faabric::scheduler::getScheduler().shutdown(); } -TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, +TEST_CASE_METHOD(PlannerEndpointTestFixture, "Test getting the planner config", "[planner]") { @@ -206,7 +209,33 @@ TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, REQUIRE(config.numthreadshttpserver() > 0); } -TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, +class PlannerEndpointExecTestFixture + : public PlannerEndpointTestFixture + , public FunctionCallClientServerFixture +{ + public: + PlannerEndpointExecTestFixture() + : sch(faabric::scheduler::getScheduler()) + { + sch.shutdown(); + sch.addHostToGlobalSet(); + + std::shared_ptr fac = + std::make_shared(); + faabric::scheduler::setExecutorFactory(fac); + } + + ~PlannerEndpointExecTestFixture() + { + sch.shutdown(); + sch.addHostToGlobalSet(); + } + + protected: + faabric::scheduler::Scheduler& sch; +}; + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, "Check getting execution graph from endpoint", "[planner]") { @@ -218,40 +247,205 @@ TEST_CASE_METHOD(FaabricPlannerEndpointTestFixture, int msgId = ber->messages(0).id(); msg.set_payloadjson(faabric::util::messageToJson(ber->messages(0))); - // Prepare the system to execute functions - faabric::scheduler::FunctionCallServer functionCallServer; - functionCallServer.start(); - std::shared_ptr fac = - std::make_shared(); - faabric::scheduler::setExecutorFactory(fac); - // Call a function first, and wait for the result - auto& sch = faabric::scheduler::getScheduler(); sch.callFunctions(ber); auto resultMsg = sch.getFunctionResult(appId, msgId, 1000); - // Set expectation - SECTION("Success") { expectedReturnCode = boost::beast::http::status::ok; } + SECTION("Success") + { + expectedReturnCode = boost::beast::http::status::ok; + faabric::util::ExecGraphNode rootNode = { .msg = resultMsg }; + faabric::util::ExecGraph expectedGraph{ .rootNode = rootNode }; + expectedResponseBody = faabric::util::execGraphToJson(expectedGraph); + } + + // If we can't find the exec. graph, the endpoint will return an error SECTION("Failure") { - expectedReturnCode = beast::http::status::internal_server_error; ber->mutable_messages(0)->set_appid(1337); msg.set_payloadjson(faabric::util::messageToJson(ber->messages(0))); + expectedReturnCode = beast::http::status::internal_server_error; + expectedResponseBody = "Failed getting exec. graph!"; + } + + // The GET_EXEC_GRAPH request requires a serialised Message as + // payload, otherwise it will return an error + SECTION("Bad request payload") + { + msg.set_payloadjson("foo bar"); + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = "Bad JSON in request body"; } - faabric::util::ExecGraphNode rootNode = { .msg = resultMsg }; - faabric::util::ExecGraph expectedGraph{ .rootNode = rootNode }; // Send an HTTP request to get the execution graph msgJsonStr = faabric::util::messageToJson(msg); std::pair result = doPost(msgJsonStr); REQUIRE(boost::beast::http::int_to_status(result.first) == expectedReturnCode); - if (expectedReturnCode == boost::beast::http::status::ok) { - REQUIRE(result.second == faabric::util::execGraphToJson(expectedGraph)); + REQUIRE(result.second == expectedResponseBody); +} + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, + "Check executing a function through the endpoint", + "[planner]") +{ + // Prepare HTTP request + HttpMessage msg; + msg.set_type(HttpMessage_Type_EXECUTE_BATCH); + auto ber = faabric::util::batchExecFactory("foo", "bar", 1); + int appId = ber->appid(); + int msgId = ber->messages(0).id(); + msg.set_payloadjson(faabric::util::messageToJson(*ber)); + + SECTION("Success") + { + expectedReturnCode = beast::http::status::ok; + auto expectedBerStatus = faabric::util::batchExecStatusFactory(appId); + expectedResponseBody = faabric::util::messageToJson(*expectedBerStatus); + } + + // The EXECUTE_BATCH request requires a serialised BatchExecuteRequest as + // payload, otherwise it will return an error + SECTION("Bad request payload") + { + msg.set_payloadjson("foo bar"); + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = "Bad JSON in body's payload"; + } + + // Trying to execute a function without any registered hosts should yield + // an error + SECTION("No registered hosts") + { + expectedReturnCode = beast::http::status::internal_server_error; + expectedResponseBody = "No available hosts"; + // Remove all registered hosts + resetPlanner(); } - // Shutdown - functionCallServer.stop(); - sch.shutdown(); + // IF the BER does not pass the sanity checks, the endpoint will error + SECTION("Bad BER body") + { + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = "Bad BatchExecRequest"; + + SECTION("Bad ber id") { ber->set_appid(1337); } + + SECTION("App ID mismatch") + { + ber->mutable_messages(0)->set_appid(1337); + } + + SECTION("Empty user") { ber->mutable_messages(0)->set_user(""); } + + SECTION("Empty function") + { + ber->mutable_messages(0)->set_function(""); + } + + msg.set_payloadjson(faabric::util::messageToJson(*ber)); + } + + // Post the message that will trigger a function execution + msgJsonStr = faabric::util::messageToJson(msg); + std::pair result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); + + auto msgResult = sch.getFunctionResult(appId, msgId, 1000); + REQUIRE(msgResult.returnvalue() == 0); + + // If the request is succesful, check that the response has the fields + // we expect + if (expectedReturnCode == beast::http::status::ok) { + REQUIRE(msgResult.timestamp() > 0); + REQUIRE(msgResult.finishtimestamp() > 0); + REQUIRE(!msgResult.executedhost().empty()); + REQUIRE(!msgResult.masterhost().empty()); + } +} + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, + "Check getting the execution status through the endpoint", + "[planner]") +{ + // First, prepare an HTTP request to execute a batch + int numMessages = 1; + HttpMessage msg; + msg.set_type(HttpMessage_Type_EXECUTE_BATCH); + auto ber = faabric::util::batchExecFactory("foo", "bar", numMessages); + int appId = ber->appid(); + int msgId = ber->messages(0).id(); + msg.set_payloadjson(faabric::util::messageToJson(*ber)); + + // Execute the batch + msgJsonStr = faabric::util::messageToJson(msg); + std::pair result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + beast::http::status::ok); + + // Make sure execution has finished and the result is available + auto msgResult = sch.getFunctionResult(appId, msgId, 1000); + REQUIRE(msgResult.returnvalue() == 0); + + // Second, prepare an HTTP request to get the batch's execution status + msg.set_type(HttpMessage_Type_EXECUTE_BATCH_STATUS); + // An EXECUTE_BATCH_STATUS request needs to provide a serialised + // BatchExecuteRequestStatus in the request's JSON payload + auto berStatus = faabric::util::batchExecStatusFactory(appId); + berStatus->set_expectednummessages(numMessages); + msg.set_payloadjson(faabric::util::messageToJson(*berStatus)); + + // An EXECUTE_BATCH_STATUS request expects a BatchExecuteRequestStatus + // in the response body + SECTION("Success") + { + expectedReturnCode = beast::http::status::ok; + auto expectedBerStatus = faabric::util::batchExecStatusFactory(appId); + expectedBerStatus->set_finished(true); + *expectedBerStatus->add_messageresults() = msgResult; + expectedResponseBody = faabric::util::messageToJson(*expectedBerStatus); + } + + // If the request JSON payload is not a BER, the endpoint will error + SECTION("Malformed request body") + { + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = "Bad JSON in request body"; + msg.set_payloadjson("foo"); + } + + // If the request JSON payload contains a BER status for a non-existant + // BER (i.e. appid not registered), the endpoint will also error out + SECTION("Unregistered app id") + { + expectedReturnCode = beast::http::status::internal_server_error; + expectedResponseBody = "App not registered in results"; + auto otherBerStatus = faabric::util::batchExecStatusFactory(1337); + msg.set_payloadjson(faabric::util::messageToJson(*otherBerStatus)); + } + + // If the request JSON payload contains a BER status for an in-flight BER, + // the request will succeed. Depending on the messages we tell the planner + // we are expecting, it will either succeed or not + SECTION("Success, but not finished") + { + expectedReturnCode = beast::http::status::ok; + auto expectedBerStatus = faabric::util::batchExecStatusFactory(appId); + expectedBerStatus->set_finished(false); + *expectedBerStatus->add_messageresults() = msgResult; + expectedResponseBody = faabric::util::messageToJson(*expectedBerStatus); + // Change the expected number of messages + berStatus->set_expectednummessages(2); + msg.set_payloadjson(faabric::util::messageToJson(*berStatus)); + } + + // Post the EXECUTE_BATCH_STATUS request: + msgJsonStr = faabric::util::messageToJson(msg); + result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); } } diff --git a/tests/test/proto/test_proto.cpp b/tests/test/proto/test_proto.cpp index 55d5e5602..21bf6a7c2 100644 --- a/tests/test/proto/test_proto.cpp +++ b/tests/test/proto/test_proto.cpp @@ -47,9 +47,7 @@ TEST_CASE("Test protobuf classes", "[proto]") funcCall.set_inputdata(inputData.data(), 100); funcCall.set_outputdata(outputData); - funcCall.set_isasync(true); funcCall.set_ispython(true); - funcCall.set_isstatusrequest(true); funcCall.set_type(faabric::Message_MessageType_KILL); @@ -76,9 +74,7 @@ TEST_CASE("Test protobuf classes", "[proto]") REQUIRE(pyFunc == newFuncCall.pythonfunction()); REQUIRE(pyEntry == newFuncCall.pythonentry()); - REQUIRE(newFuncCall.isasync()); REQUIRE(newFuncCall.ispython()); - REQUIRE(newFuncCall.isstatusrequest()); REQUIRE(cmdline == newFuncCall.cmdline()); diff --git a/tests/test/scheduler/test_executor.cpp b/tests/test/scheduler/test_executor.cpp index 69c365d69..fa834ea5b 100644 --- a/tests/test/scheduler/test_executor.cpp +++ b/tests/test/scheduler/test_executor.cpp @@ -245,10 +245,10 @@ int32_t TestExecutor::executeTask( return 999; } - if (ctx->getBatchRequest()->id() != reqOrig->id()) { + if (ctx->getBatchRequest()->appid() != reqOrig->appid()) { SPDLOG_ERROR("Context request does not match ({} != {})", - ctx->getBatchRequest()->id(), - reqOrig->id()); + ctx->getBatchRequest()->appid(), + reqOrig->appid()); return 999; } diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 63202f3db..d84a444b9 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -793,7 +793,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, auto actualReqs = faabric::scheduler::getBatchRequests(); REQUIRE(actualReqs.size() == 1); REQUIRE(actualReqs.at(0).first == otherHost); - REQUIRE(actualReqs.at(0).second->id() == req->id()); + REQUIRE(actualReqs.at(0).second->appid() == req->appid()); } TEST_CASE_METHOD(SlowExecutorTestFixture, diff --git a/tests/test/util/test_batch.cpp b/tests/test/util/test_batch.cpp index e7dff5034..c5d6c6f4d 100644 --- a/tests/test/util/test_batch.cpp +++ b/tests/test/util/test_batch.cpp @@ -13,7 +13,7 @@ TEST_CASE("Test batch exec factory", "[util]") REQUIRE(req->messages().size() == nMessages); - REQUIRE(req->id() > 0); + REQUIRE(req->appid() > 0); // Expect all messages to have the same app ID by default int appId = req->messages().at(0).appid(); @@ -54,6 +54,20 @@ TEST_CASE("Test batch. exec request sanity checks") ber->mutable_messages(1)->set_appid(1337); } + // An empty user deems a BER invalid + SECTION("Empty user") + { + isBerValid = false; + ber->mutable_messages(0)->set_user(""); + } + + // An empty function deems a BER invalid + SECTION("Empty function") + { + isBerValid = false; + ber->mutable_messages(0)->set_function(""); + } + // A user mismatch between the messages deems a BER invalid SECTION("User mismatch") { @@ -73,4 +87,31 @@ TEST_CASE("Test batch. exec request sanity checks") REQUIRE(isBerValid == isBatchExecRequestValid(ber)); } + +TEST_CASE("Test BER status factory") +{ + int appId; + std::shared_ptr berStatus = nullptr; + + // A BER status can be constructed with an appId + SECTION("Constructor with app id") + { + appId = 1337; + berStatus = faabric::util::batchExecStatusFactory(appId); + } + + // A BER status can also be constructed from a BER + SECTION("Constructor from a BER") + { + auto ber = faabric::util::batchExecFactory("foo", "bar", 1); + appId = ber->appid(); + berStatus = faabric::util::batchExecStatusFactory(ber); + } + + // It will have the same appId + REQUIRE(berStatus->appid() == appId); + + // And the finished flag will be set to false + REQUIRE(berStatus->finished() == false); +} } diff --git a/tests/test/util/test_json.cpp b/tests/test/util/test_json.cpp index 0fc65f9f6..9f4103690 100644 --- a/tests/test/util/test_json.cpp +++ b/tests/test/util/test_json.cpp @@ -23,9 +23,7 @@ class JsonTestFixture msg.set_pythonfunction("py func"); msg.set_pythonentry("py entry"); - msg.set_isasync(true); msg.set_ispython(true); - msg.set_isstatusrequest(true); msg.set_ismpi(true); msg.set_mpiworldid(1234); @@ -80,9 +78,7 @@ TEST_CASE_METHOD(JsonTestFixture, "Test JSON contains required keys", "[util]") // test ensures that the keywords we use elsewhere are generated as part // of the serialisation process std::vector requiredKeys = { "input_data", - "async", "python", - "status", "py_user", "py_func", "mpi", diff --git a/tests/utils/message_utils.cpp b/tests/utils/message_utils.cpp index 9aec6dd15..cd9c7a082 100644 --- a/tests/utils/message_utils.cpp +++ b/tests/utils/message_utils.cpp @@ -21,9 +21,7 @@ void checkMessageEquality(const faabric::Message& msgA, REQUIRE(msgA.pythonuser() == msgB.pythonuser()); REQUIRE(msgA.pythonfunction() == msgB.pythonfunction()); REQUIRE(msgA.pythonentry() == msgB.pythonentry()); - REQUIRE(msgA.isasync() == msgB.isasync()); REQUIRE(msgA.ispython() == msgB.ispython()); - REQUIRE(msgA.isstatusrequest() == msgB.isstatusrequest()); REQUIRE(msgA.returnvalue() == msgB.returnvalue());