Skip to content

Commit

Permalink
use sndmore flag
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Aug 13, 2024
1 parent 4c0698a commit 44a64d2
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/zmq/ZMQBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ kage::Result<bool> ZMQProxy::forwardOutput(hg_id_t rpc_id, const char* input, si
zmq::message_t header_msg{&header, sizeof(header)};
zmq::message_t input_msg{input, input_size};

m_pub_socket.send(header_msg, zmq::send_flags::none);
m_pub_socket.send(header_msg, zmq::send_flags::sndmore);
m_pub_socket.send(input_msg, zmq::send_flags::none);

context.ev.wait();
Expand Down Expand Up @@ -154,35 +154,35 @@ void ZMQProxy::runPollingLoop() {

// Check if the socket has an incoming message
if (items[0].revents & ZMQ_POLLIN) {
zmq::message_t header_msg;
zmq::message_t data_msg;
zmq::message_t msg;

// Receive message from the other endpoint
m_sub_socket.recv(header_msg, zmq::recv_flags::none);
m_sub_socket.recv(data_msg, zmq::recv_flags::none);
m_sub_socket.recv(msg, zmq::recv_flags::none);
MessageHeader header;
memcpy(&header, msg.data(), sizeof(header));

MessageHeader* header = static_cast<MessageHeader*>(header_msg.data());
const char* data = static_cast<const char*>(data_msg.data());
size_t data_size = data_msg.size();
m_sub_socket.recv(msg, zmq::recv_flags::none);
const char* data = static_cast<const char*>(msg.data());
size_t data_size = msg.size();

if(header->is_forward) {
if(header.is_forward) {
// Received a "forward" request from other endpoint
auto output_cb = [this, header](const char* output, size_t output_size) {
auto output_cb = [this, &header](const char* output, size_t output_size) {
// We are supposed to "echo" the header with "is_forward" set to false,
// alontg with our output data.
header->is_forward = false;
header.is_forward = false;

zmq::message_t header_msg{header, sizeof(*header)};
zmq::message_t header_msg{&header, sizeof(header)};
zmq::message_t output_msg{output, output_size};
m_pub_socket.send(header_msg, zmq::send_flags::none);
m_pub_socket.send(header_msg, zmq::send_flags::sndmore);
m_pub_socket.send(output_msg, zmq::send_flags::none);

// FIXME: the above call is blocking, maybe find a way to make it not block
};
m_input_proxy.forwardInput(header->rpc_id, data, data_size, output_cb);
m_input_proxy.forwardInput(header.rpc_id, data, data_size, output_cb);
} else {
// Received the response for an RPC we have forwarded
MessageContext* sender_ctx = header->sender_ctx;
MessageContext* sender_ctx = header.sender_ctx;
sender_ctx->callback(data, data_size);
sender_ctx->ev.set_value();
}
Expand Down

0 comments on commit 44a64d2

Please sign in to comment.