Skip to content

Commit

Permalink
Route invoke requests through planner (#330)
Browse files Browse the repository at this point in the history
* wip

* nits: run clang-format and fix basic endpoint test

* tests: add more coverage

* utils: move batch utils to different source file

* util: fix compilation errors after moving batchExec to a different header file

* tests: add more test coverage

* more groundwork

* tests: improve coverage

* tests: small fix w/ scheduler

* scheduler: clear clients upon scheduler reset

* scheduler: also clear-up this host used slots

* dist-tests: actually use separate planner

* planner: move logging to trace

* gh: bump code version

* tmp fixes

* nits

* endpoint: add a test for endpoint

* scheduler: make sure we set the timestamp for the first BER message too
  • Loading branch information
csegarragonz authored Jul 18, 2023
1 parent 9b90d3f commit fa0d07d
Show file tree
Hide file tree
Showing 22 changed files with 632 additions and 560 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.4.5
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.4.5
FAABRIC_VERSION=0.4.6
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.4.6
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -36,7 +36,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -73,7 +73,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -167,7 +167,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.4.5
image: faasm.azurecr.io/faabric:0.4.6
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.5
0.4.6
5 changes: 5 additions & 0 deletions include/faabric/planner/Planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/planner/PlannerState.h>
#include <faabric/planner/planner.pb.h>
#include <faabric/proto/faabric.pb.h>

#include <shared_mutex>

Expand Down Expand Up @@ -59,6 +60,10 @@ class Planner
std::shared_ptr<faabric::Message> getMessageResult(
std::shared_ptr<faabric::Message> msg);

// Get all the results recorded for one batch
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(
int32_t appId);

private:
// There's a singleton instance of the planner running, but it must allow
// concurrent requests
Expand Down
15 changes: 15 additions & 0 deletions include/faabric/util/batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
#include <faabric/proto/faabric.pb.h>

namespace faabric::util {

// ----------
// Batch Execute Requests (BER)
// ----------

std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory();

std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory(
Expand All @@ -11,4 +16,14 @@ std::shared_ptr<faabric::BatchExecuteRequest> batchExecFactory(
int count = 1);

bool isBatchExecRequestValid(std::shared_ptr<faabric::BatchExecuteRequest> ber);

// ----------
// Batch Execute Requests' Status
// ----------

std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(
int32_t appId);

std::shared_ptr<faabric::BatchExecuteRequestStatus> batchExecStatusFactory(
std::shared_ptr<faabric::BatchExecuteRequest> ber);
}
115 changes: 5 additions & 110 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,116 +31,11 @@ void FaabricEndpointHandler::onRequest(
// Text response type
response.set(header::content_type, "text/plain");

PROF_START(endpointRoundTrip)

// Parse message from JSON in request
const std::string& requestStr = request.body();

// Handle JSON
if (requestStr.empty()) {
SPDLOG_ERROR("Faabric handler received empty request");
response.result(beast::http::status::bad_request);
response.body() = std::string("Empty request");
} else {
auto req = faabric::util::batchExecFactory();
req->set_type(req->FUNCTIONS);
faabric::Message& msg = *req->add_messages();
faabric::util::jsonToMessage(requestStr, &msg);
faabric::scheduler::Scheduler& sched =
faabric::scheduler::getScheduler();

if (msg.isstatusrequest()) {
SPDLOG_DEBUG("Processing status request");
const faabric::Message result = sched.getFunctionResult(msg, 0);

if (result.type() == faabric::Message_MessageType_EMPTY) {
response.result(beast::http::status::ok);
response.body() = std::string("RUNNING");
} else if (result.returnvalue() == 0) {
response.result(beast::http::status::ok);
response.body() = faabric::util::messageToJson(result);
} else {
response.result(beast::http::status::internal_server_error);
response.body() = "FAILED: " + result.outputdata();
}
} else {
executeFunction(
std::move(ctx), std::move(response), std::move(req), 0);
return;
}
}

PROF_END(endpointRoundTrip)
// TODO: for the moment we keep the endpoint handler, but we are not meant
// to receive any requests here. Eventually we will delete it
SPDLOG_ERROR("Faabric handler received empty request");
response.result(beast::http::status::bad_request);
response.body() = std::string("Empty request");
ctx.sendFunction(std::move(response));
}

void FaabricEndpointHandler::executeFunction(
HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& response,
std::shared_ptr<faabric::BatchExecuteRequest> ber,
size_t messageIndex)
{
auto& conf = faabric::util::getSystemConfig();

// Set app ID, message ID and master host on the first message of the BER
// TODO: eventually do it on the BER itself
faabric::util::setMessageId(*ber->mutable_messages(0));
ber->mutable_messages(0)->set_masterhost(conf.endpointHost);
int appId = ber->messages(0).appid();
int msgId = ber->messages(0).id();
assert(appId != 0);
assert(msgId != 0);

if (ber->messages(0).user().empty()) {
response.result(beast::http::status::bad_request);
response.body() = std::string("Empty user");
return ctx.sendFunction(std::move(response));
}

if (ber->messages(0).function().empty()) {
response.result(beast::http::status::bad_request);
response.body() = std::string("Empty function");
return ctx.sendFunction(std::move(response));
}

// This is set to false by the scheduler if the function ends up being sent
// elsewhere
if (!ber->messages(0).isasync()) {
ber->mutable_messages(0)->set_executeslocally(true);
}

auto tid = gettid();
const std::string funcStr =
faabric::util::funcToString(ber->messages(0), true);
SPDLOG_DEBUG("Worker HTTP thread {} scheduling {}", tid, funcStr);

// Schedule it
faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler();
sch.callFunctions(ber);

// Await result on global bus (may have been executed on a different worker)
if (ber->messages(0).isasync()) {
response.result(beast::http::status::ok);
response.body() = faabric::util::messageToJson(ber->messages(0));
return ctx.sendFunction(std::move(response));
}

// TODO: temporarily make this HTTP call block one server thread.
// Eventually. we will route all HTTP requests through the planner instead
// of the worker, so we will be able to remove this blocking call
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);
auto result =
sch.getFunctionResult(appId, msgId, conf.globalMessageTimeout);

beast::http::status statusCode =
(result.returnvalue() == 0) ? beast::http::status::ok
: beast::http::status::internal_server_error;
response.result(statusCode);
SPDLOG_DEBUG("Worker thread {} result {}",
gettid(),
faabric::util::funcToString(result, true));

response.body() = result.outputdata();
return ctx.sendFunction(std::move(response));
}
}
23 changes: 23 additions & 0 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <faabric/planner/Planner.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/batch.h>
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
Expand Down Expand Up @@ -267,6 +269,27 @@ std::shared_ptr<faabric::Message> Planner::getMessageResult(
return nullptr;
}

std::shared_ptr<faabric::BatchExecuteRequestStatus> Planner::getBatchResults(
int32_t appId)
{
auto berStatus = faabric::util::batchExecStatusFactory(appId);

// Acquire a read lock to copy all the results we have for this batch
{
faabric::util::SharedLock lock(plannerMx);

if (!state.appResults.contains(appId)) {
return nullptr;
}

for (auto msgResultPair : state.appResults.at(appId)) {
*berStatus->add_messageresults() = *(msgResultPair.second);
}
}

return berStatus;
}

Planner& getPlanner()
{
static Planner planner;
Expand Down
Loading

0 comments on commit fa0d07d

Please sign in to comment.