Skip to content

Commit

Permalink
tests: towards migration test working
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 7, 2023
1 parent 9349fcf commit ea9d236
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 1,182 deletions.
4 changes: 1 addition & 3 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,7 @@ class MpiWorld

/* Function Migration */

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
void prepareMigration(int thisRank);

private:
int id = -1;
Expand Down
5 changes: 1 addition & 4 deletions include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ enum FunctionCalls
NoFunctionCall = 0,
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4,
PendingMigrations = 5,
SetMessageResult = 6,
SetMessageResult = 3,
};
}
15 changes: 0 additions & 15 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ std::vector<
std::pair<std::string, std::shared_ptr<faabric::BatchExecuteRequest>>>
getBatchRequests();

std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getPendingMigrationsRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>>
getMessageResults();

Expand All @@ -52,14 +43,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient

void sendFlush();

faabric::HostResources getResources();

void sendPendingMigrations(std::shared_ptr<faabric::PendingMigrations> req);

void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);

void setMessageResult(std::shared_ptr<faabric::Message> msg);
};

Expand Down
8 changes: 0 additions & 8 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,8 @@ class FunctionCallServer final
std::unique_ptr<google::protobuf::Message> recvFlush(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetResources(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvPendingMigrations(
std::span<const uint8_t> buffer);

void recvExecuteFunctions(std::span<const uint8_t> buffer);

void recvUnregister(std::span<const uint8_t> buffer);

void recvSetMessageResult(std::span<const uint8_t> buffer);
};
}
64 changes: 3 additions & 61 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@

namespace faabric::scheduler {

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision>>
InFlightPair;

class Scheduler;

Scheduler& getScheduler();
Expand Down Expand Up @@ -158,17 +154,6 @@ class Executor
void threadPoolThread(std::stop_token st, int threadPoolIdx);
};

/**
* Background thread that periodically checks if there are migration
* opportunities for in-flight apps that have opted in to being checked for
* migrations.
*/
class FunctionMigrationThread : public faabric::util::PeriodicBackgroundThread
{
public:
void doWork() override;
};

