Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue: 1050049 Move poll_os logic to the internal thread #445

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ Example:
VMA DETAILS: Rx Byte Min Limit 65536 [VMA_RX_BYTES_MIN]
VMA DETAILS: Rx Poll Loops 100000 [VMA_RX_POLL]
VMA DETAILS: Rx Poll Init Loops 0 [VMA_RX_POLL_INIT]
VMA DETAILS: Rx UDP Poll OS Ratio 100 [VMA_RX_UDP_POLL_OS_RATIO]
VMA DETAILS: HW TS Conversion 3 [VMA_HW_TS_CONVERSION]
VMA DETAILS: Rx Poll Yield Disabled [VMA_RX_POLL_YIELD]
VMA DETAILS: Rx Prefetch Bytes 256 [VMA_RX_PREFETCH_BYTES]
Expand Down Expand Up @@ -500,17 +499,6 @@ Once the first ADD_MEMBERSHIP is called the above VMA_RX_POLL takes effect.
Value range is similar to the above VMA_RX_POLL
Default value is 0

VMA_RX_UDP_POLL_OS_RATIO
The above parameter will define the ratio between VMA CQ poll and OS FD poll.
This will result in a single poll of the not-offloaded sockets every
VMA_RX_UDP_POLL_OS_RATIO offloaded socket (CQ) polls. No matter if the CQ poll
was a hit or miss. No matter if the socket is blocking or non-blocking.
When disabled, only offloaded sockets are polled.
This parameter replaces the two old parameters: VMA_RX_POLL_OS_RATIO and
VMA_RX_SKIP_OS
Disable with 0
Default value is 100

VMA_HW_TS_CONVERSION
The above parameter defines the time stamp conversion method.
Experimental verbs is required for converting the time stamp from hardware time (Hz)
Expand Down Expand Up @@ -635,9 +623,9 @@ Disable with 0
Default value is 10

VMA_SELECT_SKIP_OS
Similar to VMA_RX_SKIP_OS, but in select() or poll() this will force the VMA
to check the non offloaded fd even though an offloaded socket has ready
packets found while polling.
This will force the VMA to check the non offloaded fd even though an offloaded
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change

socket has ready packets found while polling.
This will affect select() or poll().
Default value is 4

