Skip to content

Commit

Permalink
dist-tests: dist tests passing commenting out remote threads
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed Aug 29, 2023
1 parent 8883465 commit 2330596
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 59 deletions.
17 changes: 11 additions & 6 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@ std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
std::string funcStr = faabric::util::funcToString(req);

// Set group ID, this will get overridden in there's a cached decision
/*
int groupId = faabric::util::generateGid();
for (auto& m : *req->mutable_messages()) {
m.set_groupid(groupId);
m.set_groupsize(req->messages_size());
}
*/

// Get the scheduling decision
// TODO: this is WRONG FIXME
auto decision = faabric::planner::getPlannerClient().callFunctions(req);
bool isSingleHost = decision.isSingleHost();

// TODO(remote-threads): we always run single-host when executing threads
/*
bool isSingleHost = true;
// Do snapshotting if not on a single host
faabric::Message& msg = req->mutable_messages()->at(0);
std::shared_ptr<faabric::util::SnapshotData> snap = nullptr;
Expand Down Expand Up @@ -171,12 +171,16 @@ std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
mr.offset, mr.length, mr.dataType, mr.operation);
}
}
*/

// Invoke threads and await
faabric::planner::getPlannerClient().callFunctions(req, decision);
auto decision = faabric::planner::getPlannerClient().callFunctions(req);
// If we await immediately after we call the functions, it may happen that
// the threads are not registered yet
std::vector<std::pair<uint32_t, int32_t>> results =
sch.awaitThreadResults(req);

/* TODO(remote-threads): currently threads are always single-host
// Perform snapshot updates if not on single host
if (!isSingleHost) {
// Write queued changes to snapshot
Expand All @@ -194,6 +198,7 @@ std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
tracker->startTracking(memView);
tracker->startThreadLocalTracking(memView);
}
*/

// Deregister the threads
sch.deregisterThreads(req);
Expand Down
12 changes: 7 additions & 5 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,9 @@ void Scheduler::registerThread(uint32_t msgId)
{
// Here we need to ensure the promise is registered locally so
// callers can start waiting
threadResults[msgId];
if (threadResults.find(msgId) == threadResults.end()) {
threadResults[msgId];
}
}

