Skip to content

Commit

Permalink
issue: 1050049 Move poll_os logic to the internal thread
Browse files Browse the repository at this point in the history
By default VMA checks the os fd once for 100 cq polls (VMA_RX_UDP_POLL_OS_RATIO)
in order to receive UDP packets with size > MTU.
This commit moves this logic to the internal thread which uses
epoll_wait() to indicates that non-offloaded data is available.
This commit removes VMA_RX_UDP_POLL_OS_RATIO parameter.

Signed-off-by: Liran Oz <[email protected]>
  • Loading branch information
Liran Oz committed Aug 21, 2018
1 parent 1b98638 commit 310a745
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 113 deletions.
18 changes: 3 additions & 15 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ Example:
VMA DETAILS: Rx Byte Min Limit 65536 [VMA_RX_BYTES_MIN]
VMA DETAILS: Rx Poll Loops 100000 [VMA_RX_POLL]
VMA DETAILS: Rx Poll Init Loops 0 [VMA_RX_POLL_INIT]
VMA DETAILS: Rx UDP Poll OS Ratio 100 [VMA_RX_UDP_POLL_OS_RATIO]
VMA DETAILS: HW TS Conversion 3 [VMA_HW_TS_CONVERSION]
VMA DETAILS: Rx Poll Yield Disabled [VMA_RX_POLL_YIELD]
VMA DETAILS: Rx Prefetch Bytes 256 [VMA_RX_PREFETCH_BYTES]
Expand Down Expand Up @@ -500,17 +499,6 @@ Once the first ADD_MEMBERSHIP is called the above VMA_RX_POLL takes effect.
Value range is similar to the above VMA_RX_POLL
Default value is 0

VMA_RX_UDP_POLL_OS_RATIO
The above parameter will define the ratio between VMA CQ poll and OS FD poll.
This will result in a single poll of the not-offloaded sockets every
VMA_RX_UDP_POLL_OS_RATIO offloaded socket (CQ) polls. No matter if the CQ poll
was a hit or miss. No matter if the socket is blocking or non-blocking.
When disabled, only offloaded sockets are polled.
This parameter replaces the two old parameters: VMA_RX_POLL_OS_RATIO and
VMA_RX_SKIP_OS
Disable with 0
Default value is 100

VMA_HW_TS_CONVERSION
The above parameter defines the time stamp conversion method.
Experimental verbs is required for converting the time stamp from hardware time (Hz)
Expand Down Expand Up @@ -635,9 +623,9 @@ Disable with 0
Default value is 10

VMA_SELECT_SKIP_OS
Similar to VMA_RX_SKIP_OS, but in select() or poll() this will force the VMA
to check the non offloaded fd even though an offloaded socket has ready
packets found while polling.
This will force the VMA to check the non offloaded fd even though an offloaded
socket has ready packets found while polling.
This will affect select() or poll().
Default value is 4

