Skip to content

Commit

Permalink
issue: 1117626 Move epoll poll_os logic to the internal thread
Browse files Browse the repository at this point in the history
This commit is not based on udp poll os commit (pr Mellanox#445).

Signed-off-by: Liran Oz <[email protected]>
  • Loading branch information
Liran Oz committed Sep 4, 2017
1 parent a18c0ad commit f0adf45
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 46 deletions.
22 changes: 13 additions & 9 deletions src/vma/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rdma/rdma_cma.h>
#include <vma/dev/net_device_table_mgr.h>
#include "vma/dev/ring_allocation_logic.h"
#include "vma/sock/fd_collection.h"
#include "vma/sock/sock-redirect.h" // calling orig_os_api.epoll()
#include "vma/util/verbs_extra.h"

Expand Down Expand Up @@ -395,11 +396,11 @@ void event_handler_manager::stop_thread()
m_epfd = -1;
}

void event_handler_manager::update_epfd(int fd, int operation)
void event_handler_manager::update_epfd(int fd, int operation, int events)
{
epoll_event ev = {0, {0}};

ev.events = EPOLLIN | EPOLLPRI;
ev.events = events;
ev.data.fd = fd;
BULLSEYE_EXCLUDE_BLOCK_START
if (orig_os_api.epoll_ctl(m_epfd, operation, fd, &ev) < 0) {
Expand Down Expand Up @@ -520,7 +521,7 @@ void event_handler_manager::priv_register_ibverbs_events(ibverbs_reg_info_t& inf

priv_prepare_ibverbs_async_event_queue(i);

update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
evh_logdbg("%d added to event_handler_map_t!", info.fd);
}
BULLSEYE_EXCLUDE_BLOCK_START
Expand Down Expand Up @@ -582,7 +583,7 @@ void event_handler_manager::priv_unregister_ibverbs_events(ibverbs_reg_info_t& i

i->second.ibverbs_ev.ev_map.erase(j);
if (n == 1) {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
m_event_handler_map.erase(i);
evh_logdbg("%d erased from event_handler_map_t!", info.fd);
}
Expand All @@ -607,7 +608,7 @@ void event_handler_manager::priv_register_rdma_cm_events(rdma_cm_reg_info_t& inf
/* cppcheck-suppress uninitStructMember */
m_event_handler_map[info.fd] = map_value;

update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
}
else {
BULLSEYE_EXCLUDE_BLOCK_START
Expand Down Expand Up @@ -651,7 +652,7 @@ void event_handler_manager::priv_unregister_rdma_cm_events(rdma_cm_reg_info_t& i
iter_fd->second.rdma_cm_ev.map_rdma_cm_id.erase(iter_id);
iter_fd->second.rdma_cm_ev.n_ref_count--;
if (iter_fd->second.rdma_cm_ev.n_ref_count == 0) {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
m_event_handler_map.erase(iter_fd);
evh_logdbg("Removed channel <%d %p>", info.fd, info.id);
}
Expand Down Expand Up @@ -679,7 +680,7 @@ void event_handler_manager::priv_register_command_events(command_reg_info_t& inf
/* coverity[uninit_use_in_call] */
/* cppcheck-suppress uninitStructMember */
m_event_handler_map[info.fd] = map_value;
update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
}

}
Expand All @@ -696,7 +697,7 @@ void event_handler_manager::priv_unregister_command_events(command_reg_info_t& i
evh_logdbg(" This fd (%d) no longer COMMAND type fd", info.fd);
}
else {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
}
}

Expand Down Expand Up @@ -954,7 +955,10 @@ void* event_handler_manager::thread_loop()

event_handler_map_t::iterator i = m_event_handler_map.find(fd);
if (i == m_event_handler_map.end()) {
evh_logdbg("No event handler (fd=%d)", fd);
// No event handler - this is probably a poll_os event!
if (!g_p_fd_collection->set_immediate_os_sample(fd)) {
evh_logdbg("No event handler (fd=%d)", fd);
}
continue;
}

Expand Down
9 changes: 5 additions & 4 deletions src/vma/event/event_handler_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ struct reg_action_t {

typedef std::deque<struct reg_action_t> reg_action_q_t;

enum {
enum ev_type {
EV_IBVERBS,
EV_RDMA_CM,
EV_COMMAND,
};


struct event_data_t {
int type;
ev_type type;
ibverbs_ev_t ibverbs_ev;
rdma_cm_ev_t rdma_cm_ev;
command_ev_t command_ev;
Expand Down Expand Up @@ -174,11 +174,13 @@ class event_handler_manager : public wakeup_pipe
void* thread_loop();
void stop_thread();

void update_epfd(int fd, int operation, int events);

private:
pthread_t m_event_handler_tid;
bool m_b_continue_running;
int m_cq_epfd;
int m_epfd;
int m_epfd;

// pipe for the event registration handling
reg_action_q_t m_reg_action_q;
Expand Down Expand Up @@ -209,7 +211,6 @@ class event_handler_manager : public wakeup_pipe
void process_ibverbs_event(event_handler_map_t::iterator &i);
void process_rdma_cm_event(event_handler_map_t::iterator &i);
int start_thread();
void update_epfd(int fd, int operation);

void event_channel_post_process_for_rdma_events(void* p_event);
void* event_channel_pre_process_for_rdma_events(void* p_event_channel_handle, void** p_event);
Expand Down
34 changes: 33 additions & 1 deletion src/vma/iomux/epfd_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ int epfd_info::remove_fd_from_epoll_os(int fd)
}

epfd_info::epfd_info(int epfd, int size) :
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"), m_sysvar_thread_mode(safe_mce_sys().thread_mode)
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"),
m_sysvar_thread_mode(safe_mce_sys().thread_mode), m_b_os_data_available(false)
{
__log_funcall("");
int max_sys_fd = get_sys_max_fd_num();
Expand Down Expand Up @@ -82,6 +83,9 @@ epfd_info::epfd_info(int epfd, int size) :

vma_stats_instance_create_epoll_block(m_epfd, &(m_stats->stats));

// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

wakeup_set_epoll_fd(m_epfd);
}

Expand Down Expand Up @@ -120,6 +124,9 @@ epfd_info::~epfd_info()
}
BULLSEYE_EXCLUDE_BLOCK_END
}

g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

unlock();

vma_stats_instance_remove_epoll_block(&m_stats->stats);
Expand Down Expand Up @@ -765,3 +772,28 @@ void epfd_info::statistics_print(vlog_levels_t log_level /* = VLOG_DEBUG */)
}
}
}

