Skip to content

Commit

Permalink
issue: 1117626 Move epoll's poll_os logic to the internal thread
Browse files Browse the repository at this point in the history
In order to maintain a high level of performance for non-offloaded
sockets, VMA frequently checks whether there are readable non-offloaded
sockets.
Those checks are made from the context of the application which hurts
the latency.
This commit moves those checks to the internal thread by registering
each epoll_fd to VMA internal thread's epoll_fd.
From now on, VMA_SELECT_POLL_OS_RATIO and VMA_SELECT_SKIP_OS will not
affect epoll_wait().

Signed-off-by: Liran Oz <[email protected]>
  • Loading branch information
Liran Oz committed Sep 17, 2017
1 parent fd6b082 commit d2c8f24
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 89 deletions.
14 changes: 7 additions & 7 deletions README.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Update: 03 Sep 2017
Update: 13 Sep 2017

Introduction
============
Expand Down Expand Up @@ -607,18 +607,18 @@ Default value is 0

VMA_SELECT_POLL_OS_RATIO
This will enable polling of the OS file descriptors while user thread calls
select(), poll() or epoll_wait() and the VMA is busy in the offloaded sockets
polling loop. This will result in a signle poll of the not-offloaded sockets
every VMA_SELECT_POLL_RATIO offlaoded sockets (CQ) polls.
select() or poll() and the VMA is busy in the offloaded sockets polling loop.
This will result in a signle poll of the not-offloaded sockets every
VMA_SELECT_POLL_RATIO offlaoded sockets (CQ) polls.
When disabled, only offlaoded sockets are polled.
(See VMA_SELECT_POLL for more info)
Disable with 0
Default value is 10

VMA_SELECT_SKIP_OS
Similar to VMA_RX_SKIP_OS, but in select(), poll() or epoll_wait() this will
force the VMA to check the non offloaded fd even though an offloaded socket
has ready packets found while polling.
Similar to VMA_RX_SKIP_OS, but in select() or poll() this will force the VMA
to check the non offloaded fd even though an offloaded socket has ready
packets found while polling.
Default value is 4

VMA_PROGRESS_ENGINE_INTERVAL
Expand Down
22 changes: 13 additions & 9 deletions src/vma/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rdma/rdma_cma.h>
#include <vma/dev/net_device_table_mgr.h>
#include "vma/dev/ring_allocation_logic.h"
#include "vma/sock/fd_collection.h"
#include "vma/sock/sock-redirect.h" // calling orig_os_api.epoll()
#include "vma/util/verbs_extra.h"

Expand Down Expand Up @@ -395,11 +396,11 @@ void event_handler_manager::stop_thread()
m_epfd = -1;
}

