Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write-messages-to-file option. #134

Open
wants to merge 1 commit into
base: sockperf_v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ const uint32_t TEST_FIRST_CONNECTION_FIRST_PACKET_TTL_THRESHOLD_MSEC = 50;
#define MAX_ACTIVE_FD_NUM \
1024 /* maximum number of active connection to the single TCP addr:port \
*/
#define DEFAULT_BUF_ALIGNMENT 512
#define DEFAULT_FILE_FLAGS_OPT 0
#ifdef USING_VMA_EXTRA_API
#define MAX_VMA_COMPS 1024 /* maximum size for the VMA completions array for VMA Poll */
#endif
Expand Down Expand Up @@ -221,7 +223,10 @@ enum {
OPT_DUMMY_SEND, // 41
OPT_RATE_LIMIT, // 42
OPT_UC_REUSEADDR, // 43
OPT_FULL_RTT // 44
OPT_FULL_RTT, // 44
OPT_WRITE_MSG_FILE_PATH, // 45
OPT_WRITE_MSG_FILE_FLAGS, // 46
OPT_WRITE_MSG_BUF_ALIGN // 47
};

static const char *const round_trip_str[] = { "latency", "rtt" };
Expand Down Expand Up @@ -679,6 +684,10 @@ struct user_params_t {
uint32_t dummy_mps; // client side only
TicksDuration dummySendCycleDuration; // client side only
uint32_t rate_limit;
bool b_write_msg_to_file;
char write_msg_filepath[MAX_PATH_LENGTH];
uint32_t write_msg_file_flags_opt;
uint32_t write_msg_buf_alignment;
};

struct mutable_params_t {};
Expand Down
91 changes: 90 additions & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ void ServerBase::cleanupAfterLoop() {
//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCalcGaps>
Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::Server(int _fd_min, int _fd_max, int _fd_num)
: ServerBase(m_ioHandler), m_ioHandler(_fd_min, _fd_max, _fd_num) {}
: ServerBase(m_ioHandler), m_ioHandler(_fd_min, _fd_max, _fd_num),
b_write_msg_file_init(false),
write_msg_file(NULL),
write_msg_fd(0), open_fd_default_flags(O_CREAT | O_RDWR | O_TRUNC),
write_msg_buf(NULL), write_msg_buf_aligned(NULL) {}

//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCalcGaps>
Expand Down Expand Up @@ -318,6 +322,11 @@ int Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_accept(int ifd) {
inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), active_ifd);
}
do_accept = true;

if (s_user_params.b_write_msg_to_file && !b_write_msg_file_init) {
setup_write_msg_to_file();
}

break;
}
}
Expand Down Expand Up @@ -351,11 +360,91 @@ int Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_accept(int ifd) {
}
}
}
} else if (g_fds_array[ifd]->sock_type == SOCK_DGRAM) {
if (s_user_params.b_write_msg_to_file && !b_write_msg_file_init) {
setup_write_msg_to_file();
}
}

return active_ifd;
}

//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCalcGaps>
void Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::setup_write_msg_to_file() {

/* 0 - write to file through a file stream
* 1 - open_fd_default_flags | O_DSYNC
* 2 - open_fd_default_flags | O_DIRECT | O_DSYNC */
switch (s_user_params.write_msg_file_flags_opt)
{
case 0:
write_msg_file = fopen(s_user_params.write_msg_filepath, "w+");
if (write_msg_file == NULL) {
log_err("Failed to open file stream for file: %s.", s_user_params.write_msg_filepath);
exit_with_log("Failed to enable writing messages to file. Exiting sockperf.", SOCKPERF_ERR_FATAL);
}
break;
case 1:
open_fd_align_buffer(O_DSYNC);
break;
case 2:
open_fd_align_buffer(O_DSYNC | O_DIRECT);
break;
default:
exit_with_log("Unrecognized file sync option provided. Exiting sockperf.", SOCKPERF_ERR_BAD_ARGUMENT);
break;
}

b_write_msg_file_init = true;
}

//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCalcGaps>
void Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::cleanup_write_msg_to_file() {

if (write_msg_file != NULL) {
if (fclose(write_msg_file) != 0) {
log_err("Failed to close file stream for file: %s.", s_user_params.write_msg_filepath);
}
write_msg_file = NULL;
} else {
if (write_msg_fd > 0) {
if (close(write_msg_fd) != 0) {
log_err("Failed to close file descriptor for file: %s.", s_user_params.write_msg_filepath);
}
}
free(write_msg_buf);
}

b_write_msg_file_init = false;
}

/* To write to a file opened with O_DIRECT userspace buffer needs to be aligned
* by a multiple of 512 bytes and the buffer length needs to be a multiple of 512 bytes. */
#define DIRECT_IO_BUF_ALIGN(PTR, ALIGNVAL) (((uintptr_t) (PTR) + ((ALIGNVAL) - 1)) & ~((uintptr_t) ((ALIGNVAL) - 1)))

