Skip to content

Commit

Permalink
issue: 1050049 Move poll_os logic to the internal thread
Browse files Browse the repository at this point in the history
By default VMA checks the os fd once for 100 cq polls (VMA_RX_UDP_POLL_OS_RATIO)
in order to receive UDP packets with size > MTU.
This commit moves this logic to the internal thread which uses
epoll_wait() to indicates that non-offloaded data is available.
This commit removes VMA_RX_UDP_POLL_OS_RATIO parameter.

Signed-off-by: Liran Oz <[email protected]>
  • Loading branch information
Liran Oz committed Sep 17, 2017
1 parent d2c8f24 commit b7e0ca4
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 122 deletions.
20 changes: 4 additions & 16 deletions README.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Update: 13 Sep 2017
Update: 17 Sep 2017

Introduction
============
Expand Down Expand Up @@ -132,7 +132,6 @@ Example:
VMA DETAILS: Rx Byte Min Limit 65536 [VMA_RX_BYTES_MIN]
VMA DETAILS: Rx Poll Loops 100000 [VMA_RX_POLL]
VMA DETAILS: Rx Poll Init Loops 0 [VMA_RX_POLL_INIT]
VMA DETAILS: Rx UDP Poll OS Ratio 100 [VMA_RX_UDP_POLL_OS_RATIO]
VMA DETAILS: HW TS Conversion 3 [VMA_HW_TS_CONVERSION]
VMA DETAILS: Rx SW CSUM 1 [VMA_RX_SW_CSUM]
VMA DETAILS: Rx Poll Yield Disabled [VMA_RX_POLL_YIELD]
Expand Down Expand Up @@ -481,17 +480,6 @@ Once the first ADD_MEMBERSHIP is called the above VMA_RX_POLL takes effect.
Value range is similar to the above VMA_RX_POLL
Default value is 0

VMA_RX_UDP_POLL_OS_RATIO
The above param will define the ratio between VMA CQ poll and OS FD poll.
This will result in a single poll of the not-offloaded sockets every
VMA_RX_UDP_POLL_OS_RATIO offloaded socket (CQ) polls. No matter if the CQ poll
was a hit or miss. No matter if the socket is blocking or non-blocking.
When disabled, only offloaded sockets are polled.
This parameter replaces the two old parameters: VMA_RX_POLL_OS_RATIO and
VMA_RX_SKIP_OS
Disable with 0
Default value is 100

VMA_HW_TS_CONVERSION
The above param defines the time stamp conversion method.
Experimental verbs is required for converting the time stamp from hardware time (Hz)
Expand Down Expand Up @@ -616,9 +604,9 @@ Disable with 0
Default value is 10

VMA_SELECT_SKIP_OS
Similar to VMA_RX_SKIP_OS, but in select() or poll() this will force the VMA
to check the non offloaded fd even though an offloaded socket has ready
packets found while polling.
This will force the VMA to check the non offloaded fd even though an offloaded
socket has ready packets found while polling.
This will affect select() or poll().
Default value is 4