void event_handler_manager::update_epfd(int fd, int operation)
void event_handler_manager::update_epfd(int fd, int operation, int events)
{
epoll_event ev = {0, {0}};

ev.events = EPOLLIN | EPOLLPRI;
ev.events = events;
ev.data.fd = fd;
BULLSEYE_EXCLUDE_BLOCK_START
if (orig_os_api.epoll_ctl(m_epfd, operation, fd, &ev) < 0) {
Expand Down Expand Up @@ -520,7 +521,7 @@ void event_handler_manager::priv_register_ibverbs_events(ibverbs_reg_info_t& inf

priv_prepare_ibverbs_async_event_queue(i);

update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
evh_logdbg("%d added to event_handler_map_t!", info.fd);
}
BULLSEYE_EXCLUDE_BLOCK_START
Expand Down Expand Up @@ -582,7 +583,7 @@ void event_handler_manager::priv_unregister_ibverbs_events(ibverbs_reg_info_t& i

i->second.ibverbs_ev.ev_map.erase(j);
if (n == 1) {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
m_event_handler_map.erase(i);
evh_logdbg("%d erased from event_handler_map_t!", info.fd);
}
Expand All @@ -607,7 +608,7 @@ void event_handler_manager::priv_register_rdma_cm_events(rdma_cm_reg_info_t& inf
/* cppcheck-suppress uninitStructMember */
m_event_handler_map[info.fd] = map_value;

update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
}
else {
BULLSEYE_EXCLUDE_BLOCK_START
Expand Down Expand Up @@ -651,7 +652,7 @@ void event_handler_manager::priv_unregister_rdma_cm_events(rdma_cm_reg_info_t& i
iter_fd->second.rdma_cm_ev.map_rdma_cm_id.erase(iter_id);
iter_fd->second.rdma_cm_ev.n_ref_count--;
if (iter_fd->second.rdma_cm_ev.n_ref_count == 0) {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
m_event_handler_map.erase(iter_fd);
evh_logdbg("Removed channel <%d %p>", info.fd, info.id);
}
Expand Down Expand Up @@ -679,7 +680,7 @@ void event_handler_manager::priv_register_command_events(command_reg_info_t& inf
/* coverity[uninit_use_in_call] */
/* cppcheck-suppress uninitStructMember */
m_event_handler_map[info.fd] = map_value;
update_epfd(info.fd, EPOLL_CTL_ADD);
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
}

}
Expand All @@ -696,7 +697,7 @@ void event_handler_manager::priv_unregister_command_events(command_reg_info_t& i
evh_logdbg(" This fd (%d) no longer COMMAND type fd", info.fd);
}
else {
update_epfd(info.fd, EPOLL_CTL_DEL);
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
}
}

Expand Down Expand Up @@ -954,7 +955,10 @@ void* event_handler_manager::thread_loop()

event_handler_map_t::iterator i = m_event_handler_map.find(fd);
if (i == m_event_handler_map.end()) {
evh_logdbg("No event handler (fd=%d)", fd);
// No event handler - this is probably a poll_os event!
if (!g_p_fd_collection->set_immediate_os_sample(fd)) {
evh_logdbg("No event handler (fd=%d)", fd);
}
continue;
}

Expand Down
9 changes: 5 additions & 4 deletions src/vma/event/event_handler_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ struct reg_action_t {

typedef std::deque<struct reg_action_t> reg_action_q_t;

enum {
enum ev_type {
EV_IBVERBS,
EV_RDMA_CM,
EV_COMMAND,
};


struct event_data_t {
int type;
ev_type type;
ibverbs_ev_t ibverbs_ev;
rdma_cm_ev_t rdma_cm_ev;
command_ev_t command_ev;
Expand Down Expand Up @@ -174,11 +174,13 @@ class event_handler_manager : public wakeup_pipe
void* thread_loop();
void stop_thread();

void update_epfd(int fd, int operation, int events);

private:
pthread_t m_event_handler_tid;
bool m_b_continue_running;
int m_cq_epfd;
int m_epfd;
int m_epfd;

// pipe for the event registration handling
reg_action_q_t m_reg_action_q;
Expand Down Expand Up @@ -209,7 +211,6 @@ class event_handler_manager : public wakeup_pipe
void process_ibverbs_event(event_handler_map_t::iterator &i);
void process_rdma_cm_event(event_handler_map_t::iterator &i);
int start_thread();
void update_epfd(int fd, int operation);

void event_channel_post_process_for_rdma_events(void* p_event);
void* event_channel_pre_process_for_rdma_events(void* p_event_channel_handle, void** p_event);
Expand Down
33 changes: 32 additions & 1 deletion src/vma/iomux/epfd_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ int epfd_info::remove_fd_from_epoll_os(int fd)
}

epfd_info::epfd_info(int epfd, int size) :
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"), m_sysvar_thread_mode(safe_mce_sys().thread_mode)
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"),
m_lock_poll_os("epfd_lock_poll_os"), m_sysvar_thread_mode(safe_mce_sys().thread_mode),
m_b_os_data_available(false)
{
__log_funcall("");
int max_sys_fd = get_sys_max_fd_num();
Expand Down Expand Up @@ -82,6 +84,9 @@ epfd_info::epfd_info(int epfd, int size) :

vma_stats_instance_create_epoll_block(m_epfd, &(m_stats->stats));

// Register this socket to read nonoffloaded data
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

wakeup_set_epoll_fd(m_epfd);
}

Expand Down Expand Up @@ -120,6 +125,9 @@ epfd_info::~epfd_info()
}
BULLSEYE_EXCLUDE_BLOCK_END
}

g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);

unlock();

vma_stats_instance_remove_epoll_block(&m_stats->stats);
Expand Down Expand Up @@ -765,3 +773,26 @@ void epfd_info::statistics_print(vlog_levels_t log_level /* = VLOG_DEBUG */)
}
}
}

void epfd_info::set_os_data_available()
{
auto_unlocker locker(m_lock_poll_os);
m_b_os_data_available = true;
}

void epfd_info::register_to_internal_thread()
{
auto_unlocker locker(m_lock_poll_os);
m_b_os_data_available = false;

// Reassign EPOLLIN event
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
}

