Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically Check For Migration Opportunities #214

Merged
merged 27 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
867f41f
proto: add field to set migration check period
csegarragonz Jan 7, 2022
4eef2d3
scheduler: add function migration thread that sleeps for a fixed amou…
csegarragonz Jan 7, 2022
0a971c8
scheduler: add logic for check for migration opportunities method
csegarragonz Jan 7, 2022
ff175df
scheduler: start/stop migration thread and fix race condition when re…
csegarragonz Jan 8, 2022
aa9fcc8
tests: add test for function migration thread and fix bug in thread s…
csegarragonz Jan 8, 2022
4efe4ca
util: add method to deep copy a faabric message + test
csegarragonz Jan 9, 2022
13560b3
function call server: add call to add a pending migration to remote h…
csegarragonz Jan 9, 2022
0b02aba
tests: add further testing
csegarragonz Jan 9, 2022
1b98e6b
mpi: add migration points, link with executor, and tests
csegarragonz Jan 10, 2022
6308962
tests: fix data race
csegarragonz Jan 10, 2022
5192100
pr: re-factor methods as suggested in the pr comments
csegarragonz Jan 10, 2022
b3b50d5
executor: throw exception to properly shutdown executing task
csegarragonz Jan 10, 2022
fe07c6f
executor: close the loop to alllow for function migration
csegarragonz Jan 10, 2022
a45cc00
scheduler: add UNDERFULL scheduling topology hint
csegarragonz Jan 10, 2022
c177e6a
Add migration exception, catch in executor
Shillaker Jan 12, 2022
f5fa773
Remove manual message copying
Shillaker Jan 12, 2022
ec38aad
Remove unnecessary migration APIs
Shillaker Jan 12, 2022
fefbe12
Make getMemoryView public
Shillaker Jan 12, 2022
07709f0
scheduler: get functions to migrate properly
csegarragonz Jan 12, 2022
6123721
migration working
csegarragonz Jan 12, 2022
446e90a
cleanup
csegarragonz Feb 2, 2022
c090a16
mpi: use initLocalQueues() instead of repeating the logic, remove ass…
csegarragonz Feb 3, 2022
51dcc95
mpi: remove unused getMpiMockedPendingMigrations
csegarragonz Feb 3, 2022
ba44bdd
scheduler: remove unused function declarations
csegarragonz Feb 3, 2022
28d143e
scheduler: factor out method to start the function migration thread i…
csegarragonz Feb 3, 2022
54ab7b4
mpi: use a boolean flag to indicate that app has been migrated, and c…
csegarragonz Feb 3, 2022
3bc2089
proto: make topologyHint a message field, add test for json serialisa…
csegarragonz Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ enum FunctionCalls
ExecuteFunctions = 1,
Flush = 2,
Unregister = 3,
GetResources = 4
GetResources = 4,
PendingMigrations = 5
};
}
5 changes: 5 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ getBatchRequests();
std::vector<std::pair<std::string, faabric::EmptyRequest>>
getResourceRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getPendingMigrationsRequests();

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

Expand All @@ -42,6 +45,8 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient

faabric::HostResources getResources();

void sendPendingMigrations(std::shared_ptr<faabric::PendingMigrations> req);

void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);
Expand Down
4 changes: 4 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class FunctionCallServer final
const uint8_t* buffer,
size_t bufferSize);

std::unique_ptr<google::protobuf::Message> recvPendingMigrations(
const uint8_t* buffer,
size_t bufferSize);

void recvExecuteFunctions(const uint8_t* buffer, size_t bufferSize);

void recvUnregister(const uint8_t* buffer, size_t bufferSize);
Expand Down
26 changes: 26 additions & 0 deletions include/faabric/scheduler/FunctionMigrationThread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <condition_variable>
#include <mutex>
#include <thread>

namespace faabric::scheduler {
// Start a background thread that, every wake up period, will check if there
// are migration opportunities for in-flight apps that have opted in to
// being checked for migrations.
class FunctionMigrationThread
{
public:
void start(int wakeUpPeriodSecondsIn);

void stop();

int wakeUpPeriodSeconds;

private:
std::unique_ptr<std::thread> workThread = nullptr;
std::mutex mx;
std::condition_variable mustStopCv;
Shillaker marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<bool> isShutdown;
};
}
6 changes: 6 additions & 0 deletions include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ class MpiWorld

void setMsgForRank(faabric::Message& msg);

/* Function Migration */

void prepareMigration(
int thisRank,
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);

private:
int id = -1;
int size = -1;
Expand Down
61 changes: 52 additions & 9 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/ExecGraph.h>
#include <faabric/scheduler/FunctionCallClient.h>
#include <faabric/scheduler/FunctionMigrationThread.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/snapshot/SnapshotClient.h>
#include <faabric/snapshot/SnapshotRegistry.h>
Expand All @@ -22,6 +23,18 @@