VMA_PROGRESS_ENGINE_INTERVAL
Expand Down
5 changes: 0 additions & 5 deletions src/vma/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,6 @@ void print_vma_global_settings()
VLOG_PARAM_NUMBER("Rx Byte Min Limit", safe_mce_sys().rx_ready_byte_min_limit, MCE_DEFAULT_RX_BYTE_MIN_LIMIT, SYS_VAR_RX_BYTE_MIN_LIMIT);
VLOG_PARAM_NUMBER("Rx Poll Loops", safe_mce_sys().rx_poll_num, MCE_DEFAULT_RX_NUM_POLLS, SYS_VAR_RX_NUM_POLLS);
VLOG_PARAM_NUMBER("Rx Poll Init Loops", safe_mce_sys().rx_poll_num_init, MCE_DEFAULT_RX_NUM_POLLS_INIT, SYS_VAR_RX_NUM_POLLS_INIT);
if (safe_mce_sys().rx_udp_poll_os_ratio) {
VLOG_PARAM_NUMBER("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO);
} else {
VLOG_PARAM_STRING("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO, "Disabled");
}

VLOG_PARAM_NUMBER("HW TS Conversion", safe_mce_sys().hw_ts_conversion_mode, MCE_DEFAULT_HW_TS_CONVERSION_MODE, SYS_VAR_HW_TS_CONVERSION_MODE);
VLOG_PARAM_NUMBER("Rx SW CSUM", safe_mce_sys().rx_sw_csum, MCE_DEFUALT_RX_SW_CSUM, SYS_VAR_RX_SW_CSUM);
Expand Down
6 changes: 6 additions & 0 deletions src/vma/sock/fd_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,16 @@ inline cls* fd_collection::get(int fd, cls **map_type)

inline bool fd_collection::set_immediate_os_sample(int fd)
{
socket_fd_api* sock;
epfd_info* epfd_fd;

auto_unlocker locker(*this);

if ((sock = get_sockfd(fd))){
sock->set_immediate_os_sample();
return true;
}

if ((epfd_fd = get_epfd(fd))){
epfd_fd->set_os_data_available();
return true;
Expand Down
136 changes: 70 additions & 66 deletions src/vma/sock/sockinfo_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,24 @@ inline void sockinfo_udp::reuse_buffer(mem_buf_desc_t *buff)
}
}

inline int sockinfo_udp::poll_os()
int sockinfo_udp::poll_and_arm_os()
{
int ret;
uint64_t pending_data = 0;

m_rx_udp_poll_os_ratio_counter = 0;
ret = orig_os_api.ioctl(m_fd, FIONREAD, &pending_data);
if (unlikely(ret == -1)) {
int ret = orig_os_api.ioctl(m_fd, FIONREAD, &pending_data);
if (pending_data > 0) {
return pending_data;
}

// No data is available
unset_immediate_os_sample();

if (ret < 0) {
m_p_socket_stats->counters.n_rx_os_errors++;
si_udp_logdbg("orig_os_api.ioctl returned with error in polling loop (errno=%d %m)", errno);
return -1;
}
if (pending_data > 0) {
m_p_socket_stats->counters.n_rx_poll_os_hit++;
return 1;
}
return 0;

return ret;
}

inline int sockinfo_udp::rx_wait(bool blocking)
Expand All @@ -145,31 +146,26 @@ inline int sockinfo_udp::rx_wait(bool blocking)
epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];
uint64_t poll_sn;

m_loops_timer.start();
m_loops_timer.start();

while (loops_to_go) {

// Multi-thread polling support - let other threads have a go on this CPU
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops++ % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
sched_yield();
}

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
ret = poll_os();
if ((ret == -1) || (ret == 1)) {
return ret;
}
if (m_b_os_data_available) {
// OS data might be available
return 1;
}

// Poll cq for offloaded ready packets ...
m_rx_udp_poll_os_ratio_counter++;
if (is_readable(&poll_sn)) {
m_p_socket_stats->counters.n_rx_poll_hit++;
return 0;
}

loops++;
if (!blocking || m_n_sysvar_rx_poll_num != -1) {
loops_to_go--;
}
Expand Down Expand Up @@ -287,7 +283,8 @@ inline int sockinfo_udp::rx_wait(bool blocking)

// Check if OS fd is ready for reading
if (fd == m_fd) {
m_rx_udp_poll_os_ratio_counter = 0;
// OS data might be available
set_immediate_os_sample();
return 1;
}

Expand Down Expand Up @@ -377,7 +374,6 @@ sockinfo_udp::sockinfo_udp(int fd) throw (vma_exception) :
,m_b_mc_tx_loop(safe_mce_sys().tx_mc_loopback_default) // default value is 'true'. User can change this with config parameter SYS_VAR_TX_MC_LOOPBACK
,m_n_mc_ttl(DEFAULT_MC_TTL)
,m_loops_to_go(safe_mce_sys().rx_poll_num_init) // Start up with a init polling loops value
,m_rx_udp_poll_os_ratio_counter(0)
,m_sock_offload(true)
,m_mc_num_grp_with_src_filter(0)
,m_port_map_lock("sockinfo_udp::m_ports_map_lock")
Expand All @@ -388,14 +384,14 @@ sockinfo_udp::sockinfo_udp(int fd) throw (vma_exception) :
,m_b_rcvtstampns(false)
,m_n_tsing_flags(0)
,m_n_sysvar_rx_poll_yield_loops(safe_mce_sys().rx_poll_yield_loops)
,m_n_sysvar_rx_udp_poll_os_ratio(safe_mce_sys().rx_udp_poll_os_ratio)
,m_n_sysvar_rx_ready_byte_min_limit(safe_mce_sys().rx_ready_byte_min_limit)
,m_n_sysvar_rx_cq_drain_rate_nsec(safe_mce_sys().rx_cq_drain_rate_nsec)
,m_n_sysvar_rx_delta_tsc_between_cq_polls(safe_mce_sys().rx_delta_tsc_between_cq_polls)
,m_reuseaddr(false)
,m_sockopt_mapped(false)
,m_is_connected(false)
,m_multicast(false)
,m_b_os_data_available(false)
{
si_udp_logfunc("");

Expand All @@ -417,17 +413,17 @@ sockinfo_udp::sockinfo_udp(int fd) throw (vma_exception) :
rx_ready_byte_count_limit_update(n_so_rcvbuf_bytes);

epoll_event ev = {0, {0}};

ev.events = EPOLLIN;

// Add the user's orig fd to the rx epfd handle
ev.data.fd = m_fd;
ev.data.fd = m_fd; // Add the user's orig fd to the rx epfd handle

BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely(orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev)))
si_udp_logpanic("failed to add user's fd to internal epfd errno=%d (%m)", errno);
BULLSEYE_EXCLUDE_BLOCK_END

// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

si_udp_logfunc("done");
}

