From 92cf413a572d5473b51c6025d696405c4b6c45d8 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Tue, 19 Dec 2023 11:52:39 +0000 Subject: [PATCH] dist-tests: fix dist-test that was relying on an incorrect notion of being singleHost --- tests/dist/dist_test_fixtures.h | 38 ++++++++++++---------- tests/dist/scheduler/functions.cpp | 29 +++++++++-------- tests/dist/scheduler/test_snapshots.cpp | 43 ++++++++++++++----------- 3 files changed, 61 insertions(+), 49 deletions(-) diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index 1fd26597f..6f2083694 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -29,12 +29,33 @@ class DistTestsFixture sch.addHostToGlobalSet(getWorkerIP()); sch.removeHostFromGlobalSet(LOCALHOST); + // Give some resources to each host + updateLocalSlots(4, 0); + updateRemoteSlots(4, 0); + // Set up executor std::shared_ptr fac = std::make_shared(); faabric::scheduler::setExecutorFactory(fac); } + void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0) + { + faabric::HostResources localRes; + localRes.set_slots(newLocalSlots); + localRes.set_usedslots(newUsedLocalSlots); + sch.setThisHostResources(localRes); + } + + void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0) + { + faabric::HostResources remoteRes; + remoteRes.set_slots(newRemoteSlots); + remoteRes.set_usedslots(newRemoteUsedSlots); + sch.addHostToGlobalSet(workerIP, + std::make_shared(remoteRes)); + } + ~DistTestsFixture() = default; std::string getWorkerIP() @@ -65,23 +86,6 @@ class MpiDistTestsFixture : public DistTestsFixture int worldSize = 4; bool origIsMsgOrderingOn; - void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0) - { - faabric::HostResources localRes; - localRes.set_slots(newLocalSlots); - localRes.set_usedslots(newUsedLocalSlots); - sch.setThisHostResources(localRes); - } - - void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0) - { - faabric::HostResources remoteRes; - remoteRes.set_slots(newRemoteSlots); - remoteRes.set_usedslots(newRemoteUsedSlots); - sch.addHostToGlobalSet(workerIP, - std::make_shared(remoteRes)); - } - void setLocalSlots(int numLocalSlots, int worldSizeIn = 0) { if (worldSizeIn > 0) { diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index aac7c7e2f..8a708b515 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -64,19 +64,22 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec, { faabric::Message& msg = req->mutable_messages()->at(msgIdx); - std::string msgInput = msg.inputdata(); - std::vector inputBytes = faabric::util::stringToBytes(msgInput); - std::vector otherData = { 1, 2, 3, 4 }; - - // Modify the executor's memory - int offsetA = 10; - int offsetB = HOST_PAGE_SIZE + 10; - std::memcpy(exec->getDummyMemory().data() + offsetA, - otherData.data(), - otherData.size()); - std::memcpy(exec->getDummyMemory().data() + offsetB, - inputBytes.data(), - inputBytes.size()); + if (msg.groupidx() > 0) { + std::string msgInput = msg.inputdata(); + std::vector inputBytes = + faabric::util::stringToBytes(msgInput); + std::vector otherData = { 1, 2, 3, 4 }; + + // Modify the executor's memory + int offsetA = 10; + int offsetB = HOST_PAGE_SIZE + 10; + std::memcpy(exec->getDummyMemory().data() + offsetA, + otherData.data(), + otherData.size()); + std::memcpy(exec->getDummyMemory().data() + offsetB, + inputBytes.data(), + inputBytes.size()); + } return 123; } diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index d44a81aaf..b06f767db 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -27,38 +27,43 @@ TEST_CASE_METHOD(DistTestsFixture, std::string function = "fake-diffs"; std::vector inputData = { 0, 1, 2, 3, 4, 5, 6 }; - // Set up the message + // Set up the messages std::shared_ptr req = - faabric::util::batchExecFactory(user, function, 1); + faabric::util::batchExecFactory(user, function, 2); req->set_type(faabric::BatchExecuteRequest::THREADS); - // Set up some input data - faabric::Message& msg = req->mutable_messages()->at(0); - msg.set_inputdata(inputData.data(), inputData.size()); + for (int i = 0; i < req->messages_size(); i++) { + req->mutable_messages(i)->set_groupidx(i); + req->mutable_messages(i)->set_inputdata(inputData.data(), + inputData.size()); + } // Set up the main thread snapshot auto& reg = faabric::snapshot::getSnapshotRegistry(); size_t snapSize = DIST_TEST_EXECUTOR_MEMORY_SIZE; - std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg); + std::string snapshotKey = + faabric::util::getMainThreadSnapshotKey(req->messages(0)); auto snap = std::make_shared(snapSize); reg.registerSnapshot(snapshotKey, snap); - // Force the function to be executed remotely - faabric::HostResources res; - res.set_usedslots(1); - res.set_slots(1); - sch.setThisHostResources(res); - res.set_usedslots(0); - res.set_slots(4); - sch.addHostToGlobalSet(getWorkerIP(), std::make_shared(res)); + // Force the execution to span multiple hosts so that it triggers dirty + // tracking + std::vector expectedHosts = { getMasterIP(), getWorkerIP() }; + auto preloadDec = std::make_shared( + req->appid(), req->groupid()); + for (int i = 0; i < req->messages_size(); i++) { + preloadDec->addMessage(expectedHosts.at(i), 0, 0, i); + } - std::vector expectedHosts = { getWorkerIP() }; + plannerCli.preloadSchedulingDecision(preloadDec); auto decision = plannerCli.callFunctions(req); - std::vector executedHosts = decision.hosts; - REQUIRE(expectedHosts == executedHosts); + REQUIRE(expectedHosts == decision.hosts); - auto msgResult = plannerCli.getMessageResult(req->appid(), msg.id(), 500); - REQUIRE(msgResult.returnvalue() == 123); + for (const auto& msg : req->messages()) { + auto msgResult = + plannerCli.getMessageResult(req->appid(), msg.id(), 500); + REQUIRE(msgResult.returnvalue() == 123); + } // Write the diffs and check they've been applied REQUIRE(snap->getQueuedDiffsCount() == 2);