VMA_PROGRESS_ENGINE_INTERVAL
Expand Down
5 changes: 0 additions & 5 deletions src/vma/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,6 @@ void print_vma_global_settings()
VLOG_PARAM_NUMBER("Rx Byte Min Limit", safe_mce_sys().rx_ready_byte_min_limit, MCE_DEFAULT_RX_BYTE_MIN_LIMIT, SYS_VAR_RX_BYTE_MIN_LIMIT);
VLOG_PARAM_NUMBER("Rx Poll Loops", safe_mce_sys().rx_poll_num, MCE_DEFAULT_RX_NUM_POLLS, SYS_VAR_RX_NUM_POLLS);
VLOG_PARAM_NUMBER("Rx Poll Init Loops", safe_mce_sys().rx_poll_num_init, MCE_DEFAULT_RX_NUM_POLLS_INIT, SYS_VAR_RX_NUM_POLLS_INIT);
if (safe_mce_sys().rx_udp_poll_os_ratio) {
VLOG_PARAM_NUMBER("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO);
} else {
VLOG_PARAM_STRING("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO, "Disabled");
}

VLOG_PARAM_NUMBER("HW TS Conversion", safe_mce_sys().hw_ts_conversion_mode, MCE_DEFAULT_HW_TS_CONVERSION_MODE, SYS_VAR_HW_TS_CONVERSION_MODE);

Expand Down
6 changes: 6 additions & 0 deletions src/vma/sock/fd_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ inline cls* fd_collection::get(int fd, cls **map_type)

inline bool fd_collection::set_immediate_os_sample(int fd)
{
socket_fd_api* sock;
epfd_info* epfd_fd;
ring_tap* p_ring;

Expand All @@ -250,6 +251,11 @@ inline bool fd_collection::set_immediate_os_sample(int fd)
return true;
}

if ((sock = get_sockfd(fd))) {
sock->set_immediate_os_sample();
return true;
}

if ((epfd_fd = get_epfd(fd))){
epfd_fd->set_os_data_available();
return true;
Expand Down
122 changes: 62 additions & 60 deletions src/vma/sock/sockinfo_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,20 @@ inline int sockinfo_udp::poll_os()
int ret;
uint64_t pending_data = 0;

m_rx_udp_poll_os_ratio_counter = 0;
ret = orig_os_api.ioctl(m_fd, FIONREAD, &pending_data);
if (unlikely(ret == -1)) {
if (pending_data > 0) {
return pending_data;
}

// No data is available
unset_immediate_os_sample();

if (ret < 0) {
m_p_socket_stats->counters.n_rx_os_errors++;
si_udp_logdbg("orig_os_api.ioctl returned with error in polling loop (errno=%d %m)", errno);
return -1;
}
if (pending_data > 0) {
m_p_socket_stats->counters.n_rx_poll_os_hit++;
return 1;
}
return 0;

return ret;
}

inline int sockinfo_udp::rx_wait(bool blocking)
Expand All @@ -152,31 +154,26 @@ inline int sockinfo_udp::rx_wait(bool blocking)
epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];
uint64_t poll_sn;

m_loops_timer.start();
m_loops_timer.start();

while (loops_to_go) {

// Multi-thread polling support - let other threads have a go on this CPU
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops++ % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
sched_yield();
}

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
ret = poll_os();
if ((ret == -1) || (ret == 1)) {
return ret;
}
if (m_b_os_data_available) {
// OS data might be available
return 1;
}

// Poll cq for offloaded ready packets ...
m_rx_udp_poll_os_ratio_counter++;
if (is_readable(&poll_sn)) {
m_p_socket_stats->counters.n_rx_poll_hit++;
return 0;
}

loops++;
if (!blocking || m_n_sysvar_rx_poll_num != -1) {
loops_to_go--;
}
Expand Down Expand Up @@ -294,7 +291,8 @@ inline int sockinfo_udp::rx_wait(bool blocking)

// Check if OS fd is ready for reading
if (fd == m_fd) {
m_rx_udp_poll_os_ratio_counter = 0;
// OS data might be available
set_immediate_os_sample();
return 1;
}

Expand Down Expand Up @@ -385,7 +383,6 @@ sockinfo_udp::sockinfo_udp(int fd):
,m_b_mc_tx_loop(safe_mce_sys().tx_mc_loopback_default) // default value is 'true'. User can change this with config parameter SYS_VAR_TX_MC_LOOPBACK
,m_n_mc_ttl(DEFAULT_MC_TTL)
,m_loops_to_go(safe_mce_sys().rx_poll_num_init) // Start up with a init polling loops value
,m_rx_udp_poll_os_ratio_counter(0)
,m_sock_offload(true)
,m_mc_num_grp_with_src_filter(0)
,m_port_map_lock("sockinfo_udp::m_ports_map_lock")
Expand All @@ -397,7 +394,6 @@ sockinfo_udp::sockinfo_udp(int fd):
,m_n_tsing_flags(0)
,m_tos(0)
,m_n_sysvar_rx_poll_yield_loops(safe_mce_sys().rx_poll_yield_loops)
,m_n_sysvar_rx_udp_poll_os_ratio(safe_mce_sys().rx_udp_poll_os_ratio)
,m_n_sysvar_rx_ready_byte_min_limit(safe_mce_sys().rx_ready_byte_min_limit)
,m_n_sysvar_rx_cq_drain_rate_nsec(safe_mce_sys().rx_cq_drain_rate_nsec)
,m_n_sysvar_rx_delta_tsc_between_cq_polls(safe_mce_sys().rx_delta_tsc_between_cq_polls)
Expand All @@ -406,6 +402,7 @@ sockinfo_udp::sockinfo_udp(int fd):
,m_sockopt_mapped(false)
,m_is_connected(false)
,m_multicast(false)
,m_b_os_data_available(false)
{
si_udp_logfunc("");

Expand All @@ -427,17 +424,17 @@ sockinfo_udp::sockinfo_udp(int fd):
rx_ready_byte_count_limit_update(n_so_rcvbuf_bytes);

epoll_event ev = {0, {0}};

ev.events = EPOLLIN;

// Add the user's orig fd to the rx epfd handle
ev.data.fd = m_fd;
ev.data.fd = m_fd; // Add the user's orig fd to the rx epfd handle

BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely(orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev)))
si_udp_logpanic("failed to add user's fd to internal epfd errno=%d (%m)", errno);
BULLSEYE_EXCLUDE_BLOCK_END

// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

si_udp_logfunc("done");
}

