From 1a85cd68349c204bc2c931b55c0ab66fe306490b Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 19 Mar 2024 12:33:24 -0700 Subject: [PATCH 1/3] plugin: add project validation Add project validation to the job.validate callback, which checks that if a project is specified by an association when a job is submitted, it: 1) is a valid project to submit jobs under, and 2) is a valid project for the association to submit jobs under. If either one of these checks fail, the job is rejected with a message including the project name. --- src/plugins/mf_priority.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 93c32dfd..7058daab 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -588,6 +588,7 @@ static int validate_cb (flux_plugin_t *p, int userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; flux_job_state_t state; int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0; double fairshare = 0.0; @@ -599,11 +600,12 @@ static int validate_cb (flux_plugin_t *p, flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s:i, s{s{s{s?s, s?s, s?s}}}}", "userid", &userid, "state", &state, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -639,6 +641,15 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", queue); + if (project != NULL) { + // a project was specified on job submission; validate it + if (get_project_info (project, a->projects, projects) < 0) + // the association specified a project that they do not belong to + // or that flux-accounting does not know about; reject the job + return flux_jobtap_reject_job (p, args, "project not valid for " + "user: %s", project); + } + cur_active_jobs = a->cur_active_jobs; max_active_jobs = a->max_active_jobs; From f145f1a8f0ebe91985076e394ed43cb204269974 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 19 Mar 2024 12:54:15 -0700 Subject: [PATCH 2/3] plugin: add jobspec-update of project name Problem: The priority plugin has no way to update jobspec with the project used for the job in the case where an association submits a job under a default project. Add a helper function to the plugin that adds the project name for an association's job to jobspec via jobspec-update under attributes.system.project. Add a jobspec-update of the association's default project name when the job is in the job.new callback. In the event where an association submits a job before any information is loaded to the priority plugin and their job is held in PRIORITY state, add a jobspec-update of the association's default project name in job.state.priority if they are submitting under a default project. --- src/plugins/mf_priority.cpp | 63 ++++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 7058daab..3b295227 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -140,6 +140,33 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid) } +/* + * Update the jobspec with the default project the association used to + * submit their job under. + */ +static int update_jobspec_project (flux_plugin_t *p, int userid, char *bank) +{ + Association *a = get_association (userid, bank, users, users_def_bank); + if (a == nullptr) + // association could not be found + return -1; + + // get association's default project + std::string project = a->def_project; + + if (!project.empty ()) { + // post jobspec-update event + if (flux_jobtap_jobspec_update_pack (p, + "{s:s}", + "attributes.system.project", + project.c_str ()) < 0) + return -1; + } + + return 0; +} + + /* * Create a special Association object for an association's job while the * plugin waits for flux-accounting data to be loaded. @@ -451,17 +478,19 @@ static int priority_cb (flux_plugin_t *p, int urgency, userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; int64_t priority; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s:i, s{s{s{s?s, s?s, s?s}}}}", "urgency", &urgency, "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -542,6 +571,18 @@ static int priority_cb (flux_plugin_t *p, "with bank name"); return -1; } + + if (project == NULL) { + // we also need to update the jobspec with the default project + // used to submit this job under + if (update_jobspec_project (p, userid, bank) < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "failed to update jobspec " + "with project name"); + return -1; + } + } } } @@ -683,16 +724,18 @@ static int new_cb (flux_plugin_t *p, int userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s{s{s{s?s, s?s, s?s}}}}", "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -750,6 +793,18 @@ static int new_cb (flux_plugin_t *p, return 0; } + if (project == NULL) { + // this job is meant to run under a default project, so update + // the jobspec with the project name + if (update_jobspec_project (p, userid, bank) < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "failed to update jobspec with " + "project name"); + return -1; + } + } + if (flux_jobtap_job_aux_set (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info", From 779da7889f8792765cb927527c8b4edab06f5972 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Thu, 11 Apr 2024 10:31:59 -0700 Subject: [PATCH 3/3] t: add tests for project validation/annotation Problem: There are no tests for project validation and annotation in the priority plugin. Add some tests. --- t/Makefile.am | 1 + t/t1040-mf-priority-projects.t | 169 +++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100755 t/t1040-mf-priority-projects.t diff --git a/t/Makefile.am b/t/Makefile.am index ffb17827..58b6baf2 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -39,6 +39,7 @@ TESTSCRIPTS = \ t1037-hierarchy-small-tie-db.t \ t1038-hierarchy-small-tie-all-db.t \ t1039-issue476.t \ + t1040-mf-priority-projects.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1040-mf-priority-projects.t b/t/t1040-mf-priority-projects.t new file mode 100755 index 00000000..93765473 --- /dev/null +++ b/t/t1040-mf-priority-projects.t @@ -0,0 +1,169 @@ +#!/bin/bash + +test_description='test validating and setting project names in priority plugin' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root account1 1 +' + +test_expect_success 'add projects to the DB' ' + flux account add-project projectA && + flux account add-project projectB && + flux account add-project projectC +' + +test_expect_success 'submit jobs under two different users before plugin gets updated' ' + job_project_star=$(flux python ${SUBMIT_AS} 5001 hostname) && + job_project_A=$(flux python ${SUBMIT_AS} 5002 hostname) && + flux job wait-event -vt 60 $job_project_star depend && + flux job wait-event -vt 60 $job_project_A depend +' + +# If a user is added to the DB without specifying any projects, a default +# project "*" is added for the user automatically, and jobs submitted without +# specifying a project will fall under "*" - this is the case for the first +# added user in this test file. +# +# Every user who is added to the DB belongs to the "*" project, but will only +# run jobs under "*" if they do not already have another default project. +# +# If a user is added to the DB with a specified project name, any job submitted +# without specifying a project name will fall under that project name - this is +# the case for the second user in this test file. +test_expect_success 'add users to flux-accounting DB and to plugin; jobs transition to RUN' ' + flux account add-user --username=user1 --userid=5001 --bank=account1 && + flux account add-user \ + --username=user2 \ + --userid=5002 \ + --bank=account1 \ + --projects=projectA && + flux account-priority-update -p ${DB_PATH} && + flux job wait-event -vt 60 $job_project_star alloc && + flux job wait-event -vt 60 $job_project_A alloc +' + +test_expect_success 'check that first submitted job has project "*" listed in eventlog' ' + flux job info $job_project_star eventlog > eventlog.out && + grep "\"attributes.system.project\":\"\*\"" eventlog.out && + flux cancel $job_project_star +' + +test_expect_success 'check that second submitted job has project "projectA" listed in eventlog' ' + flux job info $job_project_A eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $job_project_A +' + +test_expect_success 'add a user with a list of projects to the DB' ' + flux account add-user \ + --username=user3 \ + --userid=5003 \ + --bank=account1 \ + --projects="projectA,projectB" +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'successfully submit a job under a valid project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectA hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid jobspec > jobspec.out && + grep "projectA" jobspec.out && + flux cancel $jobid +' + +test_expect_success 'submit a job under a project that does not exist' ' + test_must_fail flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectFOO \ + hostname > project_dne.out 2>&1 && + test_debug "cat project_dne.out" && + grep "project not valid for user: projectFOO" project_dne.out +' + +test_expect_success 'submit a job under a project that user does not belong to' ' + test_must_fail flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectC \ + hostname > project_invalid.out 2>&1 && + test_debug "cat project_invalid.out" && + grep "project not valid for user: projectC" project_invalid.out +' + +test_expect_success 'successfully submit a job under a default project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'successfully submit a job under a secondary project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectB hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid jobspec > jobspec.out && + grep "projectB" jobspec.out && + flux cancel $jobid +' + +test_expect_success 'update the default project for user and submit job under new default' ' + flux account edit-user user3 --default-project=projectB && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5003 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectB\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'add a user without specifying any projects (will add a default project of "*")' ' + flux account add-user --username=user4 --userid=5004 --bank=account1 && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5004 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"\*\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'add a new default project to the new user and update the plugin' ' + flux account edit-user user4 --projects=projectA --default-project=projectA && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5004 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done