diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 93c32dfd..3b295227 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -140,6 +140,33 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid) } +/* + * Update the jobspec with the default project the association used to + * submit their job under. + */ +static int update_jobspec_project (flux_plugin_t *p, int userid, char *bank) +{ + Association *a = get_association (userid, bank, users, users_def_bank); + if (a == nullptr) + // association could not be found + return -1; + + // get association's default project + std::string project = a->def_project; + + if (!project.empty ()) { + // post jobspec-update event + if (flux_jobtap_jobspec_update_pack (p, + "{s:s}", + "attributes.system.project", + project.c_str ()) < 0) + return -1; + } + + return 0; +} + + /* * Create a special Association object for an association's job while the * plugin waits for flux-accounting data to be loaded. @@ -451,17 +478,19 @@ static int priority_cb (flux_plugin_t *p, int urgency, userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; int64_t priority; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s:i, s{s{s{s?s, s?s, s?s}}}}", "urgency", &urgency, "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -542,6 +571,18 @@ static int priority_cb (flux_plugin_t *p, "with bank name"); return -1; } + + if (project == NULL) { + // we also need to update the jobspec with the default project + // used to submit this job under + if (update_jobspec_project (p, userid, bank) < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "failed to update jobspec " + "with project name"); + return -1; + } + } } } @@ -588,6 +629,7 @@ static int validate_cb (flux_plugin_t *p, int userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; flux_job_state_t state; int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0; double fairshare = 0.0; @@ -599,11 +641,12 @@ static int validate_cb (flux_plugin_t *p, flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s:i, s{s{s{s?s, s?s, s?s}}}}", "userid", &userid, "state", &state, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -639,6 +682,15 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", queue); + if (project != NULL) { + // a project was specified on job submission; validate it + if (get_project_info (project, a->projects, projects) < 0) + // the association specified a project that they do not belong to + // or that flux-accounting does not know about; reject the job + return flux_jobtap_reject_job (p, args, "project not valid for " + "user: %s", project); + } + cur_active_jobs = a->cur_active_jobs; max_active_jobs = a->max_active_jobs; @@ -672,16 +724,18 @@ static int new_cb (flux_plugin_t *p, int userid; char *bank = NULL; char *queue = NULL; + const char *project = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s{s{s{s?s, s?s}}}}", + "{s:i, s{s{s{s?s, s?s, s?s}}}}", "userid", &userid, "jobspec", "attributes", "system", - "bank", &bank, "queue", &queue) < 0) { + "bank", &bank, "queue", &queue, + "project", &project) < 0) { return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } @@ -739,6 +793,18 @@ static int new_cb (flux_plugin_t *p, return 0; } + if (project == NULL) { + // this job is meant to run under a default project, so update + // the jobspec with the project name + if (update_jobspec_project (p, userid, bank) < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, + "failed to update jobspec with " + "project name"); + return -1; + } + } + if (flux_jobtap_job_aux_set (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info", diff --git a/t/Makefile.am b/t/Makefile.am index ffb17827..58b6baf2 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -39,6 +39,7 @@ TESTSCRIPTS = \ t1037-hierarchy-small-tie-db.t \ t1038-hierarchy-small-tie-all-db.t \ t1039-issue476.t \ + t1040-mf-priority-projects.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1040-mf-priority-projects.t b/t/t1040-mf-priority-projects.t new file mode 100755 index 00000000..93765473 --- /dev/null +++ b/t/t1040-mf-priority-projects.t @@ -0,0 +1,169 @@ +#!/bin/bash + +test_description='test validating and setting project names in priority plugin' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root account1 1 +' + +test_expect_success 'add projects to the DB' ' + flux account add-project projectA && + flux account add-project projectB && + flux account add-project projectC +' + +test_expect_success 'submit jobs under two different users before plugin gets updated' ' + job_project_star=$(flux python ${SUBMIT_AS} 5001 hostname) && + job_project_A=$(flux python ${SUBMIT_AS} 5002 hostname) && + flux job wait-event -vt 60 $job_project_star depend && + flux job wait-event -vt 60 $job_project_A depend +' + +# If a user is added to the DB without specifying any projects, a default +# project "*" is added for the user automatically, and jobs submitted without +# specifying a project will fall under "*" - this is the case for the first +# added user in this test file. +# +# Every user who is added to the DB belongs to the "*" project, but will only +# run jobs under "*" if they do not already have another default project. +# +# If a user is added to the DB with a specified project name, any job submitted +# without specifying a project name will fall under that project name - this is +# the case for the second user in this test file. +test_expect_success 'add users to flux-accounting DB and to plugin; jobs transition to RUN' ' + flux account add-user --username=user1 --userid=5001 --bank=account1 && + flux account add-user \ + --username=user2 \ + --userid=5002 \ + --bank=account1 \ + --projects=projectA && + flux account-priority-update -p ${DB_PATH} && + flux job wait-event -vt 60 $job_project_star alloc && + flux job wait-event -vt 60 $job_project_A alloc +' + +test_expect_success 'check that first submitted job has project "*" listed in eventlog' ' + flux job info $job_project_star eventlog > eventlog.out && + grep "\"attributes.system.project\":\"\*\"" eventlog.out && + flux cancel $job_project_star +' + +test_expect_success 'check that second submitted job has project "projectA" listed in eventlog' ' + flux job info $job_project_A eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $job_project_A +' + +test_expect_success 'add a user with a list of projects to the DB' ' + flux account add-user \ + --username=user3 \ + --userid=5003 \ + --bank=account1 \ + --projects="projectA,projectB" +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'successfully submit a job under a valid project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectA hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid jobspec > jobspec.out && + grep "projectA" jobspec.out && + flux cancel $jobid +' + +test_expect_success 'submit a job under a project that does not exist' ' + test_must_fail flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectFOO \ + hostname > project_dne.out 2>&1 && + test_debug "cat project_dne.out" && + grep "project not valid for user: projectFOO" project_dne.out +' + +test_expect_success 'submit a job under a project that user does not belong to' ' + test_must_fail flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectC \ + hostname > project_invalid.out 2>&1 && + test_debug "cat project_invalid.out" && + grep "project not valid for user: projectC" project_invalid.out +' + +test_expect_success 'successfully submit a job under a default project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'successfully submit a job under a secondary project' ' + jobid=$(flux python ${SUBMIT_AS} 5003 --setattr=system.project=projectB hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid jobspec > jobspec.out && + grep "projectB" jobspec.out && + flux cancel $jobid +' + +test_expect_success 'update the default project for user and submit job under new default' ' + flux account edit-user user3 --default-project=projectB && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5003 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectB\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'add a user without specifying any projects (will add a default project of "*")' ' + flux account add-user --username=user4 --userid=5004 --bank=account1 && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5004 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"\*\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'add a new default project to the new user and update the plugin' ' + flux account edit-user user4 --projects=projectA --default-project=projectA && + flux account-priority-update -p ${DB_PATH} && + jobid=$(flux python ${SUBMIT_AS} 5004 hostname) && + flux job wait-event -f json $jobid priority && + flux job info $jobid eventlog > eventlog.out && + grep "\"attributes.system.project\":\"projectA\"" eventlog.out && + flux cancel $jobid +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done