/**
* Background thread that periodically checks to see if any executors have
* become stale (i.e. not handled any requests in a given timeout). If any are
Expand Down Expand Up @@ -277,14 +262,6 @@ class Scheduler

void removeHostFromGlobalSet(const std::string& host);

void removeRegisteredHost(const std::string& host,
const std::string& user,
const std::string& function);

void addRegisteredHost(const std::string& host,
const std::string& user,
const std::string& function);

faabric::HostResources getThisHostResources();

void setThisHostResources(faabric::HostResources& res);
Expand All @@ -304,14 +281,9 @@ class Scheduler
// ----------------------------------
// Function Migration
// ----------------------------------
void checkForMigrationOpportunities();

std::shared_ptr<faabric::PendingMigrations> getPendingAppMigrations(
uint32_t appId);

void addPendingMigration(std::shared_ptr<faabric::PendingMigrations> msg);

void removePendingMigration(uint32_t appId);
std::shared_ptr<faabric::PendingMigration> checkForMigrationOpportunities(
faabric::Message& msg,
int overwriteNewGroupId = 0);

private:
std::string thisHost;
Expand Down Expand Up @@ -341,8 +313,6 @@ class Scheduler

void updateHostResources();

faabric::HostResources getHostResources(const std::string& host);

// ---- Planner----
faabric::planner::KeepAliveThread keepAliveThread;

Expand All @@ -353,16 +323,6 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::batch_scheduler::SchedulingDecision doSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

faabric::batch_scheduler::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::batch_scheduler::SchedulingDecision& decision,
faabric::util::FullLock& lock,
faabric::batch_scheduler::SchedulingTopologyHint topologyHint);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
faabric::util::FullLock& schedulerLock);
Expand All @@ -379,24 +339,6 @@ class Scheduler

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;

// ---- Function migration ----
FunctionMigrationThread functionMigrationThread;
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
pendingMigrations;

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::batch_scheduler::MigrationStrategy migrationStrategy =
faabric::batch_scheduler::MigrationStrategy::BIN_PACK);

void broadcastPendingMigrations(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

void doStartFunctionMigrationThread(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::batch_scheduler::SchedulingDecision& decision);
};

}
45 changes: 4 additions & 41 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1602,9 +1602,7 @@ void MpiWorld::checkRanksRange(int sendRank, int recvRank)
}
}

void MpiWorld::prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations)
void MpiWorld::prepareMigration(int thisRank)
{
// Check that there are no pending asynchronous messages to send and receive
for (auto umb : unackedMessageBuffers) {
Expand All @@ -1629,45 +1627,10 @@ void MpiWorld::prepareMigration(
"Migrating with pending async messages is not supported");
}

// Update local records
// Update local records
if (thisRank == localLeader) {
for (int i = 0; i < pendingMigrations->migrations_size(); i++) {
auto m = pendingMigrations->mutable_migrations()->at(i);
assert(hostForRank.at(m.msg().mpirank()) == m.srchost());

// Update the host for this rank. We only update the positions of
// the to-be migrated ranks, avoiding race conditions with not-
// migrated ranks
hostForRank.at(m.msg().mpirank()) = m.dsthost();

// Update the ranks for host. This structure is used when doing
// collective communications by all ranks. At this point, all non-
// leader ranks will be hitting a barrier, for which they don't
// need the ranks for host map, therefore it is safe to modify it
if (m.dsthost() == thisHost && m.msg().mpirank() < localLeader) {
SPDLOG_WARN("Changing local leader {} -> {}",
localLeader,
m.msg().mpirank());
localLeader = m.msg().mpirank();
ranksForHost[m.dsthost()].insert(
ranksForHost[m.dsthost()].begin(), m.msg().mpirank());
}

ranksForHost[m.dsthost()].push_back(m.msg().mpirank());
ranksForHost[m.srchost()].erase(
std::remove(ranksForHost[m.srchost()].begin(),
ranksForHost[m.srchost()].end(),
m.msg().mpirank()),
ranksForHost[m.srchost()].end());

if (ranksForHost[m.srchost()].empty()) {
ranksForHost.erase(m.srchost());
}

// This could be made more efficient as the broker method acquires
// a full lock every time
broker.updateHostForIdx(id, m.msg().mpirank(), m.dsthost());
}
// TODO: we may be able to just initLocalRemote here?
initLocalRemoteLeaders();

// Set the migration flag
hasBeenMigrated = true;
Expand Down
19 changes: 15 additions & 4 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,27 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
faabric::util::FullLock lock(plannerMx);

auto batchScheduler = faabric::batch_scheduler::getBatchScheduler();
auto decisionType =
batchScheduler->getDecisionType(state.inFlightReqs, req);

// First, make scheduling decision
#ifndef NDEBUG
#endif
// TODO: remove this copy once we finalise on one Host abstraction
auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap);
auto decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);

// For a DIST_CHANGE decision (i.e. migration) we want to try to imrpove
// on the old decision (we don't care the one we send)
// TODO: alternatively we could make sure the ExecutorContext is always set?
std::shared_ptr<faabric::batch_scheduler::SchedulingDecision> decision;
if (decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE) {
auto oldReq = state.inFlightReqs.at(appId).first;
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, oldReq);
} else {
decision = batchScheduler->makeSchedulingDecision(
hostMapCopy, state.inFlightReqs, req);
}
#ifndef NDEBUG
// Here we make sure the state hasn't changed here (we pass const, but they
// are const pointers)
Expand Down Expand Up @@ -354,8 +367,6 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// 1. Update the host-map to reflect the new host occupation
// 2. Update the in-flight map to include the new request
// 3. Send the PTP mappings to all the hosts involved
auto decisionType =
batchScheduler->getDecisionType(state.inFlightReqs, req);
auto& broker = faabric::transport::getPointToPointBroker();
switch (decisionType) {
case faabric::batch_scheduler::DecisionType::NEW: {
Expand Down
23 changes: 4 additions & 19 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,6 @@ message BatchExecuteRequest {
// Arbitrary context for this batch
int32 subType = 8;
bytes contextData = 9;

// Flag set by the scheduler when this batch is all executing on a single
// host
bool singleHost = 10;

// TODO(planner-schedule): remove me
// Temporary flag to indicate the workers that the BER comes from the
// planner (i.e. proxy-ed through planner) and so it has not been scheduled
// yet. Whenever the planner does scheduling we will be able to remove this
bool comesFromPlanner = 11;
}

message BatchExecuteRequestStatus {
Expand Down Expand Up @@ -239,15 +229,10 @@ message PointToPointMappings {
// FUNCTION MIGRATIONS
// ---------------------------------------------

message PendingMigrations {
message PendingMigration {
int32 appId = 1;
int32 groupId = 2;

message PendingMigration {
Message msg = 1;
string srcHost = 2;
string dstHost = 3;
}

repeated PendingMigration migrations = 3;
int32 groupIdx = 3;
string srcHost = 4;
string dstHost = 5;
}
23 changes: 13 additions & 10 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,11 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req)
{
const std::string funcStr = faabric::util::funcToString(req);
SPDLOG_TRACE("{} executing {}/{} tasks of {} (single-host={})",
SPDLOG_TRACE("{} executing {}/{} tasks of {}",
id,
msgIdxs.size(),
req->messages_size(),
funcStr,
req->singlehost());
funcStr);

// Note that this lock is specific to this executor, so will only block
// when multiple threads are trying to schedule tasks. This will only
Expand All @@ -227,13 +226,16 @@ void Executor::executeTasks(std::vector<int> msgIdxs,

bool isMaster = firstMsg.mainhost() == thisHost;
bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
bool isSingleHost = req->singlehost();
// TODO: isSingleHost should always be true for threads now
// bool isSingleHost = req->singlehost();
std::string snapshotKey = firstMsg.snapshotkey();

// Threads on a single host don't need to do anything with snapshots, as
// they all share a single executor. Threads not on a single host need to
// restore from the main thread snapshot. Non-threads need to restore from
// a snapshot if they are given a snapshot key.
/*
* TODO: remove?
if (isThreads && !isSingleHost) {
// Check we get a valid memory view
std::span<uint8_t> memView = getMemoryView();
Expand All @@ -255,17 +257,17 @@ void Executor::executeTasks(std::vector<int> msgIdxs,
// Prepare list of lists for dirty pages from each thread
threadLocalDirtyRegions.resize(req->messages_size());
} else if (!isThreads && !firstMsg.snapshotkey().empty()) {
*/
if (!isThreads && !firstMsg.snapshotkey().empty()) {
// Restore from snapshot if provided
std::string snapshotKey = firstMsg.snapshotkey();
SPDLOG_DEBUG("Restoring {} from snapshot {}", funcStr, snapshotKey);
restore(snapshotKey);
} else {
SPDLOG_TRACE("Not restoring {}. threads={}, key={}, single={}",
SPDLOG_TRACE("Not restoring {}. threads={}, key={}",
funcStr,
isThreads,
snapshotKey,
isSingleHost);
snapshotKey);
}

// Initialise batch counter
Expand Down Expand Up @@ -423,10 +425,11 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx)
task.req->mutable_messages()->at(task.messageIndex);

// Start dirty tracking if executing threads across hosts
bool isSingleHost = task.req->singlehost();
// bool isSingleHost = task.req->singlehost();
bool isThreads =
task.req->type() == faabric::BatchExecuteRequest::THREADS;
bool doDirtyTracking = isThreads && !isSingleHost;
// TODO: do we never need to do dirty tracking now?
bool doDirtyTracking = false; // isThreads && !isSingleHost;
if (doDirtyTracking) {
// If tracking is thread local, start here as it will happen for
// each thread
Expand Down
Loading

0 comments on commit ea9d236

Please sign in to comment.