Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add estimation of cores-per-node count on system during initialization #469

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ echo "
CXXFLAGS........... $CXXFLAGS
FLUX..............: $FLUX
FLUX_VERSION......: $($FLUX version | sed -n 's/libflux-core:\t*//p')
FLUX_CORE_CFLAGS.:. $FLUX_CORE_CFLAGS
FLUX_CORE_CFLAGS..: $FLUX_CORE_CFLAGS
FLUX_IDSET_CFLAGS.: $FLUX_IDSET_CFLAGS
FLUX_CORE_LIBS....: $FLUX_CORE_LIBS
FLUX_HOSTLIST_LIBS: $FLUX_HOSTLIST_LIBS
FLUX_IDSET_LIBS...: $FLUX_IDSET_LIBS
LIBFLUX_VERSION...: $LIBFLUX_VERSION
FLUX_PREFIX.......: $FLUX_PREFIX
LDFLAGS...........: $LDFLAGS
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS)

AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS)
AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) $(FLUX_IDSET_CFLAGS)

AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared

Expand All @@ -11,3 +11,4 @@ jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
mf_priority_la_LIBADD = $(FLUX_IDSET_LIBS)
53 changes: 51 additions & 2 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#endif
#include <flux/core.h>
#include <flux/jobtap.h>
#include <flux/idset.h>
#include <jansson.h>
}

Expand All @@ -43,6 +44,9 @@ extern "C" {
#define DEFAULT_QUEUE_WEIGHT 10000
#define DEFAULT_AGE_WEIGHT 1000

// set up cores-per-node count for the system
size_t ncores_per_node = 0;

std::map<int, std::map<std::string, Association>> users;
std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;
Expand Down Expand Up @@ -254,9 +258,11 @@ static int query_cb (flux_plugin_t *p,

if (flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:O}",
"{s:O s:i}",
"mf_priority_map",
accounting_data) < 0)
accounting_data,
"ncores_per_node",
ncores_per_node) < 0)
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: query_cb: flux_plugin_arg_pack: %s",
flux_plugin_arg_strerror (args));
Expand Down Expand Up @@ -1180,6 +1186,49 @@ extern "C" int flux_plugin_init (flux_plugin_t *p)
priority_weights["queue"] = DEFAULT_QUEUE_WEIGHT;
priority_weights["age"] = DEFAULT_AGE_WEIGHT;

// initialize the plugin with total node and core counts
flux_t *h;
flux_future_t *f;
const char *core;

h = flux_jobtap_get_flux (p);
// This synchronous call to fetch R from the KVS is needed in order to
// validate and enforce resource limits on jobs. The job manager will
// block here while waiting for R when the plugin is loaded but it *should*
// occur over a very short time.
if (!(f = flux_kvs_lookup (h,
NULL,
FLUX_KVS_WAITCREATE,
"resource.R"))) {
flux_log_error (h, "flux_kvs_lookup");
return -1;
}
// Equal number of cores on all nodes in R is assumed here; thus, only
// the first entry is looked at
if (flux_kvs_lookup_get_unpack (f,
"{s{s[{s{s:s}}]}}",
"execution",
"R_lite",
"children",
"core", &core) < 0) {
Comment on lines +1209 to +1213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R may have multiple entries in the R_lite array. To handle possible heterogeneity, you could iterate each entry and use the maximum number of cores found.

Probably ok if this is just a first cut, though. If so, I'd put a comment stating that "equal number of cores on all nodes in R is assumed, so we only look at the first entry" or simimlar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for mentioning this, I was unaware that it could have multiple entries. If it has multiple entries, would it look like this?

{
  "version": 1,
  "execution": {
    "R_lite": [
      {
        "rank": "19-22",
        "children": {
          "core": "0-47",
        }
      },
      {
        "rank": "23-29",
        "children": {
          "core": "0-15",
        }
      }
    ]
  }
}

flux_log_error (h, "flux_kvs_lookup_unpack");
return -1;
}
Comment on lines +1208 to +1216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I would request that this synchronous get be replaced with an asynchronous flux_future_then(3) and the parsing of R handled in a callback. However, it seems like the result is required for validation of jobs, and you probably don't want to let some jobs through erroneously while waiting for resource.R, so perhaps this is actually the right solution. The job manager will block here while waiting for R when mf_priority.so is loaded, but it should be a very short time, and while this is occurring job management will also pause. Most of the time this will occur during job manager module load, which has other synchronous work anyway.

I'd at least suggest a comment here describing why a synchronous get is used in this case.

@garlick: any other thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FLUX_KVS_WATCH flag should be dropped. That's only used when you want to receive a response for every change to the key, and here the future is being destroyed after the first response.

IRL, we only load mf_priority.so in the system instance so resource.R should exist already and the KVS lookup should be fast. FLUX_KVS_WAITCREATE is probably is needed in test where the plugin is loaded in a test instance and resource.R is dynamically discovered though.

Yes a comment would be good since synchronous activities always raise eyebrows.


if (core == NULL) {
flux_log_error (h,
"mf_priority: could not get system "
"cores-per-node information");
return -1;
}

// calculate number of cores-per-node on system
idset* cores_decoded = idset_decode (core);
ncores_per_node = idset_count (cores_decoded);

flux_future_destroy (f);
idset_destroy (cores_decoded);

return 0;
}

Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ TESTSCRIPTS = \
t1038-hierarchy-small-tie-all-db.t \
t1039-issue476.t \
t1040-mf-priority-projects.t \
t1041-mf-priority-resource-counting.t \
t5000-valgrind.t \
python/t1000-example.py \
python/t1001_db.py \
Expand Down
31 changes: 31 additions & 0 deletions t/t1041-mf-priority-resource-counting.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

test_description='test calculating and storing system core information in priority plugin'

. `dirname $0`/sharness.sh

mkdir -p conf.d

MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so

export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 4 job -o,--config-path=$(pwd)/conf.d

flux setattr log-stderr-level 1

test_expect_success 'load multi-factor priority plugin' '
flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY}
'

test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'check that cores-per-node count is correct' '
flux jobtap query mf_priority.so > query.json &&
test_debug "jq -S . <query.json" &&
flux resource R --include=0 | flux R decode --count=core > ncores_per_node.test &&
jq -e ".ncores_per_node == $(cat ncores_per_node.test)" <query.json
'

test_done
Loading