Skip to content

Commit

Permalink
batch-scheduler: fix isSingleHost check (#359)
Browse files Browse the repository at this point in the history
* batch-scheduler: fix isSingleHost check

* tests: add regression test

* gh: bump minor code version

* dist-tests: fix dist-test that was relying on an incorrect notion of being singleHost
  • Loading branch information
csegarragonz authored Dec 19, 2023
1 parent db0cc66 commit c1f6509
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FAABRIC_VERSION=0.11.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.11.0
FAABRIC_VERSION=0.12.0
FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.12.0
COMPOSE_PROJECT_NAME=faabric-dev
CONAN_CACHE_MOUNT_SOURCE=./conan-cache/
14 changes: 7 additions & 7 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Cancel Workflow Action
uses: styfle/cancel-workflow-action@0.11.0
uses: styfle/cancel-workflow-action@0.12.0

conan-cache:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -36,7 +36,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -50,7 +50,7 @@ jobs:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand All @@ -73,7 +73,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down Expand Up @@ -167,7 +167,7 @@ jobs:
REDIS_QUEUE_HOST: redis
REDIS_STATE_HOST: redis
container:
image: faasm.azurecr.io/faabric:0.11.0
image: faasm.azurecr.io/faabric:0.12.0
credentials:
username: ${{ secrets.ACR_SERVICE_PRINCIPAL_ID }}
password: ${{ secrets.ACR_SERVICE_PRINCIPAL_PASSWORD }}
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.11.0
0.12.0
9 changes: 5 additions & 4 deletions src/batch-scheduler/SchedulingDecision.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ SchedulingDecision::SchedulingDecision(uint32_t appIdIn, int32_t groupIdIn)

bool SchedulingDecision::isSingleHost()
{
auto& conf = faabric::util::getSystemConfig();

// Always return false if single-host optimisations are switched off
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();
// TODO(thread-opt): remove this flag
if (conf.noSingleHostOptimisations == 1) {
return false;
}

std::string thisHost = conf.endpointHost;
return std::all_of(hosts.begin(), hosts.end(), [&](const std::string& s) {
return s == thisHost;
});
std::set<std::string> hostSet(hosts.begin(), hosts.end());
return hostSet.size() == 1;
}

void SchedulingDecision::addMessage(const std::string& host,
Expand Down
2 changes: 1 addition & 1 deletion src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
decision->print();
#endif

// 1. For a scale change request, we only need to update the hosts
// 1. For a new request, we only need to update the hosts
// with the new messages being scheduled
for (int i = 0; i < decision->hosts.size(); i++) {
claimHostSlots(state.hostMap.at(decision->hosts.at(i)));
Expand Down
2 changes: 2 additions & 0 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Executor::~Executor()
}
}

// TODO(thread-opt): get rid of this method here and move to
// PlannerClient::callFunctions()
std::vector<std::pair<uint32_t, int32_t>> Executor::executeThreads(
std::shared_ptr<faabric::BatchExecuteRequest> req,
const std::vector<faabric::util::SnapshotMergeRegion>& mergeRegions)
Expand Down
38 changes: 21 additions & 17 deletions tests/dist/dist_test_fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,33 @@ class DistTestsFixture
sch.addHostToGlobalSet(getWorkerIP());
sch.removeHostFromGlobalSet(LOCALHOST);

// Give some resources to each host
updateLocalSlots(4, 0);
updateRemoteSlots(4, 0);

// Set up executor
std::shared_ptr<tests::DistTestExecutorFactory> fac =
std::make_shared<tests::DistTestExecutorFactory>();
faabric::scheduler::setExecutorFactory(fac);
}

void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0)
{
faabric::HostResources remoteRes;
remoteRes.set_slots(newRemoteSlots);
remoteRes.set_usedslots(newRemoteUsedSlots);
sch.addHostToGlobalSet(workerIP,
std::make_shared<HostResources>(remoteRes));
}

~DistTestsFixture() = default;

std::string getWorkerIP()
Expand Down Expand Up @@ -65,23 +86,6 @@ class MpiDistTestsFixture : public DistTestsFixture
int worldSize = 4;
bool origIsMsgOrderingOn;

void updateLocalSlots(int newLocalSlots, int newUsedLocalSlots = 0)
{
faabric::HostResources localRes;
localRes.set_slots(newLocalSlots);
localRes.set_usedslots(newUsedLocalSlots);
sch.setThisHostResources(localRes);
}

void updateRemoteSlots(int newRemoteSlots, int newRemoteUsedSlots = 0)
{
faabric::HostResources remoteRes;
remoteRes.set_slots(newRemoteSlots);
remoteRes.set_usedslots(newRemoteUsedSlots);
sch.addHostToGlobalSet(workerIP,
std::make_shared<HostResources>(remoteRes));
}