namespace faabric::scheduler {

class ExecutorMigratedException : public faabric::util::FaabricException
{
public:
explicit ExecutorMigratedException(std::string message)
: FaabricException(std::move(message))
{}
};

typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
InFlightPair;

class Scheduler;

Scheduler& getScheduler();
Expand Down Expand Up @@ -77,13 +90,13 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

virtual std::span<uint8_t> getMemoryView();

protected:
virtual void restore(const std::string& snapshotKey);

virtual void postFinish();

virtual std::span<uint8_t> getMemoryView();

virtual void setMemorySize(size_t newSize);

faabric::Message boundMessage;
Expand All @@ -96,6 +109,8 @@ class Executor

uint32_t threadPoolSize = 0;

void migrateFunction(const faabric::Message& msg, const std::string& host);

private:
std::atomic<bool> claimed = false;

Expand Down Expand Up @@ -220,6 +235,27 @@ class Scheduler

ExecGraph getFunctionExecGraph(unsigned int msgId);

// ----------------------------------
// Function Migration
// ----------------------------------
void checkForMigrationOpportunities();

std::shared_ptr<faabric::PendingMigrations> getPendingAppMigrations(
uint32_t appId);

void addPendingMigration(std::shared_ptr<faabric::PendingMigrations> msg);

void removePendingMigration(uint32_t appId);

// ----------------------------------
// Clients
// ----------------------------------
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::snapshot::SnapshotClient& getSnapshotClient(
Copy link
Collaborator Author

@csegarragonz csegarragonz Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the snapshot client to be a public member in faasm/faasm#565 before migrating a function. In there, we are doing a function chain from a function in a non-master host, to a function in the master host. Faabric won't register, take, and push snapshots in this case; we have to do it ourselves manually. Thus, we need access to the snapshot client from the scheduler instance.

We don't need the function call client to be a public member, but I was reluctant to split the declaration of the clients (happy to revert).

const std::string& otherHost);

private:
std::string thisHost;

Expand All @@ -244,13 +280,6 @@ class Scheduler

std::mutex localResultsMutex;

// ---- Clients ----
faabric::scheduler::FunctionCallClient& getFunctionCallClient(
const std::string& otherHost);

faabric::snapshot::SnapshotClient& getSnapshotClient(
const std::string& otherHost);

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots = 0;
Expand Down Expand Up @@ -290,6 +319,20 @@ class Scheduler

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;

// ---- Function migration ----
FunctionMigrationThread functionMigrationThread;
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
pendingMigrations;

std::vector<std::shared_ptr<faabric::PendingMigrations>>
doCheckForMigrationOpportunities(
faabric::util::MigrationStrategy migrationStrategy =
faabric::util::MigrationStrategy::BIN_PACK);

void broadcastPendingMigrations(
std::shared_ptr<faabric::PendingMigrations> pendingMigrations);
};

}
8 changes: 8 additions & 0 deletions include/faabric/util/func.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

