Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: test transport layer improvements #395

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,16 @@ jobs:
run: ./bin/inv_wrapper.sh dev.cc faabric_tests
- name: "Run tests"
run: ./bin/inv_wrapper.sh tests
timeout-minutes: 10

dist-tests:
if: github.event.pull_request.draft == false
needs: [conan-cache]
runs-on: ubuntu-latest
runs-on: self-hosted
env:
# Make a unique per-job cluster name, so that different instances can
# run in parallel
COMPOSE_PROJECT_NAME: faabric-gha-${{ github.job }}-${{ github.run_id }}-${{ github.run_attempt }}
CONAN_CACHE_MOUNT_SOURCE: ~/.conan/
steps:
# --- Code update ---
Expand All @@ -136,9 +140,13 @@ jobs:
run: ./dist-test/build.sh
- name: "Run the distributed tests"
run: ./dist-test/run.sh
timeout-minutes: 10
- name: "Print planner logs"
if: always()
run: docker compose logs planner
- name: "Chown all files to avoid docker-related root-owned files"
if: always()
run: sudo chown -R $(id -u):$(id -g) .

examples:
if: github.event.pull_request.draft == false
Expand Down
49 changes: 49 additions & 0 deletions include/faabric/mpi/MpiMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <cstdint>
#include <vector>

namespace faabric::mpi {

enum MpiMessageType : int32_t
{
NORMAL = 0,
BARRIER_JOIN = 1,
BARRIER_DONE = 2,
SCATTER = 3,
GATHER = 4,
ALLGATHER = 5,
REDUCE = 6,
SCAN = 7,
ALLREDUCE = 8,
ALLTOALL = 9,
SENDRECV = 10,
BROADCAST = 11,
};

struct MpiMessage
{
int32_t id;
int32_t worldId;
int32_t sendRank;
int32_t recvRank;
int32_t typeSize;
int32_t count;
MpiMessageType messageType;
void* buffer;
};

inline size_t payloadSize(const MpiMessage& msg)
{
return msg.typeSize * msg.count;
}

inline size_t msgSize(const MpiMessage& msg)
{
return sizeof(MpiMessage) + payloadSize(msg);
}

void serializeMpiMsg(std::vector<uint8_t>& buffer, const MpiMessage& msg);

void parseMpiMsg(const std::vector<uint8_t>& bytes, MpiMessage* msg);
}
12 changes: 8 additions & 4 deletions include/faabric/mpi/MpiMessageBuffer.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include <faabric/mpi/MpiMessage.h>
#include <faabric/mpi/mpi.h>
#include <faabric/mpi/mpi.pb.h>

#include <iterator>
#include <list>
#include <memory>

namespace faabric::mpi {
/* The MPI message buffer (MMB) keeps track of the asyncrhonous
Expand All @@ -25,17 +26,20 @@ class MpiMessageBuffer
{
public:
int requestId = -1;
std::shared_ptr<MPIMessage> msg = nullptr;
std::shared_ptr<MpiMessage> msg = nullptr;
int sendRank = -1;
int recvRank = -1;
uint8_t* buffer = nullptr;
faabric_datatype_t* dataType = nullptr;
int count = -1;
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL;
MpiMessageType messageType = MpiMessageType::NORMAL;

bool isAcknowledged() { return msg != nullptr; }

void acknowledge(std::shared_ptr<MPIMessage> msgIn) { msg = msgIn; }
void acknowledge(const MpiMessage& msgIn)
{
msg = std::make_shared<MpiMessage>(msgIn);
}
};

/* Interface to query the buffer size */
Expand Down
42 changes: 23 additions & 19 deletions include/faabric/mpi/MpiWorld.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include <faabric/mpi/MpiMessage.h>
#include <faabric/mpi/MpiMessageBuffer.h>
#include <faabric/mpi/mpi.h>
#include <faabric/mpi/mpi.pb.h>
#include <faabric/proto/faabric.pb.h>
#include <faabric/scheduler/InMemoryMessageQueue.h>
#include <faabric/transport/PointToPointBroker.h>
Expand All @@ -26,10 +26,9 @@ namespace faabric::mpi {
// -----------------------------------
// MPITOPTP - mocking at the MPI level won't be needed when using the PTP broker
// as the broker already has mocking capabilities
std::vector<std::shared_ptr<MPIMessage>> getMpiMockedMessages(int sendRank);
std::vector<MpiMessage> getMpiMockedMessages(int sendRank);

typedef faabric::util::FixedCapacityQueue<std::shared_ptr<MPIMessage>>
InMemoryMpiQueue;
typedef faabric::util::SpinLockQueue<MpiMessage> InMemoryMpiQueue;

class MpiWorld
{
Expand Down Expand Up @@ -73,36 +72,36 @@ class MpiWorld
const uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

int isend(int sendRank,
int recvRank,
const uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void broadcast(int rootRank,
int thisRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void recv(int sendRank,
int recvRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

int irecv(int sendRank,
int recvRank,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);

void awaitAsyncRequest(int requestId);

Expand Down Expand Up @@ -185,8 +184,6 @@ class MpiWorld

std::shared_ptr<InMemoryMpiQueue> getLocalQueue(int sendRank, int recvRank);

long getLocalQueueSize(int sendRank, int recvRank);

void overrideHost(const std::string& newHost);

double getWTime();
Expand Down Expand Up @@ -240,29 +237,36 @@ class MpiWorld
void sendRemoteMpiMessage(std::string dstHost,
int sendRank,
int recvRank,
const std::shared_ptr<MPIMessage>& msg);
const MpiMessage& msg);

std::shared_ptr<MPIMessage> recvRemoteMpiMessage(int sendRank,
int recvRank);
MpiMessage recvRemoteMpiMessage(int sendRank, int recvRank);

// Support for asyncrhonous communications
std::shared_ptr<MpiMessageBuffer> getUnackedMessageBuffer(int sendRank,
int recvRank);

std::shared_ptr<MPIMessage> recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);
MpiMessage recvBatchReturnLast(int sendRank,
int recvRank,
int batchSize = 0);

/* Helper methods */

void checkRanksRange(int sendRank, int recvRank);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::shared_ptr<MPIMessage>& m,
void doRecv(const MpiMessage& m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MpiMessageType messageType = MpiMessageType::NORMAL);

// Abstraction of the bulk of the recv work, shared among various functions
void doRecv(std::unique_ptr<MpiMessage> m,
uint8_t* buffer,
faabric_datatype_t* dataType,
int count,
MPI_Status* status,
MPIMessage::MPIMessageType messageType = MPIMessage::NORMAL);
MpiMessageType messageType = MpiMessageType::NORMAL);
};
}
21 changes: 6 additions & 15 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <faabric/batch-scheduler/SchedulingDecision.h>
#include <faabric/transport/PointToPointClient.h>
#include <faabric/transport/PointToPointMessage.h>
#include <faabric/util/config.h>
#include <faabric/util/locks.h>

Expand Down Expand Up @@ -120,27 +121,16 @@ class PointToPointBroker

void updateHostForIdx(int groupId, int groupIdx, std::string newHost);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
std::string hostHint,
bool mustOrderMsg = false);