bool epfd_info::get_and_unset_os_data_available()
{
auto_unlocker locker(m_lock_poll_os);
bool ret = m_b_os_data_available;
m_b_os_data_available = false;
return ret;
}
15 changes: 15 additions & 0 deletions src/vma/iomux/epfd_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake

void statistics_print(vlog_levels_t log_level = VLOG_DEBUG);

// Called from the internal thread to mark that non offloaded data is available.
void set_os_data_available();

// Register this epfd to the internal thread, Called after non offloaded data has been received.
void register_to_internal_thread();

// Thread safe function which returns true if non offloaded data is available.
// Will also set m_b_os_data_available to false.
bool get_and_unset_os_data_available();

// Returns true if non offloaded data is available.
inline bool get_os_data_available() {return m_b_os_data_available;}

static inline size_t epfd_info_node_offset(void) {return NODE_OFFSET(epfd_info, epfd_info_node);}
list_node<epfd_info, epfd_info::epfd_info_node_offset> epfd_info_node;

Expand All @@ -115,11 +128,13 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake
fd_info_list_t m_fd_offloaded_list;
ring_map_t m_ring_map;
lock_mutex_recursive m_ring_map_lock;
lock_spin m_lock_poll_os;
const thread_mode_t m_sysvar_thread_mode;
ready_cq_fd_q_t m_ready_cq_fd_q;
epoll_stats_t m_local_stats;
epoll_stats_t *m_stats;
int m_log_invalid_events;
bool m_b_os_data_available; // true when non offloaded data is available

int add_fd(int fd, epoll_event *event);
int del_fd(int fd, bool passthrough = false);
Expand Down
44 changes: 38 additions & 6 deletions src/vma/iomux/epoll_wait_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ epoll_wait_call::~epoll_wait_call()
{
}

void epoll_wait_call::prepare_to_poll()
{
// Empty
}

void epoll_wait_call::prepare_to_block()
{
// Empty
Expand Down Expand Up @@ -335,8 +330,9 @@ bool epoll_wait_call::check_all_offloaded_sockets(uint64_t *p_poll_sn)
return m_n_all_ready_fds;
}

bool epoll_wait_call::immidiate_return()
bool epoll_wait_call::immidiate_return(int &poll_os_countdown)
{
NOT_IN_USE(poll_os_countdown);
return false;
}

Expand Down Expand Up @@ -364,6 +360,42 @@ bool epoll_wait_call::handle_epoll_event(bool is_ready, uint32_t events, socket_

}

bool epoll_wait_call::handle_os_countdown(int &poll_os_countdown)
{
NOT_IN_USE(poll_os_countdown);

if (!m_epfd_info->get_os_data_available() || !m_epfd_info->get_and_unset_os_data_available()) {
return false;
}

/*
* Poll OS when the internal thread found non offloaded data.
*/
bool cq_ready = wait_os(true);

m_epfd_info->register_to_internal_thread();

if (cq_ready) {
// This will empty the cqepfd
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
}
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
* below after calling check_all_offloaded_sockets();
* IMPORTANT : We cannot do an opposite with current code,
* means we cannot poll cq and then poll os (for epoll) - because poll os
* will delete ready offloaded fds.
*/
if (m_n_all_ready_fds) {
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
ring_poll_and_process_element(&m_poll_sn, NULL);
check_all_offloaded_sockets(&m_poll_sn);
return true;
}

return false;
}

int epoll_wait_call::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array/* = NULL*/)
{
return m_epfd_info->ring_poll_and_process_element(p_poll_sn, pv_fd_ready_array);
Expand Down
7 changes: 3 additions & 4 deletions src/vma/iomux/epoll_wait_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ class epoll_wait_call : public io_mux_call
/// @override
virtual void set_offloaded_rfd_ready(int fd_index);
virtual void set_offloaded_wfd_ready(int fd_index);

/// @override
virtual void prepare_to_poll();

/// @override
virtual void prepare_to_block();
Expand Down Expand Up @@ -95,7 +92,7 @@ class epoll_wait_call : public io_mux_call
virtual void unlock();

/// @override
virtual bool immidiate_return();
virtual bool immidiate_return(int &poll_os_countdown);

/// @override
virtual bool check_all_offloaded_sockets(uint64_t *p_poll_sn);
Expand All @@ -113,6 +110,8 @@ class epoll_wait_call : public io_mux_call

virtual int ring_wait_for_notification_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array = NULL);

virtual bool handle_os_countdown(int &poll_os_countdown);

private:
bool _wait(int timeout);

Expand Down
Loading

0 comments on commit d2c8f24

Please sign in to comment.