Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: improve callback for job.new #421

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 35 additions & 57 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,21 +710,26 @@ static int validate_cb (flux_plugin_t *p,
}


/*
* Create an Association object to be associated with the job submitted.
* This object contains things like active and running jobs limits, the
* association's fair share value, and associated priorities with a
* passed-in queue.
*
* If an association is submitting a job under their default bank, update the
* jobspec for this job to contain the bank name as well.
*/
static int new_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;
Association *b;

std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -740,72 +745,45 @@ static int new_cb (flux_plugin_t *p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));

if (b != NULL) {
max_run_jobs = b->max_run_jobs;
fairshare = b->fairshare;
cur_active_jobs = b->cur_active_jobs;
max_active_jobs = b->max_active_jobs;
} else {
// make sure user belongs to flux-accounting DB
it = users.find (userid);
if (it == users.end ()) {
// user does not exist in internal map yet, so create a bank_info
// struct that signifies it's going to be held in PRIORITY
if (b == nullptr) {
// Association object was not unpacked; perform lookup
b = get_association (userid, bank, users, users_def_bank);

if (b == nullptr) {
// the association could not be found in internal map, so create a
// special Association object that will hold the job in PRIORITY
add_missing_bank_info (p, h, userid);
return 0;
}

// make sure user belongs to bank they specified; if no bank was passed
// in, look up their default bank
if (bank != NULL) {
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ()) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.new: not a member of %s",
bank);
return -1;
}
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ()) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.new: user/default bank "
"entry does not exist");
return -1;
}
// update jobspec with default bank
if (bank == NULL) {
// this job is meant to run under the association's default bank;
// as a result, update the jobspec with their default bank name
if (update_jobspec_bank (p, userid) < 0) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"failed to update jobspec "
"with bank name");
"mf_priority", 0,
"failed to update jobspec "
"with bank name");
return -1;
}
}

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
max_active_jobs = bank_it->second.max_active_jobs;

b = &bank_it->second;
}

// assign priority associated with validated queue to bank_info struct
// associated with job
// assign priority associated with validated queue
b->queue_factor = get_queue_info (queue, b->queues);

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
max_run_jobs = b->max_run_jobs;
cur_active_jobs = b->cur_active_jobs;
max_active_jobs = b->max_active_jobs;

if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs)
// the association is already at their max_active_jobs limit;
// reject the job
return flux_jobtap_reject_job (p, args, "user has max active jobs");

// special case where the user/bank bank_info struct is set to NULL; used
// for testing the "if (b == NULL)" checks
if (max_run_jobs == -1) {
// special case where the object passed between callbacks is set to
// NULL; this is used for testing the "if (b == NULL)" checks
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info",
Expand All @@ -816,14 +794,14 @@ static int new_cb (flux_plugin_t *p,
return 0;
}


if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info",
b,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");

// increment the association's active jobs count
b->cur_active_jobs++;

return 0;
Expand Down
Loading