Skip to content

Commit

Permalink
Route callFunctions through the planner (#345)
Browse files Browse the repository at this point in the history
* skeleton for the planner work

* scheduler: fix compilation issues

* tests: fix compilation issues

* tests: planner, proto, redis, and state tests working

* tests: util tests running

* tests: some mpi tests working

* tests: mpi tests working

* planner: update endpoint to use new callbatch method

* tests: towards migration test working

* debugging migration tests

* tests: migration tests in the scheduler passing

* tests: scheduler tests passing

* nits: run clang-format

* git: bump minor version

* dist-tests: all mpi tests (minus migration) working

* dist-tests: mpi function migration tests running

* dist-tests: all mpi dist tests working

* mpi: remove obsolete migration check period

* dist-tests: dist tests passing commenting out remote threads

* nits: run clang format

* tests: fix failing json test

* tests: fix failing mpi test

* nits: run clang format

* planner: add client/server method to get scheduling decision and add tests

* tests: remove unnecessary mpi sleep between tests

* threads: remove register/deregister and use a common result management via planner (all tests passing locally without sanitisers)

* tests: fix typo in tests

* batch: allow different messages to have different function names

* tests: set enough resources for executor tests

* nit: run clang format

* tasks: overwrite cpu count in tests

* tasks: override cpu count in tests

* cleaup + fix env. variable setting

* set environment variable to a string

* tests: fix failing tests

* tests: add enough slots for executor reaping test

* scheduler: remove unnecessary accounting structures

* mpi: clean-up todos and comments

* tests: wait for mpi messages to be executed when counting the number of messages sent to avoid race conditions

* tests: more mpi message waiting

* mpi: remove comment

* planner: clean-up

* threads: re-introduce remote threads (most tests and dist-tests passing, still need to uncomment more)

* dist-tests: re-introduce all remote threads tests

* tests: fix executor tests failing after planner resource managing update

* tests: fix scheduler tests after planner changes

* tests: fix snapshot tests after changes

* tests: fix util tests

* migration: don't release slots when setting a migration message result, as we already release them when making the decision

* dist-tests: fix multiple mpi world migration too

* dist-tests: fix race condition in mpi dist test

* executor: add test using executeThreads

* planner-cli: remove callFunctions signature with hints

* scheduler: clean-up includes in main scheduler file

* snapshot: clean-up and make clear that we currently don't delete snapshots

* self-review: cleanup

* more self-review

* planner: add log level to print state

* scheduler: use defined timeouts instead of hardcoded numbers

* tests: remove race in mpi test

* planner: add info logging around migration opportunities

* dist-tests: make migration test less flaky
  • Loading branch information
csegarragonz authored Sep 6, 2023
1 parent a4e9d23 commit 9ccf899
Show file tree
Hide file tree
Showing 70 changed files with 2,100 additions and 3,512 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.6.1
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.6.1
FAABRIC_VERSION=0.7.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.7.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
12 changes: 6 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -36,7 +36,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -73,7 +73,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -167,7 +167,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.6.1
image: faasm.azurecr.io/faabric:0.7.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.1
0.7.0
9 changes: 5 additions & 4 deletions include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

#define DO_NOT_MIGRATE -98
#define DO_NOT_MIGRATE_DECISION \
SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
faabric::batch_scheduler::SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
#define NOT_ENOUGH_SLOTS -99
#define NOT_ENOUGH_SLOTS_DECISION \
SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)
faabric::batch_scheduler::SchedulingDecision(NOT_ENOUGH_SLOTS, \
NOT_ENOUGH_SLOTS)

namespace faabric::batch_scheduler {

Expand Down Expand Up @@ -70,7 +71,7 @@ class BatchScheduler
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) = 0;

Expand Down Expand Up @@ -111,7 +112,7 @@ class BatchScheduler
std::shared_ptr<SchedulingDecision> decisionB) = 0;

virtual std::vector<Host> getSortedHosts(
const HostMap& hostMap,
HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req,
const DecisionType& decisionType) = 0;
Expand Down
4 changes: 2 additions & 2 deletions include/faabric/batch-scheduler/BinPackScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BinPackScheduler final : public BatchScheduler
{
public:
std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) override;

Expand All @@ -21,7 +21,7 @@ class BinPackScheduler final : public BatchScheduler
std::shared_ptr<SchedulingDecision> decisionB) override;

std::vector<Host> getSortedHosts(
const HostMap& hostMap,
HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req,
const DecisionType& decisionType) override;
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/batch-scheduler/SchedulingDecision.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class SchedulingDecision

std::set<std::string> uniqueHosts();

void print();
void print(const std::string& logLevel = "debug");
};

}
7 changes: 1 addition & 6 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ class MpiWorld

/* Function Migration */

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
void prepareMigration(int thisRank);

private:
int id = -1;
Expand Down Expand Up @@ -267,8 +265,5 @@ class MpiWorld
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);

