Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpi: manually heap-allocate payloads for local messages #378

Merged
merged 8 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@

namespace faabric::util {

// We provide our own namespaced definitions for malloc/free to control the
// memory allocator we use. For the moment, we just defer to off-the-shelve
// malloc implementations.
inline void* malloc(std::size_t size)
{
return std::malloc(size);
}

inline void free(void* ptr)
{
return std::free(ptr);
}

/*
* Merges all the dirty page flags from the list of vectors into the first
* vector in place.
Expand Down
3 changes: 3 additions & 0 deletions leak-sanitizer-ignorelist.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# For local MPI messages we send malloc-ed pointers through in-memory queues,
# what makes LSAN unhappy
leak:MpiWorld::send
33 changes: 27 additions & 6 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <faabric/util/environment.h>
#include <faabric/util/gids.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/testing.h>

// Each MPI rank runs in a separate thread, thus we use TLS to maintain the
Expand Down Expand Up @@ -516,9 +517,7 @@
m->set_messagetype(messageType);

// Set up message data
if (count > 0 && buffer != nullptr) {
m->set_buffer(buffer, dataType->size * count);
}
bool mustSendData = count > 0 && buffer != nullptr;

// Mock the message sending in tests
if (faabric::util::isMockMode()) {
Expand All @@ -528,10 +527,21 @@

// Dispatch the message locally or globally
if (isLocal) {
if (mustSendData) {
void* bufferPtr = faabric::util::malloc(count * dataType->size);
std::memcpy(bufferPtr, buffer, count * dataType->size);

m->set_bufferptr((uint64_t)bufferPtr);
}

SPDLOG_TRACE(
"MPI - send {} -> {} ({})", sendRank, recvRank, messageType);
getLocalQueue(sendRank, recvRank)->enqueue(std::move(m));
} else {
if (mustSendData) {
m->set_buffer(buffer, dataType->size * count);

Check warning on line 542 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L542

Added line #L542 was not covered by tests
}

SPDLOG_TRACE(
"MPI - send remote {} -> {} ({})", sendRank, recvRank, messageType);
sendRemoteMpiMessage(otherHost, sendRank, recvRank, m);
Expand Down Expand Up @@ -596,10 +606,21 @@
assert(m->messagetype() == messageType);
assert(m->count() <= count);

// TODO - avoid copy here
// Copy message data
const std::string otherHost = getHostForRank(m->destination());

Check warning on line 609 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L609

Added line #L609 was not covered by tests
bool isLocal =
getHostForRank(m->destination()) == getHostForRank(m->sender());

if (m->count() > 0) {
std::move(m->buffer().begin(), m->buffer().end(), buffer);
if (isLocal) {
// Make sure we do not overflow the recepient buffer
auto bytesToCopy = std::min<size_t>(m->count() * dataType->size,
count * dataType->size);
std::memcpy(buffer, (void*)m->bufferptr(), bytesToCopy);
faabric::util::free((void*)m->bufferptr());
} else {
// TODO - avoid copy here
std::move(m->buffer().begin(), m->buffer().end(), buffer);

Check warning on line 622 in src/mpi/MpiWorld.cpp

View check run for this annotation

Codecov / codecov/patch

src/mpi/MpiWorld.cpp#L622

Added line #L622 was not covered by tests
}
}

// Set status values if required
Expand Down
7 changes: 6 additions & 1 deletion src/mpi/mpi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ message MPIMessage {
int32 destination = 5;
int32 type = 6;
int32 count = 7;
bytes buffer = 8;

// For remote messaging
optional bytes buffer = 8;

// For local messaging
optional int64 bufferPtr = 9;
}
5 changes: 1 addition & 4 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ bool Planner::registerHost(const Host& hostIn, bool overwrite)

void Planner::removeHost(const Host& hostIn)
{
SPDLOG_DEBUG("Planner received request to remove host {}", hostIn.ip());
SPDLOG_INFO("Planner received request to remove host {}", hostIn.ip());

// We could acquire first a read lock to see if the host is in the host
// map, and then acquire a write lock to remove it, but we don't do it
Expand Down Expand Up @@ -289,9 +289,6 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
msg->groupidx());

// Release the slot only once
if (!state.hostMap.contains(msg->executedhost())) {
SPDLOG_ERROR("Host Map does not contain: {}", msg->executedhost());
}
assert(state.hostMap.contains(msg->executedhost()));
if (!state.appResults[appId].contains(msgId)) {
releaseHostSlots(state.hostMap.at(msg->executedhost()));
Expand Down
5 changes: 4 additions & 1 deletion tasks/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
"REDIS_QUEUE_HOST": "redis",
"REDIS_STATE_HOST": "redis",
"TERM": "xterm-256color",
"ASAN_OPTIONS": "verbosity=1:halt_on_error=1",
"ASAN_OPTIONS": "verbosity=1:halt_on_error=1:",
"LSAN_OPTIONS": "suppressions={}/leak-sanitizer-ignorelist.txt".format(
PROJ_ROOT
),
"TSAN_OPTIONS": " ".join(
[
"verbosity=1 halt_on_error=1",
Expand Down
9 changes: 6 additions & 3 deletions tests/dist/mpi/mpi_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <faabric/util/compare.h>
#include <faabric/util/config.h>
#include <faabric/util/logging.h>
#include <faabric/util/memory.h>

using namespace faabric::mpi;

Expand Down Expand Up @@ -508,7 +509,7 @@ int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void* baseptr)
throw std::runtime_error("Non-null info not supported");
}

*((void**)baseptr) = malloc(size);
*((void**)baseptr) = faabric::util::malloc(size);

return MPI_SUCCESS;
}
Expand Down Expand Up @@ -641,7 +642,8 @@ int MPI_Isend(const void* buf,
SPDLOG_TRACE("MPI - MPI_Isend {} -> {}", executingContext.getRank(), dest);

MpiWorld& world = getExecutingWorld();
(*request) = (faabric_request_t*)malloc(sizeof(faabric_request_t));
(*request) =
(faabric_request_t*)faabric::util::malloc(sizeof(faabric_request_t));
int requestId = world.isend(
executingContext.getRank(), dest, (uint8_t*)buf, datatype, count);
(*request)->id = requestId;
Expand All @@ -661,7 +663,8 @@ int MPI_Irecv(void* buf,
"MPI - MPI_Irecv {} <- {}", executingContext.getRank(), source);

MpiWorld& world = getExecutingWorld();
(*request) = (faabric_request_t*)malloc(sizeof(faabric_request_t));
(*request) =
(faabric_request_t*)faabric::util::malloc(sizeof(faabric_request_t));
int requestId = world.irecv(
source, executingContext.getRank(), (uint8_t*)buf, datatype, count);
(*request)->id = requestId;
Expand Down
2 changes: 2 additions & 0 deletions tests/test/endpoint/test_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ void* doWork(void* arg)
pthread_exit(0);
}

/* 26/02/2024 - FIXME(flaky): This test is failing often in GHA
TEST_CASE("Test starting an endpoint in signal mode", "[endpoint]")
{
// Use pthreads to be able to signal the thread correctly
Expand All @@ -104,6 +105,7 @@ TEST_CASE("Test starting an endpoint in signal mode", "[endpoint]")
pthread_join(ptid, nullptr);
}
*/

TEST_CASE_METHOD(EndpointTestFixture,
"Test posting a request to the endpoint",
Expand Down
15 changes: 0 additions & 15 deletions tests/test/mpi/test_mpi_world.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,6 @@ TEST_CASE_METHOD(MpiTestFixture, "Test send and recv on same host", "[mpi]")
world.send(
rankA1, rankA2, BYTES(messageData.data()), MPI_INT, messageData.size());

SECTION("Test queueing")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not apply as the message buffer is not in the queue any more.

{
// Check the message itself is on the right queue
REQUIRE(world.getLocalQueueSize(rankA1, rankA2) == 1);
REQUIRE(world.getLocalQueueSize(rankA2, rankA1) == 0);
REQUIRE(world.getLocalQueueSize(rankA1, 0) == 0);
REQUIRE(world.getLocalQueueSize(rankA2, 0) == 0);

// Check message content
const std::shared_ptr<InMemoryMpiQueue>& queueA2 =
world.getLocalQueue(rankA1, rankA2);
MPIMessage actualMessage = *(queueA2->dequeue());
checkMessage(actualMessage, worldId, rankA1, rankA2, messageData);
}

SECTION("Test recv")
{
// Receive the message
Expand Down
20 changes: 10 additions & 10 deletions tests/test/scheduler/test_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,25 +611,25 @@ TEST_CASE_METHOD(SlowExecutorTestFixture,
{
faabric::util::setMockMode(true);

const std::string otherHost = "otherHost";
faabric::Message msg = faabric::util::messageFactory("foo", "bar");
msg.set_mainhost(otherHost);
msg.set_executedhost(faabric::util::getSystemConfig().endpointHost);

auto fac = faabric::executor::getExecutorFactory();
auto exec = fac->createExecutor(msg);

// If we want to set a function result, the planner must see at least one
// slot, and at least one used slot in this host. Both for the task
// executed in "otherHost" (executed as part of createExecutor) as well
// as the one we are setting the result for
faabric::HostResources res;
res.set_slots(1);
res.set_usedslots(1);
res.set_slots(2);
res.set_usedslots(2);
sch.setThisHostResources(res);
// Resources for the background task
const std::string otherHost = "otherHost";
sch.addHostToGlobalSet(otherHost, std::make_shared<HostResources>(res));

faabric::Message msg = faabric::util::messageFactory("foo", "bar");
msg.set_mainhost(otherHost);
msg.set_executedhost(faabric::util::getSystemConfig().endpointHost);

auto fac = faabric::executor::getExecutorFactory();
auto exec = fac->createExecutor(msg);

// Set the thread result
int returnValue = 123;
std::string snapKey;
Expand Down