namespace faabric::util {

class FunctionMigratedException : public faabric::util::FaabricException
{
public:
explicit FunctionMigratedException(std::string message)
: FaabricException(std::move(message))
{}
};

std::string funcToString(const faabric::Message& msg, bool includeId);

std::string funcToString(
Expand Down
19 changes: 18 additions & 1 deletion include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,29 @@ class SchedulingDecision
// requests in a batch.
// - NORMAL: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - FORCE_LOCAL: force local execution irrespective of the available
// resources.
// - NEVER_ALONE: never allocates a single (non-master) request to a host
// without other requests of the batch.
// - UNDERFULL: schedule up to 50% of the master hosts' capacity to force
// migration opportunities to appear.
enum SchedulingTopologyHint
{
NORMAL,
FORCE_LOCAL,
NEVER_ALONE
NEVER_ALONE,
UNDERFULL,
};

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
// running. Bin-pack batches in increasing order to hosts in
// decreasing order.
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
enum MigrationStrategy
{
BIN_PACK,
EMPTY_HOSTS
};
}
21 changes: 21 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ message BatchExecuteRequest {
FUNCTIONS = 0;
THREADS = 1;
PROCESSES = 2;
MIGRATION = 3;
}

BatchExecuteType type = 2;
Expand Down Expand Up @@ -164,6 +165,9 @@ message Message {
bool recordExecGraph = 41;
map<string, int32> intExecGraphDetails = 42;
map<string, string> execGraphDetails = 43;

// Function migration
int32 migrationCheckPeriod = 44;
}

// ---------------------------------------------
Expand Down Expand Up @@ -242,3 +246,20 @@ message PointToPointMappings {

repeated PointToPointMapping mappings = 3;
}

// ---------------------------------------------
// FUNCTION MIGRATIONS
// ---------------------------------------------

message PendingMigrations {
int32 appId = 1;
int32 groupId = 2;

message PendingMigration {
Message msg = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this is the message that's sent to worker hosts to tell it to migrate a function across hosts. If that's right, doesn't that host already have the messge, so we just need the message ID here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All functions in the app will read the same PendingMigrations message. They:

  • Iterate over the PendingMigration repeated field (migrations).
  • If the message corresponds to their message id, they themselves must trigger their own migration.
  • To do so, they need to know where they are headed dstHost.

Maybe the srcHost field is not strictly needed, but it can be asserted/makes printing and logging easier.

string srcHost = 2;
string dstHost = 3;
}

repeated PendingMigration migrations = 3;
}
1 change: 1 addition & 0 deletions src/scheduler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ faabric_lib(scheduler
Executor.cpp
FunctionCallClient.cpp
FunctionCallServer.cpp
FunctionMigrationThread.cpp
MpiContext.cpp
MpiMessageBuffer.cpp
MpiWorld.cpp
Expand Down
27 changes: 27 additions & 0 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "faabric/scheduler/MpiWorldRegistry.h"
csegarragonz marked this conversation as resolved.
Show resolved Hide resolved
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/snapshot/SnapshotRegistry.h>
Expand Down Expand Up @@ -528,9 +529,29 @@ void Executor::threadPoolThread(int threadPoolIdx)

// Execute the task
int32_t returnValue;
bool migrated = false;
try {
returnValue =
executeTask(threadPoolIdx, task.messageIndex, task.req);
} catch (const faabric::util::FunctionMigratedException& ex) {
SPDLOG_DEBUG(
"Task {} migrated, shutting down executor {}", msg.id(), id);

// Note that when a task has been migrated, we need to perform all
// the normal executor shutdown, but we must NOT set the result for
// the call.
migrated = true;
selfShutdown = true;
returnValue = -99;

// MPI migration
if (msg.ismpi()) {
// TODO - delete the pending migration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this TODO now?

Copy link
Collaborator Author

@csegarragonz csegarragonz Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, faabric does not know when the migration (app-wise) has finished. In fact, faabric does not even know when a migration is taking place. Faabric only is aware that some functions throw exceptions because they have been migrated.

The workaround (hack) I have implemented is the following:

  • When we call MpiWorld::prepareMigration the local leader sets a boolean flag in the MPI world (hasBeenMigrated).
  • Everytime we hit a barrier, after all ranks have hit the barrier (and before they have left it). The local leader checks for the hasBeenMigrated flag.
  • If true remove the pending migration from the map.

This uses the fact that faasm will call prepareMigration before migrating, and will call MPI_Barrier after.
Unfortunately, this can't really be tested from within faabric (I have tested it using the distributed test in faasm).

I can't think of a not ad-hoc way of doing this due to the lack of information in faabric.

auto& mpiWorld =
faabric::scheduler::getMpiWorldRegistry().getWorld(
msg.mpiworldid());
mpiWorld.destroy();
}
} catch (const std::exception& ex) {
returnValue = 1;

Expand Down Expand Up @@ -667,6 +688,12 @@ void Executor::threadPoolThread(int threadPoolIdx)
// executor.
sch.vacateSlot();

// If the function has been migrated, we drop out here and shut down the
// executor
if (migrated) {
break;
}

// Finally set the result of the task, this will allow anything
// waiting on its result to continue execution, therefore must be
// done once the executor has been reset, otherwise the executor may
Expand Down
31 changes: 31 additions & 0 deletions src/scheduler/FunctionCallClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ static std::unordered_map<std::string,
faabric::util::Queue<faabric::HostResources>>
queuedResourceResponses;

static std::vector<
std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
pendingMigrationsRequests;

static std::vector<std::pair<std::string, faabric::UnregisterRequest>>
unregisterRequests;

Expand Down Expand Up @@ -57,6 +61,13 @@ std::vector<std::pair<std::string, faabric::EmptyRequest>> getResourceRequests()
return resourceRequests;
}

std::vector<std::pair<std::string, std::shared_ptr<faabric::PendingMigrations>>>
getPendingMigrationsRequests()
{
faabric::util::UniqueLock lock(mockMutex);
return pendingMigrationsRequests;
}

std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests()
{
Expand All @@ -76,6 +87,7 @@ void clearMockRequests()
functionCalls.clear();
batchMessages.clear();
resourceRequests.clear();
pendingMigrationsRequests.clear();
unregisterRequests.clear();

for (auto& p : queuedResourceResponses) {
Expand Down Expand Up @@ -128,6 +140,25 @@ faabric::HostResources FunctionCallClient::getResources()
return response;
}

// This function call is used by the master host of an application to let know
// other hosts running functions of the same application that a migration
// opportunity has been found.
void FunctionCallClient::sendPendingMigrations(
std::shared_ptr<faabric::PendingMigrations> req)
{
faabric::PendingMigrations request;
faabric::EmptyResponse response;

if (faabric::util::isMockMode()) {
faabric::util::UniqueLock lock(mockMutex);
pendingMigrationsRequests.emplace_back(host, req);
} else {
syncSend(faabric::scheduler::FunctionCalls::PendingMigrations,
req.get(),
&response);
}
}

void FunctionCallClient::executeFunctions(
const std::shared_ptr<faabric::BatchExecuteRequest> req)
{
Expand Down
Loading