diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index a5576ed7..19c592d3 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -10,3 +10,28 @@ #include "accounting.hpp" +Association* get_association (int userid, + const char *bank, + std::map> + &users, + std::map &users_def_bank) +{ + auto it = users.find (userid); + if (it == users.end ()) + // user could not be found + return nullptr; + + std::string b; + if (bank != NULL) + b = bank; + else + // get the default bank of this user + b = users_def_bank[userid]; + + auto bank_it = it->second.find (b); + if (bank_it == it->second.end ()) + // user does not have accounting information under the specified bank + return nullptr; + + return &bank_it->second; +} diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 407bb96d..10dcda62 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -33,4 +33,12 @@ class Association { int active; // active status }; +// get an Association object that points to user/bank in the users map; +// return nullptr on failure +Association* get_association (int userid, + const char *bank, + std::map> + &users, + std::map &users_def_bank); + #endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index d14e8586..362019ca 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -140,9 +140,8 @@ int64_t priority_calculation (flux_plugin_t *p, } -static int get_queue_info ( - char *queue, - std::map::iterator bank_it) +static int get_queue_info (char *queue, + std::vector permissible_queues) { std::map::iterator q_it; @@ -159,10 +158,10 @@ static int get_queue_info ( // check #2) the queue passed in is a valid option to pass for user std::vector::iterator vect_it; - vect_it = std::find (bank_it->second.queues.begin (), - bank_it->second.queues.end (), queue); + vect_it = std::find (permissible_queues.begin (), + permissible_queues.end (), queue); - if (vect_it == bank_it->second.queues.end ()) + if (vect_it == permissible_queues.end ()) return INVALID_QUEUE; else // add priority associated with the passed in queue to bank_info @@ -690,7 +689,8 @@ static int priority_cb (flux_plugin_t *p, } // fetch priority associated with passed-in queue (or default queue) - bank_it->second.queue_factor = get_queue_info (queue, bank_it); + bank_it->second.queue_factor = get_queue_info (queue, + bank_it->second.queues); if (check_queue_factor (p, bank_it->second.queue_factor, queue) < 0) @@ -766,9 +766,21 @@ static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid) /* - * Look up the userid of the submitted job in the multimap; if user is not found - * in the map, reject the job saying the user wasn't found in the - * flux-accounting database. + * Perform basic validation of a user/bank's submitted job. If a bank or + * queue is specified on submission, ensure that the user is allowed to + * submit a job under them. Check the active job limits for the user/bank + * on submission as well to make sure that they are under this limit when + * the job is submitted. + * + * This callback will also make sure that the user/bank belongs to + * the flux-accounting DB; there are two behaviors supported here: + * + * if the plugin has SOME data about users/banks and the user does not have + * an entry in the plugin, the job will be rejected. + * + * if the plugin has NO data about users/banks and the user does not have an + * entry in the plugin, the job will be held until data is received by the + * plugin. */ static int validate_cb (flux_plugin_t *p, const char *topic, @@ -782,11 +794,10 @@ static int validate_cb (flux_plugin_t *p, int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0; double fairshare = 0.0; bool only_dne_data; + Association *a; - std::map>::iterator it; - std::map::iterator bank_it; - std::map::iterator q_it; - + // unpack the attributes of the user/bank's submitted job when it + // enters job.validate and place them into their respective variables flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -798,67 +809,45 @@ static int validate_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } - // make sure user belongs to flux-accounting DB; there are two behaviors - // supported in this plugin: - // - // if the plugin has SOME data about users/banks and the user does not - // have an entry in the plugin, the job will be rejected. - // - // if the plugin has NO data about users/banks and the user does not have - // an entry in the plugin, the job will be held until data is received by - // the plugin. - it = users.find (userid); - if (it == users.end ()) { - // check if the map only contains DNE entries + // perform a lookup in the users map of the unpacked user/bank + a = get_association (userid, bank, users, users_def_bank); + + if (a == nullptr) { + // the assocation could not be found in the plugin's internal map, + // so perform a check to see if the map has any loaded + // flux-accounting data before rejecting the job bool only_dne_data = check_map_for_dne_only (); if (users.empty () || only_dne_data) { add_missing_bank_info (p, h, userid); return 0; } else { - return flux_jobtap_reject_job (p, args, - "no bank found for user: %i", userid); + return flux_jobtap_reject_job (p, + args, + "cannot find user/bank or " + "user/default bank entry " + "for uid: %i", userid); } } - // make sure user belongs to bank they specified; if no bank was passed in, - // look up their default bank - if (bank != NULL) { - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) - return flux_jobtap_reject_job (p, args, - "user does not belong to specified bank"); - } else { - bank = const_cast (users_def_bank[userid].c_str ()); - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) - return flux_jobtap_reject_job (p, args, - "user/default bank entry does not exist"); - } - - // if user/bank entry was disabled, reject job with a message saying the - // entry has been disabled - if (bank_it->second.active == 0) + if (a->active == 0) + // the association entry was disabled; reject the job return flux_jobtap_reject_job (p, args, "user/bank entry has been " "disabled from flux-accounting DB"); - // validate the queue if one is passed in; if the user does not have access - // to the queue they specified, reject the job - queue_factor = get_queue_info (queue, bank_it); - - if (queue_factor == INVALID_QUEUE) + if (get_queue_info (queue, a->queues) == INVALID_QUEUE) + // the user/bank specified a queue that they do not belong to; + // reject the job return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s", queue); - max_run_jobs = bank_it->second.max_run_jobs; - fairshare = bank_it->second.fairshare; - cur_active_jobs = bank_it->second.cur_active_jobs; - max_active_jobs = bank_it->second.max_active_jobs; + cur_active_jobs = a->cur_active_jobs; + max_active_jobs = a->max_active_jobs; - // if a user/bank has reached their max_active_jobs limit, subsequently - // submitted jobs will be rejected if (state == FLUX_JOB_STATE_NEW) { if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs) + // the association is already at their max_active_jobs limit; + // reject the job return flux_jobtap_reject_job (p, args, "user has max active jobs"); @@ -954,7 +943,7 @@ static int new_cb (flux_plugin_t *p, // assign priority associated with validated queue to bank_info struct // associated with job - b->queue_factor = get_queue_info (queue, bank_it); + b->queue_factor = get_queue_info (queue, b->queues); // if a user/bank has reached their max_active_jobs limit, subsequently // submitted jobs will be rejected @@ -1131,7 +1120,7 @@ static int job_updated (flux_plugin_t *p, // if the queue for the job has been updated, fetch the priority of the // validated queue and assign it to the associated bank_info struct if (queue != NULL) { - int queue_factor = get_queue_info (queue, bank_it); + int queue_factor = get_queue_info (queue, bank_it->second.queues); b->queue_factor = queue_factor; } @@ -1186,7 +1175,7 @@ static int update_queue_cb (flux_plugin_t *p, // validate the updated queue and make sure the user/bank has // access to it; if not, reject the update - if (get_queue_info (queue, bank_it) == INVALID_QUEUE) + if (get_queue_info (queue, bank_it->second.queues) == INVALID_QUEUE) return flux_jobtap_error (p, args, "mf_priority: queue not valid for user: %s", diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index 650a69ff..32a74705 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -79,15 +79,52 @@ static void test_direct_map_access ( } +// ensure an Assocation* is returned on success +static void test_get_association_success () +{ + // retrieve user_bank_info object + Association *user1 = get_association (1001, + const_cast ("bank_A"), + users, + users_def_bank); + ok (user1->bank_name == "bank_A", + "get_association () successfully returns a pointer to an Association"); +} + + +// ensure nullptr is returned when a user cannot be found in the map +static void test_get_association_noexist () +{ + Association *user_foo = get_association (9999, + const_cast ("bank_A"), + users, + users_def_bank); + ok (user_foo == nullptr, + "get_association () fails when an association cannot be found"); +} + + +// ensure nullptr is returned when a user does not have a default bank +static void test_get_association_no_default_bank () +{ + Association *user2 = get_association (1002, NULL, users, users_def_bank); + ok (user2 == nullptr, + "get_association () fails when a user does not have a default bank"); +} + + int main (int argc, char* argv[]) { // declare the number of tests that we plan to run - plan (1); + plan (4); // add users to the test map initialize_map (users); test_direct_map_access (users); + test_get_association_success (); + test_get_association_noexist (); + test_get_association_no_default_bank (); // indicate we are done testing done_testing (); diff --git a/t/t1001-mf-priority-basic.t b/t/t1001-mf-priority-basic.t index 85a1f55e..5e526697 100755 --- a/t/t1001-mf-priority-basic.t +++ b/t/t1001-mf-priority-basic.t @@ -147,7 +147,7 @@ test_expect_success 'submit a job using default bank' ' test_expect_success 'submit a job using a bank the user does not belong to' ' test_must_fail flux submit --setattr=system.bank=account1 -n1 hostname > bad_bank.out 2>&1 && test_debug "cat bad_bank.out" && - grep "user does not belong to specified bank" bad_bank.out + grep "cannot find user/bank or user/default bank entry for uid:" bad_bank.out ' test_expect_success 'reject job when invalid bank format is passed in' ' diff --git a/t/t1022-mf-priority-issue346.t b/t/t1022-mf-priority-issue346.t index 5443c0f4..4ff68436 100755 --- a/t/t1022-mf-priority-issue346.t +++ b/t/t1022-mf-priority-issue346.t @@ -66,7 +66,7 @@ test_expect_success 'cancel job' ' test_expect_success 'submit a job to plugin while not having an entry in the plugin' ' test_must_fail flux python ${SUBMIT_AS} 1003 hostname > no_user_entry.out 2>&1 && - grep "no bank found for user" no_user_entry.out + grep "cannot find user/bank or user/default bank entry for uid:" no_user_entry.out ' test_expect_success 'shut down flux-accounting service' ' diff --git a/t/t1028-mf-priority-issue385.t b/t/t1028-mf-priority-issue385.t index 6b2ccf90..151a4b0e 100755 --- a/t/t1028-mf-priority-issue385.t +++ b/t/t1028-mf-priority-issue385.t @@ -54,7 +54,7 @@ test_expect_success 'check that jobs transition to RUN' ' test_expect_success 'submitting a job under invalid user while plugin has data fails' ' test_must_fail flux python ${SUBMIT_AS} 9999 hostname > invalid_user.out 2>&1 && test_debug "cat invalid_user.out" && - grep "flux-job: no bank found for user: 9999" invalid_user.out + grep "cannot find user/bank or user/default bank entry for uid: 9999" invalid_user.out ' test_expect_success 'cancel running jobs' '