//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCalcGaps>
void Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::open_fd_align_buffer(int flags) {

write_msg_fd = open(s_user_params.write_msg_filepath, open_fd_default_flags | flags, S_IRUSR | S_IWUSR);

if (write_msg_fd > 0) {
/* At server startup actual message sizes used by sockperf client are unknown,
* MAX_PAYLOAD_SIZE is used to ensure client is able to send messages of any supported size. */
write_msg_buf = (uint8_t*)MALLOC(MAX_PAYLOAD_SIZE + s_user_params.write_msg_buf_alignment);
if (write_msg_buf == NULL) {
exit_with_log("Failed to allocate memory for write_msg_buf.", SOCKPERF_ERR_NO_MEMORY);
}
write_msg_buf_aligned = (uint8_t*)DIRECT_IO_BUF_ALIGN(write_msg_buf, s_user_params.write_msg_buf_alignment);
} else {
log_err("Failed to open file: %s.", s_user_params.write_msg_filepath);
exit_with_log("Failed to enable writing messages to file. Exiting sockperf.", SOCKPERF_ERR_FATAL);
}
}
#undef DIRECT_IO_BUF_ALIGN

//------------------------------------------------------------------------------
template <class IoType, class SwitchActivityInfo, class SwitchCheckGaps>
void server_handler(int _fd_min, int _fd_max, int _fd_num) {
Expand Down
45 changes: 45 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ class Server : public ServerBase {
int server_accept(int ifd);

private:
bool b_write_msg_file_init;
FILE *write_msg_file;
int write_msg_fd;
int open_fd_default_flags;
uint8_t *write_msg_buf;
uint8_t *write_msg_buf_aligned;
void setup_write_msg_to_file();
void cleanup_write_msg_to_file();
void open_fd_align_buffer(int flags);

SwitchActivityInfo m_switchActivityInfo;
SwitchCalcGaps m_switchCalcGaps;
};
Expand Down Expand Up @@ -186,6 +196,7 @@ inline bool Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_receive_t
if (ret == RET_SOCKET_SHUTDOWN) {
if (l_fds_ifd->sock_type == SOCK_STREAM) {
close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd);
cleanup_write_msg_to_file();
}
return (do_update);
}
Expand Down Expand Up @@ -223,6 +234,7 @@ inline bool Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_receive_t
print_log("Message received was larger than expected, message ignored.", l_fds_ifd);

close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd);
cleanup_write_msg_to_file();
return (do_update);
}

Expand Down Expand Up @@ -255,6 +267,38 @@ inline bool Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_receive_t
hexdump(m_pMsgReply->getBuf(), MsgHeader::EFFECTIVE_SIZE);
#endif /* LOG_TRACE_MSG_IN */

/* if requested persist each message to file */
if (s_user_params.b_write_msg_to_file) {

uint8_t *msg = m_pMsgReply->getBuf();
int msg_len = m_pMsgReply->getLength();

if (write_msg_file) {
assert(s_user_params.write_msg_file_flags_opt == DEFAULT_FILE_FLAGS_OPT);

size_t wb = fwrite(msg, sizeof(uint8_t), msg_len, write_msg_file);

if (wb != (size_t)msg_len) {
log_err("Failed to write complete message to file stream - wrote %zu of %d bytes.", wb, msg_len);
exit_with_log("Failed to write message to file stream.", SOCKPERF_ERR_FATAL);
}

if (fflush(write_msg_file) != 0) {
exit_with_log("Failed to flush file stream.", SOCKPERF_ERR_FATAL);
}
} else {
assert(s_user_params.write_msg_file_flags_opt != DEFAULT_FILE_FLAGS_OPT);

memcpy(write_msg_buf_aligned, msg, msg_len);
ssize_t wb = write(write_msg_fd, write_msg_buf_aligned, msg_len);

if (wb != msg_len) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, cast msg_len to size_t as done in if (write_msg_file){ ... } above.

log_err("Failed to write complete message to file descriptor - wrote %zu of %d bytes.", wb, msg_len);
exit_with_log("Failed to write message to file descriptor.", SOCKPERF_ERR_FATAL);
}
}
}

