From 064060b949bbf1c0481f502dd2a2554828353c6e Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 20 Feb 2024 08:26:51 -0800 Subject: [PATCH 1/2] plugin: improve job.new callback Problem: The callback for job.new still performs a manual lookup of an Association, but there now exists external functions defined in accounting.cpp that the callback should make use of. It is also lacking a function description and has some leftover variables that are unutilized. Clean up this function by using the new external functions that perform lookups of an association. Remove unused variables from the function. Add a function description at the top. --- src/plugins/mf_priority.cpp | 79 ++++++++++++++----------------------- 1 file changed, 29 insertions(+), 50 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index ff6da54a..72da46c1 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -710,6 +710,15 @@ static int validate_cb (flux_plugin_t *p, } +/* + * Create an Association object to be associated with the job submitted. + * This object contains things like active and running jobs limits, the + * association's fair share value, and associated priorities with a + * passed-in queue. + * + * If an association is submitting a job under their default bank, update the + * jobspec for this job to contain the bank name as well. + */ static int new_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, @@ -719,12 +728,8 @@ static int new_cb (flux_plugin_t *p, char *bank = NULL; char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; - double fairshare = 0.0; Association *b; - std::map>::iterator it; - std::map::iterator bank_it; - flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -740,43 +745,20 @@ static int new_cb (flux_plugin_t *p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); - if (b != NULL) { - max_run_jobs = b->max_run_jobs; - fairshare = b->fairshare; - cur_active_jobs = b->cur_active_jobs; - max_active_jobs = b->max_active_jobs; - } else { - // make sure user belongs to flux-accounting DB - it = users.find (userid); - if (it == users.end ()) { - // user does not exist in internal map yet, so create a bank_info - // struct that signifies it's going to be held in PRIORITY + if (b == nullptr) { + // Association object was not unpacked; perform lookup + b = get_association (userid, bank, users, users_def_bank); + + if (b == nullptr) { + // the association could not be found in internal map, so create a + // special Association object that will hold the job in PRIORITY add_missing_bank_info (p, h, userid); return 0; } - // make sure user belongs to bank they specified; if no bank was passed - // in, look up their default bank - if (bank != NULL) { - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) { - flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, - "mf_priority", 0, - "job.new: not a member of %s", - bank); - return -1; - } - } else { - bank = const_cast (users_def_bank[userid].c_str ()); - bank_it = it->second.find (std::string (bank)); - if (bank_it == it->second.end ()) { - flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, - "mf_priority", 0, - "job.new: user/default bank " - "entry does not exist"); - return -1; - } - // update jobspec with default bank + if (bank == NULL) { + // this job is meant to run under the association's default bank; + // as a result, update the jobspec with their default bank name if (update_jobspec_bank (p, userid) < 0) { flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority", 0, @@ -785,27 +767,23 @@ static int new_cb (flux_plugin_t *p, return -1; } } - - max_run_jobs = bank_it->second.max_run_jobs; - fairshare = bank_it->second.fairshare; - cur_active_jobs = bank_it->second.cur_active_jobs; - max_active_jobs = bank_it->second.max_active_jobs; - - b = &bank_it->second; } - // assign priority associated with validated queue to bank_info struct - // associated with job + // assign priority associated with validated queue b->queue_factor = get_queue_info (queue, b->queues); - // if a user/bank has reached their max_active_jobs limit, subsequently - // submitted jobs will be rejected + max_run_jobs = b->max_run_jobs; + cur_active_jobs = b->cur_active_jobs; + max_active_jobs = b->max_active_jobs; + if (max_active_jobs > 0 && cur_active_jobs >= max_active_jobs) + // the association is already at their max_active_jobs limit; + // reject the job return flux_jobtap_reject_job (p, args, "user has max active jobs"); - // special case where the user/bank bank_info struct is set to NULL; used - // for testing the "if (b == NULL)" checks if (max_run_jobs == -1) { + // special case where the object passed between callbacks is set to + // NULL; this is used for testing the "if (b == NULL)" checks if (flux_jobtap_job_aux_set (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info", @@ -824,6 +802,7 @@ static int new_cb (flux_plugin_t *p, NULL) < 0) flux_log_error (h, "flux_jobtap_job_aux_set"); + // increment the association's active jobs count b->cur_active_jobs++; return 0; From b0995b87034ccbda75ed5664acabb8ce6edac584 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 20 Feb 2024 08:27:30 -0800 Subject: [PATCH 2/2] plugin: cleanup whitespace Problem: The job.new callback has some extra whitespace and misaligned indention. Remove the extra whitespace and fix the misaligned indentation. --- src/plugins/mf_priority.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 72da46c1..9c501e1d 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -720,9 +720,9 @@ static int validate_cb (flux_plugin_t *p, * jobspec for this job to contain the bank name as well. */ static int new_cb (flux_plugin_t *p, - const char *topic, - flux_plugin_arg_t *args, - void *data) + const char *topic, + flux_plugin_arg_t *args, + void *data) { int userid; char *bank = NULL; @@ -761,9 +761,9 @@ static int new_cb (flux_plugin_t *p, // as a result, update the jobspec with their default bank name if (update_jobspec_bank (p, userid) < 0) { flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, - "mf_priority", 0, - "failed to update jobspec " - "with bank name"); + "mf_priority", 0, + "failed to update jobspec " + "with bank name"); return -1; } } @@ -794,7 +794,6 @@ static int new_cb (flux_plugin_t *p, return 0; } - if (flux_jobtap_job_aux_set (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info",