void setLocalSlots(int numLocalSlots, int worldSizeIn = 0)
{
if (worldSizeIn > 0) {
Expand Down
29 changes: 16 additions & 13 deletions tests/dist/scheduler/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,22 @@ int handleFakeDiffsFunction(tests::DistTestExecutor* exec,
{
faabric::Message& msg = req->mutable_messages()->at(msgIdx);

std::string msgInput = msg.inputdata();
std::vector<uint8_t> inputBytes = faabric::util::stringToBytes(msgInput);
std::vector<uint8_t> otherData = { 1, 2, 3, 4 };

// Modify the executor's memory
int offsetA = 10;
int offsetB = HOST_PAGE_SIZE + 10;
std::memcpy(exec->getDummyMemory().data() + offsetA,
otherData.data(),
otherData.size());
std::memcpy(exec->getDummyMemory().data() + offsetB,
inputBytes.data(),
inputBytes.size());
if (msg.groupidx() > 0) {
std::string msgInput = msg.inputdata();
std::vector<uint8_t> inputBytes =
faabric::util::stringToBytes(msgInput);
std::vector<uint8_t> otherData = { 1, 2, 3, 4 };

// Modify the executor's memory
int offsetA = 10;
int offsetB = HOST_PAGE_SIZE + 10;
std::memcpy(exec->getDummyMemory().data() + offsetA,
otherData.data(),
otherData.size());
std::memcpy(exec->getDummyMemory().data() + offsetB,
inputBytes.data(),
inputBytes.size());
}

return 123;
}
Expand Down
43 changes: 24 additions & 19 deletions tests/dist/scheduler/test_snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,43 @@ TEST_CASE_METHOD(DistTestsFixture,
std::string function = "fake-diffs";
std::vector<uint8_t> inputData = { 0, 1, 2, 3, 4, 5, 6 };

// Set up the message
// Set up the messages
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory(user, function, 1);
faabric::util::batchExecFactory(user, function, 2);
req->set_type(faabric::BatchExecuteRequest::THREADS);

// Set up some input data
faabric::Message& msg = req->mutable_messages()->at(0);
msg.set_inputdata(inputData.data(), inputData.size());
for (int i = 0; i < req->messages_size(); i++) {
req->mutable_messages(i)->set_groupidx(i);
req->mutable_messages(i)->set_inputdata(inputData.data(),
inputData.size());
}

// Set up the main thread snapshot
auto& reg = faabric::snapshot::getSnapshotRegistry();
size_t snapSize = DIST_TEST_EXECUTOR_MEMORY_SIZE;
std::string snapshotKey = faabric::util::getMainThreadSnapshotKey(msg);
std::string snapshotKey =
faabric::util::getMainThreadSnapshotKey(req->messages(0));
auto snap = std::make_shared<faabric::util::SnapshotData>(snapSize);
reg.registerSnapshot(snapshotKey, snap);

// Force the function to be executed remotely
faabric::HostResources res;
res.set_usedslots(1);
res.set_slots(1);
sch.setThisHostResources(res);
res.set_usedslots(0);
res.set_slots(4);
sch.addHostToGlobalSet(getWorkerIP(), std::make_shared<HostResources>(res));
// Force the execution to span multiple hosts so that it triggers dirty
// tracking
std::vector<std::string> expectedHosts = { getMasterIP(), getWorkerIP() };
auto preloadDec = std::make_shared<batch_scheduler::SchedulingDecision>(
req->appid(), req->groupid());
for (int i = 0; i < req->messages_size(); i++) {
preloadDec->addMessage(expectedHosts.at(i), 0, 0, i);
}

std::vector<std::string> expectedHosts = { getWorkerIP() };
plannerCli.preloadSchedulingDecision(preloadDec);
auto decision = plannerCli.callFunctions(req);
std::vector<std::string> executedHosts = decision.hosts;
REQUIRE(expectedHosts == executedHosts);
REQUIRE(expectedHosts == decision.hosts);

auto msgResult = plannerCli.getMessageResult(req->appid(), msg.id(), 500);
REQUIRE(msgResult.returnvalue() == 123);
for (const auto& msg : req->messages()) {
auto msgResult =
plannerCli.getMessageResult(req->appid(), msg.id(), 500);
REQUIRE(msgResult.returnvalue() == 123);
}

// Write the diffs and check they've been applied
REQUIRE(snap->getQueuedDiffsCount() == 2);
Expand Down
2 changes: 1 addition & 1 deletion tests/test/batch-scheduler/test_scheduling_decisions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ TEST_CASE_METHOD(ConfFixture, "Test building scheduling decisions", "[util]")
hostB = "hostA";
hostC = "hostA";

expectSingleHost = false;
expectSingleHost = true;
expectedUniqueHosts = { "hostA" };
}

Expand Down

0 comments on commit c1f6509

Please sign in to comment.