From 5efb12eebb7c1953cc6df7cd2dc13f3151f65c91 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 15 Sep 2023 08:45:37 -0700 Subject: [PATCH 01/12] testsuite: ensure test module is unloaded Problem: some tests test start an instance with a test module but fail to unload it, which will cause the broker to fail once it begins detecting unloaded modules on exit. Change test scripts to unload test modules. --- t/t0003-module.t | 7 ++++++- t/t0012-content-sqlite.t | 8 ++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/t/t0003-module.t b/t/t0003-module.t index 7c695d915280..9d4ef9635241 100755 --- a/t/t0003-module.t +++ b/t/t0003-module.t @@ -249,7 +249,12 @@ test_expect_success 'module with version ext can be loaded by name' ' mkdir -p testmoddir && cp $testmod testmoddir/testmod.so.0.0.0 && FLUX_MODULE_PATH_PREPEND=$(pwd)/testmoddir flux start \ - bash -c "flux module load testmod && flux module list -l" \ + bash -c \ + "flux module load testmod; \ + rc=\$?; \ + flux module list -l; \ + flux module remove -f testmod; \ + exit \$rc" \ >modlist.out && grep testmod.so.0.0.0 modlist.out ' diff --git a/t/t0012-content-sqlite.t b/t/t0012-content-sqlite.t index 405f4dda4cf4..cf10529bad14 100755 --- a/t/t0012-content-sqlite.t +++ b/t/t0012-content-sqlite.t @@ -357,8 +357,12 @@ load_module_xfail() { flux start -o,-Sbroker.rc1_path=,-Sbroker.rc3_path= \ -o,-Sstatedir=$(pwd) bash -c \ - "flux module load content && \ - flux module load content-sqlite" + "flux module load content; \ + flux module load content-sqlite; \ + rc=\$?; \ + flux module remove -f content-sqlite; \ + flux module remove -f content; \ + exit \$rc" } # FWIW https://www.sqlite.org/fileformat.html From b49df3ba845cc75d78a5856ee8c8ce8c45a231b2 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 14 Sep 2023 06:56:08 -0700 Subject: [PATCH 02/12] libzmqutil/zap: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. Add a zctx parameter to the constructor and update broker user and unit test. --- src/broker/overlay.c | 2 +- src/common/libzmqutil/test/zap.c | 4 ++-- src/common/libzmqutil/zap.c | 14 +++++++------- src/common/libzmqutil/zap.h | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/broker/overlay.c b/src/broker/overlay.c index 86c18bd18fbf..2c62d8fbf7ca 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -1330,7 +1330,7 @@ int overlay_bind (struct overlay *ov, const char *uri) } assert (ov->zap == NULL); - if (!(ov->zap = zmqutil_zap_create (ov->reactor))) { + if (!(ov->zap = zmqutil_zap_create (ov->zctx, ov->reactor))) { log_err ("error creating ZAP server"); return -1; } diff --git a/src/common/libzmqutil/test/zap.c b/src/common/libzmqutil/test/zap.c index 91e284a86a50..ab427236a13c 100644 --- a/src/common/libzmqutil/test/zap.c +++ b/src/common/libzmqutil/test/zap.c @@ -20,8 +20,8 @@ void test_badargs (void) { errno = 0; - ok (zmqutil_zap_create (NULL) == NULL && errno == EINVAL, - "zmqutil_zap_create reactor=NULL fails with EINVAL"); + ok (zmqutil_zap_create (NULL, NULL) == NULL && errno == EINVAL, + "zmqutil_zap_create zctx=NULL reactor=NULL fails with EINVAL"); lives_ok ({zmqutil_zap_destroy (NULL);}, "zmqutil_zap_destroy zap=NULL doesn't crash"); diff --git a/src/common/libzmqutil/zap.c b/src/common/libzmqutil/zap.c index 2897fa22e4a5..eec48aabc9ab 100644 --- a/src/common/libzmqutil/zap.c +++ b/src/common/libzmqutil/zap.c @@ -24,7 +24,7 @@ struct zmqutil_zap { zcertstore_t *certstore; - zsock_t *sock; + void *sock; flux_watcher_t *w; zaplog_f logger; void *logger_arg; @@ -192,8 +192,8 @@ void zmqutil_zap_destroy (struct zmqutil_zap *zap) int saved_errno = errno; flux_watcher_destroy (zap->w); if (zap->sock) { - zsock_unbind (zap->sock, ZAP_ENDPOINT); - zsock_destroy (&zap->sock); + zmq_unbind (zap->sock, ZAP_ENDPOINT); + zmq_close (zap->sock); } zcertstore_destroy (&zap->certstore); free (zap); @@ -201,11 +201,11 @@ void zmqutil_zap_destroy (struct zmqutil_zap *zap) } } -struct zmqutil_zap *zmqutil_zap_create (flux_reactor_t *r) +struct zmqutil_zap *zmqutil_zap_create (void *zctx, flux_reactor_t *r) { struct zmqutil_zap *zap; - if (!r) { + if (!r || !zctx) { errno = EINVAL; return NULL; } @@ -213,9 +213,9 @@ struct zmqutil_zap *zmqutil_zap_create (flux_reactor_t *r) return NULL; if (!(zap->certstore = zcertstore_new (NULL))) goto error; - if (!(zap->sock = zsock_new (ZMQ_REP))) + if (!(zap->sock = zmq_socket (zctx, ZMQ_REP))) goto error; - if (zsock_bind (zap->sock, ZAP_ENDPOINT) < 0) + if (zmq_bind (zap->sock, ZAP_ENDPOINT) < 0) goto error; if (!(zap->w = zmqutil_watcher_create (r, zap->sock, diff --git a/src/common/libzmqutil/zap.h b/src/common/libzmqutil/zap.h index bdb699722dcb..c3f16cee9d8a 100644 --- a/src/common/libzmqutil/zap.h +++ b/src/common/libzmqutil/zap.h @@ -23,7 +23,7 @@ int zmqutil_zap_authorize (struct zmqutil_zap *zap, void zmqutil_zap_set_logger (struct zmqutil_zap *zap, zaplog_f fun, void *arg); void zmqutil_zap_destroy (struct zmqutil_zap *zap); -struct zmqutil_zap *zmqutil_zap_create (flux_reactor_t *r); +struct zmqutil_zap *zmqutil_zap_create (void *zctx, flux_reactor_t *r); /* * vi:tabstop=4 shiftwidth=4 expandtab From 9bee39dff46557b735d0d7d9d4ab1ae337011140 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Thu, 14 Sep 2023 07:26:56 -0700 Subject: [PATCH 03/12] libzmqutil/monitor: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. --- src/broker/overlay.c | 6 ++++-- src/common/libzmqutil/monitor.c | 18 ++++++++++-------- src/common/libzmqutil/monitor.h | 3 ++- src/common/libzmqutil/test/monitor.c | 2 +- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/broker/overlay.c b/src/broker/overlay.c index 2c62d8fbf7ca..ace3cd1a2246 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -1276,7 +1276,8 @@ int overlay_connect (struct overlay *ov) * Setup may fail if libzmq is too old. */ if (ov->zmqdebug) { - ov->parent.monitor = zmqutil_monitor_create (ov->parent.zsock, + ov->parent.monitor = zmqutil_monitor_create (ov->zctx, + ov->parent.zsock, ov->reactor, parent_monitor_cb, ov); @@ -1344,7 +1345,8 @@ int overlay_bind (struct overlay *ov, const char *uri) * Setup may fail if libzmq is too old. */ if (ov->zmqdebug) { - ov->bind_monitor = zmqutil_monitor_create (ov->bind_zsock, + ov->bind_monitor = zmqutil_monitor_create (ov->zctx, + ov->bind_zsock, ov->reactor, bind_monitor_cb, ov); diff --git a/src/common/libzmqutil/monitor.c b/src/common/libzmqutil/monitor.c index 8da1d22732bf..a2182c163bf0 100644 --- a/src/common/libzmqutil/monitor.c +++ b/src/common/libzmqutil/monitor.c @@ -28,7 +28,7 @@ /* N.B. 4.1.4 has a bad bug */ struct zmqutil_monitor { - zsock_t *sock; + void *sock; char endpoint[UUID_STR_LEN + 64]; flux_watcher_t *w; zmqutil_monitor_f fun; @@ -237,15 +237,16 @@ void zmqutil_monitor_destroy (struct zmqutil_monitor *mon) flux_watcher_destroy (mon->w); if (mon->sock) { monitor_purge (mon); - zsock_disconnect (mon->sock, "%s", mon->endpoint); - zsock_destroy (&mon->sock); + (void)zmq_disconnect (mon->sock, mon->endpoint); + (void)zmq_close (mon->sock); } free (mon); errno = saved_errno; } } -struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, +struct zmqutil_monitor *zmqutil_monitor_create (void *zctx, + void *sock, flux_reactor_t *r, zmqutil_monitor_f fun, void *arg) @@ -254,7 +255,7 @@ struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, uuid_t uuid; char uuid_str[UUID_STR_LEN]; - if (!sock || !r) { + if (!zctx || !sock || !r) { errno = EINVAL; return NULL; } @@ -275,8 +276,8 @@ struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, if (zmq_socket_monitor (zsock_resolve (sock), mon->endpoint, ZMQ_EVENT_ALL) < 0 - || !(mon->sock = zsock_new (ZMQ_PAIR)) - || zsock_connect (mon->sock, "%s", mon->endpoint) < 0 + || !(mon->sock = zmq_socket (zctx, ZMQ_PAIR)) + || zmq_connect (mon->sock, mon->endpoint) < 0 || !(mon->w = zmqutil_watcher_create (r, mon->sock, FLUX_POLLIN, @@ -294,7 +295,8 @@ struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, #else /* Monitoring is disabled due to libzmq being too old. */ -struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, +struct zmqutil_monitor *zmqutil_monitor_create (void *zctx, + void *sock, flux_reactor_t *r, zmqutil_monitor_f fun, void *arg) diff --git a/src/common/libzmqutil/monitor.h b/src/common/libzmqutil/monitor.h index 2db5ee6eb2f2..2c34991b06c6 100644 --- a/src/common/libzmqutil/monitor.h +++ b/src/common/libzmqutil/monitor.h @@ -29,7 +29,8 @@ typedef void (*zmqutil_monitor_f)(struct zmqutil_monitor *mon, void *arg); * after close/destroy. * N.B. this will fail if an old/buggy version of libzmq is used. */ -struct zmqutil_monitor *zmqutil_monitor_create (zsock_t *sock, +struct zmqutil_monitor *zmqutil_monitor_create (void *zctx, + void *sock, flux_reactor_t *r, zmqutil_monitor_f fun, void *arg); diff --git a/src/common/libzmqutil/test/monitor.c b/src/common/libzmqutil/test/monitor.c index 4103ef9de8e7..46c4b16ad77b 100644 --- a/src/common/libzmqutil/test/monitor.c +++ b/src/common/libzmqutil/test/monitor.c @@ -22,7 +22,7 @@ void test_badargs (void) /* Note: these are stubbed for older libzmq (e.g. centos 7), * so checking for errno == EINVAL is not going to happen there. */ - ok (zmqutil_monitor_create (NULL, NULL, NULL, NULL) == NULL, + ok (zmqutil_monitor_create (NULL, NULL, NULL, NULL, NULL) == NULL, "zmqutil_monitor_create sock=NULL fails"); lives_ok({zmqutil_monitor_destroy (NULL);}, From 7d6d138c453c612ea5fdc664d041c8542a5bbb86 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2023 07:09:55 -0700 Subject: [PATCH 04/12] connectors/shmem: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. Since the global zmq context can be directly accessed as the return value of zsys_init(), capture it and pass it as a URI query parameter to the shmem:// connector ("&zctx=%p"). Note: Due to zsock_t "polymorphism", zsock_t and raw zeromq socket handles can be used interchangeably in many of the czmq classes. Update unit test. --- src/broker/broker.c | 3 ++- src/broker/broker.h | 1 + src/broker/module.c | 18 ++++++++++++++++-- src/broker/module.h | 1 + src/connectors/shmem/shmem.c | 30 +++++++++++++++++++++--------- t/Makefile.am | 4 ++-- t/shmem/backtoback.c | 14 +++++++++++--- 7 files changed, 54 insertions(+), 17 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 9ba9c0c4d2cb..38d4b378e592 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -264,7 +264,7 @@ int main (int argc, char *argv[]) * Disable czmq's internal signal handlers for SIGINT and SIGTERM, since * the broker will install its own. */ - if (!zsys_init ()) { + if (!(ctx.zctx = zsys_init ())) { log_err ("zsys_init"); goto cleanup; } @@ -1214,6 +1214,7 @@ static int load_module (broker_ctx_t *ctx, path = zlist_first (files); } if (!(p = module_create (ctx->h, + ctx->zctx, overlay_get_uuid (ctx->overlay), name, path, diff --git a/src/broker/broker.h b/src/broker/broker.h index 41812b6f8c1e..fec3bc817a37 100644 --- a/src/broker/broker.h +++ b/src/broker/broker.h @@ -16,6 +16,7 @@ #include "src/common/libczmqcontainers/czmq_containers.h" struct broker { + void *zctx; flux_t *h; flux_reactor_t *reactor; optparse_t *opts; diff --git a/src/broker/module.c b/src/broker/module.c index 1b9af42ece0e..279e9894a4d3 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -43,6 +43,7 @@ struct broker_module { double lastseen; + void *zctx; /* zeromq context shared with broker */ zsock_t *sock; /* broker end of PAIR socket */ struct flux_msg_cred cred; /* cred of connection */ @@ -151,7 +152,7 @@ static void *module_thread (void *arg) module_t *p = arg; sigset_t signal_set; int errnum; - char uri[UUID_STR_LEN + 16]; + char uri[128]; char **av = NULL; int ac; int mod_main_errno = 0; @@ -162,7 +163,18 @@ static void *module_thread (void *arg) /* Connect to broker socket, enable logging, register built-in services */ - snprintf (uri, sizeof (uri), "shmem://%s", p->uuid_str); + + /* N.B. inproc endpoints (in same process) must share a 0MQ context + * pointer which is passed via query argument to the local connector. + * (There was no convenient way to share that directly). + * + * copying 8 + 37 + 6 + 18 + 1 = 70 bytes into 128 byte buffer cannot fail + */ + (void)snprintf (uri, + sizeof (uri), + "shmem://%s&zctx=%p", + p->uuid_str, + p->zctx); if (!(p->h = flux_open (uri, 0))) { log_err ("flux_open %s", uri); goto done; @@ -276,6 +288,7 @@ static char *module_name_from_path (const char *s) } module_t *module_create (flux_t *h, + void *zctx, const char *parent_uuid, const char *name, // may be NULL const char *path, @@ -305,6 +318,7 @@ module_t *module_create (flux_t *h, p->main = mod_main; p->dso = dso; p->rank = rank; + p->zctx = zctx; if (!(p->conf = flux_conf_copy (flux_get_conf (h)))) goto cleanup; if (!(p->parent_uuid_str = strdup (parent_uuid))) diff --git a/src/broker/module.h b/src/broker/module.h index 4f46c9a57934..006c42ae9114 100644 --- a/src/broker/module.h +++ b/src/broker/module.h @@ -26,6 +26,7 @@ typedef void (*modpoller_cb_f)(module_t *p, void *arg); typedef void (*module_status_cb_f)(module_t *p, int prev_status, void *arg); module_t *module_create (flux_t *h, + void *zctx, const char *parent_uuid, const char *name, // may be NULL const char *path, diff --git a/src/connectors/shmem/shmem.c b/src/connectors/shmem/shmem.c index 19125d04daab..011430ab872d 100644 --- a/src/connectors/shmem/shmem.c +++ b/src/connectors/shmem/shmem.c @@ -11,9 +11,9 @@ /* Note: * This connector creates a 0MQ inproc socket that communicates with another * inproc socket in the same process (normally the flux broker). Pairs of - * inproc sockets must share a common 0MQ context. This connector uses the - * libczmq zsock class, which hides creation/sharing of the 0MQ context; - * therefore, the other inproc socket should be created with zsock also. + * inproc sockets must share a common 0MQ context. The context is passed + * in as a URI query option, e.g. "shmem://NAME&zctx=%p", where NAME is + * the unique socket name used to match two endpoints. */ #if HAVE_CONFIG_H @@ -21,6 +21,7 @@ #endif #include #include +#undef streq // redefined by ccan/str/str.h below #include #if HAVE_CALIPER #include @@ -28,13 +29,16 @@ #include #include "src/common/libzmqutil/msg_zsock.h" +#include "ccan/str/str.h" typedef struct { - zsock_t *sock; + void *zctx; + void *sock; char *uuid; flux_t *h; char *argz; size_t argz_len; + char endpoint[128]; } shmem_ctx_t; static const struct flux_handle_ops handle_ops; @@ -75,7 +79,7 @@ static flux_msg_t *op_recv (void *impl, int flags) shmem_ctx_t *ctx = impl; zmq_pollitem_t zp = { .events = ZMQ_POLLIN, - .socket = zsock_resolve (ctx->sock), + .socket = ctx->sock, .revents = 0, .fd = -1, }; @@ -97,7 +101,8 @@ static flux_msg_t *op_recv (void *impl, int flags) static void op_fini (void *impl) { shmem_ctx_t *ctx = impl; - zsock_destroy (&ctx->sock); + + (void)zmq_close (ctx->sock); free (ctx->argz); free (ctx); } @@ -140,21 +145,28 @@ flux_t *connector_init (const char *path, int flags, flux_error_t *errp) bind_socket = 1; else if (streq (item, "connect")) bind_socket = 0; + else if (strstarts (item, "zctx=")) { + if (sscanf (item + 5, "%p", &ctx->zctx) != 1) { + errno = EINVAL; + goto error; + } + } else { errno = EINVAL; goto error; } } - if (!(ctx->sock = zsock_new_pair (NULL))) + if (!(ctx->sock = zmq_socket (ctx->zctx, ZMQ_PAIR))) goto error; zsock_set_unbounded (ctx->sock); zsock_set_linger (ctx->sock, 5); + snprintf (ctx->endpoint, sizeof (ctx->endpoint), "inproc://%s", ctx->uuid); if (bind_socket) { - if (zsock_bind (ctx->sock, "inproc://%s", ctx->uuid) < 0) + if (zmq_bind (ctx->sock, ctx->endpoint) < 0) goto error; } else { - if (zsock_connect (ctx->sock, "inproc://%s", ctx->uuid) < 0) + if (zmq_connect (ctx->sock, ctx->endpoint) < 0) goto error; } if (!(ctx->h = flux_handle_create (ctx, &handle_ops, flags))) diff --git a/t/Makefile.am b/t/Makefile.am index cfa3476b8483..a2ae52c4a78b 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -519,9 +519,9 @@ test_cppflags = \ $(AM_CPPFLAGS) shmem_backtoback_t_SOURCES = shmem/backtoback.c -shmem_backtoback_t_CPPFLAGS = $(test_cppflags) +shmem_backtoback_t_CPPFLAGS = $(test_cppflags) $(ZMQ_CFLAGS) shmem_backtoback_t_LDADD = $(test_ldadd) -shmem_backtoback_t_LDFLAGS = $(test_ldflags) +shmem_backtoback_t_LDFLAGS = $(test_ldflags) $(ZMQ_LIBS) loop_logstderr_SOURCES = loop/logstderr.c loop_logstderr_CPPFLAGS = $(test_cppflags) diff --git a/t/shmem/backtoback.c b/t/shmem/backtoback.c index 4bdacb21928a..9668b7793408 100644 --- a/t/shmem/backtoback.c +++ b/t/shmem/backtoback.c @@ -13,21 +13,27 @@ #endif #include #include +#include -#include "src/common/libutil/xzmalloc.h" #include "src/common/libtap/tap.h" int main (int argc, char *argv[]) { flux_t *h_cli, *h_srv; + void *zctx; + char uri[256]; flux_msg_t *msg; int type; plan (NO_PLAN); - ok ((h_srv = flux_open ("shmem://test&bind", 0)) != NULL, + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create 0MQ context"); + snprintf (uri, sizeof (uri), "shmem://test&bind&zctx=%p", zctx); + ok ((h_srv = flux_open (uri, 0)) != NULL, "created server handle"); - ok ((h_cli = flux_open ("shmem://test&connect", 0)) != NULL, + snprintf (uri, sizeof (uri), "shmem://test&connect&zctx=%p", zctx); + ok ((h_cli = flux_open (uri, 0)) != NULL, "created client handle"); if (!h_cli || !h_srv) BAIL_OUT ("can't continue without client or server handle"); @@ -59,6 +65,8 @@ int main (int argc, char *argv[]) flux_close (h_cli); flux_close (h_srv); + zmq_ctx_term (zctx); + done_testing(); return (0); } From 8d7c082e110b5314ff77d1e5d7e318a1f1bd4953 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2023 12:28:24 -0700 Subject: [PATCH 05/12] broker/module: fix whitespace Problem: There is a misaligned comment block. Fix whitespace. --- src/broker/module.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/broker/module.c b/src/broker/module.c index 279e9894a4d3..22c6e248e695 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -363,7 +363,7 @@ module_t *module_create (flux_t *h, uuid_generate (p->uuid); uuid_unparse (p->uuid, p->uuid_str); - /* Broker end of PAIR socket is opened here. + /* Broker end of PAIR socket is opened here. */ if (!(p->sock = zsock_new_pair (NULL))) { errprintf (error, "could not create zsock for %s", p->name); From 015731955635258e326a54f37bd4adec82387a29 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2023 13:01:41 -0700 Subject: [PATCH 06/12] broker/module: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. --- src/broker/module.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/broker/module.c b/src/broker/module.c index 22c6e248e695..4967420cbc71 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -44,7 +44,8 @@ struct broker_module { double lastseen; void *zctx; /* zeromq context shared with broker */ - zsock_t *sock; /* broker end of PAIR socket */ + void *sock; /* broker end of PAIR socket */ + char endpoint[128]; struct flux_msg_cred cred; /* cred of connection */ uuid_t uuid; /* uuid for unique request sender identity */ @@ -365,14 +366,18 @@ module_t *module_create (flux_t *h, /* Broker end of PAIR socket is opened here. */ - if (!(p->sock = zsock_new_pair (NULL))) { + if (!(p->sock = zmq_socket (p->zctx, ZMQ_PAIR))) { errprintf (error, "could not create zsock for %s", p->name); goto cleanup; } zsock_set_unbounded (p->sock); zsock_set_linger (p->sock, 5); - if (zsock_bind (p->sock, "inproc://%s", module_get_uuid (p)) < 0) { - errprintf (error, "zsock_bind inproc://%s", module_get_uuid (p)); + snprintf (p->endpoint, + sizeof (p->endpoint), + "inproc://%s", + module_get_uuid (p)); + if (zmq_bind (p->sock, p->endpoint) < 0) { + errprintf (error, "zmq_bind %s: %s", p->endpoint, strerror (errno)); goto cleanup; } if (!(p->broker_w = zmqutil_watcher_create (flux_get_reactor (h), @@ -568,7 +573,7 @@ void module_destroy (module_t *p) flux_watcher_stop (p->broker_w); flux_watcher_destroy (p->broker_w); - zsock_destroy (&p->sock); + zmq_close (p->sock); #ifndef __SANITIZE_ADDRESS__ dlclose (p->dso); From 59eee08cc4e148489db96e4f7e3f446c1b47116d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 13 Sep 2023 13:28:30 -0700 Subject: [PATCH 07/12] broker/overlay: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. Update unit test to match. --- src/broker/broker.c | 1 + src/broker/overlay.c | 34 ++++++++++++++++--------------- src/broker/overlay.h | 1 + src/broker/test/overlay.c | 43 ++++++++++++++++++++++----------------- 4 files changed, 44 insertions(+), 35 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 38d4b378e592..1ae49b5bf640 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -318,6 +318,7 @@ int main (int argc, char *argv[]) if (!(ctx.overlay = overlay_create (ctx.h, ctx.attrs, + ctx.zctx, overlay_recv_cb, &ctx))) { log_err ("overlay_create"); diff --git a/src/broker/overlay.c b/src/broker/overlay.c index ace3cd1a2246..43317b995bcc 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -108,7 +108,7 @@ struct child { }; struct parent { - zsock_t *zsock; // NULL on rank 0 + void *zsock; // NULL on rank 0 char *uri; flux_watcher_t *w; int lastsent; @@ -138,6 +138,7 @@ struct overlay_monitor { }; struct overlay { + void *zctx; zcert_t *cert; struct zmqutil_zap *zap; int enable_ipv6; @@ -161,7 +162,7 @@ struct overlay { struct parent parent; - zsock_t *bind_zsock; // NULL if no downstream peers + void *bind_zsock; // NULL if no downstream peers char *bind_uri; flux_watcher_t *bind_w; struct child *children; @@ -1035,7 +1036,7 @@ static void parent_cb (flux_reactor_t *r, flux_watcher_t *w, "%s (rank %lu) sent disconnect control message", flux_get_hostbyrank (ov->h, ov->parent.rank), (unsigned long)ov->parent.rank); - (void)zsock_disconnect (ov->parent.zsock, "%s", ov->parent.uri); + (void)zmq_disconnect (ov->parent.zsock, ov->parent.uri); ov->parent.offline = true; rpc_track_purge (ov->parent.tracker, fail_parent_rpc, ov); overlay_monitor_notify (ov, FLUX_NODEID_ANY); @@ -1270,8 +1271,8 @@ int overlay_connect (struct overlay *ov) errno = EINVAL; return -1; } - if (!(ov->parent.zsock = zsock_new_dealer (NULL))) - goto nomem; + if (!(ov->parent.zsock = zmq_socket (ov->zctx, ZMQ_DEALER))) + return -1; /* The socket monitor is only used for logging. * Setup may fail if libzmq is too old. */ @@ -1297,8 +1298,8 @@ int overlay_connect (struct overlay *ov) zsock_set_curve_serverkey (ov->parent.zsock, ov->parent.pubkey); zsock_set_identity (ov->parent.zsock, ov->uuid); - if (zsock_connect (ov->parent.zsock, "%s", ov->parent.uri) < 0) - goto nomem; + if (zmq_connect (ov->parent.zsock, ov->parent.uri) < 0) + goto error; if (!(ov->parent.w = zmqutil_watcher_create (ov->reactor, ov->parent.zsock, FLUX_POLLIN, @@ -1310,8 +1311,7 @@ int overlay_connect (struct overlay *ov) return -1; } return 0; -nomem: - errno = ENOMEM; +error: return -1; } @@ -1337,7 +1337,7 @@ int overlay_bind (struct overlay *ov, const char *uri) } zmqutil_zap_set_logger (ov->zap, zaplogger, ov); - if (!(ov->bind_zsock = zsock_new_router (NULL))) { + if (!(ov->bind_zsock = zmq_socket (ov->zctx, ZMQ_ROUTER))) { log_err ("error creating zmq ROUTER socket"); return -1; } @@ -1363,11 +1363,11 @@ int overlay_bind (struct overlay *ov, const char *uri) zcert_apply (ov->cert, ov->bind_zsock); zsock_set_curve_server (ov->bind_zsock, 1); - if (zsock_bind (ov->bind_zsock, "%s", uri) < 0) { + if (zmq_bind (ov->bind_zsock, uri) < 0) { log_err ("error binding to %s", uri); return -1; } - /* Capture URI after zsock_bind() processing, so it reflects expanded + /* Capture URI after zmq_bind() processing, so it reflects expanded * wildcards and normalized addresses. */ if (!(ov->bind_uri = zsock_last_endpoint (ov->bind_zsock))) { @@ -1396,8 +1396,8 @@ int overlay_bind (struct overlay *ov, const char *uri) void overlay_shutdown (struct overlay *overlay) { if (overlay->bind_zsock && overlay->bind_uri) - if (zsock_unbind (overlay->bind_zsock, "%s", overlay->bind_uri) < 0) - flux_log (overlay->h, LOG_ERR, "zsock_unbind failed"); + if (zmq_unbind (overlay->bind_zsock, overlay->bind_uri) < 0) + flux_log (overlay->h, LOG_ERR, "zmq_unbind failed"); } /* Call after overlay bootstrap (bind/connect), @@ -2036,14 +2036,14 @@ void overlay_destroy (struct overlay *ov) ov->status = SUBTREE_STATUS_OFFLINE; overlay_control_parent (ov, CONTROL_STATUS, ov->status); - zsock_destroy (&ov->parent.zsock); + zmq_close (ov->parent.zsock); free (ov->parent.uri); flux_watcher_destroy (ov->parent.w); free (ov->parent.pubkey); zmqutil_monitor_destroy (ov->parent.monitor); - zsock_destroy (&ov->bind_zsock); free (ov->bind_uri); + zmq_close (ov->bind_zsock); flux_watcher_destroy (ov->bind_w); zmqutil_monitor_destroy (ov->bind_monitor); @@ -2104,6 +2104,7 @@ static const struct flux_msg_handler_spec htab[] = { struct overlay *overlay_create (flux_t *h, attr_t *attrs, + void *zctx, overlay_recv_f cb, void *arg) { @@ -2120,6 +2121,7 @@ struct overlay *overlay_create (flux_t *h, ov->recv_cb = cb; ov->recv_arg = arg; ov->version = FLUX_CORE_VERSION_HEX; + ov->zctx = zctx; uuid_generate (uuid); uuid_unparse (uuid, ov->uuid); if (!(ov->monitor_callbacks = zlist_new ())) diff --git a/src/broker/overlay.h b/src/broker/overlay.h index de4ecfad94a3..2c7e3dff1edb 100644 --- a/src/broker/overlay.h +++ b/src/broker/overlay.h @@ -35,6 +35,7 @@ typedef void (*overlay_recv_f)(const flux_msg_t *msg, */ struct overlay *overlay_create (flux_t *h, attr_t *attrs, + void *zctx, overlay_recv_f cb, void *arg); void overlay_destroy (struct overlay *ov); diff --git a/src/broker/test/overlay.c b/src/broker/test/overlay.c index 46cc919645cc..b94dabad9964 100644 --- a/src/broker/test/overlay.c +++ b/src/broker/test/overlay.c @@ -29,6 +29,7 @@ #include "src/broker/topology.h" static zlist_t *logs; +void *zctx; struct context { struct overlay *ov; @@ -107,7 +108,7 @@ struct context *ctx_create (flux_t *h, ctx->size = size; ctx->rank = rank; snprintf (ctx->name, sizeof (ctx->name), "%s-%d", name, rank); - if (!(ctx->ov = overlay_create (h, ctx->attrs, cb, ctx))) + if (!(ctx->ov = overlay_create (h, ctx->attrs, zctx, cb, ctx))) BAIL_OUT ("overlay_create"); if (!(ctx->uuid = overlay_get_uuid (ctx->ov))) BAIL_OUT ("overlay_get_uuid failed"); @@ -276,8 +277,8 @@ void trio (flux_t *h) const flux_msg_t *rmsg; flux_msg_t *msg; const char *topic; - zsock_t *zsock_none; - zsock_t *zsock_curve; + void *zsock_none; + void *zsock_curve; zcert_t *cert; const char *sender; @@ -458,19 +459,21 @@ void trio (flux_t *h) /* 1) No security */ - if (!(zsock_none = zsock_new_dealer (NULL))) - BAIL_OUT ("zsock_new_dealer failed"); + if (!(zsock_none = zmq_socket (zctx, ZMQ_DEALER))) + BAIL_OUT ("zmq_socket failed"); + zsock_set_linger (zsock_none, 5); zsock_set_identity (zsock_none, "2"); - ok (zsock_connect (zsock_none, "%s", parent_uri) == 0, - "none-2: zsock_connect %s (no security) works", parent_uri); + ok (zmq_connect (zsock_none, parent_uri) == 0, + "none-2: zmq_connect %s (no security) works", parent_uri); ok (zmqutil_msg_send (zsock_none, msg) == 0, "none-2: zsock_msg_sendzsock works"); /* 2) Curve, and correct server public key, but client public key * was not authorized */ - if (!(zsock_curve = zsock_new_dealer (NULL))) - BAIL_OUT ("zsock_new_dealer failed"); + if (!(zsock_curve = zmq_socket (zctx, ZMQ_DEALER))) + BAIL_OUT ("zmq_socket failed"); + zsock_set_linger (zsock_curve, 5); if (!(cert = zcert_new ())) BAIL_OUT ("zcert_new failed"); zsock_set_zap_domain (zsock_curve, "flux"); @@ -478,8 +481,8 @@ void trio (flux_t *h) zsock_set_curve_serverkey (zsock_curve, server_pubkey); zsock_set_identity (zsock_curve, "2"); zcert_destroy (&cert); - ok (zsock_connect (zsock_curve, "%s", parent_uri) == 0, - "curve-2: zsock_connect %s works", parent_uri); + ok (zmq_connect (zsock_curve, parent_uri) == 0, + "curve-2: zmq_connect %s works", parent_uri); ok (zmqutil_msg_send (zsock_curve, msg) == 0, "curve-2: zmqutil_msg_send works"); @@ -490,8 +493,8 @@ void trio (flux_t *h) "%s: no messages received within 1.0s", ctx[0]->name); flux_msg_decref (msg); - zsock_destroy (&zsock_none); - zsock_destroy (&zsock_curve); + zmq_close (zsock_none); + zmq_close (zsock_curve); ctx_destroy (ctx[1]); ctx_destroy (ctx[0]); @@ -648,16 +651,17 @@ void wrongness (flux_t *h) if (!(attrs = attr_create ())) BAIL_OUT ("attr_create failed"); errno = 0; - ok (overlay_create (NULL, attrs, NULL, NULL) == NULL && errno == EINVAL, + ok (overlay_create (NULL, attrs, zctx, NULL, NULL) == NULL + && errno == EINVAL, "overlay_create h=NULL fails with EINVAL"); errno = 0; - ok (overlay_create (h, NULL, NULL, NULL) == NULL && errno == EINVAL, + ok (overlay_create (h, NULL, zctx, NULL, NULL) == NULL && errno == EINVAL, "overlay_create attrs=NULL fails with EINVAL"); attr_destroy (attrs); if (!(attrs = attr_create ())) BAIL_OUT ("attr_create failed"); - if (!(ov = overlay_create (h, attrs, NULL, NULL))) + if (!(ov = overlay_create (h, attrs, zctx, NULL, NULL))) BAIL_OUT ("overlay_create failed"); errno = 0; @@ -697,9 +701,8 @@ int main (int argc, char *argv[]) plan (NO_PLAN); - if (!zsys_init ()) - BAIL_OUT ("zsys_init failed"); - zsys_set_linger (5); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("zmq_ctx_new failed"); if (!(logs = zlist_new ())) BAIL_OUT ("zlist_new failed"); @@ -726,6 +729,8 @@ int main (int argc, char *argv[]) flux_close (h); zlist_destroy (&logs); + zmq_ctx_term (zctx); + done_testing (); } From e4720c5bbd1e6d517ee46a4ddad32f1279423650 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 15 Sep 2023 08:23:51 -0700 Subject: [PATCH 08/12] broker/modhash: have destructor return #canceled Problem: zmq_ctx_term() blocks when there are unclosed zeromq sockets, but modhash_destroy() does not return any indication that modules were canceled and hence probably leaked sockets. Instead of void, have modhash_destroy() return a count of canceled modules. Also, log the offending module names to stderr: flux-broker: broker module 'foo' was not properly shut down an improvement over: E: (flux-broker) 23-09-15 21:22:51 [488322] dangling 'PAIR' socket created at shmem/shmem.c:148 --- src/broker/modhash.c | 7 ++++++- src/broker/modhash.h | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/broker/modhash.c b/src/broker/modhash.c index 7a0eb6246be7..cdc64fe6dfdc 100644 --- a/src/broker/modhash.c +++ b/src/broker/modhash.c @@ -73,24 +73,29 @@ modhash_t *modhash_create (void) return mh; } -void modhash_destroy (modhash_t *mh) +int modhash_destroy (modhash_t *mh) { int saved_errno = errno; const char *uuid; module_t *p; + int count = 0; if (mh) { if (mh->zh_byuuid) { FOREACH_ZHASH (mh->zh_byuuid, uuid, p) { + log_msg ("broker module '%s' was not properly shut down", + module_get_name (p)); flux_error_t error; if (module_cancel (p, &error) < 0) log_msg ("%s: %s", module_get_name (p), error.text); + count++; } zhash_destroy (&mh->zh_byuuid); } free (mh); } errno = saved_errno; + return count; } static json_t *modhash_entry_tojson (module_t *p, diff --git a/src/broker/modhash.h b/src/broker/modhash.h index 8c7ccf852c06..69605fe12d74 100644 --- a/src/broker/modhash.h +++ b/src/broker/modhash.h @@ -22,9 +22,10 @@ typedef struct modhash modhash_t; /* Hash-o-modules, keyed by uuid + * Destructor returns the number of modules that had to be canceled. */ modhash_t *modhash_create (void); -void modhash_destroy (modhash_t *mh); +int modhash_destroy (modhash_t *mh); void modhash_add (modhash_t *mh, module_t *p); void modhash_remove (modhash_t *mh, module_t *p); From 38b57f73070a147778bfdde509bd30cd2b339203 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 15 Sep 2023 07:55:03 -0700 Subject: [PATCH 09/12] broker: get a zeromq context directly from libzmq Problem: Now that all zeromq sockets in the broker are allocated with zmq_socket() instead of zsock_new(), zsys is no longer invoked internally by czmq to track unclosed sockets, and thus does not need to be initialized. Get the zeromq context from zmq_ctx_new() instead of zsys_init(). Instead of zsys_shutdown() (via atexit(3)), call zmq_ctx_term(). However, don't call it if any modules had to be canceled, as it blocks when 0MQ sockets are leaked. The broker now exits with a return code of 1 when sockets are leaked. --- src/broker/broker.c | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 1ae49b5bf640..0345fd25078a 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -19,8 +19,7 @@ #include #include #include -#include -#undef streq // redefined by ccan/str/str.h below +#include #include #if HAVE_CALIPER #include @@ -250,27 +249,10 @@ int main (int argc, char *argv[]) || sigaction (SIGTERM, NULL, &old_sigact_term) < 0) log_err_exit ("error setting signal mask"); - /* Initialize libczmq zsys class. - * - * zsys_init() creates a global 0MQ context and starts the 0MQ I/O thread. - * The context is implicitly shared by users of the zsock class within - * the broker, including shmem connector, overlay.c, and module.c. - * libczmq tracks 0MQ sockets created with zsock, and any left open are - * closed by an atexit() handler to prevent zmq_ctx_term() from hanging. - * - * If something goes wrong, such as unclosed sockets in the atexit handler, - * czmq sends messages to its log class, which we redirect to stderr here. - * - * Disable czmq's internal signal handlers for SIGINT and SIGTERM, since - * the broker will install its own. - */ - if (!(ctx.zctx = zsys_init ())) { - log_err ("zsys_init"); + if (!(ctx.zctx = zmq_ctx_new ())) { + log_err ("zmq_ctx_new"); goto cleanup; } - zsys_set_logstream (stderr); - zsys_set_logident ("flux-broker"); - zsys_handler_set (NULL); /* Set up the flux reactor with support for child watchers. * Associate an internal flux_t handle with the reactor. @@ -533,7 +515,7 @@ int main (int argc, char *argv[]) */ attr_destroy (ctx.attrs); - modhash_destroy (ctx.modhash); + int module_leaks = modhash_destroy (ctx.modhash); zlist_destroy (&ctx.sigwatchers); shutdown_destroy (ctx.shutdown); state_machine_destroy (ctx.state_machine); @@ -550,6 +532,17 @@ int main (int argc, char *argv[]) free (ctx.init_shell_cmd); optparse_destroy (ctx.opts); + /* zmq_ctx_term() blocks if any 0mq sockets remain open, so skip it + * if any broker modules had to be canceled in modhash_destroy(). + */ + if (module_leaks > 0) { + log_msg ("skipping 0MQ shutdown due to presumed module socket leak"); + if (ctx.exit_rc == 0) + ctx.exit_rc = 1; + } + else + zmq_ctx_term (ctx.zctx); + return ctx.exit_rc; } From 83e26d06beb3f88e3a86eb8e1e2512eeff200f60 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 15 Sep 2023 11:06:20 -0700 Subject: [PATCH 10/12] testsuite: use zmq_socket() not zsock_new() Problem: creating sockets through czmq zsock_new() causes them to use a global context and be tracked by czmq which has proven problematic in production quality code. Use the libzmq API directly to avoid socket tracking. The libtestutil test server's interface changed: - the test no longer calls test_server_environment_init() - the test must pass a zeromq context to test_server_create() Although it increases boiler plate for users of the test server, czmq's implicit creation of an I/O thread and installation of an atexit(3) handler may confuse matters when debugging a test failure. --- src/common/libflux/test/attr.c | 10 +++-- src/common/libflux/test/event.c | 9 ++++- src/common/libflux/test/log.c | 8 +++- src/common/libflux/test/rpc.c | 17 +++++--- src/common/libflux/test/rpc_chained.c | 9 ++++- src/common/librouter/test/router.c | 10 ++++- src/common/librouter/test/servhash.c | 10 +++-- src/common/librouter/test/usock_echo.c | 9 ++++- src/common/librouter/test/usock_emfile.c | 9 ++++- src/common/librouter/test/usock_epipe.c | 9 ++++- src/common/libsubprocess/test/iochan.c | 8 +++- src/common/libsubprocess/test/iostress.c | 8 +++- src/common/libsubprocess/test/rcmdsrv.c | 4 +- src/common/libsubprocess/test/rcmdsrv.h | 2 +- src/common/libsubprocess/test/remote.c | 9 ++++- src/common/libterminus/test/pty.c | 20 ++++++---- src/common/libterminus/test/terminus.c | 21 ++++++---- src/common/libtestutil/util.c | 49 ++++++++++++------------ src/common/libtestutil/util.h | 15 ++------ src/common/libzmqutil/test/msg_zsock.c | 37 ++++++++---------- src/common/libzmqutil/test/reactor.c | 25 ++++++------ 21 files changed, 185 insertions(+), 113 deletions(-) diff --git a/src/common/libflux/test/attr.c b/src/common/libflux/test/attr.c index 02c3d439531b..ce981de50f71 100644 --- a/src/common/libflux/test/attr.c +++ b/src/common/libflux/test/attr.c @@ -14,6 +14,7 @@ #include #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libczmqcontainers/czmq_containers.h" @@ -185,12 +186,13 @@ int main (int argc, char *argv[]) flux_t *h; const char *value; const char *value2; + void *zctx; plan (NO_PLAN); - test_server_environment_init ("attr-test"); - - if (!(h = test_server_create (0, test_server, NULL))) + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + if (!(h = test_server_create (zctx, 0, test_server, NULL))) BAIL_OUT ("test_server_create failed"); /* get ENOENT */ @@ -405,6 +407,8 @@ int main (int argc, char *argv[]) test_server_stop (h); flux_close (h); + zmq_ctx_term (zctx); + done_testing(); return (0); } diff --git a/src/common/libflux/test/event.c b/src/common/libflux/test/event.c index 5decc38808f2..67e4adca87b8 100644 --- a/src/common/libflux/test/event.c +++ b/src/common/libflux/test/event.c @@ -12,6 +12,7 @@ #include "config.h" #endif #include +#include #include "src/common/libtap/tap.h" #include "src/common/libtestutil/util.h" #include "ccan/str/str.h" @@ -218,8 +219,12 @@ void test_subscribe_rpc (void) { flux_t *h; flux_future_t *f; + void *zctx; - if (!(h = test_server_create (0, test_server, NULL))) + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + + if (!(h = test_server_create (zctx, 0, test_server, NULL))) BAIL_OUT ("test_server_create: %s", strerror (errno)); ok (flux_event_subscribe (h, "fubar") == 0, @@ -275,6 +280,8 @@ void test_subscribe_rpc (void) if (test_server_stop (h) < 0) BAIL_OUT ("error stopping test server: %s", strerror (errno)); flux_close (h); + + zmq_ctx_term (zctx); } void test_subscribe_nosub (void) diff --git a/src/common/libflux/test/log.c b/src/common/libflux/test/log.c index eb1d67d3f671..74cb47611eee 100644 --- a/src/common/libflux/test/log.c +++ b/src/common/libflux/test/log.c @@ -13,6 +13,7 @@ #endif #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libtestutil/util.h" @@ -20,12 +21,14 @@ int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - test_server_environment_init ("log-test"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); - if (!(h = test_server_create (0, NULL, NULL))) + if (!(h = test_server_create (zctx, 0, NULL, NULL))) BAIL_OUT ("could not create test server"); if (flux_attr_set_cacheonly (h, "rank", "0") < 0) BAIL_OUT ("flux_attr_set_cacheonly failed"); @@ -45,6 +48,7 @@ int main (int argc, char *argv[]) test_server_stop (h); flux_close (h); + zmq_ctx_term (zctx); done_testing(); return (0); } diff --git a/src/common/libflux/test/rpc.c b/src/common/libflux/test/rpc.c index 3893941cf2eb..952a8bb4a13a 100644 --- a/src/common/libflux/test/rpc.c +++ b/src/common/libflux/test/rpc.c @@ -12,6 +12,7 @@ #include "config.h" #endif #include +#include #include #include "src/common/libtap/tap.h" @@ -914,18 +915,18 @@ static int fake_server_reactor (flux_t *h, void *arg) return flux_reactor_run (flux_get_reactor (h), 0); } -void test_fake_server (void) +void test_fake_server (void *zctx) { flux_t *h; - ok ((h = test_server_create (0, fake_server, NULL)) != NULL, + ok ((h = test_server_create (zctx, 0, fake_server, NULL)) != NULL, "test_server_create (recv loop)"); ok (test_server_stop (h) == 0, "test_server_stop worked"); flux_close (h); diag ("completed test with server recv loop"); - ok ((h = test_server_create (0, fake_server_reactor, NULL)) != NULL, + ok ((h = test_server_create (zctx, 0, fake_server_reactor, NULL)) != NULL, "test_server_create (reactor)"); ok ((test_server_stop (h)) == 0, "test_server_stop worked"); @@ -960,14 +961,16 @@ static int comms_err (flux_t *h, void *arg) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - test_server_environment_init ("rpc-test"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); - test_fake_server (); + test_fake_server (zctx); - h = test_server_create (0, test_server, NULL); + h = test_server_create (zctx, 0, test_server, NULL); ok (h != NULL, "created test server thread"); if (!h) @@ -998,6 +1001,8 @@ int main (int argc, char *argv[]) "stopped test server thread"); flux_close (h); // destroys test server + zmq_ctx_term (zctx); + done_testing(); return (0); } diff --git a/src/common/libflux/test/rpc_chained.c b/src/common/libflux/test/rpc_chained.c index dcdcca62b5b9..1f93fa47b03a 100644 --- a/src/common/libflux/test/rpc_chained.c +++ b/src/common/libflux/test/rpc_chained.c @@ -12,6 +12,7 @@ #include "config.h" #endif #include +#include #include "src/common/libtap/tap.h" #include "src/common/libtestutil/util.h" #include "ccan/str/str.h" @@ -345,12 +346,14 @@ void test_or_then_error_string (flux_t *h) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - test_server_environment_init ("rpc-chained-test"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); - h = test_server_create (0, test_server, NULL); + h = test_server_create (zctx, 0, test_server, NULL); ok (h != NULL, "created test server thread"); if (!h) @@ -370,6 +373,8 @@ int main (int argc, char *argv[]) "stopped test server thread"); flux_close (h); // destroys test server + zmq_ctx_term (zctx); + done_testing(); return (0); } diff --git a/src/common/librouter/test/router.c b/src/common/librouter/test/router.c index cad965fcd13a..1faf0d7df81b 100644 --- a/src/common/librouter/test/router.c +++ b/src/common/librouter/test/router.c @@ -12,6 +12,7 @@ #include "config.h" #endif #include +#include #include "src/common/libtap/tap.h" #include "src/common/libtestutil/util.h" @@ -266,13 +267,16 @@ void test_error (flux_t *h) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + diag ("starting test server"); - test_server_environment_init ("test_router"); - if (!(h = test_server_create (FLUX_O_TEST_NOSUB, server_cb, NULL))) + if (!(h = test_server_create (zctx, FLUX_O_TEST_NOSUB, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_basic (h); @@ -283,6 +287,8 @@ int main (int argc, char *argv[]) BAIL_OUT ("test_server_stop failed"); flux_close (h); + zmq_ctx_term (zctx); + done_testing (); return 0; diff --git a/src/common/librouter/test/servhash.c b/src/common/librouter/test/servhash.c index 3548412595e8..4d51ffe342ab 100644 --- a/src/common/librouter/test/servhash.c +++ b/src/common/librouter/test/servhash.c @@ -12,6 +12,7 @@ #include "config.h" #endif #include +#include #include "src/common/libtap/tap.h" #include "src/common/libczmqcontainers/czmq_containers.h" @@ -265,13 +266,16 @@ void test_basic (flux_t *h) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + diag ("starting test server"); - test_server_environment_init ("test_router"); - if (!(h = test_server_create (0, server_cb, NULL))) + if (!(h = test_server_create (zctx, 0, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_basic (h); @@ -281,7 +285,7 @@ int main (int argc, char *argv[]) if (test_server_stop (h) < 0) BAIL_OUT ("test_server_stop failed"); flux_close (h); - + zmq_ctx_term (zctx); done_testing (); return 0; diff --git a/src/common/librouter/test/usock_echo.c b/src/common/librouter/test/usock_echo.c index edd796352ee3..b3453c96b2d6 100644 --- a/src/common/librouter/test/usock_echo.c +++ b/src/common/librouter/test/usock_echo.c @@ -13,6 +13,7 @@ #endif #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libutil/unlink_recursive.h" @@ -291,17 +292,20 @@ static void test_async_stream (flux_t *h, int size, int count) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("cannot create zeromq context"); + tmpdir_create (); signal (SIGPIPE, SIG_IGN); diag ("starting test server"); - test_server_environment_init ("usock_server"); - if (!(h = test_server_create (0, server_cb, NULL))) + if (!(h = test_server_create (zctx, 0, server_cb, NULL))) BAIL_OUT ("test_server_create failed"); test_early_disconnect (h); @@ -317,6 +321,7 @@ int main (int argc, char *argv[]) flux_close (h); tmpdir_destroy (); + zmq_ctx_term (zctx); done_testing (); return 0; diff --git a/src/common/librouter/test/usock_emfile.c b/src/common/librouter/test/usock_emfile.c index 5514251cb282..0208b252a192 100644 --- a/src/common/librouter/test/usock_emfile.c +++ b/src/common/librouter/test/usock_emfile.c @@ -15,6 +15,7 @@ #include #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libutil/unlink_recursive.h" @@ -258,9 +259,13 @@ int main (int argc, char *argv[]) struct test_params tp = {0}; struct client_args cargs[2]; flux_t *h; + void *zctx; plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + tmpdir_create (); if (snprintf (sockpath, sizeof (sockpath), @@ -271,9 +276,8 @@ int main (int argc, char *argv[]) signal (SIGPIPE, SIG_IGN); diag ("starting test server"); - test_server_environment_init ("usock_server"); - if (!(h = test_server_create (0, server_cb, &tp))) + if (!(h = test_server_create (zctx, 0, server_cb, &tp))) BAIL_OUT ("test_server_create failed"); wait_for_server (); @@ -341,6 +345,7 @@ int main (int argc, char *argv[]) tmpdir_destroy (); diag ("fds_inuse = %d", fds_inuse ()); + zmq_ctx_term (zctx); done_testing (); return 0; } diff --git a/src/common/librouter/test/usock_epipe.c b/src/common/librouter/test/usock_epipe.c index 155196a60124..2b1ad97753de 100644 --- a/src/common/librouter/test/usock_epipe.c +++ b/src/common/librouter/test/usock_epipe.c @@ -14,6 +14,7 @@ #include #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libutil/unlink_recursive.h" @@ -217,17 +218,20 @@ int main (int argc, char *argv[]) { struct test_params tp = {0}; flux_t *h; + void *zctx; plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("cannot create zeromq context"); + tmpdir_create (); signal (SIGPIPE, SIG_IGN); diag ("starting test server"); - test_server_environment_init ("usock_server"); - if (!(h = test_server_create (0, server_cb, &tp))) + if (!(h = test_server_create (zctx, 0, server_cb, &tp))) BAIL_OUT ("test_server_create failed"); test_send_and_exit (h, 1); @@ -252,6 +256,7 @@ int main (int argc, char *argv[]) flux_close (h); tmpdir_destroy (); + zmq_ctx_term (zctx); done_testing (); return 0; diff --git a/src/common/libsubprocess/test/iochan.c b/src/common/libsubprocess/test/iochan.c index 55a743650068..1daead937216 100644 --- a/src/common/libsubprocess/test/iochan.c +++ b/src/common/libsubprocess/test/iochan.c @@ -14,6 +14,7 @@ #include // environ def #include #include +#include #include "ccan/array_size/array_size.h" #include "ccan/str/str.h" @@ -252,10 +253,14 @@ bool iochan_run_check (flux_t *h, const char *name, int count) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - h = rcmdsrv_create ("rexec"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + + h = rcmdsrv_create (zctx, "rexec"); ok (iochan_run_check (h, "simple", linesize * 100), "simple check worked"); @@ -267,6 +272,7 @@ int main (int argc, char *argv[]) "refcount check worked"); test_server_stop (h); flux_close (h); + zmq_ctx_term (zctx); done_testing (); return 0; diff --git a/src/common/libsubprocess/test/iostress.c b/src/common/libsubprocess/test/iostress.c index 02259392b428..bb125b0d29c0 100644 --- a/src/common/libsubprocess/test/iostress.c +++ b/src/common/libsubprocess/test/iostress.c @@ -14,6 +14,7 @@ #include // environ def #include #include +#include #include "ccan/array_size/array_size.h" #include "ccan/str/str.h" @@ -278,10 +279,14 @@ bool iostress_run_check (flux_t *h, int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - h = rcmdsrv_create ("rexec"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + + h = rcmdsrv_create (zctx, "rexec"); ok (iostress_run_check (h, "balanced", false, 0, 0, 8, 8, 80), "balanced worked"); @@ -304,6 +309,7 @@ int main (int argc, char *argv[]) test_server_stop (h); flux_close (h); + zmq_ctx_term (zctx); done_testing (); return 0; } diff --git a/src/common/libsubprocess/test/rcmdsrv.c b/src/common/libsubprocess/test/rcmdsrv.c index 530767510b57..2dc988dd84ba 100644 --- a/src/common/libsubprocess/test/rcmdsrv.c +++ b/src/common/libsubprocess/test/rcmdsrv.c @@ -74,7 +74,7 @@ static int test_server (flux_t *h, void *arg) return rc; } -flux_t *rcmdsrv_create (const char *service_name) +flux_t *rcmdsrv_create (void *zctx, const char *service_name) { flux_t *h; @@ -83,7 +83,7 @@ flux_t *rcmdsrv_create (const char *service_name) signal (SIGPIPE, SIG_IGN); // N.B. test reactor is created with FLUX_REACTOR_SIGCHLD flag - if (!(h = test_server_create (0, test_server, (char *)service_name))) + if (!(h = test_server_create (zctx, 0, test_server, (char *)service_name))) BAIL_OUT ("test_server_create failed"); return h; diff --git a/src/common/libsubprocess/test/rcmdsrv.h b/src/common/libsubprocess/test/rcmdsrv.h index 9027d5c190f9..bf5713afe5cb 100644 --- a/src/common/libsubprocess/test/rcmdsrv.h +++ b/src/common/libsubprocess/test/rcmdsrv.h @@ -14,7 +14,7 @@ /* Start subprocess server. Returns one end of back-to-back flux_t test * handle. Call test_server_stop (h) when done to join with server thread. */ -flux_t *rcmdsrv_create (const char *service_name); +flux_t *rcmdsrv_create (void *zctx, const char *service_name); /* llog-compatible logger */ diff --git a/src/common/libsubprocess/test/remote.c b/src/common/libsubprocess/test/remote.c index 2c858e3f14f7..301e45f1af01 100644 --- a/src/common/libsubprocess/test/remote.c +++ b/src/common/libsubprocess/test/remote.c @@ -14,6 +14,7 @@ #include // environ def #include #include +#include #include "ccan/array_size/array_size.h" #include "ccan/str/str.h" @@ -342,10 +343,14 @@ void sigstop_test (flux_t *h) int main (int argc, char *argv[]) { flux_t *h; + void *zctx; plan (NO_PLAN); - h = rcmdsrv_create (SERVER_NAME); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + + h = rcmdsrv_create (zctx, SERVER_NAME); simple_test (h); sigstop_test (h); @@ -353,6 +358,8 @@ int main (int argc, char *argv[]) test_server_stop (h); flux_close (h); + zmq_ctx_term (zctx); + done_testing (); return 0; } diff --git a/src/common/libterminus/test/pty.c b/src/common/libterminus/test/pty.c index 8d12abc6543b..12fa686672e8 100644 --- a/src/common/libterminus/test/pty.c +++ b/src/common/libterminus/test/pty.c @@ -17,6 +17,7 @@ #include #include +#include #include "src/common/libterminus/pty.h" #include "src/common/libtap/tap.h" @@ -174,9 +175,9 @@ static int pty_server (flux_t *h, void *arg) return rc; } -static void test_basic_protocol (void) +static void test_basic_protocol (void *zctx) { - flux_t *h = test_server_create (0, pty_server, NULL); + flux_t *h = test_server_create (zctx, 0, pty_server, NULL); flux_future_t *f = NULL; flux_future_t *f_attach = NULL; @@ -393,9 +394,9 @@ static void pty_exit_cb (struct flux_pty_client *c, void *arg) flux_reactor_stop (flux_get_reactor (h)); } -static void test_client() +static void test_client (void *zctx) { - flux_t *h = test_server_create (0, pty_server, NULL); + flux_t *h = test_server_create (zctx, 0, pty_server, NULL); flux_future_t *f = NULL; int rc; int flags = FLUX_PTY_CLIENT_ATTACH_SYNC @@ -481,14 +482,19 @@ void test_monitor () int main (int argc, char *argv[]) { + void *zctx; + plan (NO_PLAN); - test_server_environment_init ("pty"); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + test_invalid_args (); test_empty_server (); - test_basic_protocol (); - test_client (); + test_basic_protocol (zctx); + test_client (zctx); test_monitor (); + zmq_ctx_term (zctx); done_testing (); return 0; } diff --git a/src/common/libterminus/test/terminus.c b/src/common/libterminus/test/terminus.c index 2d92f29f844f..9bcd2051b527 100644 --- a/src/common/libterminus/test/terminus.c +++ b/src/common/libterminus/test/terminus.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "src/common/libterminus/terminus.h" @@ -86,11 +87,11 @@ static int terminus_server (flux_t *h, void *arg) return rc; } -static void test_kill_server_empty (void) +static void test_kill_server_empty (void *zctx) { int rc; flux_future_t *f = NULL; - flux_t *h = test_server_create (0, terminus_server, NULL); + flux_t *h = test_server_create (zctx, 0, terminus_server, NULL); /* kill-server */ @@ -116,12 +117,12 @@ static void test_kill_server_empty (void) flux_close (h); } -static void test_protocol (void) +static void test_protocol (void *zctx) { int rc; json_t *o = NULL; flux_future_t *f = NULL; - flux_t *h = test_server_create (0, terminus_server, NULL); + flux_t *h = test_server_create (zctx, 0, terminus_server, NULL); const char *service = NULL; const char *name = NULL; @@ -470,16 +471,22 @@ void test_open_close_session (void) int main (int argc, char *argv[]) { + void *zctx; + plan (NO_PLAN); /* Make sure SHELL is set in environment. */ if (getenv ("SHELL") == NULL) setenv ("SHELL", "/bin/sh", 1); - test_server_environment_init ("terminus"); + + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + test_invalid_args (); - test_kill_server_empty (); - test_protocol (); + test_kill_server_empty (zctx); + test_protocol (zctx); test_open_close_session (); + zmq_ctx_term (zctx); done_testing (); return 0; } diff --git a/src/common/libtestutil/util.c b/src/common/libtestutil/util.c index 1330c1d23ced..734e4f32d1d1 100644 --- a/src/common/libtestutil/util.c +++ b/src/common/libtestutil/util.c @@ -26,8 +26,8 @@ #define UUID_STR_LEN 37 // defined in later libuuid headers #endif - struct test_server { + void *zctx; flux_t *c; flux_t *s; flux_msg_handler_t *shutdown_mh; @@ -40,8 +40,10 @@ struct test_server { char uuid_str[UUID_STR_LEN]; }; -static flux_t *test_connector_create (const char *shmem_name, - bool server, int flags); +static flux_t *test_connector_create (void *zctx, + const char *shmem_name, + bool server, + int flags); void shutdown_cb (flux_t *h, flux_msg_handler_t *mh, @@ -122,7 +124,7 @@ static void test_server_destroy (struct test_server *a) } } -flux_t *test_server_create (int cflags, test_server_f cb, void *arg) +flux_t *test_server_create (void *zctx, int cflags, test_server_f cb, void *arg) { int e; struct test_server *a; @@ -132,6 +134,7 @@ flux_t *test_server_create (int cflags, test_server_f cb, void *arg) BAIL_OUT ("calloc"); a->cb = cb; a->arg = arg; + a->zctx = zctx; uuid_generate (a->uuid); uuid_unparse (a->uuid, a->uuid_str); @@ -141,9 +144,9 @@ flux_t *test_server_create (int cflags, test_server_f cb, void *arg) /* Create back-to-back wired flux_t handles */ - if (!(a->s = test_connector_create (a->uuid_str, true, sflags))) + if (!(a->s = test_connector_create (a->zctx, a->uuid_str, true, sflags))) BAIL_OUT ("test_connector_create server"); - if (!(a->c = test_connector_create (a->uuid_str, false, cflags))) + if (!(a->c = test_connector_create (a->zctx, a->uuid_str, false, cflags))) BAIL_OUT ("test_connector_create client"); /* If no callback, register watcher for all messages so we can log them. @@ -176,20 +179,11 @@ flux_t *test_server_create (int cflags, test_server_f cb, void *arg) return a->c; } -void test_server_environment_init (const char *test_name) -{ - zsys_init (); - zsys_set_logstream (stderr); - zsys_set_logident (test_name); - zsys_handler_set (NULL); - zsys_set_linger (5); // msec -} - /* Test connector implementation */ struct test_connector { - zsock_t *sock; + void *sock; flux_t *h; struct flux_msg_cred cred; }; @@ -274,7 +268,7 @@ static void test_connector_fini (void *impl) { struct test_connector *tcon = impl; - zsock_destroy (&tcon->sock); + zmq_close (tcon->sock); free (tcon); } @@ -288,25 +282,30 @@ static const struct flux_handle_ops handle_ops = { .impl_destroy = test_connector_fini, }; -static flux_t *test_connector_create (const char *shmem_name, - bool server, int flags) +static flux_t *test_connector_create (void *zctx, + const char *shmem_name, + bool server, + int flags) { struct test_connector *tcon; + char uri[256]; if (!(tcon = calloc (1, sizeof (*tcon)))) BAIL_OUT ("calloc"); tcon->cred.userid = getuid (); tcon->cred.rolemask = FLUX_ROLE_OWNER; - if (!(tcon->sock = zsock_new_pair (NULL))) - BAIL_OUT ("zsock_new_pair"); + if (!(tcon->sock = zmq_socket (zctx, ZMQ_PAIR))) + BAIL_OUT ("zmq_socket failed"); zsock_set_unbounded (tcon->sock); + zsock_set_linger (tcon->sock, 5); + snprintf (uri, sizeof (uri), "inproc://%s", shmem_name); if (server) { - if (zsock_bind (tcon->sock, "inproc://%s", shmem_name) < 0) - BAIL_OUT ("zsock_bind %s", shmem_name); + if (zmq_bind (tcon->sock, uri) < 0) + BAIL_OUT ("zmq_bind %s", uri); } else { - if (zsock_connect (tcon->sock, "inproc://%s", shmem_name) < 0) - BAIL_OUT ("zsock_connect %s", shmem_name); + if (zmq_connect (tcon->sock, uri) < 0) + BAIL_OUT ("zmq_connect %s", uri); } if (!(tcon->h = flux_handle_create (tcon, &handle_ops, flags))) BAIL_OUT ("flux_handle_create"); diff --git a/src/common/libtestutil/util.h b/src/common/libtestutil/util.h index 26a313ff391e..9d260c1a9a61 100644 --- a/src/common/libtestutil/util.h +++ b/src/common/libtestutil/util.h @@ -21,22 +21,18 @@ * 3) broker attributes (such as rank and size) are unavailable * 4) message nodeid is ignored * - * Unit tests that use the test server should call - * test_server_environment_init() once prior to creating the first server - * to initialize czmq's runtime. - * * If callback is NULL, a default callback is run that logs each * message received with diag(). */ typedef int (*test_server_f)(flux_t *h, void *arg); -flux_t *test_server_create (int flags, test_server_f cb, void *arg); +flux_t *test_server_create (void *zctx, + int flags, + test_server_f cb, + void *arg); int test_server_stop (flux_t *c); -void test_server_environment_init (const char *test_name); - - /* Create a loopback connector for testing. * The net effect is much the same as flux_open("loop://") except * the implementation is self contained here. Close with flux_close(). @@ -44,8 +40,5 @@ void test_server_environment_init (const char *test_name); * Like loop://, this support test manipulation of credentials: * flux_opt_set (h, FLUX_OPT_TESTING_USERID, &userid, sizeof (userid); * flux_opt_set (h, FLUX_OPT_TESTING_ROLEMASK, &rolemask, sizeof (rolemask)) - * - * N.B. No need to call test_server_environment_init() if this is the - * only component used from this module. */ flux_t *loopback_create (int flags); diff --git a/src/common/libzmqutil/test/msg_zsock.c b/src/common/libzmqutil/test/msg_zsock.c index 7fa17d94f29b..75ae998f860a 100644 --- a/src/common/libzmqutil/test/msg_zsock.c +++ b/src/common/libzmqutil/test/msg_zsock.c @@ -20,29 +20,25 @@ #include "src/common/libzmqutil/msg_zsock.h" #include "src/common/libtap/tap.h" +static void *zctx; + void check_sendzsock (void) { - zsock_t *zsock[2] = { NULL, NULL }; + void *zsock[2] = { NULL, NULL }; flux_msg_t *any, *msg, *msg2; const char *topic; int type; const char *uri = "inproc://test"; - /* zsys boiler plate: - * appears to be needed to avoid atexit assertions when lives_ok() - * macro (which calls fork()) is used. - */ - zsys_init (); - zsys_set_logstream (stderr); - zsys_set_logident ("test_message.t"); - zsys_handler_set (NULL); - zsys_set_linger (5); // msec - - ok ((zsock[0] = zsock_new_pair (NULL)) != NULL - && zsock_bind (zsock[0], "%s", uri) == 0 - && (zsock[1] = zsock_new_pair (uri)) != NULL, + ok ((zsock[0] = zmq_socket (zctx, ZMQ_PAIR)) != NULL + && zmq_bind (zsock[0], uri) == 0 + && (zsock[1] = zmq_socket (zctx, ZMQ_PAIR)) != NULL + && zmq_connect( zsock[1], uri) == 0, "got inproc socket pair"); + zsock_set_linger (zsock[0], 5); + zsock_set_linger (zsock[1], 5); + if (!(any = flux_msg_create (FLUX_MSGTYPE_ANY))) BAIL_OUT ("flux_msg_create failed"); @@ -87,20 +83,21 @@ void check_sendzsock (void) flux_msg_destroy (msg2); flux_msg_destroy (msg); - zsock_destroy (&zsock[0]); - zsock_destroy (&zsock[1]); - - /* zsys boiler plate - see note above - */ - zsys_shutdown(); + zmq_close (zsock[0]); + zmq_close (zsock[1]); } int main (int argc, char *argv[]) { plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("could not create zeromq context"); + check_sendzsock (); + zmq_ctx_term (zctx); + done_testing(); return (0); } diff --git a/src/common/libzmqutil/test/reactor.c b/src/common/libzmqutil/test/reactor.c index a7a95d6c498a..88ef60ec74d9 100644 --- a/src/common/libzmqutil/test/reactor.c +++ b/src/common/libzmqutil/test/reactor.c @@ -23,6 +23,7 @@ #include "src/common/libzmqutil/reactor.h" static const size_t zmqwriter_msgcount = 1024; +static void *zctx; static void zmqwriter (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg) @@ -83,18 +84,15 @@ static void zmqreader (flux_reactor_t *r, flux_watcher_t *w, static void test_zmq (flux_reactor_t *reactor) { - zsock_t *zs[2]; + void *zs[2]; flux_watcher_t *r, *w; const char *uri = "inproc://test_zmq"; - zsys_set_logstream (stderr); - zsys_handler_set (NULL); - - zs[0] = zsock_new_pair (NULL); - zs[1] = zsock_new_pair (NULL); + zs[0] = zmq_socket (zctx, ZMQ_PAIR); + zs[1] = zmq_socket (zctx, ZMQ_PAIR); ok (zs[0] && zs[1] - && zsock_bind (zs[0], "%s", uri) == 0 - && zsock_connect (zs[1], "%s", uri) == 0, + && zmq_bind (zs[0], uri) == 0 + && zmq_connect (zs[1], uri) == 0, "zmq: connected ZMQ_PAIR sockets over inproc"); r = zmqutil_watcher_create (reactor, zs[0], FLUX_POLLIN, zmqreader, NULL); w = zmqutil_watcher_create (reactor, zs[1], FLUX_POLLOUT, zmqwriter, NULL); @@ -109,10 +107,8 @@ static void test_zmq (flux_reactor_t *reactor) flux_watcher_destroy (r); flux_watcher_destroy (w); - zsock_destroy (&zs[0]); - zsock_destroy (&zs[1]); - - zsys_shutdown (); + zmq_close (zs[0]); + zmq_close (zs[1]); } int main (int argc, char *argv[]) @@ -121,6 +117,9 @@ int main (int argc, char *argv[]) plan (NO_PLAN); + if (!(zctx = zmq_ctx_new ())) + BAIL_OUT ("cannot create zmq context"); + ok ((reactor = flux_reactor_create (0)) != NULL, "created reactor"); if (!reactor) @@ -130,6 +129,8 @@ int main (int argc, char *argv[]) flux_reactor_destroy (reactor); + zmq_ctx_term (zctx); + done_testing(); return (0); } From df719c7927bc34f546871273b431c6dfb1a9ed90 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 15 Sep 2023 13:20:59 -0700 Subject: [PATCH 11/12] testsuite: cover module unload error Problem: the broker fails when a module is not unloaded, but there are no tests that explicitly check for that. Add a test to t0003-module.t and ensure that the error message is appropriate and helpful. --- t/t0003-module.t | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/t/t0003-module.t b/t/t0003-module.t index 9d4ef9635241..05b84fdd02f0 100755 --- a/t/t0003-module.t +++ b/t/t0003-module.t @@ -278,6 +278,16 @@ test_expect_success 'module: configuration object is cached' ' test_expect_success 'module: remove testmod if loaded' ' flux module remove -f testmod ' - +test_expect_success 'module: load without unload causes broker failure' ' + test_must_fail flux start \ + -o,-Sbroker.rc1_path=,-Sbroker.rc3_path= \ + flux module load content 2>nounload.err +' +test_expect_success 'module: socket leak is called out' ' + grep "socket leak" nounload.err +' +test_expect_success 'module: leaked module name is called out' ' + grep ".content. was not properly shut down" nounload.err +' test_done From 7c21568bcd0fdf27420043a4ea11bb4d7a125164 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 18 Sep 2023 09:34:59 -0700 Subject: [PATCH 12/12] broker/module: comment unchecked snprintf Problem: an snprintf() return value is not checked. Cast to void and add a comment explaining why it cannot fail. --- src/broker/module.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/broker/module.c b/src/broker/module.c index 4967420cbc71..c08be1ea496b 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -372,10 +372,11 @@ module_t *module_create (flux_t *h, } zsock_set_unbounded (p->sock); zsock_set_linger (p->sock, 5); - snprintf (p->endpoint, - sizeof (p->endpoint), - "inproc://%s", - module_get_uuid (p)); + // copying 9 + 37 + 1 = 47 bytes into 128 byte buffer cannot fail + (void)snprintf (p->endpoint, + sizeof (p->endpoint), + "inproc://%s", + module_get_uuid (p)); if (zmq_bind (p->sock, p->endpoint) < 0) { errprintf (error, "zmq_bind %s: %s", p->endpoint, strerror (errno)); goto cleanup;