VMA_PROGRESS_ENGINE_INTERVAL
Expand Down
5 changes: 0 additions & 5 deletions src/vma/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,6 @@ void print_vma_global_settings()
VLOG_PARAM_NUMBER("Rx Byte Min Limit", safe_mce_sys().rx_ready_byte_min_limit, MCE_DEFAULT_RX_BYTE_MIN_LIMIT, SYS_VAR_RX_BYTE_MIN_LIMIT);
VLOG_PARAM_NUMBER("Rx Poll Loops", safe_mce_sys().rx_poll_num, MCE_DEFAULT_RX_NUM_POLLS, SYS_VAR_RX_NUM_POLLS);
VLOG_PARAM_NUMBER("Rx Poll Init Loops", safe_mce_sys().rx_poll_num_init, MCE_DEFAULT_RX_NUM_POLLS_INIT, SYS_VAR_RX_NUM_POLLS_INIT);
if (safe_mce_sys().rx_udp_poll_os_ratio) {
VLOG_PARAM_NUMBER("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO);
} else {
VLOG_PARAM_STRING("Rx UDP Poll OS Ratio", safe_mce_sys().rx_udp_poll_os_ratio, MCE_DEFAULT_RX_UDP_POLL_OS_RATIO, SYS_VAR_RX_UDP_POLL_OS_RATIO, "Disabled");
}

VLOG_PARAM_NUMBER("HW TS Conversion", safe_mce_sys().hw_ts_conversion_mode, MCE_DEFAULT_HW_TS_CONVERSION_MODE, SYS_VAR_HW_TS_CONVERSION_MODE);

Expand Down
6 changes: 6 additions & 0 deletions src/vma/sock/fd_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ inline cls* fd_collection::get(int fd, cls **map_type)

inline bool fd_collection::set_immediate_os_sample(int fd)
{
socket_fd_api* sock;
epfd_info* epfd_fd;
ring_tap* p_ring;

Expand All @@ -250,6 +251,11 @@ inline bool fd_collection::set_immediate_os_sample(int fd)
return true;
}

if ((sock = get_sockfd(fd))) {
sock->set_immediate_os_sample();
return true;
}

if ((epfd_fd = get_epfd(fd))){
epfd_fd->set_os_data_available();
return true;
Expand Down
122 changes: 62 additions & 60 deletions src/vma/sock/sockinfo_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,20 @@ inline int sockinfo_udp::poll_os()
int ret;
uint64_t pending_data = 0;

m_rx_udp_poll_os_ratio_counter = 0;
ret = orig_os_api.ioctl(m_fd, FIONREAD, &pending_data);
if (unlikely(ret == -1)) {
if (pending_data > 0) {
return pending_data;
}

// No data is available
unset_immediate_os_sample();

if (ret < 0) {
m_p_socket_stats->counters.n_rx_os_errors++;
si_udp_logdbg("orig_os_api.ioctl returned with error in polling loop (errno=%d %m)", errno);
return -1;
}
if (pending_data > 0) {
m_p_socket_stats->counters.n_rx_poll_os_hit++;
return 1;
}
return 0;

return ret;
}

inline int sockinfo_udp::rx_wait(bool blocking)
Expand All @@ -122,31 +124,26 @@ inline int sockinfo_udp::rx_wait(bool blocking)
epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];
uint64_t poll_sn;

m_loops_timer.start();
m_loops_timer.start();

while (loops_to_go) {

// Multi-thread polling support - let other threads have a go on this CPU
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
if ((m_n_sysvar_rx_poll_yield_loops > 0) && ((loops++ % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1))) {
sched_yield();
}

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
ret = poll_os();
if ((ret == -1) || (ret == 1)) {
return ret;
}
if (m_b_os_data_available) {
// OS data might be available
return 1;
}

// Poll cq for offloaded ready packets ...
m_rx_udp_poll_os_ratio_counter++;
if (is_readable(&poll_sn)) {
m_p_socket_stats->counters.n_rx_poll_hit++;
return 0;
}

loops++;
if (!blocking || m_n_sysvar_rx_poll_num != -1) {
loops_to_go--;
}
Expand Down Expand Up @@ -264,7 +261,8 @@ inline int sockinfo_udp::rx_wait(bool blocking)

// Check if OS fd is ready for reading
if (fd == m_fd) {
m_rx_udp_poll_os_ratio_counter = 0;
// OS data might be available
set_immediate_os_sample();
return 1;
}

Expand Down Expand Up @@ -334,15 +332,13 @@ sockinfo_udp::sockinfo_udp(int fd):
,m_b_mc_tx_loop(safe_mce_sys().tx_mc_loopback_default) // default value is 'true'. User can change this with config parameter SYS_VAR_TX_MC_LOOPBACK
,m_n_mc_ttl(DEFAULT_MC_TTL)
,m_loops_to_go(safe_mce_sys().rx_poll_num_init) // Start up with a init polling loops value
,m_rx_udp_poll_os_ratio_counter(0)
,m_sock_offload(true)
,m_mc_num_grp_with_src_filter(0)
,m_port_map_lock("sockinfo_udp::m_ports_map_lock")
,m_port_map_index(0)
,m_p_last_dst_entry(NULL)
,m_tos(0)
,m_n_sysvar_rx_poll_yield_loops(safe_mce_sys().rx_poll_yield_loops)
,m_n_sysvar_rx_udp_poll_os_ratio(safe_mce_sys().rx_udp_poll_os_ratio)
,m_n_sysvar_rx_ready_byte_min_limit(safe_mce_sys().rx_ready_byte_min_limit)
,m_n_sysvar_rx_cq_drain_rate_nsec(safe_mce_sys().rx_cq_drain_rate_nsec)
,m_n_sysvar_rx_delta_tsc_between_cq_polls(safe_mce_sys().rx_delta_tsc_between_cq_polls)
Expand All @@ -351,6 +347,7 @@ sockinfo_udp::sockinfo_udp(int fd):
,m_sockopt_mapped(false)
,m_is_connected(false)
,m_multicast(false)
,m_b_os_data_available(false)
{
si_udp_logfunc("");

Expand All @@ -372,17 +369,17 @@ sockinfo_udp::sockinfo_udp(int fd):
rx_ready_byte_count_limit_update(n_so_rcvbuf_bytes);

epoll_event ev = {0, {0}};

ev.events = EPOLLIN;

// Add the user's orig fd to the rx epfd handle
ev.data.fd = m_fd;
ev.data.fd = m_fd; // Add the user's orig fd to the rx epfd handle

BULLSEYE_EXCLUDE_BLOCK_START
if (unlikely(orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev)))
si_udp_logpanic("failed to add user's fd to internal epfd errno=%d (%m)", errno);
BULLSEYE_EXCLUDE_BLOCK_END

// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

si_udp_logfunc("done");
}

Expand Down Expand Up @@ -1356,30 +1353,25 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,

return_reuse_buffers_postponed();

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
if ((m_n_sysvar_rx_udp_poll_os_ratio > 0) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
// Check for nonoffloaded data if m_b_os_data_available is true
if (m_b_os_data_available) {
ret = poll_os();
if (ret == -1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
goto out;
}
if (ret == 1) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
if (ret > 0) {
goto os;
}
if (unlikely(ret < 0)) {
goto out;
}
}

// Drop lock to not starve other threads
m_lock_rcv.unlock();

// First check if we have a packet in the ready list
if ((m_n_rx_pkt_ready_list_count > 0 && m_n_sysvar_rx_cq_drain_rate_nsec == MCE_RX_CQ_DRAIN_RATE_DISABLED)
|| is_readable(&poll_sn)) {
/* coverity[double_lock] TODO: RM#1049980 */
m_lock_rcv.lock();
m_rx_udp_poll_os_ratio_counter++;
if (m_n_rx_pkt_ready_list_count > 0) {
// Found a ready packet in the list
if (__msg) handle_cmsg(__msg);
Expand Down Expand Up @@ -1409,20 +1401,28 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
m_lock_rcv.unlock();
goto wait;
}
}
else if (unlikely(rx_wait_ret < 0)) {
} else if (unlikely(rx_wait_ret < 0)) {
// Got < 0, means an error occurred
ret = rx_wait_ret;
goto out;
} // else - packet in OS

// Else, check for nonoffloaded data - rx_wait() returned 1.
ret = poll_os();
if (ret == 0) {
m_lock_rcv.unlock();
goto wait;
}
if (unlikely(ret < 0)) {
goto out;
}

/*
* If we got here, either the socket is not offloaded or rx_wait() returned 1.
*/
os:
if (in_flags & MSG_VMA_ZCOPY_FORCE) {
// Enable the next non-blocked read to check the OS
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
errno = EIO;
ret = -1;
goto out;
Expand All @@ -1436,10 +1436,9 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
ret = socket_fd_api::rx_os(call_type, p_iov, sz_iov, in_flags, __from, __fromlen, __msg);
*p_flags = in_flags;
save_stats_rx_os(ret);
if (ret > 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
if (ret <= 0) {
// Do not poll the os fd on the next rx() call.
unset_immediate_os_sample();
}

out:
Expand Down Expand Up @@ -1480,16 +1479,17 @@ void sockinfo_udp::handle_ip_pktinfo(struct cmsg_state * cm_state)
insert_cmsg(cm_state, IPPROTO_IP, IP_PKTINFO, &in_pktinfo, sizeof(struct in_pktinfo));
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::set_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
m_b_os_data_available = true;
}

// This function is relevant only for non-blocking socket
void sockinfo_udp::unset_immediate_os_sample()
{
m_rx_udp_poll_os_ratio_counter = 0;
m_b_os_data_available = false;

// Reassign EPOLLIN event
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
}

bool sockinfo_udp::is_readable(uint64_t *p_poll_sn, fd_array_t* p_fd_ready_array)
Expand Down Expand Up @@ -1809,7 +1809,6 @@ int sockinfo_udp::rx_verify_available_data()
if (ret >= 0) {
// This will cause the next non-blocked read to check the OS again.
// We do this only after a successful read.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
ret = pending_data;
}
} else if (errno == EAGAIN) {
Expand Down Expand Up @@ -2142,12 +2141,9 @@ void sockinfo_udp::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ri
si_udp_logdbg("");
sockinfo::rx_add_ring_cb(flow_key, p_ring, is_migration);

//Now that we got at least 1 CQ attached enable the skip os mechanism.
m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;

// Now that we got at least 1 CQ attached start polling the CQs
if (m_b_blocking) {
m_loops_to_go = m_n_sysvar_rx_poll_num;
m_loops_to_go = m_n_sysvar_rx_poll_num;
}
else {
m_loops_to_go = 1; // Force single CQ poll in case of non-blocking socket
Expand Down Expand Up @@ -2610,24 +2606,30 @@ int sockinfo_udp::get_socket_tx_ring_fd(struct sockaddr *to, socklen_t tolen)
return res;
}

mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list(){
mem_buf_desc_t* sockinfo_udp::get_front_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.front();
}

size_t sockinfo_udp::get_size_m_rx_pkt_ready_list(){
size_t sockinfo_udp::get_size_m_rx_pkt_ready_list()
{
return m_rx_pkt_ready_list.size();
}

void sockinfo_udp::pop_front_m_rx_pkt_ready_list(){
void sockinfo_udp::pop_front_m_rx_pkt_ready_list()
{
m_rx_pkt_ready_list.pop_front();
}

void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff){
void sockinfo_udp::push_back_m_rx_pkt_ready_list(mem_buf_desc_t* buff)
{
m_rx_pkt_ready_list.push_back(buff);
}

bool sockinfo_udp::prepare_to_close(bool process_shutdown) {
bool sockinfo_udp::prepare_to_close(bool process_shutdown)
{
m_lock_rcv.lock();
g_p_event_handler_manager->update_epfd(m_fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
do_wakeup();
m_lock_rcv.unlock();
NOT_IN_USE(process_shutdown);
Expand Down
Loading