Skip to content

Commit

Permalink
dist-tests: fix dist-test that was relying on an incorrect notion of …
Browse files Browse the repository at this point in the history
…being singleHost
  • Loading branch information
csegarragonz committed Dec 19, 2023
1 parent c491f82 commit 92cf413
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 49 deletions.
38 changes: 21 additions & 17 deletions tests/dist/dist_test_fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,33 @@ class DistTestsFixture
sch.addHostToGlobalSet(getWorkerIP());
sch.removeHostFromGlobalSet(LOCALHOST);

// Give some resources to each host
updateLocalSlots(4, 0);
updateRemoteSlots(4, 0);

// Set up executor
std::shared_ptr<tests::DistTestExecutorFactory> fac =
std::make_shared<tests::DistTestExecutorFactory>();
faabric::scheduler::setExecutorFactory(fac);
}

void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0)
{
faabric::HostResources remoteRes;
remoteRes.set_slots(newRemoteSlots);
remoteRes.set_usedslots(newRemoteUsedSlots);
sch.addHostToGlobalSet(workerIP,
std::make_shared<HostResources>(remoteRes));
}

~DistTestsFixture() = default;

std::string getWorkerIP()
Expand Down Expand Up @@ -65,23 +86,6 @@ class MpiDistTestsFixture : public DistTestsFixture
int worldSize = 4;
bool origIsMsgOrderingOn;

void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0)
{
faabric::HostResources remoteRes;
remoteRes.set_slots(newRemoteSlots);
remoteRes.set_usedslots(newRemoteUsedSlots);
sch.addHostToGlobalSet(workerIP,
std::make_shared<HostResources>(remoteRes));
}

void setLocalSlots(int numLocalSlots, int worldSizeIn = 0)
{
if (worldSizeIn > 0) {
Expand Down
29 changes: 16 additions & 13 deletions tests/dist/scheduler/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,22 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec,
{
faabric::Message& msg = req->mutable_messages()->at(msgIdx);

std::string msgInput = msg.inputdata();
std::vector<uint8_t> inputBytes = faabric::util::stringToBytes(msgInput);
std::vector<uint8_t> otherData = { 1, 2, 3, 4 };

// Modify the executor's memory
int offsetA = 10;
int offsetB = HOST_PAGE_SIZE + 10;
std::memcpy(exec->getDummyMemory().data() + offsetA,
otherData.data(),
otherData.size());
std::memcpy(exec->getDummyMemory().data() + offsetB,
inputBytes.data(),
inputBytes.size());
if (msg.groupidx() > 0) {
std::string msgInput = msg.inputdata();
std::vector<uint8_t> inputBytes =
faabric::util::stringToBytes(msgInput);
std::vector<uint8_t> otherData = { 1, 2, 3, 4 };

// Modify the executor's memory
int offsetA = 10;
int offsetB = HOST_PAGE_SIZE + 10;
std::memcpy(exec->getDummyMemory().data() + offsetA,
otherData.data(),
otherData.size());
std::memcpy(exec->getDummyMemory().data() + offsetB,
inputBytes.data(),
inputBytes.size());
}

return 123;
}
Expand Down
43 changes: 24 additions & 19 deletions tests/dist/scheduler/test_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,43 @@ TEST_CASE_METHOD(DistTestsFixture,
std::string function = "fake-diffs";
std::vector<uint8_t> inputData = { 0, 1, 2, 3, 4, 5, 6 };

// Set up the message
// Set up the messages
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory(user, function, 1);
faabric::util::batchExecFactory(user, function, 2);
req->set_type(faabric::BatchExecuteRequest::THREADS);

// Set up some input data
faabric::Message& msg = req->mutable_messages()->at(0);
msg.set_inputdata(inputData.data(), inputData.size());
for (int i = 0; i < req->messages_size(); i++) {
req->mutable_messages(i)->set_groupidx(i);
req->mutable_messages(i)->set_inputdata(inputData.data(),
inputData.size());
}

// Set up the main thread snapshot
auto& reg = faabric::snapshot::getSnapshotRegistry();
size_t snapSize = DIST_TEST_EXECUTOR_MEMORY_SIZE;
std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg);
std::string snapshotKey =
faabric::util::getMainThreadSnapshotKey(req->messages(0));
auto snap = std::make_shared<faabric::util::SnapshotData>(snapSize);
reg.registerSnapshot(snapshotKey, snap);

// Force the function to be executed remotely
faabric::HostResources res;
res.set_usedslots(1);
res.set_slots(1);
sch.setThisHostResources(res);
res.set_usedslots(0);
res.set_slots(4);
sch.addHostToGlobalSet(getWorkerIP(), std::make_shared<HostResources>(res));
// Force the execution to span multiple hosts so that it triggers dirty
// tracking
std::vector<std::string> expectedHosts = { getMasterIP(), getWorkerIP() };
auto preloadDec = std::make_shared<batch_scheduler::SchedulingDecision>(
req->appid(), req->groupid());
for (int i = 0; i < req->messages_size(); i++) {
preloadDec->addMessage(expectedHosts.at(i), 0, 0, i);
}

std::vector<std::string> expectedHosts = { getWorkerIP() };
plannerCli.preloadSchedulingDecision(preloadDec);
auto decision = plannerCli.callFunctions(req);
std::vector<std::string> executedHosts = decision.hosts;
REQUIRE(expectedHosts == executedHosts);
REQUIRE(expectedHosts == decision.hosts);

auto msgResult = plannerCli.getMessageResult(req->appid(), msg.id(), 500);
REQUIRE(msgResult.returnvalue() == 123);
for (const auto& msg : req->messages()) {
auto msgResult =
plannerCli.getMessageResult(req->appid(), msg.id(), 500);
REQUIRE(msgResult.returnvalue() == 123);
}

// Write the diffs and check they've been applied
REQUIRE(snap->getQueuedDiffsCount() == 2);
Expand Down

0 comments on commit 92cf413

Please sign in to comment.