void epfd_info::set_immediate_os_sample()
{
lock();
m_b_os_data_available = true;
unlock();
}

void epfd_info::unset_immediate_os_sample()
{
lock();
// Reassign EPOLLIN event
m_b_os_data_available = false;
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
unlock();
}

bool epfd_info::is_os_data_available() {
lock();
bool ret = m_b_os_data_available;
m_b_os_data_available = false;
unlock();
return ret;
}

7 changes: 7 additions & 0 deletions src/vma/iomux/epfd_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,15 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake

void statistics_print(vlog_levels_t log_level = VLOG_DEBUG);

// Instructing the socket to immediately sample/un-sample the OS in receive flow
void set_immediate_os_sample();
void unset_immediate_os_sample();

static inline size_t epfd_info_node_offset(void) {return NODE_OFFSET(epfd_info, epfd_info_node);}
list_node<epfd_info, epfd_info::epfd_info_node_offset> epfd_info_node;

bool is_os_data_available();

private:

const int m_epfd;
Expand All @@ -120,6 +126,7 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake
epoll_stats_t m_local_stats;
epoll_stats_t *m_stats;
int m_log_invalid_events;
bool m_b_os_data_available; // true when not offloaded data is available

int add_fd(int fd, epoll_event *event);
int del_fd(int fd, bool passthrough = false);
Expand Down
37 changes: 37 additions & 0 deletions src/vma/iomux/epoll_wait_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,43 @@ bool epoll_wait_call::handle_epoll_event(bool is_ready, uint32_t events, socket_

}

bool epoll_wait_call::handle_os_countdown(int &poll_os_countdown)
{
NOT_IN_USE(poll_os_countdown);

if (!m_epfd_info->is_os_data_available()) {
return false;
}

/*
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
* This also handles the 0 ratio case - do not poll OS at all.
*/
bool cq_ready = wait_os(true);

m_epfd_info->unset_immediate_os_sample();

if (cq_ready) {
// This will empty the cqepfd
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
}
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
* below after calling check_all_offloaded_sockets();
* IMPORTANT : We cannot do an opposite with current code,
* means we cannot poll cq and then poll os (for epoll) - because poll os
* will delete ready offloaded fds.
*/
if (m_n_all_ready_fds) {
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
ring_poll_and_process_element(&m_poll_sn, NULL);
check_all_offloaded_sockets(&m_poll_sn);
return true;
}

return false;
}

