Skip to content

Commit

Permalink
Merge pull request #274 from lf-lang/real-time-sockets
Browse files Browse the repository at this point in the history
Use real-time sockets in federated execution
  • Loading branch information
erlingrj authored Sep 28, 2023
2 parents 64dbf26 + 664e4ca commit eb96e25
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 54 deletions.
1 change: 1 addition & 0 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ add_executable(

IF(CMAKE_BUILD_TYPE MATCHES DEBUG)
# Set the LOG_LEVEL to 4 to get DEBUG messages
message("-- Building RTI with DEBUG messages enabled")
target_compile_definitions(RTI PUBLIC LOG_LEVEL=4)
ENDIF(CMAKE_BUILD_TYPE MATCHES DEBUG)

Expand Down
38 changes: 2 additions & 36 deletions core/federated/RTI/rti_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = -1;
if (socket_type == TCP) {
socket_descriptor = socket(AF_INET, SOCK_STREAM, 0);
socket_descriptor = create_real_time_tcp_socket_errexit();
} else if (socket_type == UDP) {
socket_descriptor = socket(AF_INET, SOCK_DGRAM, 0);
socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Set the appropriate timeout time
timeout_time = (struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
Expand Down Expand Up @@ -1598,40 +1598,6 @@ void wait_for_federates(int socket_descriptor) {
// duplicated packets intended for this program.
close(socket_descriptor);

/* NOTE: Below is a song and dance that is apparently not needed.
The above shutdown and close appear to do the job.
// Apparently, closing the socket will not necessarily
// cause the respond_to_erroneous_connections accept() call to return,
// so instead, we connect here so that it can check the _f_rti->all_federates_exited
// variable.
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int tmp_socket = socket(AF_INET , SOCK_STREAM , 0);
// If creating the socket fails, assume the thread has already exited.
if (tmp_socket >= 0) {
struct hostent *server = gethostbyname("localhost");
if (server != NULL) {
// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server_fd struct.
bzero((char *) &server_fd, sizeof(server_fd));
// Set up the server_fd fields.
server_fd.sin_family = AF_INET; // IPv4
bcopy((char *)server->h_addr,
(char *)&server_fd.sin_addr.s_addr,
server->h_length);
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(_f_rti->final_port_TCP);
connect(
tmp_socket,
(struct sockaddr *)&server_fd,
sizeof(server_fd));
close(tmp_socket);
}
}
*/

if (_f_rti->socket_descriptor_UDP > 0) {
if (shutdown(_f_rti->socket_descriptor_UDP, SHUT_RDWR)) {
LF_PRINT_LOG("On shut down UDP socket, received reply: %s", strerror(errno));
Expand Down
2 changes: 1 addition & 1 deletion core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ uint16_t setup_clock_synchronization_with_rti() {
uint16_t port_to_return = UINT16_MAX;
#ifdef _LF_CLOCK_SYNC_ON
// Initialize the UDP socket
_lf_rti_socket_UDP = socket(AF_INET, SOCK_DGRAM, 0);
_lf_rti_socket_UDP = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Initialize the necessary information for the UDP address
struct sockaddr_in federate_UDP_addr;
federate_UDP_addr.sin_family = AF_INET;
Expand Down
18 changes: 5 additions & 13 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <arpa/inet.h> // inet_ntop & inet_pton
#include <netdb.h> // Defines getaddrinfo(), freeaddrinfo() and struct addrinfo.
#include <netinet/in.h> // Defines struct sockaddr_in

#include <regex.h>
#include <strings.h> // Defines bzero().
#include <sys/socket.h>
Expand Down Expand Up @@ -157,10 +158,7 @@ void create_server(int specified_port) {
}
LF_PRINT_DEBUG("Creating a socket server on port %d.", port);
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = socket(AF_INET, SOCK_STREAM, 0);
if (socket_descriptor < 0) {
lf_print_error_and_exit("Failed to obtain a socket server.");
}
int socket_descriptor = create_real_time_tcp_socket_errexit();

// Server file descriptor.
struct sockaddr_in server_fd;
Expand Down Expand Up @@ -800,10 +798,7 @@ void connect_to_federate(uint16_t remote_federate_id) {
int socket_id = -1;
while (result < 0) {
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
socket_id = socket(AF_INET, SOCK_STREAM, 0);
if (socket_id < 0) {
lf_print_error_and_exit("Failed to create socket to federate %d.", remote_federate_id);
}
socket_id = create_real_time_tcp_socket_errexit();

// Server file descriptor.
struct sockaddr_in server_fd;
Expand Down Expand Up @@ -1038,11 +1033,8 @@ void connect_to_rti(const char* hostname, int port) {
lf_print_error_and_exit("No host for RTI matching given hostname: %s", hostname);
}

// Create a socket matching hints criteria
_fed.socket_TCP_RTI = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (_fed.socket_TCP_RTI < 0) {
lf_print_error_and_exit("Failed to create socket to RTI.");
}
// Create a socket
_fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit();

result = connect(_fed.socket_TCP_RTI, res->ai_addr, res->ai_addrlen);
if (result == 0) {
Expand Down
38 changes: 34 additions & 4 deletions core/federated/net_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <math.h> // For sqrtl() and powl
#include <stdarg.h> // Defines va_list
#include <math.h> // For sqrtl() and powl
#include <stdarg.h> // Defines va_list
#include <stdio.h>
#include <stdlib.h>
#include <string.h> // Defines memcpy()
#include <time.h> // Defines nanosleep()
#include <string.h> // Defines memcpy()
#include <time.h> // Defines nanosleep()
#include <netinet/in.h> // IPPROTO_TCP, IPPROTO_UDP
#include <netinet/tcp.h> // TCP_NODELAY

#include "net_util.h"
#include "util.h"
Expand All @@ -54,6 +56,34 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/** Number of nanoseconds to sleep before retrying a socket read. */
#define SOCKET_READ_RETRY_INTERVAL 1000000

int create_real_time_tcp_socket_errexit() {
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0) {
lf_print_error_and_exit("Could not open TCP socket. Err=%d", sock);
}
// Disable Nagle's algorithm which bundles together small TCP messages to
// reduce network traffic
// TODO: Re-consider if we should do this, and whether disabling delayed ACKs
// is enough.
int flag = 1;
int result = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int));

if (result < 0) {
lf_print_error_and_exit("Failed to disable Nagle algorithm on socket server.");
}

// Disable delayed ACKs. Only possible on Linux
#if defined(PLATFORM_Linux)
result = setsockopt(sock, IPPROTO_TCP, TCP_QUICKACK, &flag, sizeof(int));

if (result < 0) {
lf_print_error_and_exit("Failed to disable Nagle algorithm on socket server.");
}
#endif

return sock;
}

ssize_t read_from_socket_errexit(
int socket,
size_t num_bytes,
Expand Down
10 changes: 10 additions & 0 deletions include/core/federated/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ int host_is_big_endian(void);

#ifdef FEDERATED


/**
* @brief Create an IPv4 TCP socket with Nagle's algorithm disabled
* (TCP_NODELAY) and Delayed ACKs disabled (TCP_QUICKACK). Exits application
* on any error.
*
* @return int
*/
int create_real_time_tcp_socket_errexit();

/**
* Read the specified number of bytes from the specified socket into the
* specified buffer. If a disconnect or an EOF occurs during this
Expand Down

0 comments on commit eb96e25

Please sign in to comment.