Skip to content

Commit

Permalink
Merge pull request #421 from cmoussa1/issue#419
Browse files Browse the repository at this point in the history
plugin: improve callback for `job.new`
  • Loading branch information
mergify[bot] authored Feb 20, 2024
2 parents 72b0c1a + b0995b8 commit b673799
Showing 1 changed file with 35 additions and 57 deletions.
92 changes: 35 additions & 57 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,21 +710,26 @@ static int validate_cb (flux_plugin_t *p,
}


/*
* Create an Association object to be associated with the job submitted.
* This object contains things like active and running jobs limits, the
* association's fair share value, and associated priorities with a
* passed-in queue.
*
* If an association is submitting a job under their default bank, update the
* jobspec for this job to contain the bank name as well.
*/
static int new_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
int userid;
char *bank = NULL;
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;
Association *b;

std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -740,72 +745,45 @@ static int new_cb (flux_plugin_t *p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));

if (b != NULL) {
max_run_jobs = b->max_run_jobs;
fairshare = b->fairshare;
cur_active_jobs = b->cur_active_jobs;
max_active_jobs = b->max_active_jobs;
} else {
// make sure user belongs to flux-accounting DB
it = users.find (userid);
if (it == users.end ()) {
// user does not exist in internal map yet, so create a bank_info
// struct that signifies it's going to be held in PRIORITY
if (b == nullptr) {
// Association object was not unpacked; perform lookup
b = get_association (userid, bank, users, users_def_bank);

if (b == nullptr) {
// the association could not be found in internal map, so create a
// special Association object that will hold the job in PRIORITY
add_missing_bank_info (p, h, userid);
return 0;
}

// make sure user belongs to bank they specified; if no bank was passed
// in, look up their default bank
if (bank != NULL) {
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ()) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.new: not a member of %s",
bank);
return -1;
}
} else {
bank = const_cast<char*> (users_def_bank[userid].c_str ());
bank_it = it->second.find (std::string (bank));
if (bank_it == it->second.end ()) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"job.new: user/default bank "
"entry does not exist");
return -1;
}
// update jobspec with default bank
if (bank == NULL) {
// this job is meant to run under the association's default bank;
// as a result, update the jobspec with their default bank name
if (update_jobspec_bank (p, userid) < 0) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0,
"failed to update jobspec "
"with bank name");
"mf_priority", 0,
"failed to update jobspec "
"with bank name");
return -1;
}
}

max_run_jobs = bank_it->second.max_run_jobs;
fairshare = bank_it->second.fairshare;
cur_active_jobs = bank_it->second.cur_active_jobs;
max_active_jobs = bank_it->second.max_active_jobs;

b = &bank_it->second;
}

// assign priority associated with validated queue to bank_info struct
// associated with job
// assign priority associated with validated queue
b->queue_factor = get_queue_info (queue, b->queues);

// if a user/bank has reached their max_active_jobs limit, subsequently
// submitted jobs will be rejected
max_run_jobs = b->max_run_jobs;
cur_active_jobs = b->cur_active_jobs;
max_active_jobs = b->max_active_jobs;

if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs)
// the association is already at their max_active_jobs limit;
// reject the job
return flux_jobtap_reject_job (p, args, "user has max active jobs");

// special case where the user/bank bank_info struct is set to NULL; used
// for testing the "if (b == NULL)" checks
if (max_run_jobs == -1) {
// special case where the object passed between callbacks is set to
// NULL; this is used for testing the "if (b == NULL)" checks
if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info",
Expand All @@ -816,14 +794,14 @@ static int new_cb (flux_plugin_t *p,
return 0;
}


if (flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info",
b,
NULL) < 0)
flux_log_error (h, "flux_jobtap_job_aux_set");

// increment the association's active jobs count
b->cur_active_jobs++;

return 0;
Expand Down

0 comments on commit b673799

Please sign in to comment.