/* Function migration */
bool hasBeenMigrated = false;
};
}
29 changes: 28 additions & 1 deletion include/faabric/planner/Planner.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/planner/PlannerState.h>
#include <faabric/planner/planner.pb.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/snapshot/SnapshotRegistry.h>

#include <shared_mutex>

Expand Down Expand Up @@ -31,7 +33,7 @@ class Planner
void printConfig() const;

// ----------
// Util
// Util public API
// ----------

bool reset();
Expand Down Expand Up @@ -64,6 +66,12 @@ class Planner
std::shared_ptr<faabric::BatchExecuteRequestStatus> getBatchResults(
int32_t appId);

std::shared_ptr<faabric::batch_scheduler::SchedulingDecision>
getSchedulingDecision(std::shared_ptr<BatchExecuteRequest> req);

std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> callBatch(
std::shared_ptr<BatchExecuteRequest> req);

private:
// There's a singleton instance of the planner running, but it must allow
// concurrent requests
Expand All @@ -72,12 +80,31 @@ class Planner
PlannerState state;
PlannerConfig config;

// Snapshot registry to distribute snapshots in THREADS requests
faabric::snapshot::SnapshotRegistry& snapshotRegistry;

// ----------
// Util private API
// ----------

void flushHosts();

void flushExecutors();

// ----------
// Host membership private API
// ----------

// Check if a host's registration timestamp has expired
bool isHostExpired(std::shared_ptr<Host> host, long epochTimeMs = 0);

// ----------
// Request scheduling private API
// ----------

void dispatchSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision);
};

Planner& getPlanner();
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/planner/PlannerApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ enum PlannerCalls
// Scheduling calls
SetMessageResult = 8,
GetMessageResult = 9,
GetSchedulingDecision = 10,
CallBatch = 11,
};
}
17 changes: 15 additions & 2 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/planner/planner.pb.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/util/PeriodicBackgroundThread.h>

Expand Down Expand Up @@ -31,12 +33,14 @@ class KeepAliveThread : public faabric::util::PeriodicBackgroundThread
};

/*
* Local state associated with the current host, used to cache results and
* avoid unnecessary interactions with the planner server.
* Local state associated with the current host, used to store useful state
* like cached results to unnecessary interactions with the planner server.
*/
struct PlannerCache
{
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults;
// Keeps track of the snapshots that have been pushed to the planner
std::set<std::string> pushedSnapshots;
};

/*
Expand Down Expand Up @@ -79,10 +83,19 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient
faabric::Message getMessageResult(const faabric::Message& msg,
int timeoutMs);

faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req);

faabric::batch_scheduler::SchedulingDecision getSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

private:
std::mutex plannerCacheMx;
PlannerCache cache;

// Snapshot client for the planner snapshot server
std::shared_ptr<faabric::snapshot::SnapshotClient> snapshotClient;

faabric::Message doGetMessageResult(
std::shared_ptr<faabric::Message> msgPtr,
int timeoutMs);
Expand Down
6 changes: 6 additions & 0 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvGetMessageResult(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetSchedulingDecision(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvCallBatch(
std::span<const uint8_t> buffer);

private:
faabric::planner::Planner& planner;
};
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/planner/PlannerState.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <faabric/batch-scheduler/BatchScheduler.h>
#include <faabric/planner/planner.pb.h>
#include <faabric/proto/faabric.pb.h>

Expand All @@ -23,5 +24,8 @@ struct PlannerState
// Map holding the hosts that have registered interest in getting an app
// result
std::map<int, std::vector<std::string>> appResultWaiters;

// Map keeping track of the requests that are in-flight
faabric::batch_scheduler::InFlightReqs inFlightReqs;
};
}
5 changes: 1 addition & 4 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ enum FunctionCalls
NoFunctionCall = 0,
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4,
PendingMigrations = 5,
SetMessageResult = 6,
SetMessageResult = 3,
};
}
18 changes: 0 additions & 18 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,9 @@ std::vector<
std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>>
getBatchRequests();

std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getPendingMigrationsRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>>
getMessageResults();

void queueResourceResponse(const std::string& host,
faabric::HostResources& res);

void clearMockRequests();

// -----------------------------------
Expand All @@ -52,14 +40,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient

void sendFlush();

faabric::HostResources getResources();

void sendPendingMigrations(std::shared_ptr<faabric::PendingMigrations> req);

void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);

void setMessageResult(std::shared_ptr<faabric::Message> msg);
};

Expand Down
8 changes: 0 additions & 8 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,8 @@ class FunctionCallServer final
std::unique_ptr<google::protobuf::Message> recvFlush(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetResources(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPendingMigrations(
std::span<const uint8_t> buffer);

void recvExecuteFunctions(std::span<const uint8_t> buffer);

void recvUnregister(std::span<const uint8_t> buffer);

void recvSetMessageResult(std::span<const uint8_t> buffer);
};
}
Loading

0 comments on commit 9ccf899

Please sign in to comment.