Skip to content

Commit

Permalink
Running heartbeat client in its own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
anutosh491 committed Mar 27, 2024
1 parent 1b030c0 commit 5915d81
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 5 deletions.
18 changes: 17 additions & 1 deletion src/xclient_zmq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace xeus
, m_shell_client(context, config.m_transport, config.m_ip, config.m_shell_port)
, m_control_client(context, config.m_transport, config.m_ip, config.m_control_port)
, m_iopub_client(context, config)
, m_heartbeat_client(context, config)
, p_messenger(context)
, m_error_handler(eh)
{
Expand Down Expand Up @@ -93,6 +94,11 @@ namespace xeus
m_iopub_listener = l;
}

void xclient_zmq_impl::register_heartbeat_listener(const listener& l)
{
m_heartbeat_listener = l;
}

void xclient_zmq_impl::connect()
{
p_messenger.connect();
Expand All @@ -118,6 +124,11 @@ namespace xeus
m_iopub_listener(std::move(msg));
}

void xclient_zmq_impl::notify_heartbeat_listener(xmessage msg)
{
m_heartbeat_listener(std::move(msg));
}

void xclient_zmq_impl::poll(long timeout)
{
zmq::multipart_t wire_msg;
Expand Down Expand Up @@ -166,14 +177,19 @@ namespace xeus
void xclient_zmq_impl::start()
{
start_iopub_thread();
// TODO : Introduce a client, xheartbeat_client that runs on its own thread, m_heartbeat_thread.
start_heartbeat_thread();
}

void xclient_zmq_impl::start_iopub_thread()
{
m_iopub_thread = std::move(xthread(&xiopub_client::run, p_iopub_client.get()));
}

void xclient_zmq_impl::start_heartbeat_thread()
{
m_heartbeat_thread = std::move(xthread(&xheartbeat_client::run, p_heartbeat_client.get()));
}

xmessage xclient_zmq_impl::deserialize(zmq::multipart_t& wire_msg) const
{
return xzmq_serializer::deserialize(wire_msg, *p_auth);
Expand Down
11 changes: 9 additions & 2 deletions src/xclient_zmq_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "xdealer_channel.hpp"
#include "xiopub_client.hpp"
#include "xheartbeat_client.hpp"
#include "xclient_messenger.hpp"

namespace xeus
Expand All @@ -31,6 +32,7 @@ namespace xeus
{
public:
using iopub_client_ptr = std::unique_ptr<xiopub_client>;
using heartbeat_client_ptr = std::unique_ptr<xheartbeat_client>;
using listener = std::function<void(xmessage)>;

xclient_zmq_impl(zmq::context_t& context,
Expand Down Expand Up @@ -60,8 +62,8 @@ namespace xeus
std::optional<xmessage> pop_iopub_message();
void register_iopub_listener(const listener& l);

// hearbeat channel
// TODO
// heartbeat channel
void register_heartbeat_listener(const listener& l);

// client messenger
void connect();
Expand All @@ -70,6 +72,7 @@ namespace xeus
void notify_shell_listener(xmessage msg);
void notify_control_listener(xmessage msg);
void notify_iopub_listener(xmessage msg);
void notify_heartbeat_listener(xmessage msg);

void wait_for_message();
void start();
Expand All @@ -86,6 +89,7 @@ namespace xeus
xdealer_channel m_shell_client;
xdealer_channel m_control_client;
xiopub_client m_iopub_client;
xheartbeat_client m_heartbeat_client;

xclient_messenger p_messenger;

Expand All @@ -94,10 +98,13 @@ namespace xeus
listener m_shell_listener;
listener m_control_listener;
listener m_iopub_listener;
listener m_heartbeat_listener;

iopub_client_ptr p_iopub_client;
heartbeat_client_ptr p_heartbeat_client;

xthread m_iopub_thread;
xthread m_heartbeat_thread;
};
}

Expand Down
23 changes: 22 additions & 1 deletion src/xheartbeat_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,25 @@
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/
****************************************************************************/

#include "xheartbeat_client.hpp"

#include "xeus-zmq/xmiddleware.hpp"

namespace xeus
{

xheartbeat_client::xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config)
: m_heartbeat(context, zmq::socket_type::sub)
, m_controller(context, zmq::socket_type::rep)
{
m_heartbeat.connect(get_end_point(config.m_transport, config.m_ip, config.m_hb_port));
init_socket(m_controller, get_controller_end_point("heartbeat"));
}

xheartbeat_client::~xheartbeat_client()
{
}
}
34 changes: 33 additions & 1 deletion src/xheartbeat_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,36 @@
* Distributed under the terms of the BSD 3-Clause License. *
* *
* The full license is in the file LICENSE, distributed with this software. *
****************************************************************************/
****************************************************************************/

#ifndef XEUS_HEARTBEAT_CLIENT_HPP
#define XEUS_HEARTBEAT_CLIENT_HPP

#include "zmq.hpp"

#include "xeus/xkernel_configuration.hpp"

namespace xeus
{
class xclient_zmq_impl;

class xheartbeat_client
{
public:

xheartbeat_client(zmq::context_t& context,
const xeus::xconfiguration& config);

~xheartbeat_client();

void run();

private:
zmq::socket_t m_iopub;
zmq::socket_t m_controller;

xclient_zmq_impl* p_client_impl;
};
}

#endif

0 comments on commit 5915d81

Please sign in to comment.