int epoll_wait_call::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array/* = NULL*/)
{
return m_epfd_info->ring_poll_and_process_element(p_poll_sn, pv_fd_ready_array);
Expand Down
2 changes: 2 additions & 0 deletions src/vma/iomux/epoll_wait_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class epoll_wait_call : public io_mux_call

virtual int ring_wait_for_notification_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array = NULL);

virtual bool handle_os_countdown(int &poll_os_countdown);

private:
bool _wait(int timeout);

Expand Down
69 changes: 37 additions & 32 deletions src/vma/iomux/io_mux_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,45 @@ void io_mux_call::check_offloaded_rsockets(uint64_t *p_poll_sn)
//return false;
}

bool io_mux_call::handle_os_countdown(int &poll_os_countdown)
{
/*
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
* This also handles the 0 ratio case - do not poll OS at all.
*/
if (poll_os_countdown-- == 0 && m_n_sysvar_select_poll_os_ratio > 0) {
bool cq_ready = wait_os(true);
if (cq_ready) {
// This will empty the cqepfd
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
}
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
* below after calling check_all_offloaded_sockets();
* IMPORTANT : We cannot do an opposite with current code,
* means we cannot poll cq and then poll os (for epoll) - because poll os
* will delete ready offloaded fds.
*/
if (m_n_all_ready_fds) {

m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
ring_poll_and_process_element(&m_poll_sn, NULL);
check_all_offloaded_sockets(&m_poll_sn);
return true;
}
poll_os_countdown = m_n_sysvar_select_poll_os_ratio - 1;
}

return false;
}

void io_mux_call::polling_loops()
{
int poll_counter;
int check_timer_countdown;
int check_timer_countdown = 1; // Poll once before checking the time
int poll_os_countdown;
bool multiple_polling_loops, finite_polling;
timeval before_polling_timer = TIMEVAL_INITIALIZER, after_polling_timer = TIMEVAL_INITIALIZER, delta;
int delta_time; // in usec

prepare_to_poll();

Expand All @@ -262,9 +293,6 @@ void io_mux_call::polling_loops()
TAKE_T_POLL_START;
ZERO_POLL_COUNT;
#endif

// Poll once before checking the time
check_timer_countdown = 1;

/*
* Give OS priority in 1 of SELECT_SKIP_OS times
Expand Down Expand Up @@ -308,31 +336,9 @@ void io_mux_call::polling_loops()
poll_os_countdown, m_n_sysvar_select_poll_os_ratio, check_timer_countdown, *m_p_num_all_offloaded_fds,
m_n_all_ready_fds, m_n_ready_rfds, m_n_ready_wfds, m_n_ready_efds, multiple_polling_loops);

/*
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
* This also handles the 0 ratio case - do not poll OS at all.
*/
if (poll_os_countdown-- == 0 && m_n_sysvar_select_poll_os_ratio > 0) {
bool cq_ready = wait_os(true);
if (cq_ready) {
// This will empty the cqepfd
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
}
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
* below after calling check_all_offloaded_sockets();
* IMPORTANT : We cannot do an opposite with current code,
* means we cannot poll cq and then poll os (for epoll) - because poll os
* will delete ready offloaded fds.
*/
if (m_n_all_ready_fds) {

m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
ring_poll_and_process_element(&m_poll_sn, NULL);
check_all_offloaded_sockets(&m_poll_sn);
break;
}
poll_os_countdown = m_n_sysvar_select_poll_os_ratio - 1;
// TODO explain break
if (handle_os_countdown(poll_os_countdown)) {
break;
}

/*
Expand Down Expand Up @@ -385,8 +391,7 @@ void io_mux_call::polling_loops()

//calc accumulated polling time
tv_sub(&after_polling_timer, &before_polling_timer, &delta);
delta_time=tv_to_usec(&delta);
g_polling_time_usec += delta_time ;
g_polling_time_usec += tv_to_usec(&delta);

zero_polling_cpu(after_polling_timer);
}
Expand Down
2 changes: 2 additions & 0 deletions src/vma/iomux/io_mux_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class io_mux_call

virtual int ring_wait_for_notification_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array = NULL);

virtual bool handle_os_countdown(int &poll_os_countdown);

/// Pointer to an array of all offloaded fd's
int *m_p_all_offloaded_fds;
offloaded_mode_t *m_p_offloaded_modes;
Expand Down
Loading

0 comments on commit f0adf45

Please sign in to comment.