diff --git a/src/Makefile.am b/src/Makefile.am index 0de6c18e..4b452022 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,7 +17,8 @@ noinst_HEADERS = \ fairness/reader/data_reader_db.hpp \ fairness/writer/data_writer_base.hpp \ fairness/writer/data_writer_db.hpp \ - fairness/writer/data_writer_stdout.hpp + fairness/writer/data_writer_stdout.hpp \ + plugins/accounting.hpp fairness_libweighted_tree_la_SOURCES = \ fairness/account/account.cpp \ diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index f4ac138c..35576e03 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -8,5 +8,6 @@ jobtapdir = \ $(fluxlibdir)/job-manager/plugins/ jobtap_LTLIBRARIES = mf_priority.la -mf_priority_la_SOURCES = mf_priority.cpp +mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp +mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp new file mode 100644 index 00000000..a5576ed7 --- /dev/null +++ b/src/plugins/accounting.cpp @@ -0,0 +1,12 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#include "accounting.hpp" + diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp new file mode 100644 index 00000000..407bb96d --- /dev/null +++ b/src/plugins/accounting.hpp @@ -0,0 +1,36 @@ +/************************************************************\ + * Copyright 2024 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +// header file for the Accounting class + +#ifndef ACCOUNTING_H +#define ACCOUNTING_H + +#include +#include +#include +#include + +// all attributes are per-user/bank +class Association { +public: + std::string bank_name; // name of bank + double fairshare; // fair share value + int max_run_jobs; // max number of running jobs + int cur_run_jobs; // current number of running jobs + int max_active_jobs; // max number of active jobs + int cur_active_jobs; // current number of active jobs + std::vector held_jobs; // list of currently held job ID's + std::vector queues; // list of accessible queues + int queue_factor; // priority factor associated with queue + int active; // active status +}; + +#endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index d0800f73..d14e8586 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -29,6 +29,9 @@ extern "C" { #include #include +// custom bank_info class file +#include "accounting.hpp" + // the plugin does not know about the association who submitted a job and will // assign default values to the association until it receives information from // flux-accounting @@ -58,25 +61,12 @@ enum bank_info_codes { BANK_NO_DEFAULT }; -typedef std::pair::iterator> bank_info_result; +typedef std::pair::iterator> bank_info_result; -std::map> users; +std::map> users; std::map queues; std::map users_def_bank; -struct bank_info { - std::string bank_name; - double fairshare; - int max_run_jobs; - int cur_run_jobs; - int max_active_jobs; - int cur_active_jobs; - std::vector held_jobs; - std::vector queues; - int queue_factor; - int active; -}; - // min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not // currently used or enforced in this plugin, so their values have no // effect in queue limit enforcement. @@ -113,7 +103,7 @@ int64_t priority_calculation (flux_plugin_t *p, double fshare_factor = 0.0, priority = 0.0; int queue_factor = 0; int fshare_weight, queue_weight; - struct bank_info *b; + Association *b; fshare_weight = 100000; queue_weight = 10000; @@ -124,7 +114,7 @@ int64_t priority_calculation (flux_plugin_t *p, if (urgency == FLUX_JOB_URGENCY_EXPEDITE) return FLUX_JOB_PRIORITY_MAX; - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -152,7 +142,7 @@ int64_t priority_calculation (flux_plugin_t *p, static int get_queue_info ( char *queue, - std::map::iterator bank_it) + std::map::iterator bank_it) { std::map::iterator q_it; @@ -184,7 +174,7 @@ static int get_queue_info ( } -static void split_string (char *queues, struct bank_info *b) +static void split_string (char *queues, Association *b) { std::stringstream s_stream; @@ -218,7 +208,7 @@ int check_queue_factor (flux_plugin_t *p, * Add held job IDs to a JSON array to be added to a bank_info JSON object. */ static json_t *add_held_jobs ( - const std::pair &b) + const std::pair &b) { json_t *held_jobs = NULL; @@ -249,7 +239,7 @@ static json_t *add_held_jobs ( * Create a JSON object for a bank that a user belongs to. */ static json_t *pack_bank_info_object ( - const std::pair &b) + const std::pair &b) { json_t *bank_info, *held_jobs = NULL; @@ -282,7 +272,7 @@ static json_t *pack_bank_info_object ( */ static json_t *banks_to_json ( flux_plugin_t *p, - std::pair> &u) + std::pair> &u) { json_t *bank_info, *banks = NULL; @@ -314,7 +304,7 @@ static json_t *banks_to_json ( */ static json_t *user_to_json ( flux_plugin_t *p, - std::pair> u) + std::pair> u) { json_t *user = json_object (); // JSON object for one user json_t *userid, *banks = NULL; @@ -373,7 +363,7 @@ static bool check_map_for_dne_only () static int update_jobspec_bank (flux_plugin_t *p, int userid) { char *bank = NULL; - std::map>::iterator it; + std::map>::iterator it; it = users.find (userid); if (it == users.end ()) { @@ -401,8 +391,8 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid) // associated with the submitted job static bank_info_result get_bank_info (int userid, char *bank) { - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; it = users.find (userid); if (it == users.end ()) { @@ -522,7 +512,7 @@ static void rec_update_cb (flux_t *h, "active", &active) < 0) flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); - struct bank_info *b; + Association *b; b = &users[uid][bank]; b->bank_name = bank; @@ -633,7 +623,7 @@ static int priority_cb (flux_plugin_t *p, char *bank = NULL; char *queue = NULL; int64_t priority; - struct bank_info *b; + Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -650,7 +640,7 @@ static int priority_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -662,8 +652,8 @@ static int priority_cb (flux_plugin_t *p, return -1; } - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; if (b->max_run_jobs == BANK_INFO_MISSING) { // try to look up user again @@ -752,7 +742,7 @@ static int priority_cb (flux_plugin_t *p, static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid) { - struct bank_info *b; + Association *b; b = &users[userid]["DNE"]; users_def_bank[userid] = "DNE"; @@ -793,9 +783,9 @@ static int validate_cb (flux_plugin_t *p, double fairshare = 0.0; bool only_dne_data; - std::map>::iterator it; - std::map::iterator bank_it; - std::map::iterator q_it; + std::map>::iterator it; + std::map::iterator bank_it; + std::map::iterator q_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -888,10 +878,10 @@ static int new_cb (flux_plugin_t *p, char *queue = NULL; int max_run_jobs, cur_active_jobs, max_active_jobs = 0; double fairshare = 0.0; - struct bank_info *b; + Association *b; - std::map>::iterator it; - std::map::iterator bank_it; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -903,7 +893,7 @@ static int new_cb (flux_plugin_t *p, return flux_jobtap_reject_job (p, args, "unable to unpack bank arg"); } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1005,7 +995,7 @@ static int depend_cb (flux_plugin_t *p, { int userid; long int id; - struct bank_info *b; + Association *b; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1019,7 +1009,7 @@ static int depend_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1057,9 +1047,9 @@ static int run_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; + Association *b; - b = static_cast + b = static_cast (flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1088,11 +1078,11 @@ static int job_updated (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *queue = NULL; - struct bank_info *b; + Association *b; if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -1104,7 +1094,7 @@ static int job_updated (flux_plugin_t *p, return flux_jobtap_error (p, args, "unable to unpack plugin args"); // grab bank_info struct for user/bank (if any) - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info")); @@ -1159,7 +1149,7 @@ static int update_queue_cb (flux_plugin_t *p, flux_plugin_arg_t *args, void *data) { - std::map::iterator bank_it; + std::map::iterator bank_it; int userid; char *bank = NULL; char *queue = NULL; @@ -1213,9 +1203,9 @@ static int inactive_cb (flux_plugin_t *p, void *data) { int userid; - struct bank_info *b; - std::map>::iterator it; - std::map::iterator bank_it; + Association *b; + std::map>::iterator it; + std::map::iterator bank_it; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, @@ -1229,7 +1219,7 @@ static int inactive_cb (flux_plugin_t *p, return -1; } - b = static_cast (flux_jobtap_job_aux_get ( + b = static_cast (flux_jobtap_job_aux_get ( p, FLUX_JOBTAP_CURRENT_JOB, "mf_priority:bank_info"));