Skip to content

Commit

Permalink
plugin: create new Association class
Browse files Browse the repository at this point in the history
Problem: The plugin uses its own bank_info struct to hold user/bank
information and has a number of methods for access and modification
of these structs. As the plugin's feature set has grown, so have the
requirements for the bank_info struct, resulting in a very large and
hard-to-parse piece of code.

Begin to clean up this plugin. Start by creating a new Association
class and place it in a separate file that gets compiled with the
plugin. Replace all instances of "struct bank_info" with the new
"Association" class type.
  • Loading branch information
cmoussa1 committed Jan 31, 2024
1 parent 510fd5c commit 1f29878
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 53 deletions.
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ noinst_HEADERS = \
fairness/reader/data_reader_db.hpp \
fairness/writer/data_writer_base.hpp \
fairness/writer/data_writer_db.hpp \
fairness/writer/data_writer_stdout.hpp
fairness/writer/data_writer_stdout.hpp \
plugins/accounting.hpp

fairness_libweighted_tree_la_SOURCES = \
fairness/account/account.cpp \
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
12 changes: 12 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#include "accounting.hpp"

36 changes: 36 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

// header file for the Accounting class

#ifndef ACCOUNTING_H
#define ACCOUNTING_H

#include <vector>
#include <string>
#include <map>
#include <iterator>

// all attributes are per-user/bank
class Association {
public:
std::string bank_name; // name of bank
double fairshare; // fair share value
int max_run_jobs; // max number of running jobs
int cur_run_jobs; // current number of running jobs
int max_active_jobs; // max number of active jobs
int cur_active_jobs; // current number of active jobs
std::vector<long int> held_jobs; // list of currently held job ID's
std::vector<std::string> queues; // list of accessible queues
int queue_factor; // priority factor associated with queue
int active; // active status
};