if (g_b_exit) return (!do_update);
if (!m_pMsgReply->isClient()) {
/* 6: shift to start of cycle buffer in case receiving buffer is empty and
Expand Down Expand Up @@ -322,6 +366,7 @@ inline bool Server<IoType, SwitchActivityInfo, SwitchCalcGaps>::server_receive_t
if (unlikely(ret == RET_SOCKET_SHUTDOWN)) {
if (l_fds_ifd->sock_type == SOCK_STREAM) {
close_ifd(l_fds_ifd->next_fd, ifd, l_fds_ifd);
cleanup_write_msg_to_file();
}
return (do_update);
}
Expand Down
72 changes: 72 additions & 0 deletions src/sockperf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,23 @@ static int proc_mode_server(int id, int argc, const char **argv) {
"Set maximum message size that the server can receive <size> bytes (default 65507)." },
{ 'g', AOPT_NOARG, aopt_set_literal('g'),
aopt_set_string("gap-detection"), "Enable gap-detection." },
{ OPT_WRITE_MSG_FILE_PATH,
AOPT_ARG,
aopt_set_literal('o'),
aopt_set_string("output-file"),
"Specify path to a file for sockperf server to write received messages to." },
{ OPT_WRITE_MSG_FILE_FLAGS,
AOPT_ARG,
aopt_set_literal(0),
aopt_set_string("write-flags"),
"Flags to use to open the file, where received messages will be written to (default 0). "
" Accepted values: 0: No flags, use file stream; 1: D_SYNC; 2: (O_DIRECT | O_DSYNC) " },
{ OPT_WRITE_MSG_BUF_ALIGN,
AOPT_ARG,
aopt_set_literal(0),
aopt_set_string("write-align"),
"Buffer alignment, required for O_DIRECT (default 512). "
"Specified value needs to be a power of two, greater or equal to 512." },
{ 0, AOPT_NOARG, aopt_set_literal(0), aopt_set_string(NULL), NULL }
};

Expand Down Expand Up @@ -1466,6 +1483,57 @@ static int proc_mode_server(int id, int argc, const char **argv) {
if (!rc && aopt_check(server_obj, 'g')) {
s_user_params.b_server_detect_gaps = true;
}

if (!rc && aopt_check(server_obj, OPT_WRITE_MSG_FILE_PATH)) {
/* Writing messages to a file is currently only supported for a single-threaded sockperf server */
if (s_user_params.mthread_server || aopt_check(server_obj, OPT_THREADS_NUM)) {
log_msg("Writing messages to file (option '-o <filepath>') is not supported for multi-threaded server");
rc = SOCKPERF_ERR_UNSUPPORTED;
} else {
const char *optarg = aopt_value(server_obj, OPT_WRITE_MSG_FILE_PATH);
if (optarg) {
strncpy(s_user_params.write_msg_filepath, optarg, MAX_ARGV_SIZE);
s_user_params.write_msg_filepath[MAX_PATH_LENGTH - 1] = '\0';

s_user_params.b_write_msg_to_file = true;
} else {
log_msg("'-%c' Valid path to output file is required, when writing messages to file", 'o');
rc = SOCKPERF_ERR_BAD_ARGUMENT;
}
}
}

if (!rc && aopt_check(server_obj, OPT_WRITE_MSG_FILE_FLAGS)) {
const char *optarg = aopt_value(server_obj, OPT_WRITE_MSG_FILE_FLAGS);
long int value = strtol(optarg, NULL, 0);

/* 0 - write to file through a file stream
* 1 - open_fd_default_flags | O_DSYNC
* 2 - open_fd_default_flags | O_DIRECT | O_DSYNC */
if (value < 0 || value > 2) {
log_err("Invalid sync flag value: %s . Setting the value to default: %d", optarg, DEFAULT_FILE_FLAGS_OPT);
s_user_params.write_msg_file_flags_opt = DEFAULT_FILE_FLAGS_OPT;
} else {
s_user_params.write_msg_file_flags_opt = value;

/* When not using a file stream to write messages to file, check that Direct IO (O_DIRECT)
* requirements for userspace buffer alignment are met. */
if (value != DEFAULT_FILE_FLAGS_OPT) {
if (aopt_check(server_obj, OPT_WRITE_MSG_BUF_ALIGN)) {
const char *optarg = aopt_value(server_obj, OPT_WRITE_MSG_BUF_ALIGN);
long int alignment = strtol(optarg, NULL, 0);

if (alignment >= DEFAULT_BUF_ALIGNMENT && ((alignment & (alignment - 1)) == 0)) {
Copy link
Contributor

@ChrisCoe ChrisCoe Feb 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Maybe add a small comment here that we are checking for powers of 2? Fine either way.

s_user_params.write_msg_buf_alignment = alignment;
} else {
log_msg("Invalid buffer alignment value: %s . Setting the value to default: %d bytes",
optarg, DEFAULT_BUF_ALIGNMENT);
/* The default alignment has already been set at args init, no need to assign again. */
}
}
}
}
}
}

if (rc) {
Expand Down Expand Up @@ -2221,6 +2289,10 @@ void set_defaults() {
s_user_params.dummy_mps = 0;
memset(s_user_params.feedfile_name, 0, sizeof(s_user_params.feedfile_name));
s_user_params.tos = 0x00;
s_user_params.b_write_msg_to_file = false;
memset(s_user_params.write_msg_filepath, 0, sizeof(s_user_params.write_msg_filepath));
s_user_params.write_msg_file_flags_opt = 0;
s_user_params.write_msg_buf_alignment = DEFAULT_BUF_ALIGNMENT;
}

//------------------------------------------------------------------------------
Expand Down