Skip to content

Commit

Permalink
Merge pull request flux-framework#5454 from garlick/nix_czmq_atexit
Browse files Browse the repository at this point in the history
broker: stop managing 0MQ sockets with czmq
  • Loading branch information
mergify[bot] authored Sep 18, 2023
2 parents 4cd93f0 + 7c21568 commit a00922d
Show file tree
Hide file tree
Showing 41 changed files with 368 additions and 221 deletions.
39 changes: 17 additions & 22 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
#include <sys/resource.h>
#include <argz.h>
#include <flux/core.h>
#include <czmq.h>
#undef streq // redefined by ccan/str/str.h below
#include <zmq.h>
#include <jansson.h>
#if HAVE_CALIPER
#include <caliper/cali.h>
Expand Down Expand Up @@ -250,27 +249,10 @@ int main (int argc, char *argv[])
|| sigaction (SIGTERM, NULL, &old_sigact_term) < 0)
log_err_exit ("error setting signal mask");

/* Initialize libczmq zsys class.
*
* zsys_init() creates a global 0MQ context and starts the 0MQ I/O thread.
* The context is implicitly shared by users of the zsock class within
* the broker, including shmem connector, overlay.c, and module.c.
* libczmq tracks 0MQ sockets created with zsock, and any left open are
* closed by an atexit() handler to prevent zmq_ctx_term() from hanging.
*
* If something goes wrong, such as unclosed sockets in the atexit handler,
* czmq sends messages to its log class, which we redirect to stderr here.
*
* Disable czmq's internal signal handlers for SIGINT and SIGTERM, since
* the broker will install its own.
*/
if (!zsys_init ()) {
log_err ("zsys_init");
if (!(ctx.zctx = zmq_ctx_new ())) {
log_err ("zmq_ctx_new");
goto cleanup;
}
zsys_set_logstream (stderr);
zsys_set_logident ("flux-broker");
zsys_handler_set (NULL);

/* Set up the flux reactor with support for child watchers.
* Associate an internal flux_t handle with the reactor.
Expand Down Expand Up @@ -318,6 +300,7 @@ int main (int argc, char *argv[])

if (!(ctx.overlay = overlay_create (ctx.h,
ctx.attrs,
ctx.zctx,
overlay_recv_cb,
&ctx))) {
log_err ("overlay_create");
Expand Down Expand Up @@ -532,7 +515,7 @@ int main (int argc, char *argv[])
*/
attr_destroy (ctx.attrs);

modhash_destroy (ctx.modhash);
int module_leaks = modhash_destroy (ctx.modhash);
zlist_destroy (&ctx.sigwatchers);
shutdown_destroy (ctx.shutdown);
state_machine_destroy (ctx.state_machine);
Expand All @@ -549,6 +532,17 @@ int main (int argc, char *argv[])
free (ctx.init_shell_cmd);
optparse_destroy (ctx.opts);

/* zmq_ctx_term() blocks if any 0mq sockets remain open, so skip it
* if any broker modules had to be canceled in modhash_destroy().
*/
if (module_leaks > 0) {
log_msg ("skipping 0MQ shutdown due to presumed module socket leak");
if (ctx.exit_rc == 0)
ctx.exit_rc = 1;
}
else
zmq_ctx_term (ctx.zctx);

return ctx.exit_rc;
}