Expand Down Expand Up @@ -1426,30 +1423,25 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,

return_reuse_buffers_postponed();

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
// Check for nonoffloaded data if m_b_os_data_available is true
if (m_b_os_data_available) {
ret = poll_os();
if (ret == -1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
goto out;
}
if (ret == 1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
if (ret > 0) {
goto os;
}
if (unlikely(ret < 0)) {
goto out;
}
}

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// First check if we have a packet in the ready list
if ((m_n_rx_pkt_ready_list_count > 0 && m_n_sysvar_rx_cq_drain_rate_nsec == MCE_RX_CQ_DRAIN_RATE_DISABLED)
|| is_readable(&poll_sn)) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
m_rx_udp_poll_os_ratio_counter++;
if (m_n_rx_pkt_ready_list_count > 0) {
// Found a ready packet in the list
if (__msg) handle_cmsg(__msg);
Expand Down Expand Up @@ -1479,20 +1471,28 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
m_lock_rcv.unlock();
goto wait;
}
}
else if (unlikely(rx_wait_ret < 0)) {
} else if (unlikely(rx_wait_ret < 0)) {
// Got < 0, means an error occurred
ret = rx_wait_ret;
goto out;
} // else - packet in OS

// Else, check for nonoffloaded data - rx_wait() returned 1.
ret = poll_os();
if (ret == 0) {
m_lock_rcv.unlock();
goto wait;
}
if (unlikely(ret < 0)) {
goto out;
}

/*
* If we got here, either the socket is not offloaded or rx_wait() returned 1.
*/
os:
if (in_flags & MSG_VMA_ZCOPY_FORCE) {
// Enable the next non-blocked read to check the OS
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
errno = EIO;
ret = -1;
goto out;
Expand All @@ -1506,10 +1506,9 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, in_flags, __from, __fromlen, __msg);
*p_flags = in_flags;
save_stats_rx_os(ret);
if (ret > 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
if (ret <= 0) {
// Do not poll the os fd on the next rx() call.
unset_immediate_os_sample();
}

out:
Expand Down Expand Up @@ -1644,16 +1643,17 @@ void sockinfo_udp::handle_cmsg(struct msghdr * msg)
cm_state.mhdr->msg_controllen = cm_state.cmsg_bytes_consumed;
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::set_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
m_b_os_data_available = true;
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::unset_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = 0;
m_b_os_data_available = false;

// Reassign EPOLLIN event
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
}

bool sockinfo_udp::is_readable(uint64_t *p_poll_sn, fd_array_t* p_fd_ready_array)
Expand Down Expand Up @@ -1973,7 +1973,6 @@ int sockinfo_udp::rx_verify_available_data()
if (ret >= 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
ret = pending_data;
}
} else if (errno == EAGAIN) {
Expand Down Expand Up @@ -2328,12 +2327,9 @@ void sockinfo_udp::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ri
si_udp_logdbg("");
sockinfo::rx_add_ring_cb(flow_key, p_ring, is_migration);

//Now that we got at least 1 CQ attached enable the skip os mechanism.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;

// Now that we got at least 1 CQ attached start polling the CQs
if (m_b_blocking) {
m_loops_to_go = m_n_sysvar_rx_poll_num;
m_loops_to_go = m_n_sysvar_rx_poll_num;
}
else {
m_loops_to_go = 1; // Force single CQ poll in case of non-blocking socket
Expand Down Expand Up @@ -2787,24 +2783,30 @@ int sockinfo_udp::get_socket_tx_ring_fd(struct sockaddr *to, socklen_t tolen)
return res;
}

mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list(){
mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.front();
}

size_t sockinfo_udp::get_size_m_rx_pkt_ready_list(){
size_t sockinfo_udp::get_size_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.size();
}

void sockinfo_udp::pop_front_m_rx_pkt_ready_list(){
void sockinfo_udp::pop_front_m_rx_pkt_ready_list()
{
m_rx_pkt_ready_list.pop_front();
}

void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff){
void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff)
{
m_rx_pkt_ready_list.push_back(buff);
}

bool sockinfo_udp::prepare_to_close(bool process_shutdown) {
bool sockinfo_udp::prepare_to_close(bool process_shutdown)
{
m_lock_rcv.lock();
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
do_wakeup();
m_lock_rcv.unlock();
NOT_IN_USE(process_shutdown);
Expand Down
Loading

0 comments on commit 310a745

Please sign in to comment.