diff --git a/src/scheduler/Executor.cpp b/src/scheduler/Executor.cpp index b0608f121..ed86aa35a 100644 --- a/src/scheduler/Executor.cpp +++ b/src/scheduler/Executor.cpp @@ -119,17 +119,17 @@ std::vector> Executor::executeThreads( std::string funcStr = faabric::util::funcToString(req); // Set group ID, this will get overridden in there's a cached decision + /* int groupId = faabric::util::generateGid(); for (auto& m : *req->mutable_messages()) { m.set_groupid(groupId); m.set_groupsize(req->messages_size()); } + */ - // Get the scheduling decision - // TODO: this is WRONG FIXME - auto decision = faabric::planner::getPlannerClient().callFunctions(req); - bool isSingleHost = decision.isSingleHost(); - + // TODO(remote-threads): we always run single-host when executing threads + /* + bool isSingleHost = true; // Do snapshotting if not on a single host faabric::Message& msg = req->mutable_messages()->at(0); std::shared_ptr snap = nullptr; @@ -171,12 +171,16 @@ std::vector> Executor::executeThreads( mr.offset, mr.length, mr.dataType, mr.operation); } } + */ // Invoke threads and await - faabric::planner::getPlannerClient().callFunctions(req, decision); + auto decision = faabric::planner::getPlannerClient().callFunctions(req); + // If we await immediately after we call the functions, it may happen that + // the threads are not registered yet std::vector> results = sch.awaitThreadResults(req); + /* TODO(remote-threads): currently threads are always single-host // Perform snapshot updates if not on single host if (!isSingleHost) { // Write queued changes to snapshot @@ -194,6 +198,7 @@ std::vector> Executor::executeThreads( tracker->startTracking(memView); tracker->startThreadLocalTracking(memView); } + */ // Deregister the threads sch.deregisterThreads(req); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index df7b7459a..8db2662f8 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -576,7 +576,9 @@ void Scheduler::registerThread(uint32_t msgId) { // Here we need to ensure the promise is registered locally so // callers can start waiting - threadResults[msgId]; + if (threadResults.find(msgId) == threadResults.end()) { + threadResults[msgId]; + } } void Scheduler::setThreadResult( @@ -649,11 +651,11 @@ std::vector> Scheduler::awaitThreadResults( int32_t Scheduler::awaitThreadResult(uint32_t messageId) { faabric::util::SharedLock lock(mx); - auto it = threadResults.find(messageId); - if (it == threadResults.end()) { - SPDLOG_ERROR("Thread {} not registered on this host", messageId); - throw std::runtime_error("Awaiting unregistered thread"); + if (threadResults.find(messageId) == threadResults.end()) { + SPDLOG_WARN("Registering thread result {} before thread started", messageId); + threadResults[messageId]; } + auto it = threadResults.find(messageId); lock.unlock(); return it->second.get_future().get(); diff --git a/tests/dist/dist_test_fixtures.h b/tests/dist/dist_test_fixtures.h index aa6304b41..a46145ee4 100644 --- a/tests/dist/dist_test_fixtures.h +++ b/tests/dist/dist_test_fixtures.h @@ -87,9 +87,6 @@ class MpiDistTestsFixture : public DistTestsFixture int numRemoteSlots = worldSize - numLocalSlots; if (numLocalSlots == numRemoteSlots) { - // localRes.set_slots(2 * numLocalSlots); - // localRes.set_usedslots(numLocalSlots); - // remoteRes.set_slots(numRemoteSlots); updateLocalSlots(2 * numLocalSlots, numLocalSlots); updateRemoteSlots(numRemoteSlots); } else if (numLocalSlots > numRemoteSlots) { diff --git a/tests/dist/scheduler/functions.cpp b/tests/dist/scheduler/functions.cpp index 176e95360..1f9a9214d 100644 --- a/tests/dist/scheduler/functions.cpp +++ b/tests/dist/scheduler/functions.cpp @@ -229,11 +229,11 @@ int handleReductionFunction(tests::DistTestExecutor* exec, auto req = faabric::util::batchExecFactory( msg.user(), msg.function(), nThreads); req->set_type(faabric::BatchExecuteRequest::THREADS); + // faabric::util::updateBatchExecAppId(req, msg.appid()); + + // Set app/ group info for (int i = 0; i < nThreads; i++) { auto& m = req->mutable_messages()->at(i); - - // Set app/ group info - m.set_appid(msg.appid()); m.set_appidx(i); m.set_groupidx(i); } diff --git a/tests/dist/scheduler/test_exec_graph.cpp b/tests/dist/scheduler/test_exec_graph.cpp index 8b6435e8d..7d4aba620 100644 --- a/tests/dist/scheduler/test_exec_graph.cpp +++ b/tests/dist/scheduler/test_exec_graph.cpp @@ -10,17 +10,18 @@ TEST_CASE_METHOD(DistTestsFixture, "Test generating the execution graph", "[funcs]") { - // Set up this host's resources - int nLocalSlots = 2; - int nFuncs = 4; - faabric::HostResources res; - res.set_slots(nLocalSlots); - sch.setThisHostResources(res); - // Retry the test a number of times to catch the race-condition where // we get the execution graph before all results have been published int numRetries = 10; for (int r = 0; r < numRetries; r++) { + // Set up both host's resources + int nLocalSlots = 2; + int nFuncs = 4; + faabric::HostResources res; + res.set_slots(nLocalSlots); + sch.setThisHostResources(res); + sch.addHostToGlobalSet(getWorkerIP(), std::make_shared(res)); + // Set up the messages std::shared_ptr req = faabric::util::batchExecFactory("funcs", "simple", nFuncs); diff --git a/tests/dist/scheduler/test_funcs.cpp b/tests/dist/scheduler/test_funcs.cpp index e71e4e65b..1c38a400b 100644 --- a/tests/dist/scheduler/test_funcs.cpp +++ b/tests/dist/scheduler/test_funcs.cpp @@ -17,15 +17,19 @@ TEST_CASE_METHOD(DistTestsFixture, "Test executing functions on multiple hosts", "[funcs]") { - // Set up this host's resources + std::string thisHost = conf.endpointHost; + std::string otherHost = getWorkerIP(); + + // Set up this host's resources (2 functions locally, 2 remotely) int nLocalSlots = 2; int nFuncs = 4; faabric::HostResources res; - res.set_slots(nLocalSlots); + res.set_slots(nFuncs); + res.set_usedslots(nLocalSlots); sch.setThisHostResources(res); - - std::string thisHost = conf.endpointHost; - std::string otherHost = getWorkerIP(); + res.set_slots(nLocalSlots); + res.set_usedslots(0); + sch.addHostToGlobalSet(otherHost, std::make_shared(res)); // Set up the messages std::shared_ptr req = @@ -50,11 +54,11 @@ TEST_CASE_METHOD(DistTestsFixture, for (int i = 0; i < nLocalSlots; i++) { faabric::Message& m = req->mutable_messages()->at(i); - plannerCli.getMessageResult(m, 1000); + auto resultMsg = plannerCli.getMessageResult(m, 1000); std::string expected = fmt::format("Function {} executed on host {}", m.id(), getMasterIP()); - REQUIRE(m.outputdata() == expected); + REQUIRE(resultMsg.outputdata() == expected); } // Check functions executed on the other host diff --git a/tests/dist/scheduler/test_snapshots.cpp b/tests/dist/scheduler/test_snapshots.cpp index 8b25ca96e..4180b8fa3 100644 --- a/tests/dist/scheduler/test_snapshots.cpp +++ b/tests/dist/scheduler/test_snapshots.cpp @@ -19,6 +19,7 @@ namespace tests { +/* TODO(remote-threads): remote threads temporarily disabled TEST_CASE_METHOD(DistTestsFixture, "Check snapshots sent back from worker are queued", "[snapshots][threads]") @@ -107,6 +108,7 @@ TEST_CASE_METHOD(DistTestsFixture, faabric::Message actualResult = plannerCli.getMessageResult(msg, 10000); REQUIRE(actualResult.returnvalue() == 333); } +*/ TEST_CASE_METHOD(DistTestsFixture, "Check repeated reduction", @@ -119,9 +121,9 @@ TEST_CASE_METHOD(DistTestsFixture, faabric::util::batchExecFactory(user, function, 1); faabric::Message& msg = req->mutable_messages()->at(0); - // Main function and one thread execute on this host, others on another + // TODO(remote-threads): all threads execute in one host faabric::HostResources res; - res.set_slots(3); + res.set_slots(100); sch.setThisHostResources(res); std::vector expectedHosts = { getMasterIP() }; diff --git a/tests/dist/scheduler/test_threads.cpp b/tests/dist/scheduler/test_threads.cpp index ebc546ef0..e4c65c7fe 100644 --- a/tests/dist/scheduler/test_threads.cpp +++ b/tests/dist/scheduler/test_threads.cpp @@ -17,15 +17,15 @@ namespace tests { +/* TODO(remote-threads): this test now runs on one single host TEST_CASE_METHOD(DistTestsFixture, "Test executing threads on multiple hosts", "[snapshots][threads]") { // Set up this host's resources - int nLocalSlots = 2; int nThreads = 4; faabric::HostResources res; - res.set_slots(nLocalSlots); + res.set_slots(nThreads); sch.setThisHostResources(res); // Set up the message @@ -52,7 +52,7 @@ TEST_CASE_METHOD(DistTestsFixture, plannerCli.callFunctions(req); // Check threads executed on this host - for (int i = 0; i < nLocalSlots; i++) { + for (int i = 0; i < nThreads; i++) { faabric::Message& m = req->mutable_messages()->at(i); int res = sch.awaitThreadResult(m.id()); REQUIRE(res == m.id() / 2); @@ -65,4 +65,5 @@ TEST_CASE_METHOD(DistTestsFixture, REQUIRE(res == m.id() / 2); } } +*/ } diff --git a/tests/dist/transport/functions.cpp b/tests/dist/transport/functions.cpp index cfeae3c95..8c99b05b2 100644 --- a/tests/dist/transport/functions.cpp +++ b/tests/dist/transport/functions.cpp @@ -130,15 +130,16 @@ int handleDistributedLock(tests::DistTestExecutor* exec, if (msg.function() == "lock") { int initialValue = 0; - int groupId = faabric::util::generateGid(); stateKv->set(BYTES(&initialValue)); + // Create a nested request child of the parent request (parent-child + // indicated by same app id) std::shared_ptr nestedReq = faabric::util::batchExecFactory("ptp", "lock-worker", nWorkers); + faabric::util::updateBatchExecAppId(nestedReq, req->appid()); for (int i = 0; i < nWorkers; i++) { faabric::Message& m = nestedReq->mutable_messages()->at(i); - m.set_groupid(groupId); m.set_groupidx(i); } @@ -154,6 +155,8 @@ int handleDistributedLock(tests::DistTestExecutor* exec, } } + // Pull to make sure we have the latest version + stateKv->pull(); int finalValue = *(int*)stateKv->get(); int expectedValue = nWorkers * nLoops; if (finalValue != expectedValue) { @@ -255,10 +258,7 @@ class DistributedCoordinationTestRunner auto& m = chainReq->mutable_messages()->at(i); // Set app index and group data - m.set_appid(msg.appid()); m.set_appidx(i); - - m.set_groupid(groupId); m.set_groupidx(i); m.set_groupsize(nChained); @@ -324,8 +324,6 @@ class DistributedCoordinationTestRunner faabric::state::State& state; std::vector stateKeys; - - int groupId = 123; }; int handleDistributedBarrier(tests::DistTestExecutor* exec, diff --git a/tests/dist/transport/test_coordination.cpp b/tests/dist/transport/test_coordination.cpp index 642ad2d3b..567c87889 100644 --- a/tests/dist/transport/test_coordination.cpp +++ b/tests/dist/transport/test_coordination.cpp @@ -14,11 +14,14 @@ namespace tests { TEST_CASE_METHOD(DistTestsFixture, "Test distributed lock", "[ptp][transport]") { - // Set up this host's resources - int nLocalSlots = 5; + // Set up the host resources. The distributed lock test will start 10 other + // functions (so we need 11 slots). We give each host 8 slots for an even + // distribution + int nSlotsPerHost = 8; faabric::HostResources res; - res.set_slots(nLocalSlots); + res.set_slots(nSlotsPerHost); sch.setThisHostResources(res); + sch.addHostToGlobalSet(getWorkerIP(), std::make_shared(res)); // Set up the request std::shared_ptr req = diff --git a/tests/dist/transport/test_point_to_point.cpp b/tests/dist/transport/test_point_to_point.cpp index 69c81d1eb..5353aa463 100644 --- a/tests/dist/transport/test_point_to_point.cpp +++ b/tests/dist/transport/test_point_to_point.cpp @@ -33,21 +33,37 @@ class PointToPointDistTestFixture : public DistTestsFixture void setSlotsAndNumFuncs(int nLocalSlotsIn, int nFuncsIn) { + int nRemoteSlots = nFuncsIn - nLocalSlotsIn; nLocalSlots = nLocalSlotsIn; nFuncs = nFuncsIn; - // Set local resources - faabric::HostResources res; - res.set_slots(nLocalSlots); - sch.setThisHostResources(res); + faabric::HostResources localRes; + std::shared_ptr remoteRes = std::make_shared(); + + if (nLocalSlots == nRemoteSlots) { + localRes.set_slots(2 * nLocalSlots); + localRes.set_usedslots(nLocalSlots); + remoteRes->set_slots(nRemoteSlots); + } else if (nLocalSlots > nRemoteSlots) { + localRes.set_slots(nLocalSlots); + remoteRes->set_slots(nRemoteSlots); + } else { + SPDLOG_ERROR( + "Unfeasible PTP slots config (local: {} - remote: {})", + nLocalSlots, + nRemoteSlots); + throw std::runtime_error("Unfeasible slots configuration"); + } + + sch.setThisHostResources(localRes); + sch.addHostToGlobalSet(getWorkerIP(), remoteRes); } faabric::batch_scheduler::SchedulingDecision prepareRequestReturnDecision( std::shared_ptr req) { // Prepare expected decision - faabric::batch_scheduler::SchedulingDecision expectedDecision(appId, - groupId); + faabric::batch_scheduler::SchedulingDecision expectedDecision(req->appid(), req->groupid()); std::vector expectedHosts(nFuncs, getWorkerIP()); for (int i = 0; i < nLocalSlots; i++) { expectedHosts.at(i) = getMasterIP(); @@ -57,9 +73,9 @@ class PointToPointDistTestFixture : public DistTestsFixture for (int i = 0; i < nFuncs; i++) { faabric::Message& msg = req->mutable_messages()->at(i); - msg.set_appid(appId); + msg.set_appid(req->appid()); msg.set_appidx(i); - msg.set_groupid(groupId); + msg.set_groupid(req->groupid()); msg.set_groupidx(i); // Add to expected decision @@ -87,9 +103,6 @@ class PointToPointDistTestFixture : public DistTestsFixture } protected: - int appId = 222; - int groupId = 333; - int nLocalSlots; int nFuncs; }; @@ -98,7 +111,7 @@ TEST_CASE_METHOD(PointToPointDistTestFixture, "Test point-to-point messaging on multiple hosts", "[ptp][transport]") { - setSlotsAndNumFuncs(1, 4); + setSlotsAndNumFuncs(2, 4); // Set up batch request and scheduling decision std::shared_ptr req = @@ -140,11 +153,13 @@ TEST_CASE_METHOD(DistTestsFixture, { // Set up this host's resources, force execution across hosts int nChainedFuncs = 4; - int nLocalSlots = 2; + int nLocalSlots = 3; faabric::HostResources res; res.set_slots(nLocalSlots); sch.setThisHostResources(res); + res.set_slots(2); + sch.addHostToGlobalSet(getWorkerIP(), std::make_shared(res)); std::string function; SECTION("Barrier") { function = "barrier"; }