Expand Down Expand Up @@ -1415,30 +1411,25 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,

return_reuse_buffers_postponed();

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
ret = poll_os();
if (ret == -1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
goto out;
}
if (ret == 1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
// Check for nonoffloaded data if m_b_os_data_available is true
if (m_b_os_data_available) {
ret = poll_and_arm_os();
if (ret > 0) {
goto os;
}
if (unlikely(ret < 0)) {
goto out;
}
}

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// First check if we have a packet in the ready list
if ((m_n_rx_pkt_ready_list_count > 0 && m_n_sysvar_rx_cq_drain_rate_nsec == MCE_RX_CQ_DRAIN_RATE_DISABLED)
|| is_readable(&poll_sn)) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
m_rx_udp_poll_os_ratio_counter++;
if (m_n_rx_pkt_ready_list_count > 0) {
// Found a ready packet in the list
if (__msg) handle_cmsg(__msg);
Expand All @@ -1464,24 +1455,33 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
if (__msg) handle_cmsg(__msg);
ret = dequeue_packet(p_iov, sz_iov, (sockaddr_in *)__from, __fromlen, in_flags, &out_flags);
goto out;
} else {
m_lock_rcv.unlock();
goto wait;
}
m_lock_rcv.unlock();
goto wait;
}
else if (unlikely(rx_wait_ret < 0)) {

if (unlikely(rx_wait_ret < 0)) {
// Got < 0, means an error occurred
ret = rx_wait_ret;
goto out;
} // else - packet in OS
}

// Else, check for nonoffloaded data - rx_wait() returned 1.
ret = poll_and_arm_os();
if (ret == 0) {
m_lock_rcv.unlock();
goto wait;
}
if (unlikely(ret < 0)) {
goto out;
}

/*
* If we got here, either the socket is not offloaded or rx_wait() returned 1.
*/
os:
if (in_flags & MSG_VMA_ZCOPY_FORCE) {
// Enable the next non-blocked read to check the OS
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
errno = EIO;
ret = -1;
goto out;
Expand All @@ -1495,10 +1495,9 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, in_flags, __from, __fromlen, __msg);
*p_flags = in_flags;
save_stats_rx_os(ret);
if (ret > 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
if (ret <= 0) {
// Do not poll the os fd on the next rx() call.
unset_immediate_os_sample();
}

out:
Expand Down Expand Up @@ -1633,16 +1632,16 @@ void sockinfo_udp::handle_cmsg(struct msghdr * msg)
cm_state.mhdr->msg_controllen = cm_state.cmsg_bytes_consumed;
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::set_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
m_b_os_data_available = true;
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::unset_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = 0;
// Reassign EPOLLIN event
m_b_os_data_available = false;
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
}

bool sockinfo_udp::is_readable(uint64_t *p_poll_sn, fd_array_t* p_fd_ready_array)
Expand Down Expand Up @@ -1952,7 +1951,6 @@ int sockinfo_udp::rx_verify_available_data()
if (ret >= 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
ret = pending_data;
}
}
Expand Down Expand Up @@ -2297,12 +2295,12 @@ void sockinfo_udp::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ri
si_udp_logdbg("");
sockinfo::rx_add_ring_cb(flow_key, p_ring, is_migration);

//Now that we got at least 1 CQ attached enable the skip os mechanism.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
// Now that we got at least 1 CQ attached enable poll os for available data.
set_immediate_os_sample();

// Now that we got at least 1 CQ attached start polling the CQs
if (m_b_blocking) {
m_loops_to_go = m_n_sysvar_rx_poll_num;
m_loops_to_go = m_n_sysvar_rx_poll_num;
}
else {
m_loops_to_go = 1; // Force single CQ poll in case of non-blocking socket
Expand Down Expand Up @@ -2742,24 +2740,30 @@ size_t sockinfo_udp::handle_msg_trunc(size_t total_rx, size_t payload_size, int
return total_rx;
}

mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list(){
mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.front();
}

size_t sockinfo_udp::get_size_m_rx_pkt_ready_list(){
size_t sockinfo_udp::get_size_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.size();
}

void sockinfo_udp::pop_front_m_rx_pkt_ready_list(){
void sockinfo_udp::pop_front_m_rx_pkt_ready_list()
{
m_rx_pkt_ready_list.pop_front();
}

void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff){
void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff)
{
m_rx_pkt_ready_list.push_back(buff);
}

bool sockinfo_udp::prepare_to_close(bool process_shutdown) {
bool sockinfo_udp::prepare_to_close(bool process_shutdown)
{
m_lock_rcv.lock();
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
do_wakeup();
m_lock_rcv.unlock();
NOT_IN_USE(process_shutdown);
Expand Down
Loading

0 comments on commit b7e0ca4

Please sign in to comment.