diff --git a/src/client/xclient_messenger.cpp b/src/client/xclient_messenger.cpp index b113ac7..2d6deda 100644 --- a/src/client/xclient_messenger.cpp +++ b/src/client/xclient_messenger.cpp @@ -47,4 +47,4 @@ namespace xeus m_heartbeat_controller.send(stop_msg, zmq::send_flags::none); (void)m_heartbeat_controller.recv(response); } -} \ No newline at end of file +} diff --git a/src/client/xclient_zmq_impl.cpp b/src/client/xclient_zmq_impl.cpp index 9190762..f6c6db6 100644 --- a/src/client/xclient_zmq_impl.cpp +++ b/src/client/xclient_zmq_impl.cpp @@ -36,7 +36,11 @@ namespace xeus // Has to be in the cpp because incomplete // types are used in unique_ptr in the header - xclient_zmq_impl::~xclient_zmq_impl() = default; + xclient_zmq_impl::~xclient_zmq_impl() + { + m_iopub_thread.join(); + m_heartbeat_thread.join(); + } void xclient_zmq_impl::send_on_shell(xmessage msg) { diff --git a/src/client/xheartbeat_client.cpp b/src/client/xheartbeat_client.cpp index f84e3e9..b7a7327 100644 --- a/src/client/xheartbeat_client.cpp +++ b/src/client/xheartbeat_client.cpp @@ -7,6 +7,8 @@ * The full license is in the file LICENSE, distributed with this software. * ****************************************************************************/ +#include + #include "xheartbeat_client.hpp" #include "xclient_zmq_impl.hpp" #include "../common/xmiddleware_impl.hpp" @@ -23,6 +25,7 @@ namespace xeus , m_max_retry(max_retry) , m_heartbeat_timeout(timeout) , m_heartbeat_end_point("") + , m_request_stop(false) { m_heartbeat_end_point = get_end_point(config.m_transport, config.m_ip, config.m_hb_port); m_heartbeat.connect(m_heartbeat_end_point); @@ -42,9 +45,35 @@ namespace xeus bool xheartbeat_client::wait_for_answer(long timeout) { - m_heartbeat.set(zmq::sockopt::linger, static_cast(timeout)); - zmq::message_t response; - return m_heartbeat.recv(response).has_value(); + zmq::pollitem_t items[] = { + { m_heartbeat, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 } + }; + + zmq::poll(&items[0], 2, std::chrono::milliseconds(timeout)); + try + { + if (items[0].revents & ZMQ_POLLIN) + { + zmq::multipart_t wire_msg; + wire_msg.recv(m_heartbeat); + } + + if (items[1].revents & ZMQ_POLLIN) + { + // stop message + zmq::multipart_t wire_msg; + wire_msg.recv(m_controller); + wire_msg.send(m_controller); + m_request_stop = true; + } + + return true; + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } + return false; } void xheartbeat_client::register_kernel_status_listener(const kernel_status_listener& l) @@ -59,10 +88,9 @@ namespace xeus void xheartbeat_client::run() { - bool stop = false; std::size_t retry_count = 0; - while(!stop) + while(!m_request_stop) { send_heartbeat_message(); if(!wait_for_answer(m_heartbeat_timeout)) @@ -74,7 +102,7 @@ namespace xeus else { notify_kernel_dead(true); - stop = true; + break; } } else diff --git a/src/client/xheartbeat_client.hpp b/src/client/xheartbeat_client.hpp index f2500b0..0d1c228 100644 --- a/src/client/xheartbeat_client.hpp +++ b/src/client/xheartbeat_client.hpp @@ -48,7 +48,8 @@ namespace xeus const long m_heartbeat_timeout; std::string m_heartbeat_end_point; + bool m_request_stop; }; } -#endif \ No newline at end of file +#endif diff --git a/src/client/xiopub_client.cpp b/src/client/xiopub_client.cpp index 00158ef..5127b6f 100644 --- a/src/client/xiopub_client.cpp +++ b/src/client/xiopub_client.cpp @@ -59,7 +59,6 @@ namespace xeus void xiopub_client::run() { - zmq::multipart_t wire_msg; zmq::pollitem_t items[] = { { m_iopub, 0, ZMQ_POLLIN, 0 }, { m_controller, 0, ZMQ_POLLIN, 0 } }; @@ -71,6 +70,7 @@ namespace xeus { if (items[0].revents & ZMQ_POLLIN) { + zmq::multipart_t wire_msg; wire_msg.recv(m_iopub); xpub_message msg = p_client_impl->deserialize_iopub(wire_msg); { @@ -80,15 +80,11 @@ namespace xeus } if (items[1].revents & ZMQ_POLLIN) { + // stop message + zmq::multipart_t wire_msg; wire_msg.recv(m_controller); - if (wire_msg.size() > 0) - { - std::string received_msg = wire_msg.at(0).to_string(); - if (received_msg == "stop") - { - break; - } - } + wire_msg.send(m_controller); + break; } } catch (std::exception& e)