Skip to content

Commit

Permalink
batch-scheduler: move scheduling decisions to batch scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 2, 2023
1 parent 07da690 commit e22b97a
Show file tree
Hide file tree
Showing 38 changed files with 404 additions and 363 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ add_library(faabric
$<TARGET_OBJECTS:redis_obj>
$<TARGET_OBJECTS:runner_obj>
$<TARGET_OBJECTS:scheduler_obj>
$<TARGET_OBJECTS:scheduling_util_obj>
$<TARGET_OBJECTS:snapshot_obj>
$<TARGET_OBJECTS:state_obj>
$<TARGET_OBJECTS:transport_obj>
Expand Down
18 changes: 7 additions & 11 deletions include/faabric/batch-scheduler/BatchScheduler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/util/batch.h>
#include <faabric/util/scheduling.h>

#include <string>

#define DO_NOT_MIGRATE -98
#define DO_NOT_MIGRATE_DECISION \
faabric::util::SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
SchedulingDecision(DO_NOT_MIGRATE, DO_NOT_MIGRATE)
#define NOT_ENOUGH_SLOTS -99
#define NOT_ENOUGH_SLOTS_DECISION \
faabric::util::SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)
SchedulingDecision(NOT_ENOUGH_SLOTS, NOT_ENOUGH_SLOTS)

namespace faabric::batch_scheduler {

// TODO: move BatchExecuteRequest here

// TODO: move SchedulingDecision here?
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
std::shared_ptr<SchedulingDecision>>
InFlightPair;

typedef std::map<int32_t, InFlightPair> InFlightReqs;
Expand Down Expand Up @@ -72,8 +69,7 @@ class BatchScheduler
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req);

virtual std::shared_ptr<faabric::util::SchedulingDecision>
makeSchedulingDecision(
virtual std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) = 0;
Expand Down Expand Up @@ -111,8 +107,8 @@ class BatchScheduler
// ----------

virtual bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) = 0;
std::shared_ptr<SchedulingDecision> decisionA,
std::shared_ptr<SchedulingDecision> decisionB) = 0;

virtual std::vector<Host> getSortedHosts(
const HostMap& hostMap,
Expand Down
8 changes: 4 additions & 4 deletions include/faabric/batch-scheduler/BinPackScheduler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#pragma once

#include <faabric/batch-scheduler/BatchScheduler.h>
#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/util/batch.h>
#include <faabric/util/scheduling.h>
#include <string>

namespace faabric::batch_scheduler {

class BinPackScheduler final : public BatchScheduler
{
public:
std::shared_ptr<faabric::util::SchedulingDecision> makeSchedulingDecision(
std::shared_ptr<SchedulingDecision> makeSchedulingDecision(
const HostMap& hostMap,
const InFlightReqs& inFlightReqs,
std::shared_ptr<faabric::BatchExecuteRequest> req) override;

private:
bool isFirstDecisionBetter(
std::shared_ptr<faabric::util::SchedulingDecision> decisionA,
std::shared_ptr<faabric::util::SchedulingDecision> decisionB) override;
std::shared_ptr<SchedulingDecision> decisionA,
std::shared_ptr<SchedulingDecision> decisionB) override;

std::vector<Host> getSortedHosts(
const HostMap& hostMap,
Expand Down
52 changes: 52 additions & 0 deletions include/faabric/batch-scheduler/DecisionCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>

#include <string>
#include <vector>

namespace faabric::batch_scheduler {
/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);

std::vector<std::string> getHosts() { return hosts; }

int getGroupId() const { return groupId; }

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void addCachedDecision(std::shared_ptr<BatchExecuteRequest> req,
SchedulingDecision& decision);

void clear();

private:
std::string getCacheKey(std::shared_ptr<BatchExecuteRequest> req);

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};

DecisionCache& getSchedulingDecisionCache();
}
Original file line number Diff line number Diff line change
@@ -1,58 +1,8 @@
#pragma once

#include <cstdint>
#include <string>
#include <unordered_map>
#include <vector>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/locks.h>

namespace faabric::util {

class SchedulingDecision
{
public:
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn);

uint32_t appId = 0;

int32_t groupId = 0;

int32_t nFunctions = 0;

std::vector<std::string> hosts;

std::vector<int32_t> messageIds;

std::vector<int32_t> appIdxs;

std::vector<int32_t> groupIdxs;

std::string returnHost;

/**
* Work out if this decision is all on this host. If the decision is
* completely on *another* host, we still count it as not being on a single
* host, as this host will be the master.
*
* Will always return false if single host optimisations are switched off.
*/
bool isSingleHost();

void addMessage(const std::string& host, const faabric::Message& msg);

void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);

void addMessage(const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx);
};

namespace faabric::batch_scheduler {
// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NONE: bin-packs requests to slots in hosts starting from the master
Expand Down Expand Up @@ -92,59 +42,60 @@ const std::unordered_map<SchedulingTopologyHint, std::string>
{ SchedulingTopologyHint::UNDERFULL, "UNDERFULL" },
};

/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
// TODO(planner-schedule): remove these strategies
// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
};

class SchedulingDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);
static SchedulingDecision fromPointToPointMappings(
faabric::PointToPointMappings& mappings);

std::vector<std::string> getHosts() { return hosts; }
SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn);

int getGroupId() const { return groupId; }
uint32_t appId = 0;

int32_t groupId = 0;

int32_t nFunctions = 0;

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);
std::vector<int32_t> messageIds;

void addCachedDecision(std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);
std::vector<int32_t> appIdxs;

void clear();
std::vector<int32_t> groupIdxs;

private:
std::string getCacheKey(std::shared_ptr<faabric::BatchExecuteRequest> req);
std::string returnHost;

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};
/**
* Work out if this decision is all on this host. If the decision is
* completely on *another* host, we still count it as not being on a single
* host, as this host will be the master.
*
* Will always return false if single host optimisations are switched off.
*/
bool isSingleHost();

DecisionCache& getSchedulingDecisionCache();
void addMessage(const std::string& host, const faabric::Message& msg);

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);

void addMessage(const std::string& host,
int32_t messageId,
int32_t appIdx,
int32_t groupIdx);
};

}
32 changes: 16 additions & 16 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/planner/PlannerClient.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/FunctionCallClient.h>
Expand All @@ -14,7 +15,6 @@
#include <faabric/util/dirty.h>
#include <faabric/util/memory.h>
#include <faabric/util/queue.h>
#include <faabric/util/scheduling.h>
#include <faabric/util/snapshot.h>
#include <faabric/util/timing.h>

Expand All @@ -26,7 +26,7 @@
namespace faabric::scheduler {

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision>>
InFlightPair;

class Scheduler;
Expand Down Expand Up @@ -186,17 +186,17 @@ class Scheduler

~Scheduler();

faabric::util::SchedulingDecision makeSchedulingDecision(
faabric::batch_scheduler::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint =
faabric::util::SchedulingTopologyHint::NONE);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint =
faabric::batch_scheduler::SchedulingTopologyHint::NONE);

faabric::util::SchedulingDecision callFunctions(
faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req);

faabric::util::SchedulingDecision callFunctions(
faabric::batch_scheduler::SchedulingDecision callFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& hint);
faabric::batch_scheduler::SchedulingDecision& hint);

void reset();

Expand Down Expand Up @@ -351,15 +351,15 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::util::SchedulingDecision doSchedulingDecision(
faabric::batch_scheduler::SchedulingDecision doSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
faabric::batch_scheduler::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
faabric::batch_scheduler::SchedulingDecision& decision,
faabric::util::FullLock& lock,
faabric::util::SchedulingTopologyHint topologyHint);
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
Expand All @@ -386,15 +386,15 @@ class Scheduler

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);
faabric::batch_scheduler::MigrationStrategy migrationStrategy =
faabric::batch_scheduler::MigrationStrategy::BIN_PACK);

void broadcastPendingMigrations(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

void doStartFunctionMigrationThread(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);
faabric::batch_scheduler::SchedulingDecision& decision);
};

}
Loading

0 comments on commit e22b97a

Please sign in to comment.