Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced all tabs by spaces within the C and C++ examples #886

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/C++/hwserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#else
#include <windows.h>

#define sleep(n) Sleep(n)
#define sleep(n) Sleep(n)
#endif

int main () {
Expand Down
216 changes: 108 additions & 108 deletions examples/C++/lbbroker2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,152 +15,152 @@
static void *
client_task(void *args)
{
zctx_t *ctx = zctx_new();
void *client = zsocket_new(ctx, ZMQ_REQ);
zctx_t *ctx = zctx_new();
void *client = zsocket_new(ctx, ZMQ_REQ);

#if (defined (WIN32))
zsocket_connect(client, "tcp://localhost:5672"); // frontend
zsocket_connect(client, "tcp://localhost:5672"); // frontend
#else
zsocket_connect(client, "ipc://frontend.ipc");
zsocket_connect(client, "ipc://frontend.ipc");
#endif

// Send request, get reply
zstr_send(client, "HELLO");
char *reply = zstr_recv(client);
if (reply) {
std::cout << "Client: " << reply << std::endl;
free(reply);
}
// Send request, get reply
zstr_send(client, "HELLO");
char *reply = zstr_recv(client);
if (reply) {
std::cout << "Client: " << reply << std::endl;
free(reply);
}

zctx_destroy(&ctx);
return NULL;
zctx_destroy(&ctx);
return NULL;
}

// Worker using REQ socket to do load-balancing
//
static void *
worker_task(void *args)
{
zctx_t *ctx = zctx_new();
void *worker = zsocket_new(ctx, ZMQ_REQ);
zctx_t *ctx = zctx_new();
void *worker = zsocket_new(ctx, ZMQ_REQ);

#if (defined (WIN32))
zsocket_connect(worker, "tcp://localhost:5673"); // backend
zsocket_connect(worker, "tcp://localhost:5673"); // backend
#else
zsocket_connect(worker, "ipc://backend.ipc");
zsocket_connect(worker, "ipc://backend.ipc");
#endif

// Tell broker we're ready for work
zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY));
zframe_send(&frame, worker, 0);

// Process messages as they arrive
while (1) {
zmsg_t *msg = zmsg_recv(worker);
if (!msg)
break; // Interrupted
zframe_print(zmsg_last(msg), "Worker: ");
zframe_reset(zmsg_last(msg), "OK", 2);
zmsg_send(&msg, worker);
}
zctx_destroy(&ctx);
return NULL;
// Tell broker we're ready for work
zframe_t *frame = zframe_new(WORKER_READY, strlen(WORKER_READY));
zframe_send(&frame, worker, 0);

// Process messages as they arrive
while (1) {
zmsg_t *msg = zmsg_recv(worker);
if (!msg)
break; // Interrupted
zframe_print(zmsg_last(msg), "Worker: ");
zframe_reset(zmsg_last(msg), "OK", 2);
zmsg_send(&msg, worker);
}
zctx_destroy(&ctx);
return NULL;
}

// .split main task
// Now we come to the main task. This has the identical functionality to
// the previous {{lbbroker}} broker example, but uses CZMQ to start child
// the previous {{lbbroker}} broker example, but uses CZMQ to start child
// threads, to hold the list of workers, and to read and send messages:

