Skip to content

Commit

Permalink
transport(tcp): set send/recv buffer sizes (#426)
Browse files Browse the repository at this point in the history
In this PR we add support to set the send and receive buffer sizes.
OpenMPI does not set them, but we were failing when sending some large
messages. In addition, and similarly to OpenMPI we need to appropriately
set the kernel parameters for TCP.
  • Loading branch information
csegarragonz authored Apr 17, 2024
1 parent d1c0f96 commit da5fee2
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 0 deletions.
3 changes: 3 additions & 0 deletions include/faabric/transport/tcp/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
namespace faabric::transport::tcp {

const int SocketTimeoutMs = 5000;
// We get this value from OpenMPI's recommended TCP settings (FAQ 9):
// https://www.open-mpi.org/faq/?category=tcp
const size_t SocketBufferSizeBytes = 16777216;

class Socket
{
Expand Down
9 changes: 9 additions & 0 deletions include/faabric/transport/tcp/SocketOptions.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <cstddef>

namespace faabric::transport::tcp {
void setReuseAddr(int connFd);
void setNoDelay(int connFd);
Expand All @@ -16,4 +18,11 @@ void setBusyPolling(int connFd);
// Set timeout for blocking sockets
void setRecvTimeoutMs(int connFd, int timeoutMs);
void setSendTimeoutMs(int connFd, int timeoutMs);

// Set send/recv buffer sizes (important to guarantee MPI progress). Note that
// this options can never exceed the values set in net.core.{r,w}mem_max. To
// this extent, this functions must be used in conjunction with the adequate
// TCP configuration
void setRecvBufferSize(int connFd, size_t bufferSizeBytes);
void setSendBufferSize(int connFd, size_t bufferSizeBytes);
}
5 changes: 5 additions & 0 deletions src/transport/tcp/RecvSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ void RecvSocket::listen()
throw std::runtime_error("Socket error binding to fd");
}

// Set the receive buffer size on the listening socket so that ACCEPT-ed
// sockets inherit it, and the right buffer size is used to negotiate the
// TCP window
setRecvBufferSize(connFd, SocketBufferSizeBytes);

ret = ::listen(connFd, 1024);
if (ret) {
SPDLOG_ERROR("Error listening to {}:{} (fd: {}): {} (ret: {})",
Expand Down
4 changes: 4 additions & 0 deletions src/transport/tcp/SendSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ SendSocket::SendSocket(const std::string& host, int port)

void SendSocket::setSocketOptions(int connFd)
{
setSendBufferSize(connFd, SocketBufferSizeBytes);
setNoDelay(connFd);
setQuickAck(connFd);

#ifdef FAABRIC_USE_SPINLOCK
setNonBlocking(connFd);
#else
Expand Down Expand Up @@ -111,5 +113,7 @@ void SendSocket::sendOne(const uint8_t* buffer, size_t bufferSize)
buffer += nSent;
totalNumSent += nSent;
}

assert(totalNumSent == bufferSize);
}
}
28 changes: 28 additions & 0 deletions src/transport/tcp/SocketOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,32 @@ void setSendTimeoutMs(int connFd, int timeoutMs)
throw std::runtime_error("Error setting send timeout");
}
}

void setRecvBufferSize(int connFd, size_t bufferSizeBytes)
{
int ret = ::setsockopt(
connFd, SOL_SOCKET, SO_RCVBUF, &bufferSizeBytes, sizeof(bufferSizeBytes));
if (ret == -1) {
SPDLOG_ERROR(
"Error setting recv buffer size for socket {}: {} (no: {})",
connFd,
std::strerror(errno),
errno);
throw std::runtime_error("Error setting recv buffer size");
}
}

void setSendBufferSize(int connFd, size_t bufferSizeBytes)
{
int ret = ::setsockopt(
connFd, SOL_SOCKET, SO_SNDBUF, &bufferSizeBytes, sizeof(bufferSizeBytes));
if (ret == -1) {
SPDLOG_ERROR(
"Error setting send buffer size for socket {}: {} (no: {})",
connFd,
std::strerror(errno),
errno);
throw std::runtime_error("Error setting send buffer size");
}
}
}
6 changes: 6 additions & 0 deletions tests/test/transport/test_tcp_sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ TEST_CASE("Test setting socket options", "[transport]")
setBlocking(conn);
setRecvTimeoutMs(conn, SocketTimeoutMs);
setSendTimeoutMs(conn, SocketTimeoutMs);
setRecvBufferSize(conn, SocketBufferSizeBytes);
setSendBufferSize(conn, SocketBufferSizeBytes);

REQUIRE(!isNonBlocking(conn));

Expand All @@ -91,6 +93,8 @@ TEST_CASE("Test setting socket options", "[transport]")
REQUIRE_THROWS(setBlocking(conn));
REQUIRE_THROWS(setRecvTimeoutMs(conn, SocketTimeoutMs));
REQUIRE_THROWS(setSendTimeoutMs(conn, SocketTimeoutMs));
REQUIRE_THROWS(setRecvBufferSize(conn, SocketBufferSizeBytes));
REQUIRE_THROWS(setSendBufferSize(conn, SocketBufferSizeBytes));
}

TEST_CASE("Test send/recv one message using raw TCP sockets", "[transport]")
Expand Down Expand Up @@ -142,6 +146,8 @@ TEST_CASE("Test send/recv one message using raw TCP sockets", "[transport]")
REQUIRE_THROWS(
dst.recvOne(conn, BYTES(actual.data()), sizeof(int) * actual.size()));
} else {
setRecvBufferSize(conn, SocketBufferSizeBytes);

dst.recvOne(conn, BYTES(actual.data()), sizeof(int) * actual.size());

REQUIRE(actual == msg);
Expand Down

0 comments on commit da5fee2

Please sign in to comment.