void Scheduler::setThreadResult(
Expand Down Expand Up @@ -649,11 +651,11 @@ std::vector<std::pair<uint32_t, int32_t>> Scheduler::awaitThreadResults(
int32_t Scheduler::awaitThreadResult(uint32_t messageId)
{
faabric::util::SharedLock lock(mx);
auto it = threadResults.find(messageId);
if (it == threadResults.end()) {
SPDLOG_ERROR("Thread {} not registered on this host", messageId);
throw std::runtime_error("Awaiting unregistered thread");
if (threadResults.find(messageId) == threadResults.end()) {
SPDLOG_WARN("Registering thread result {} before thread started", messageId);
threadResults[messageId];
}
auto it = threadResults.find(messageId);
lock.unlock();

return it->second.get_future().get();
Expand Down
3 changes: 0 additions & 3 deletions tests/dist/dist_test_fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ class MpiDistTestsFixture : public DistTestsFixture
int numRemoteSlots = worldSize - numLocalSlots;

if (numLocalSlots == numRemoteSlots) {
// localRes.set_slots(2 * numLocalSlots);
// localRes.set_usedslots(numLocalSlots);
// remoteRes.set_slots(numRemoteSlots);
updateLocalSlots(2 * numLocalSlots, numLocalSlots);
updateRemoteSlots(numRemoteSlots);
} else if (numLocalSlots > numRemoteSlots) {
Expand Down
6 changes: 3 additions & 3 deletions tests/dist/scheduler/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ int handleReductionFunction(tests::DistTestExecutor* exec,
auto req = faabric::util::batchExecFactory(
msg.user(), msg.function(), nThreads);
req->set_type(faabric::BatchExecuteRequest::THREADS);
// faabric::util::updateBatchExecAppId(req, msg.appid());

// Set app/ group info
for (int i = 0; i < nThreads; i++) {
auto& m = req->mutable_messages()->at(i);

// Set app/ group info
m.set_appid(msg.appid());
m.set_appidx(i);
m.set_groupidx(i);
}
Expand Down
15 changes: 8 additions & 7 deletions tests/dist/scheduler/test_exec_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ TEST_CASE_METHOD(DistTestsFixture,
"Test generating the execution graph",
"[funcs]")
{
// Set up this host's resources
int nLocalSlots = 2;
int nFuncs = 4;
faabric::HostResources res;
res.set_slots(nLocalSlots);
sch.setThisHostResources(res);

// Retry the test a number of times to catch the race-condition where
// we get the execution graph before all results have been published
int numRetries = 10;
for (int r = 0; r < numRetries; r++) {
// Set up both host's resources
int nLocalSlots = 2;
int nFuncs = 4;
faabric::HostResources res;
res.set_slots(nLocalSlots);
sch.setThisHostResources(res);
sch.addHostToGlobalSet(getWorkerIP(), std::make_shared<HostResources>(res));

// Set up the messages
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory("funcs", "simple", nFuncs);
Expand Down
18 changes: 11 additions & 7 deletions tests/dist/scheduler/test_funcs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ TEST_CASE_METHOD(DistTestsFixture,
"Test executing functions on multiple hosts",
"[funcs]")
{
// Set up this host's resources
std::string thisHost = conf.endpointHost;
std::string otherHost = getWorkerIP();

// Set up this host's resources (2 functions locally, 2 remotely)
int nLocalSlots = 2;
int nFuncs = 4;
faabric::HostResources res;
res.set_slots(nLocalSlots);
res.set_slots(nFuncs);
res.set_usedslots(nLocalSlots);
sch.setThisHostResources(res);

std::string thisHost = conf.endpointHost;
std::string otherHost = getWorkerIP();
res.set_slots(nLocalSlots);
res.set_usedslots(0);
sch.addHostToGlobalSet(otherHost, std::make_shared<HostResources>(res));

// Set up the messages
std::shared_ptr<faabric::BatchExecuteRequest> req =
Expand All @@ -50,11 +54,11 @@ TEST_CASE_METHOD(DistTestsFixture,
for (int i = 0; i < nLocalSlots; i++) {
faabric::Message& m = req->mutable_messages()->at(i);

plannerCli.getMessageResult(m, 1000);
auto resultMsg = plannerCli.getMessageResult(m, 1000);
std::string expected =
fmt::format("Function {} executed on host {}", m.id(), getMasterIP());

REQUIRE(m.outputdata() == expected);
REQUIRE(resultMsg.outputdata() == expected);
}

// Check functions executed on the other host
Expand Down
6 changes: 4 additions & 2 deletions tests/dist/scheduler/test_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace tests {

/* TODO(remote-threads): remote threads temporarily disabled
TEST_CASE_METHOD(DistTestsFixture,
"Check snapshots sent back from worker are queued",
"[snapshots][threads]")
Expand Down Expand Up @@ -107,6 +108,7 @@ TEST_CASE_METHOD(DistTestsFixture,
faabric::Message actualResult = plannerCli.getMessageResult(msg, 10000);
REQUIRE(actualResult.returnvalue() == 333);
}
*/

TEST_CASE_METHOD(DistTestsFixture,
"Check repeated reduction",
Expand All @@ -119,9 +121,9 @@ TEST_CASE_METHOD(DistTestsFixture,
faabric::util::batchExecFactory(user, function, 1);
faabric::Message& msg = req->mutable_messages()->at(0);

// Main function and one thread execute on this host, others on another
// TODO(remote-threads): all threads execute in one host
faabric::HostResources res;
res.set_slots(3);
res.set_slots(100);
sch.setThisHostResources(res);

std::vector<std::string> expectedHosts = { getMasterIP() };
Expand Down
7 changes: 4 additions & 3 deletions tests/dist/scheduler/test_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

namespace tests {

/* TODO(remote-threads): this test now runs on one single host
TEST_CASE_METHOD(DistTestsFixture,
"Test executing threads on multiple hosts",
"[snapshots][threads]")
{
// Set up this host's resources
int nLocalSlots = 2;
int nThreads = 4;
faabric::HostResources res;
res.set_slots(nLocalSlots);
res.set_slots(nThreads);
sch.setThisHostResources(res);
// Set up the message
Expand All @@ -52,7 +52,7 @@ TEST_CASE_METHOD(DistTestsFixture,
plannerCli.callFunctions(req);
// Check threads executed on this host
for (int i = 0; i < nLocalSlots; i++) {
for (int i = 0; i < nThreads; i++) {
faabric::Message& m = req->mutable_messages()->at(i);
int res = sch.awaitThreadResult(m.id());
REQUIRE(res == m.id() / 2);
Expand All @@ -65,4 +65,5 @@ TEST_CASE_METHOD(DistTestsFixture,
REQUIRE(res == m.id() / 2);
}
}
*/
}
12 changes: 5 additions & 7 deletions tests/dist/transport/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ int handleDistributedLock(tests::DistTestExecutor* exec,

if (msg.function() == "lock") {
int initialValue = 0;
int groupId = faabric::util::generateGid();

stateKv->set(BYTES(&initialValue));

// Create a nested request child of the parent request (parent-child
// indicated by same app id)
std::shared_ptr<faabric::BatchExecuteRequest> nestedReq =
faabric::util::batchExecFactory("ptp", "lock-worker", nWorkers);
faabric::util::updateBatchExecAppId(nestedReq, req->appid());
for (int i = 0; i < nWorkers; i++) {
faabric::Message& m = nestedReq->mutable_messages()->at(i);
m.set_groupid(groupId);
m.set_groupidx(i);
}

Expand All @@ -154,6 +155,8 @@ int handleDistributedLock(tests::DistTestExecutor* exec,
}
}

// Pull to make sure we have the latest version
stateKv->pull();
int finalValue = *(int*)stateKv->get();
int expectedValue = nWorkers * nLoops;
if (finalValue != expectedValue) {
Expand Down Expand Up @@ -255,10 +258,7 @@ class DistributedCoordinationTestRunner
auto& m = chainReq->mutable_messages()->at(i);

// Set app index and group data
m.set_appid(msg.appid());
m.set_appidx(i);

m.set_groupid(groupId);
m.set_groupidx(i);
m.set_groupsize(nChained);

Expand Down Expand Up @@ -324,8 +324,6 @@ class DistributedCoordinationTestRunner
faabric::state::State& state;

std::vector<std::string> stateKeys;

int groupId = 123;
};

int handleDistributedBarrier(tests::DistTestExecutor* exec,
Expand Down
9 changes: 6 additions & 3 deletions tests/dist/transport/test_coordination.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ namespace tests {

TEST_CASE_METHOD(DistTestsFixture, "Test distributed lock", "[ptp][transport]")
{
// Set up this host's resources
int nLocalSlots = 5;
// Set up the host resources. The distributed lock test will start 10 other
// functions (so we need 11 slots). We give each host 8 slots for an even
// distribution
int nSlotsPerHost = 8;
faabric::HostResources res;
res.set_slots(nLocalSlots);
res.set_slots(nSlotsPerHost);
sch.setThisHostResources(res);
sch.addHostToGlobalSet(getWorkerIP(), std::make_shared<HostResources>(res));

// Set up the request
std::shared_ptr<faabric::BatchExecuteRequest> req =
Expand Down
41 changes: 28 additions & 13 deletions tests/dist/transport/test_point_to_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,37 @@ class PointToPointDistTestFixture : public DistTestsFixture

void setSlotsAndNumFuncs(int nLocalSlotsIn, int nFuncsIn)
{
int nRemoteSlots = nFuncsIn - nLocalSlotsIn;
nLocalSlots = nLocalSlotsIn;
nFuncs = nFuncsIn;

// Set local resources
faabric::HostResources res;
res.set_slots(nLocalSlots);
sch.setThisHostResources(res);
faabric::HostResources localRes;
std::shared_ptr<faabric::HostResources> remoteRes = std::make_shared<HostResources>();

if (nLocalSlots == nRemoteSlots) {
localRes.set_slots(2 * nLocalSlots);
localRes.set_usedslots(nLocalSlots);
remoteRes->set_slots(nRemoteSlots);
} else if (nLocalSlots > nRemoteSlots) {
localRes.set_slots(nLocalSlots);
remoteRes->set_slots(nRemoteSlots);
} else {
SPDLOG_ERROR(
"Unfeasible PTP slots config (local: {} - remote: {})",
nLocalSlots,
nRemoteSlots);
throw std::runtime_error("Unfeasible slots configuration");
}

sch.setThisHostResources(localRes);
sch.addHostToGlobalSet(getWorkerIP(), remoteRes);
}

faabric::batch_scheduler::SchedulingDecision prepareRequestReturnDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req)
{
// Prepare expected decision
faabric::batch_scheduler::SchedulingDecision expectedDecision(appId,
groupId);
faabric::batch_scheduler::SchedulingDecision expectedDecision(req->appid(), req->groupid());
std::vector<std::string> expectedHosts(nFuncs, getWorkerIP());
for (int i = 0; i < nLocalSlots; i++) {
expectedHosts.at(i) = getMasterIP();
Expand All @@ -57,9 +73,9 @@ class PointToPointDistTestFixture : public DistTestsFixture
for (int i = 0; i < nFuncs; i++) {
faabric::Message& msg = req->mutable_messages()->at(i);

msg.set_appid(appId);
msg.set_appid(req->appid());
msg.set_appidx(i);
msg.set_groupid(groupId);
msg.set_groupid(req->groupid());
msg.set_groupidx(i);

// Add to expected decision
Expand Down Expand Up @@ -87,9 +103,6 @@ class PointToPointDistTestFixture : public DistTestsFixture
}

protected:
int appId = 222;
int groupId = 333;

int nLocalSlots;
int nFuncs;
};
Expand All @@ -98,7 +111,7 @@ TEST_CASE_METHOD(PointToPointDistTestFixture,
"Test point-to-point messaging on multiple hosts",
"[ptp][transport]")
{
setSlotsAndNumFuncs(1, 4);
setSlotsAndNumFuncs(2, 4);

// Set up batch request and scheduling decision
std::shared_ptr<faabric::BatchExecuteRequest> req =
Expand Down Expand Up @@ -140,11 +153,13 @@ TEST_CASE_METHOD(DistTestsFixture,
{
// Set up this host's resources, force execution across hosts
int nChainedFuncs = 4;
int nLocalSlots = 2;
int nLocalSlots = 3;

faabric::HostResources res;
res.set_slots(nLocalSlots);
sch.setThisHostResources(res);
res.set_slots(2);
sch.addHostToGlobalSet(getWorkerIP(), std::make_shared<HostResources>(res));

std::string function;
SECTION("Barrier") { function = "barrier"; }
Expand Down

0 comments on commit 2330596

Please sign in to comment.