int main(void)
{
zctx_t *ctx = zctx_new();
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
void *backend = zsocket_new(ctx, ZMQ_ROUTER);
zctx_t *ctx = zctx_new();
void *frontend = zsocket_new(ctx, ZMQ_ROUTER);
void *backend = zsocket_new(ctx, ZMQ_ROUTER);

// IPC doesn't yet work on MS Windows.
// IPC doesn't yet work on MS Windows.
#if (defined (WIN32))
zsocket_bind(frontend, "tcp://*:5672");
zsocket_bind(backend, "tcp://*:5673");
zsocket_bind(frontend, "tcp://*:5672");
zsocket_bind(backend, "tcp://*:5673");
#else
zsocket_bind(frontend, "ipc://frontend.ipc");
zsocket_bind(backend, "ipc://backend.ipc");
zsocket_bind(frontend, "ipc://frontend.ipc");
zsocket_bind(backend, "ipc://backend.ipc");
#endif

int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(worker_task, NULL);

// Queue of available workers
zlist_t *workers = zlist_new();

// .split main load-balancer loop
// Here is the main loop for the load balancer. It works the same way
// as the previous example, but is a lot shorter because CZMQ gives
// us an API that does more with fewer calls:
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items[0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // Interrupted
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new(client_task, NULL);
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new(worker_task, NULL);

// Queue of available workers
zlist_t *workers = zlist_new();

// .split main load-balancer loop
// Here is the main loop for the load balancer. It works the same way
// as the previous example, but is a lot shorter because CZMQ gives
// us an API that does more with fewer calls:
while (1) {
zmq_pollitem_t items[] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll(items, zlist_size(workers) ? 2 : 1, -1);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items[0].revents & ZMQ_POLLIN) {
// Use worker identity for load-balancing
zmsg_t *msg = zmsg_recv(backend);
if (!msg)
break; // Interrupted

#if 0
// zmsg_unwrap is DEPRECATED as over-engineered, poor style
zframe_t *identity = zmsg_unwrap(msg);
// zmsg_unwrap is DEPRECATED as over-engineered, poor style
zframe_t *identity = zmsg_unwrap(msg);
#else
zframe_t *identity = zmsg_pop(msg);
zframe_t *delimiter = zmsg_pop(msg);
zframe_destroy(&delimiter);
zframe_t *identity = zmsg_pop(msg);
zframe_t *delimiter = zmsg_pop(msg);
zframe_destroy(&delimiter);
#endif

zlist_append(workers, identity);

// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) {
zmsg_destroy(&msg);
} else {
zmsg_send(&msg, frontend);
if (--client_nbr == 0)
break; // Exit after N messages
}
}
if (items[1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv(frontend);
if (msg) {
zlist_append(workers, identity);

// Forward message to client if it's not a READY
zframe_t *frame = zmsg_first(msg);
if (memcmp(zframe_data(frame), WORKER_READY, strlen(WORKER_READY)) == 0) {
zmsg_destroy(&msg);
} else {
zmsg_send(&msg, frontend);
if (--client_nbr == 0)
break; // Exit after N messages
}
}
if (items[1].revents & ZMQ_POLLIN) {
// Get client request, route to first available worker
zmsg_t *msg = zmsg_recv(frontend);
if (msg) {
#if 0
// zmsg_wrap is DEPRECATED as unsafe
zmsg_wrap(msg, (zframe_t *)zlist_pop(workers));
// zmsg_wrap is DEPRECATED as unsafe
zmsg_wrap(msg, (zframe_t *)zlist_pop(workers));
#else
zmsg_pushmem(msg, NULL, 0); // delimiter
zmsg_push(msg, (zframe_t *)zlist_pop(workers));
zmsg_pushmem(msg, NULL, 0); // delimiter
zmsg_push(msg, (zframe_t *)zlist_pop(workers));
#endif

zmsg_send(&msg, backend);
}
}
}
// When we're done, clean up properly
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *)zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return 0;
zmsg_send(&msg, backend);
}
}
}
// When we're done, clean up properly
while (zlist_size(workers)) {
zframe_t *frame = (zframe_t *)zlist_pop(workers);
zframe_destroy(&frame);
}
zlist_destroy(&workers);
zctx_destroy(&ctx);
return 0;
}
12 changes: 5 additions & 7 deletions examples/C++/mdbroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class broker {
m_socket->bind(m_endpoint.c_str());
s_console ("I: MDP broker/0.1.1 is active at %s", endpoint.c_str());
}

private:

// ---------------------------------------------------------------------
Expand All @@ -114,9 +114,9 @@ class broker {
{
if ((*wrk)->m_expiry <= now)
toCull.push_back(*wrk);
}
}
for (std::deque<worker*>::iterator wrk = toCull.begin(); wrk != toCull.end(); ++wrk)
{
{
if (m_verbose) {
s_console ("I: deleting expired worker: %s",
(*wrk)->m_identity.c_str());
Expand Down Expand Up @@ -168,7 +168,7 @@ class broker {
if ((*next)->m_expiry > (*wrk)->m_expiry)
wrk = next;
}

zmsg *msg = srv->m_requests.front();
srv->m_requests.pop_front();
worker_send (*wrk, (char*)MDPW_REQUEST, "", msg);
Expand Down Expand Up @@ -382,7 +382,7 @@ class broker {
service_dispatch (srv, msg);
}
}

public:

// Get and process messages forever or until interrupted
Expand Down Expand Up @@ -470,5 +470,3 @@ int main (int argc, char *argv [])

return 0;
}


4 changes: 2 additions & 2 deletions examples/C++/mspoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ int main (int argc, char *argv[])

// Connect to weather server
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

// Initialize poll set
Expand All @@ -28,7 +28,7 @@ int main (int argc, char *argv[])
while (1) {
zmq::message_t message;
zmq::poll (&items [0], 2, -1);

if (items [0].revents & ZMQ_POLLIN) {
receiver.recv(&message);
// Process task
Expand Down
6 changes: 3 additions & 3 deletions examples/C++/msreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ int main (int argc, char *argv[])
// Process messages from both sockets
// We prioritize traffic from the task ventilator
while (1) {

// Process any waiting tasks
bool rc;
do {
Expand All @@ -33,7 +33,7 @@ int main (int argc, char *argv[])
// process task
}
} while(rc == true);

// Process any waiting weather updates
do {
zmq::message_t update;
Expand All @@ -42,7 +42,7 @@ int main (int argc, char *argv[])

}
} while(rc == true);

// No activity, so sleep for 1 msec
s_sleep(1);
}
Expand Down
Loading