Skip to content

Commit

Permalink
Move message result management to PlannerClient (#340)
Browse files Browse the repository at this point in the history
* scheduler: move set/get messages to planner cli almost entirely

* wip: not working

* test working now

* remove debug warning

* nits: run clang format

* planner-cli: add method to clear the cache

* planner-cli: make method that queries the planner directly private

* gh: bump version to capture new planner sync/async behaviour

* tests: avoid setting the result for the same message more than once to avoid undefined behaviours

* tests: add blocking time every time we set a function result to accomodate for its async behaviour
  • Loading branch information
csegarragonz authored Jul 31, 2023
1 parent b81cdaa commit 9ece659
Show file tree
Hide file tree
Showing 28 changed files with 298 additions and 271 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.6.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.6.0
FAABRIC_VERSION=0.6.1
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.6.1
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -36,7 +36,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -73,7 +73,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -167,7 +167,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.0
image: faasm.azurecr.io/faabric:0.6.1
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.0
0.6.1
44 changes: 40 additions & 4 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/PeriodicBackgroundThread.h>

#include <future>
#include <shared_mutex>

namespace faabric::planner {

typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise;
typedef std::shared_ptr<MessageResultPromise> MessageResultPromisePtr;

/* The planner's implementation of group membership requires clients to send
* keep-alive messages. Once started, this background thread will send these
* messages
Expand All @@ -25,6 +30,20 @@ class KeepAliveThread : public faabric::util::PeriodicBackgroundThread
std::shared_mutex keepAliveThreadMx;
};

/*
* Local state associated with the current host, used to cache results and
* avoid unnecessary interactions with the planner server.
*/
struct PlannerCache
{
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults;
};

/*
* The planner client is used to communicate with the planner over the network.
* To minimise the number of open connections, we have one static instance
* of the client per-host. This means that the planner client is reentrant.
*/
class PlannerClient final : public faabric::transport::MessageEndpointClient
{
public:
Expand All @@ -34,6 +53,8 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient

void ping();

void clearCache();

// ------
// Host membership calls
// ------
Expand All @@ -51,15 +72,30 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient

void setMessageResult(std::shared_ptr<faabric::Message> msg);

std::shared_ptr<faabric::Message> getMessageResult(
void setMessageResultLocally(std::shared_ptr<faabric::Message> msg);

faabric::Message getMessageResult(int appId, int msgId, int timeoutMs);

faabric::Message getMessageResult(const faabric::Message& msg,
int timeoutMs);

private:
std::mutex plannerCacheMx;
PlannerCache cache;

faabric::Message doGetMessageResult(
std::shared_ptr<faabric::Message> msgPtr,
int timeoutMs);

// This method actually gets the message result from the planner (i.e.
// sends a request to the planner server)
std::shared_ptr<faabric::Message> getMessageResultFromPlanner(
std::shared_ptr<faabric::Message> msg);
};

// -----------------------------------
// Static setter/getters
// -----------------------------------

std::shared_ptr<faabric::planner::PlannerClient> getPlannerClient();

void clearPlannerClient();
PlannerClient& getPlannerClient();
}
7 changes: 4 additions & 3 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> doSyncRecv(
transport::Message& message) override;

// Asynchronous calls

void recvSetMessageResult(std::span<const uint8_t> buffer);

// Synchronous calls

std::unique_ptr<google::protobuf::Message> recvPing();
Expand All @@ -27,9 +31,6 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvRemoveHost(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvSetMessageResult(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetMessageResult(
std::span<const uint8_t> buffer);

Expand Down
20 changes: 3 additions & 17 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

#include <future>
#include <shared_mutex>

#define AVAILABLE_HOST_SET "available_hosts"
Expand All @@ -29,8 +28,6 @@ namespace faabric::scheduler {
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
InFlightPair;
typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise;
typedef std::shared_ptr<MessageResultPromise> MessageResultPromisePtr;

class Scheduler;

Expand Down Expand Up @@ -228,14 +225,10 @@ class Scheduler
// ----------------------------------
// Message results
// ----------------------------------
void setFunctionResult(faabric::Message& msg);

void setMessageResultLocally(std::shared_ptr<faabric::Message> msg);

faabric::Message getFunctionResult(const faabric::Message& msg,
int timeoutMs);

faabric::Message getFunctionResult(int appId, int msgId, int timeoutMs);
// TODO(planner-scheduler): move this method to the planner client once
// the planner controls scheduling
void setFunctionResult(faabric::Message& msg);

void setThreadResult(const faabric::Message& msg,
int32_t returnValue,
Expand Down Expand Up @@ -340,13 +333,6 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> pushedSnapshotsMap;

// ---- Message results ----
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults;
std::mutex plannerResultsMutex;
faabric::Message doGetFunctionResult(
std::shared_ptr<faabric::Message> msgPtr,
int timeoutMs);

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots = 0;
Expand Down
Loading

0 comments on commit 9ece659

Please sign in to comment.