void sendMessage(int groupId,
int sendIdx,
int recvIdx,
const uint8_t* buffer,
size_t bufferSize,
void sendMessage(const PointToPointMessage& msg,
bool mustOrderMsg = false,
int sequenceNum = NO_SEQUENCE_NUM,
std::string hostHint = "");

std::vector<uint8_t> recvMessage(int groupId,
int sendIdx,
int recvIdx,
bool mustOrderMsg = false);
void recvMessage(PointToPointMessage& msg, bool mustOrderMsg = false);

void clearGroup(int groupId);

Expand All @@ -163,7 +153,8 @@ class PointToPointBroker

std::shared_ptr<faabric::util::FlagWaiter> getGroupFlag(int groupId);

Message doRecvMessage(int groupId, int sendIdx, int recvIdx);
// Returns the message response code and the sequence number
std::pair<MessageResponseCode, int> doRecvMessage(PointToPointMessage& msg);

void initSequenceCounters(int groupId);

Expand Down
7 changes: 4 additions & 3 deletions include/faabric/transport/PointToPointClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
#include <faabric/proto/faabric.pb.h>
#include <faabric/transport/MessageEndpointClient.h>
#include <faabric/transport/PointToPointCall.h>
#include <faabric/transport/PointToPointMessage.h>

namespace faabric::transport {

std::vector<std::pair<std::string, faabric::PointToPointMappings>>
getSentMappings();

std::vector<std::pair<std::string, faabric::PointToPointMessage>>
std::vector<std::pair<std::string, PointToPointMessage>>
getSentPointToPointMessages();

std::vector<std::tuple<std::string,
faabric::transport::PointToPointCall,
faabric::PointToPointMessage>>
PointToPointMessage>>
getSentLockMessages();

void clearSentMessages();
Expand All @@ -26,7 +27,7 @@ class PointToPointClient : public faabric::transport::MessageEndpointClient

void sendMappings(faabric::PointToPointMappings& mappings);

void sendMessage(faabric::PointToPointMessage& msg,
void sendMessage(const PointToPointMessage& msg,
int sequenceNum = NO_SEQUENCE_NUM);

void groupLock(int appId,
Expand Down
45 changes: 45 additions & 0 deletions include/faabric/transport/PointToPointMessage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <cstdint>
#include <span>

namespace faabric::transport {

/* Simple fixed-size C-struct to capture the state of a PTP message moving
* through Faabric.
*
* We require fixed-size, and no unique pointers to be able to use
* high-throughput ring-buffers to send the messages around. This also means
* that we manually malloc/free the data pointer. The message size is:
* 4 * int32_t = 4 * 4 bytes = 16 bytes
* 1 * size_t = 1 * 8 bytes = 8 bytes
* 1 * void* = 1 * 8 bytes = 8 bytes
* total = 32 bytes = 4 * 8 so the struct is naturally 8 byte-aligned
*/
struct PointToPointMessage
{
int32_t appId;
int32_t groupId;
int32_t sendIdx;
int32_t recvIdx;
size_t dataSize;
void* dataPtr;
};
static_assert((sizeof(PointToPointMessage) % 8) == 0,
"PTP message mus be 8-aligned!");

// The wire format for a PTP message is very simple: the fixed-size struct,
// followed by dataSize bytes containing the payload.
void serializePtpMsg(std::span<uint8_t> buffer, const PointToPointMessage& msg);

// This parsing function mallocs space for the message payload. This is to
// keep the PTP message at fixed-size, and be able to efficiently move it
// around in-memory queues
void parsePtpMsg(std::span<const uint8_t> bytes, PointToPointMessage* msg);

// Alternative signature for parsing PTP messages for when the caller can
// provide an already-allocated buffer to write into
void parsePtpMsg(std::span<const uint8_t> bytes,
PointToPointMessage* msg,
std::span<uint8_t> preAllocBuffer);
}
Loading