Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20589] Set real TCP non_blocking_send limitation (backport #4502) #4631

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ bool TCPChannelResource::check_socket_send_buffer(


size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
if (future_queue_size > size_t(parent_->configuration()->sendBufferSize))
// TCP actually allocates twice the size of the buffer requested.
if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize))
{
return false;
}
Expand Down
94 changes: 65 additions & 29 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1222,17 +1222,21 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server)
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, secure_non_blocking_send)
{
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info);

uint16_t port = g_default_port;
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
// Create a TCP Server transport
using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
TCPv4TransportDescriptor senderDescriptor;
senderDescriptor.add_listener_port(port);
senderDescriptor.apply_security = true;
senderDescriptor.sendBufferSize = msg_size;
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
senderDescriptor.tls_config.verify_file = "ca.crt";
senderDescriptor.tls_config.password = "fastddspwd";
senderDescriptor.tls_config.cert_chain_file = "fastdds.crt";
senderDescriptor.tls_config.private_key_file = "fastdds.key";
senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem";
senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE);
Expand All @@ -1249,8 +1253,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
Expand All @@ -1263,13 +1267,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
{
return preverified;
});
ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose)
{
return "fastddspwd";
});
ssl_context.use_certificate_chain_file("fastdds.crt");
ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem);
ssl_context.use_tmp_dh_file("dh_params.pem");
ssl_context.load_verify_file("ca.crt");

uint32_t options = 0;
options |= asio::ssl::context::default_workarounds;
Expand All @@ -1278,8 +1276,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
options |= asio::ssl::context::no_compression;
ssl_context.set_options(options);

// TCPChannelResourceSecure::connect() like connection
asio::io_service io_service;
auto ioServiceFunction = [&]()
{
#if ASIO_VERSION >= 101200
asio::executor_work_guard<asio::io_service::executor_type> work(io_service.get_executor());
#else
io_service::work work(io_service_);
#endif // if ASIO_VERSION >= 101200
io_service.run();
};
std::thread ioServiceThread(ioServiceFunction);

// TCPChannelResourceSecure::connect() like connection
asio::ip::tcp::resolver resolver(io_service);
auto endpoints = resolver.resolve(
IPLocator::ip_to_string(serverLoc),
Expand All @@ -1300,7 +1309,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
)
{
ASSERT_TRUE(!ec);
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server;
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client;
secure_socket->async_handshake(role,
[](const std::error_code& ec)
{
Expand All @@ -1316,24 +1325,40 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
a connection. This channel will not be present in the server's channel_resources_ map
as communication lacks most of the discovery messages using a raw socket as participant.
*/
// auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
auto sender_channel_resource =
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = 0;
bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec);
ASSERT_EQ(ec, asio::error_code());
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

secure_socket->lowest_layer().close(ec);
io_service.stop();
ioServiceThread.join();
}
#endif // ifndef _WIN32

Expand Down Expand Up @@ -1769,7 +1794,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv4Tests, non_blocking_send)
{
Expand All @@ -1790,8 +1815,8 @@ TEST_F(TCPv4Tests, non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
Expand Down Expand Up @@ -1836,15 +1861,26 @@ TEST_F(TCPv4Tests, non_blocking_send)

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
Expand Down
29 changes: 20 additions & 9 deletions test/unittest/transport/TCPv6Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness)
}

#ifndef _WIN32
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
// destination that does not read or does it so slowly.
TEST_F(TCPv6Tests, non_blocking_send)
{
Expand All @@ -264,8 +264,8 @@ TEST_F(TCPv6Tests, non_blocking_send)
// saturate the reception socket of the datareader. This saturation requires
// preventing the datareader from reading from the socket, what inevitably
// happens continuously if instantiating and connecting the receiver transport.
// Hence, a raw socket is opened and connected to the server. There won't be read
// calls on that socket.
// Hence, a raw socket is opened and connected to the server. Read calls on that
// socket are controlled.
Locator_t serverLoc;
serverLoc.kind = LOCATOR_KIND_TCPv6;
IPLocator::setIPv6(serverLoc, "::1");
Expand Down Expand Up @@ -310,15 +310,26 @@ TEST_F(TCPv6Tests, non_blocking_send)

// Prepare the message
asio::error_code ec;
std::vector<octet> message(msg_size, 0);
std::vector<octet> message(msg_size * 2, 0);
const octet* data = message.data();
size_t size = message.size();

// Send the message with no header
for (int i = 0; i < 5; i++)
{
sender_channel_resource->send(nullptr, 0, data, size, ec);
}
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
// it should be able to send a message of msg_size*2.
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, size);

// Now wait until the receive buffer is flushed (send buffer will be empty too)
std::vector<octet> buffer(size, 0);
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
ASSERT_EQ(bytes_read, size);

// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
message.resize(msg_size * 2 + 1);
data = message.data();
size = message.size();
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
ASSERT_EQ(bytes_sent, 0u);

socket.shutdown(asio::ip::tcp::socket::shutdown_both);
socket.cancel();
Expand Down
Loading