#endif // ACCOUNTING_H
92 changes: 41 additions & 51 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ extern "C" {
#include <vector>
#include <sstream>

// custom bank_info class file
#include "accounting.hpp"

// the plugin does not know about the association who submitted a job and will
// assign default values to the association until it receives information from
// flux-accounting
Expand Down Expand Up @@ -58,25 +61,12 @@ enum bank_info_codes {
BANK_NO_DEFAULT
};

typedef std::pair<bank_info_codes, std::map<std::string, struct bank_info>::iterator> bank_info_result;
typedef std::pair<bank_info_codes, std::map<std::string, Association>::iterator> bank_info_result;

std::map<int, std::map<std::string, struct bank_info>> users;
std::map<int, std::map<std::string, Association>> users;
std::map<std::string, struct queue_info> queues;
std::map<int, std::string> users_def_bank;

struct bank_info {
std::string bank_name;
double fairshare;
int max_run_jobs;
int cur_run_jobs;
int max_active_jobs;
int cur_active_jobs;
std::vector<long int> held_jobs;
std::vector<std::string> queues;
int queue_factor;
int active;
};

// min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not
// currently used or enforced in this plugin, so their values have no
// effect in queue limit enforcement.
Expand Down Expand Up @@ -113,7 +103,7 @@ int64_t priority_calculation (flux_plugin_t *p,
double fshare_factor = 0.0, priority = 0.0;
int queue_factor = 0;
int fshare_weight, queue_weight;
struct bank_info *b;
Association *b;

fshare_weight = 100000;
queue_weight = 10000;
Expand All @@ -124,7 +114,7 @@ int64_t priority_calculation (flux_plugin_t *p,
if (urgency == FLUX_JOB_URGENCY_EXPEDITE)
return FLUX_JOB_PRIORITY_MAX;

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -152,7 +142,7 @@ int64_t priority_calculation (flux_plugin_t *p,

static int get_queue_info (
char *queue,
std::map<std::string, struct bank_info>::iterator bank_it)
std::map<std::string, Association>::iterator bank_it)
{
std::map<std::string, struct queue_info>::iterator q_it;

Expand Down Expand Up @@ -184,7 +174,7 @@ static int get_queue_info (
}


static void split_string (char *queues, struct bank_info *b)
static void split_string (char *queues, Association *b)
{
std::stringstream s_stream;

Expand Down Expand Up @@ -218,7 +208,7 @@ int check_queue_factor (flux_plugin_t *p,
* Add held job IDs to a JSON array to be added to a bank_info JSON object.
*/
static json_t *add_held_jobs (
const std::pair<std::string, struct bank_info> &b)
const std::pair<std::string, Association> &b)
{
json_t *held_jobs = NULL;

Expand Down Expand Up @@ -249,7 +239,7 @@ static json_t *add_held_jobs (
* Create a JSON object for a bank that a user belongs to.
*/
static json_t *pack_bank_info_object (
const std::pair<std::string, struct bank_info> &b)
const std::pair<std::string, Association> &b)
{
json_t *bank_info, *held_jobs = NULL;

Expand Down Expand Up @@ -282,7 +272,7 @@ static json_t *pack_bank_info_object (
*/
static json_t *banks_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, struct bank_info>> &u)
std::pair<int, std::map<std::string, Association>> &u)
{
json_t *bank_info, *banks = NULL;

Expand Down Expand Up @@ -314,7 +304,7 @@ static json_t *banks_to_json (
*/
static json_t *user_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, struct bank_info>> u)
std::pair<int, std::map<std::string, Association>> u)
{
json_t *user = json_object (); // JSON object for one user
json_t *userid, *banks = NULL;
Expand Down Expand Up @@ -373,7 +363,7 @@ static bool check_map_for_dne_only ()
static int update_jobspec_bank (flux_plugin_t *p, int userid)
{
char *bank = NULL;
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<int, std::map<std::string, Association>>::iterator it;

it = users.find (userid);
if (it == users.end ()) {
Expand Down Expand Up @@ -401,8 +391,8 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid)
// associated with the submitted job
static bank_info_result get_bank_info (int userid, char *bank)
{
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

it = users.find (userid);
if (it == users.end ()) {
Expand Down Expand Up @@ -522,7 +512,7 @@ static void rec_update_cb (flux_t *h,
"active", &active) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

struct bank_info *b;
Association *b;
b = &users[uid][bank];

b->bank_name = bank;
Expand Down Expand Up @@ -633,7 +623,7 @@ static int priority_cb (flux_plugin_t *p,
char *bank = NULL;
char *queue = NULL;
int64_t priority;
struct bank_info *b;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -650,7 +640,7 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand All @@ -662,8 +652,8 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

if (b->max_run_jobs == BANK_INFO_MISSING) {
// try to look up user again
Expand Down Expand Up @@ -752,7 +742,7 @@ static int priority_cb (flux_plugin_t *p,

static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid)
{
struct bank_info *b;
Association *b;

b = &users[userid]["DNE"];
users_def_bank[userid] = "DNE";
Expand Down Expand Up @@ -793,9 +783,9 @@ static int validate_cb (flux_plugin_t *p,
double fairshare = 0.0;
bool only_dne_data;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, struct queue_info>::iterator q_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;
std::map<std::string, Association>::iterator q_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand Down Expand Up @@ -888,10 +878,10 @@ static int new_cb (flux_plugin_t *p,
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;
struct bank_info *b;
Association *b;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -903,7 +893,7 @@ static int new_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1005,7 +995,7 @@ static int depend_cb (flux_plugin_t *p,
{
int userid;
long int id;
struct bank_info *b;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -1019,7 +1009,7 @@ static int depend_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1057,9 +1047,9 @@ static int run_cb (flux_plugin_t *p,
void *data)
{
int userid;
struct bank_info *b;
Association *b;

b = static_cast<bank_info *>
b = static_cast<Association *>
(flux_jobtap_job_aux_get (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1088,11 +1078,11 @@ static int job_updated (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
struct bank_info *b;
Association *b;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -1104,7 +1094,7 @@ static int job_updated (flux_plugin_t *p,
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab bank_info struct for user/bank (if any)
b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1159,7 +1149,7 @@ static int update_queue_cb (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
Expand Down Expand Up @@ -1213,9 +1203,9 @@ static int inactive_cb (flux_plugin_t *p,
void *data)
{
int userid;
struct bank_info *b;
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
Association *b;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -1229,7 +1219,7 @@ static int inactive_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down

0 comments on commit 1f29878

Please sign in to comment.