Expand Down Expand Up @@ -1214,6 +1208,7 @@ static int load_module (broker_ctx_t *ctx,
path = zlist_first (files);
}
if (!(p = module_create (ctx->h,
ctx->zctx,
overlay_get_uuid (ctx->overlay),
name,
path,
Expand Down
1 change: 1 addition & 0 deletions src/broker/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "src/common/libczmqcontainers/czmq_containers.h"

struct broker {
void *zctx;
flux_t *h;
flux_reactor_t *reactor;
optparse_t *opts;
Expand Down
7 changes: 6 additions & 1 deletion src/broker/modhash.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,29 @@ modhash_t *modhash_create (void)
return mh;
}

void modhash_destroy (modhash_t *mh)
int modhash_destroy (modhash_t *mh)
{
int saved_errno = errno;
const char *uuid;
module_t *p;
int count = 0;

if (mh) {
if (mh->zh_byuuid) {
FOREACH_ZHASH (mh->zh_byuuid, uuid, p) {
log_msg ("broker module '%s' was not properly shut down",
module_get_name (p));
flux_error_t error;
if (module_cancel (p, &error) < 0)
log_msg ("%s: %s", module_get_name (p), error.text);
count++;
}
zhash_destroy (&mh->zh_byuuid);
}
free (mh);
}
errno = saved_errno;
return count;
}

static json_t *modhash_entry_tojson (module_t *p,
Expand Down
3 changes: 2 additions & 1 deletion src/broker/modhash.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
typedef struct modhash modhash_t;

/* Hash-o-modules, keyed by uuid
* Destructor returns the number of modules that had to be canceled.
*/
modhash_t *modhash_create (void);
void modhash_destroy (modhash_t *mh);
int modhash_destroy (modhash_t *mh);

void modhash_add (modhash_t *mh, module_t *p);
void modhash_remove (modhash_t *mh, module_t *p);
Expand Down
36 changes: 28 additions & 8 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct broker_module {

double lastseen;

zsock_t *sock; /* broker end of PAIR socket */
void *zctx; /* zeromq context shared with broker */
void *sock; /* broker end of PAIR socket */
char endpoint[128];
struct flux_msg_cred cred; /* cred of connection */

uuid_t uuid; /* uuid for unique request sender identity */
Expand Down Expand Up @@ -151,7 +153,7 @@ static void *module_thread (void *arg)
module_t *p = arg;
sigset_t signal_set;
int errnum;
char uri[UUID_STR_LEN + 16];
char uri[128];
char **av = NULL;
int ac;
int mod_main_errno = 0;
Expand All @@ -162,7 +164,18 @@ static void *module_thread (void *arg)

/* Connect to broker socket, enable logging, register built-in services
*/
snprintf (uri, sizeof (uri), "shmem://%s", p->uuid_str);

/* N.B. inproc endpoints (in same process) must share a 0MQ context
* pointer which is passed via query argument to the local connector.
* (There was no convenient way to share that directly).
*
* copying 8 + 37 + 6 + 18 + 1 = 70 bytes into 128 byte buffer cannot fail
*/
(void)snprintf (uri,
sizeof (uri),
"shmem://%s&zctx=%p",
p->uuid_str,
p->zctx);
if (!(p->h = flux_open (uri, 0))) {
log_err ("flux_open %s", uri);
goto done;
Expand Down Expand Up @@ -276,6 +289,7 @@ static char *module_name_from_path (const char *s)
}

module_t *module_create (flux_t *h,
void *zctx,
const char *parent_uuid,
const char *name, // may be NULL
const char *path,
Expand Down Expand Up @@ -305,6 +319,7 @@ module_t *module_create (flux_t *h,
p->main = mod_main;
p->dso = dso;
p->rank = rank;
p->zctx = zctx;
if (!(p->conf = flux_conf_copy (flux_get_conf (h))))
goto cleanup;
if (!(p->parent_uuid_str = strdup (parent_uuid)))
Expand Down Expand Up @@ -349,16 +364,21 @@ module_t *module_create (flux_t *h,
uuid_generate (p->uuid);
uuid_unparse (p->uuid, p->uuid_str);

/* Broker end of PAIR socket is opened here.
/* Broker end of PAIR socket is opened here.
*/
if (!(p->sock = zsock_new_pair (NULL))) {
if (!(p->sock = zmq_socket (p->zctx, ZMQ_PAIR))) {
errprintf (error, "could not create zsock for %s", p->name);
goto cleanup;
}
zsock_set_unbounded (p->sock);
zsock_set_linger (p->sock, 5);
if (zsock_bind (p->sock, "inproc://%s", module_get_uuid (p)) < 0) {
errprintf (error, "zsock_bind inproc://%s", module_get_uuid (p));
// copying 9 + 37 + 1 = 47 bytes into 128 byte buffer cannot fail
(void)snprintf (p->endpoint,
sizeof (p->endpoint),
"inproc://%s",
module_get_uuid (p));
if (zmq_bind (p->sock, p->endpoint) < 0) {
errprintf (error, "zmq_bind %s: %s", p->endpoint, strerror (errno));
goto cleanup;
}
if (!(p->broker_w = zmqutil_watcher_create (flux_get_reactor (h),
Expand Down Expand Up @@ -554,7 +574,7 @@ void module_destroy (module_t *p)

flux_watcher_stop (p->broker_w);
flux_watcher_destroy (p->broker_w);
zsock_destroy (&p->sock);
zmq_close (p->sock);

#ifndef __SANITIZE_ADDRESS__
dlclose (p->dso);
Expand Down
1 change: 1 addition & 0 deletions src/broker/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef void (*modpoller_cb_f)(module_t *p, void *arg);
typedef void (*module_status_cb_f)(module_t *p, int prev_status, void *arg);

module_t *module_create (flux_t *h,
void *zctx,
const char *parent_uuid,
const char *name, // may be NULL
const char *path,
Expand Down
42 changes: 23 additions & 19 deletions src/broker/overlay.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ struct child {
};

struct parent {
zsock_t *zsock; // NULL on rank 0
void *zsock; // NULL on rank 0
char *uri;
flux_watcher_t *w;
int lastsent;
Expand Down Expand Up @@ -138,6 +138,7 @@ struct overlay_monitor {
};

struct overlay {
void *zctx;
zcert_t *cert;
struct zmqutil_zap *zap;
int enable_ipv6;
Expand All @@ -161,7 +162,7 @@ struct overlay {

struct parent parent;

zsock_t *bind_zsock; // NULL if no downstream peers
void *bind_zsock; // NULL if no downstream peers
char *bind_uri;
flux_watcher_t *bind_w;
struct child *children;
Expand Down Expand Up @@ -1035,7 +1036,7 @@ static void parent_cb (flux_reactor_t *r, flux_watcher_t *w,
"%s (rank %lu) sent disconnect control message",
flux_get_hostbyrank (ov->h, ov->parent.rank),
(unsigned long)ov->parent.rank);
(void)zsock_disconnect (ov->parent.zsock, "%s", ov->parent.uri);
(void)zmq_disconnect (ov->parent.zsock, ov->parent.uri);
ov->parent.offline = true;
rpc_track_purge (ov->parent.tracker, fail_parent_rpc, ov);
overlay_monitor_notify (ov, FLUX_NODEID_ANY);
Expand Down Expand Up @@ -1270,13 +1271,14 @@ int overlay_connect (struct overlay *ov)
errno = EINVAL;
return -1;
}
if (!(ov->parent.zsock = zsock_new_dealer (NULL)))
goto nomem;
if (!(ov->parent.zsock = zmq_socket (ov->zctx, ZMQ_DEALER)))
return -1;
/* The socket monitor is only used for logging.
* Setup may fail if libzmq is too old.
*/
if (ov->zmqdebug) {
ov->parent.monitor = zmqutil_monitor_create (ov->parent.zsock,
ov->parent.monitor = zmqutil_monitor_create (ov->zctx,
ov->parent.zsock,
ov->reactor,
parent_monitor_cb,
ov);
Expand All @@ -1296,8 +1298,8 @@ int overlay_connect (struct overlay *ov)
zsock_set_curve_serverkey (ov->parent.zsock, ov->parent.pubkey);
zsock_set_identity (ov->parent.zsock, ov->uuid);

if (zsock_connect (ov->parent.zsock, "%s", ov->parent.uri) < 0)
goto nomem;
if (zmq_connect (ov->parent.zsock, ov->parent.uri) < 0)
goto error;
if (!(ov->parent.w = zmqutil_watcher_create (ov->reactor,
ov->parent.zsock,
FLUX_POLLIN,
Expand All @@ -1309,8 +1311,7 @@ int overlay_connect (struct overlay *ov)
return -1;
}
return 0;
nomem:
errno = ENOMEM;
error:
return -1;
}

Expand All @@ -1330,21 +1331,22 @@ int overlay_bind (struct overlay *ov, const char *uri)
}

assert (ov->zap == NULL);
if (!(ov->zap = zmqutil_zap_create (ov->reactor))) {
if (!(ov->zap = zmqutil_zap_create (ov->zctx, ov->reactor))) {
log_err ("error creating ZAP server");
return -1;
}
zmqutil_zap_set_logger (ov->zap, zaplogger, ov);

if (!(ov->bind_zsock = zsock_new_router (NULL))) {
if (!(ov->bind_zsock = zmq_socket (ov->zctx, ZMQ_ROUTER))) {
log_err ("error creating zmq ROUTER socket");
return -1;
}
/* The socket monitor is only used for logging.
* Setup may fail if libzmq is too old.
*/
if (ov->zmqdebug) {
ov->bind_monitor = zmqutil_monitor_create (ov->bind_zsock,
ov->bind_monitor = zmqutil_monitor_create (ov->zctx,
ov->bind_zsock,
ov->reactor,
bind_monitor_cb,
ov);
Expand All @@ -1361,11 +1363,11 @@ int overlay_bind (struct overlay *ov, const char *uri)
zcert_apply (ov->cert, ov->bind_zsock);
zsock_set_curve_server (ov->bind_zsock, 1);

if (zsock_bind (ov->bind_zsock, "%s", uri) < 0) {
if (zmq_bind (ov->bind_zsock, uri) < 0) {
log_err ("error binding to %s", uri);
return -1;
}
/* Capture URI after zsock_bind() processing, so it reflects expanded
/* Capture URI after zmq_bind() processing, so it reflects expanded
* wildcards and normalized addresses.
*/
if (!(ov->bind_uri = zsock_last_endpoint (ov->bind_zsock))) {
Expand Down Expand Up @@ -1394,8 +1396,8 @@ int overlay_bind (struct overlay *ov, const char *uri)
void overlay_shutdown (struct overlay *overlay)
{
if (overlay->bind_zsock && overlay->bind_uri)
if (zsock_unbind (overlay->bind_zsock, "%s", overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zsock_unbind failed");
if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0)
flux_log (overlay->h, LOG_ERR, "zmq_unbind failed");
}

/* Call after overlay bootstrap (bind/connect),
Expand Down Expand Up @@ -2034,14 +2036,14 @@ void overlay_destroy (struct overlay *ov)
ov->status = SUBTREE_STATUS_OFFLINE;
overlay_control_parent (ov, CONTROL_STATUS, ov->status);

zsock_destroy (&ov->parent.zsock);
zmq_close (ov->parent.zsock);
free (ov->parent.uri);
flux_watcher_destroy (ov->parent.w);
free (ov->parent.pubkey);
zmqutil_monitor_destroy (ov->parent.monitor);

zsock_destroy (&ov->bind_zsock);
free (ov->bind_uri);
zmq_close (ov->bind_zsock);
flux_watcher_destroy (ov->bind_w);
zmqutil_monitor_destroy (ov->bind_monitor);

Expand Down Expand Up @@ -2102,6 +2104,7 @@ static const struct flux_msg_handler_spec htab[] = {

struct overlay *overlay_create (flux_t *h,
attr_t *attrs,
void *zctx,
overlay_recv_f cb,
void *arg)
{
Expand All @@ -2118,6 +2121,7 @@ struct overlay *overlay_create (flux_t *h,
ov->recv_cb = cb;
ov->recv_arg = arg;
ov->version = FLUX_CORE_VERSION_HEX;
ov->zctx = zctx;
uuid_generate (uuid);
uuid_unparse (uuid, ov->uuid);
if (!(ov->monitor_callbacks = zlist_new ()))
Expand Down
1 change: 1 addition & 0 deletions src/broker/overlay.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef void (*overlay_recv_f)(const flux_msg_t *msg,
*/
struct overlay *overlay_create (flux_t *h,
attr_t *attrs,
void *zctx,
overlay_recv_f cb,
void *arg);
void overlay_destroy (struct overlay *ov);
Expand Down
Loading

0 comments on commit a00922d

Please sign in to comment.