Skip to content

Commit

Permalink
Merge pull request #415 from cmoussa1/issue#413
Browse files Browse the repository at this point in the history
plugin: improve callback for `job.validate`
  • Loading branch information
mergify[bot] authored Feb 7, 2024
2 parents 4b15ea5 + 41463a9 commit 8e84ac3
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 64 deletions.
25 changes: 25 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,28 @@

#include "accounting.hpp"

Association* get_association (int userid,
const char *bank,
std::map<int, std::map<std::string, Association>>
&users,
std::map<int, std::string> &users_def_bank)
{
auto it = users.find (userid);
if (it == users.end ())
// user could not be found
return nullptr;

std::string b;
if (bank != NULL)
b = bank;
else
// get the default bank of this user
b = users_def_bank[userid];

auto bank_it = it->second.find (b);
if (bank_it == it->second.end ())
// user does not have accounting information under the specified bank
return nullptr;

return &bank_it->second;
}
8 changes: 8 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ class Association {
int active; // active status
};

// get an Association object that points to user/bank in the users map;
// return nullptr on failure
Association* get_association (int userid,
const char *bank,
std::map<int, std::map<std::string, Association>>
&users,
std::map<int, std::string> &users_def_bank);

#endif // ACCOUNTING_H
109 changes: 49 additions & 60 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ int64_t priority_calculation (flux_plugin_t *p,
}


