diff --git a/configure.ac b/configure.ac index 45165930..333da53f 100644 --- a/configure.ac +++ b/configure.ac @@ -169,9 +169,11 @@ echo " CXXFLAGS........... $CXXFLAGS FLUX..............: $FLUX FLUX_VERSION......: $($FLUX version | sed -n 's/libflux-core:\t*//p') - FLUX_CORE_CFLAGS.:. $FLUX_CORE_CFLAGS + FLUX_CORE_CFLAGS..: $FLUX_CORE_CFLAGS + FLUX_IDSET_CFLAGS.: $FLUX_IDSET_CFLAGS FLUX_CORE_LIBS....: $FLUX_CORE_LIBS FLUX_HOSTLIST_LIBS: $FLUX_HOSTLIST_LIBS + FLUX_IDSET_LIBS...: $FLUX_IDSET_LIBS LIBFLUX_VERSION...: $LIBFLUX_VERSION FLUX_PREFIX.......: $FLUX_PREFIX LDFLAGS...........: $LDFLAGS diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 35576e03..aae17b25 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -1,6 +1,6 @@ AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS) -AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) +AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) $(FLUX_IDSET_CFLAGS) AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared @@ -11,3 +11,4 @@ jobtap_LTLIBRARIES = mf_priority.la mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module +mf_priority_la_LIBADD = $(FLUX_IDSET_LIBS) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index e1825d04..308e14e0 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -17,6 +17,7 @@ extern "C" { #endif #include #include +#include #include } @@ -43,6 +44,9 @@ extern "C" { #define DEFAULT_QUEUE_WEIGHT 10000 #define DEFAULT_AGE_WEIGHT 1000 +// set up cores-per-node count for the system +size_t ncores_per_node = 0; + std::map> users; std::map queues; std::map users_def_bank; @@ -254,9 +258,11 @@ static int query_cb (flux_plugin_t *p, if (flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT, - "{s:O}", + "{s:O s:i}", "mf_priority_map", - accounting_data) < 0) + accounting_data, + "ncores_per_node", + ncores_per_node) < 0) flux_log_error (flux_jobtap_get_flux (p), "mf_priority: query_cb: flux_plugin_arg_pack: %s", flux_plugin_arg_strerror (args)); @@ -1184,6 +1190,49 @@ extern "C" int flux_plugin_init (flux_plugin_t *p) priority_weights["queue"] = DEFAULT_QUEUE_WEIGHT; priority_weights["age"] = DEFAULT_AGE_WEIGHT; + // initialize the plugin with total node and core counts + flux_t *h; + flux_future_t *f; + const char *core; + + h = flux_jobtap_get_flux (p); + // This synchronous call to fetch R from the KVS is needed in order to + // validate and enforce resource limits on jobs. The job manager will + // block here while waiting for R when the plugin is loaded but it *should* + // occur over a very short time. + if (!(f = flux_kvs_lookup (h, + NULL, + FLUX_KVS_WAITCREATE, + "resource.R"))) { + flux_log_error (h, "flux_kvs_lookup"); + return -1; + } + // Equal number of cores on all nodes in R is assumed here; thus, only + // the first entry is looked at + if (flux_kvs_lookup_get_unpack (f, + "{s{s[{s{s:s}}]}}", + "execution", + "R_lite", + "children", + "core", &core) < 0) { + flux_log_error (h, "flux_kvs_lookup_unpack"); + return -1; + } + + if (core == NULL) { + flux_log_error (h, + "mf_priority: could not get system " + "cores-per-node information"); + return -1; + } + + // calculate number of cores-per-node on system + idset* cores_decoded = idset_decode (core); + ncores_per_node = idset_count (cores_decoded); + + flux_future_destroy (f); + idset_destroy (cores_decoded); + return 0; } diff --git a/t/Makefile.am b/t/Makefile.am index a84e27e9..8274d920 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -43,6 +43,7 @@ TESTSCRIPTS = \ t1041-view-jobs-by-project.t \ t1042-issue508.t \ t1043-view-jobs-by-bank.t \ + t1044-mf-priority-resource-counting.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1044-mf-priority-resource-counting.t b/t/t1044-mf-priority-resource-counting.t new file mode 100755 index 00000000..8bb1fd8c --- /dev/null +++ b/t/t1044-mf-priority-resource-counting.t @@ -0,0 +1,31 @@ +#!/bin/bash + +test_description='test calculating and storing system core information in priority plugin' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so + +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 4 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'check that cores-per-node count is correct' ' + flux jobtap query mf_priority.so > query.json && + test_debug "jq -S . ncores_per_node.test && + jq -e ".ncores_per_node == $(cat ncores_per_node.test)"