static int get_queue_info (
char *queue,
std::map<std::string, Association>::iterator bank_it)
static int get_queue_info (char *queue,
std::vector<std::string> permissible_queues)
{
std::map<std::string, struct queue_info>::iterator q_it;

Expand All @@ -159,10 +158,10 @@ static int get_queue_info (

// check #2) the queue passed in is a valid option to pass for user
std::vector<std::string>::iterator vect_it;
vect_it = std::find (bank_it->second.queues.begin (),
bank_it->second.queues.end (), queue);
vect_it = std::find (permissible_queues.begin (),
permissible_queues.end (), queue);

if (vect_it == bank_it->second.queues.end ())
if (vect_it == permissible_queues.end ())
return INVALID_QUEUE;
else
// add priority associated with the passed in queue to bank_info
Expand Down Expand Up @@ -690,7 +689,8 @@ static int priority_cb (flux_plugin_t *p,
}

// fetch priority associated with passed-in queue (or default queue)
bank_it->second.queue_factor = get_queue_info (queue, bank_it);
bank_it->second.queue_factor = get_queue_info (queue,
bank_it->second.queues);
if (check_queue_factor (p,
bank_it->second.queue_factor,
queue) < 0)
Expand Down Expand Up @@ -766,9 +766,21 @@ static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid)


/*
* Look up the userid of the submitted job in the multimap; if user is not found
* in the map, reject the job saying the user wasn't found in the
* flux-accounting database.
* Perform basic validation of a user/bank's submitted job. If a bank or
* queue is specified on submission, ensure that the user is allowed to
* submit a job under them. Check the active job limits for the user/bank
* on submission as well to make sure that they are under this limit when
* the job is submitted.
*
* This callback will also make sure that the user/bank belongs to
* the flux-accounting DB; there are two behaviors supported here:
*
* if the plugin has SOME data about users/banks and the user does not have
* an entry in the plugin, the job will be rejected.
*
* if the plugin has NO data about users/banks and the user does not have an
* entry in the plugin, the job will be held until data is received by the
* plugin.
*/
static int validate_cb (flux_plugin_t *p,
const char *topic,
Expand All @@ -782,11 +794,10 @@ static int validate_cb (flux_plugin_t *p,
int max_run_jobs, cur_active_jobs, max_active_jobs, queue_factor = 0;
double fairshare = 0.0;
bool only_dne_data;
Association *a;

std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;
std::map<std::string, Association>::iterator q_it;

// unpack the attributes of the user/bank's submitted job when it
// enters job.validate and place them into their respective variables
flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -798,67 +809,45 @@ static int validate_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

// make sure user belongs to flux-accounting DB; there are two behaviors
// supported in this plugin:
//
// if the plugin has SOME data about users/banks and the user does not
// have an entry in the plugin, the job will be rejected.
//
// if the plugin has NO data about users/banks and the user does not have
// an entry in the plugin, the job will be held until data is received by
// the plugin.
it = users.find (userid);
if (it == users.end ()) {
// check if the map only contains DNE entries
// perform a lookup in the users map of the unpacked user/bank
a = get_association (userid, bank, users, users_def_bank);

if (a == nullptr) {
// the assocation could not be found in the plugin's internal map,
// so perform a check to see if the map has any loaded
// flux-accounting data before rejecting the job
bool only_dne_data = check_map_for_dne_only ();

if (users.empty () || only_dne_data) {
add_missing_bank_info (p, h, userid);
return 0;
} else {
return flux_jobtap_reject_job (p, args,
"no bank found for user: %i", userid);
return flux_jobtap_reject_job (p,
args,
"cannot find user/bank or "
"user/default bank entry "
"for uid: %i", userid);
}
}

// make sure user belongs to bank they specified; if no bank was passed in,
// look up their default bank
if (bank != NULL) {
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return flux_jobtap_reject_job (p, args,
"user does not belong to specified bank");
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ())
return flux_jobtap_reject_job (p, args,
"user/default bank entry does not exist");
}

// if user/bank entry was disabled, reject job with a message saying the
// entry has been disabled
if (bank_it->second.active == 0)
if (a->active == 0)
// the association entry was disabled; reject the job
return flux_jobtap_reject_job (p, args, "user/bank entry has been "
"disabled from flux-accounting DB");

// validate the queue if one is passed in; if the user does not have access
// to the queue they specified, reject the job
queue_factor = get_queue_info (queue, bank_it);

if (queue_factor == INVALID_QUEUE)
if (get_queue_info (queue, a->queues) == INVALID_QUEUE)
// the user/bank specified a queue that they do not belong to;
// reject the job
return flux_jobtap_reject_job (p, args, "Queue not valid for user: %s",
queue);

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
max_active_jobs = bank_it->second.max_active_jobs;
cur_active_jobs = a->cur_active_jobs;
max_active_jobs = a->max_active_jobs;

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
if (state == FLUX_JOB_STATE_NEW) {
if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs)
// the association is already at their max_active_jobs limit;
// reject the job
return flux_jobtap_reject_job (p,
args,
"user has max active jobs");
Expand Down Expand Up @@ -954,7 +943,7 @@ static int new_cb (flux_plugin_t *p,

// assign priority associated with validated queue to bank_info struct
// associated with job
b->queue_factor = get_queue_info (queue, bank_it);
b->queue_factor = get_queue_info (queue, b->queues);

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
Expand Down Expand Up @@ -1131,7 +1120,7 @@ static int job_updated (flux_plugin_t *p,
// if the queue for the job has been updated, fetch the priority of the
// validated queue and assign it to the associated bank_info struct
if (queue != NULL) {
int queue_factor = get_queue_info (queue, bank_it);
int queue_factor = get_queue_info (queue, bank_it->second.queues);
b->queue_factor = queue_factor;
}

Expand Down Expand Up @@ -1186,7 +1175,7 @@ static int update_queue_cb (flux_plugin_t *p,

// validate the updated queue and make sure the user/bank has
// access to it; if not, reject the update
if (get_queue_info (queue, bank_it) == INVALID_QUEUE)
if (get_queue_info (queue, bank_it->second.queues) == INVALID_QUEUE)
return flux_jobtap_error (p,
args,
"mf_priority: queue not valid for user: %s",
Expand Down
39 changes: 38 additions & 1 deletion src/plugins/test/accounting_test01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,52 @@ static void test_direct_map_access (
}


// ensure an Assocation* is returned on success
static void test_get_association_success ()
{
// retrieve user_bank_info object
Association *user1 = get_association (1001,
const_cast<char *> ("bank_A"),
users,
users_def_bank);
ok (user1->bank_name == "bank_A",
"get_association () successfully returns a pointer to an Association");
}


// ensure nullptr is returned when a user cannot be found in the map
static void test_get_association_noexist ()
{
Association *user_foo = get_association (9999,
const_cast<char *> ("bank_A"),
users,
users_def_bank);
ok (user_foo == nullptr,
"get_association () fails when an association cannot be found");
}


// ensure nullptr is returned when a user does not have a default bank
static void test_get_association_no_default_bank ()
{
Association *user2 = get_association (1002, NULL, users, users_def_bank);
ok (user2 == nullptr,
"get_association () fails when a user does not have a default bank");
}


int main (int argc, char* argv[])
{
// declare the number of tests that we plan to run
plan (1);
plan (4);

// add users to the test map
initialize_map (users);

test_direct_map_access (users);
test_get_association_success ();
test_get_association_noexist ();
test_get_association_no_default_bank ();

// indicate we are done testing
done_testing ();
Expand Down
2 changes: 1 addition & 1 deletion t/t1001-mf-priority-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ test_expect_success 'submit a job using default bank' '
test_expect_success 'submit a job using a bank the user does not belong to' '
test_must_fail flux submit --setattr=system.bank=account1 -n1 hostname > bad_bank.out 2>&1 &&
test_debug "cat bad_bank.out" &&
grep "user does not belong to specified bank" bad_bank.out
grep "cannot find user/bank or user/default bank entry for uid:" bad_bank.out
'

test_expect_success 'reject job when invalid bank format is passed in' '
Expand Down
2 changes: 1 addition & 1 deletion t/t1022-mf-priority-issue346.t
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test_expect_success 'cancel job' '

test_expect_success 'submit a job to plugin while not having an entry in the plugin' '
test_must_fail flux python ${SUBMIT_AS} 1003 hostname > no_user_entry.out 2>&1 &&
grep "no bank found for user" no_user_entry.out
grep "cannot find user/bank or user/default bank entry for uid:" no_user_entry.out
'

test_expect_success 'shut down flux-accounting service' '
Expand Down
2 changes: 1 addition & 1 deletion t/t1028-mf-priority-issue385.t
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ test_expect_success 'check that jobs transition to RUN' '
test_expect_success 'submitting a job under invalid user while plugin has data fails' '
test_must_fail flux python ${SUBMIT_AS} 9999 hostname > invalid_user.out 2>&1 &&
test_debug "cat invalid_user.out" &&
grep "flux-job: no bank found for user: 9999" invalid_user.out
grep "cannot find user/bank or user/default bank entry for uid: 9999" invalid_user.out
'

test_expect_success 'cancel running jobs' '
Expand Down

0 comments on commit 8e84ac3

Please sign in to comment.