Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add external Association class to be used in plugin #412

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ noinst_HEADERS = \
fairness/reader/data_reader_db.hpp \
fairness/writer/data_writer_base.hpp \
fairness/writer/data_writer_db.hpp \
fairness/writer/data_writer_stdout.hpp
fairness/writer/data_writer_stdout.hpp \
plugins/accounting.hpp

fairness_libweighted_tree_la_SOURCES = \
fairness/account/account.cpp \
Expand Down Expand Up @@ -46,7 +47,8 @@ fairness_libweighted_tree_la_CXXFLAGS = \
TESTS = \
weighted_tree_test01.t \
data_reader_db_test01.t \
data_writer_db_test01.t
data_writer_db_test01.t \
accounting_test01.t
check_PROGRAMS = $(TESTS)

TEST_EXTENSIONS = .t
Expand Down Expand Up @@ -93,6 +95,14 @@ data_writer_db_test01_t_LDADD = \
fairness/libweighted_tree.la \
common/libtap/libtap.la

accounting_test01_t_SOURCES = \
plugins/test/accounting_test01.cpp \
plugins/accounting.cpp \
plugins/accounting.hpp
accounting_test01_t_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir)
accounting_test01_t_LDADD = \
common/libtap/libtap.la

noinst_PROGRAMS = \
cmd/flux-account-update-fshare \
cmd/flux-account-shares
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ jobtapdir = \
$(fluxlibdir)/job-manager/plugins/

jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
12 changes: 12 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#include "accounting.hpp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is accounting.cpp for if all the code is in accounting.hpp? Do you plan to implement methods on the Association class in this file later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was planning on adding any methods as well as standalone functions that relate to any Accounting objects in this file (I know of at least three at the moment).


36 changes: 36 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/************************************************************\
* Copyright 2024 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

// header file for the Accounting class

#ifndef ACCOUNTING_H
#define ACCOUNTING_H

#include <vector>
#include <string>
#include <map>
#include <iterator>

// all attributes are per-user/bank
class Association {
public:
std::string bank_name; // name of bank
double fairshare; // fair share value
int max_run_jobs; // max number of running jobs
int cur_run_jobs; // current number of running jobs
int max_active_jobs; // max number of active jobs
int cur_active_jobs; // current number of active jobs
std::vector<long int> held_jobs; // list of currently held job ID's
std::vector<std::string> queues; // list of accessible queues
int queue_factor; // priority factor associated with queue
int active; // active status
};

#endif // ACCOUNTING_H
93 changes: 43 additions & 50 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ extern "C" {
#include <vector>
#include <sstream>

// custom bank_info class file
#include "accounting.hpp"

// the plugin does not know about the association who submitted a job and will
// assign default values to the association until it receives information from
// flux-accounting
Expand Down Expand Up @@ -58,24 +61,12 @@ enum bank_info_codes {
BANK_NO_DEFAULT
};

typedef std::pair<bank_info_codes, std::map<std::string, struct bank_info>::iterator> bank_info_result;
typedef std::pair<bank_info_codes, std::map<std::string, Association>::iterator> bank_info_result;

std::map<int, std::map<std::string, struct bank_info>> users;
std::map<int, std::map<std::string, Association>> users;
std::map<std::string, struct queue_info> queues;
std::map<int, std::string> users_def_bank;

struct bank_info {
double fairshare;
int max_run_jobs;
int cur_run_jobs;
int max_active_jobs;
int cur_active_jobs;
std::vector<long int> held_jobs;
std::vector<std::string> queues;
int queue_factor;
int active;
};

// min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not
// currently used or enforced in this plugin, so their values have no
// effect in queue limit enforcement.
Expand Down Expand Up @@ -112,7 +103,7 @@ int64_t priority_calculation (flux_plugin_t *p,
double fshare_factor = 0.0, priority = 0.0;
int queue_factor = 0;
int fshare_weight, queue_weight;
struct bank_info *b;
Association *b;

fshare_weight = 100000;
queue_weight = 10000;
Expand All @@ -123,7 +114,7 @@ int64_t priority_calculation (flux_plugin_t *p,
if (urgency == FLUX_JOB_URGENCY_EXPEDITE)
return FLUX_JOB_PRIORITY_MAX;

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -151,7 +142,7 @@ int64_t priority_calculation (flux_plugin_t *p,

static int get_queue_info (
char *queue,
std::map<std::string, struct bank_info>::iterator bank_it)
std::map<std::string, Association>::iterator bank_it)
{
std::map<std::string, struct queue_info>::iterator q_it;

Expand Down Expand Up @@ -183,7 +174,7 @@ static int get_queue_info (
}


static void split_string (char *queues, struct bank_info *b)
static void split_string (char *queues, Association *b)
{
std::stringstream s_stream;

Expand Down Expand Up @@ -217,7 +208,7 @@ int check_queue_factor (flux_plugin_t *p,
* Add held job IDs to a JSON array to be added to a bank_info JSON object.
*/
static json_t *add_held_jobs (
const std::pair<std::string, struct bank_info> &b)
const std::pair<std::string, Association> &b)
{
json_t *held_jobs = NULL;

Expand Down Expand Up @@ -248,7 +239,7 @@ static json_t *add_held_jobs (
* Create a JSON object for a bank that a user belongs to.
*/
static json_t *pack_bank_info_object (
const std::pair<std::string, struct bank_info> &b)
const std::pair<std::string, Association> &b)
{
json_t *bank_info, *held_jobs = NULL;

Expand Down Expand Up @@ -281,7 +272,7 @@ static json_t *pack_bank_info_object (
*/
static json_t *banks_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, struct bank_info>> &u)
std::pair<int, std::map<std::string, Association>> &u)
{
json_t *bank_info, *banks = NULL;

Expand Down Expand Up @@ -313,7 +304,7 @@ static json_t *banks_to_json (
*/
static json_t *user_to_json (
flux_plugin_t *p,
std::pair<int, std::map<std::string, struct bank_info>> u)
std::pair<int, std::map<std::string, Association>> u)
{
json_t *user = json_object (); // JSON object for one user
json_t *userid, *banks = NULL;
Expand Down Expand Up @@ -372,7 +363,7 @@ static bool check_map_for_dne_only ()
static int update_jobspec_bank (flux_plugin_t *p, int userid)
{
char *bank = NULL;
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<int, std::map<std::string, Association>>::iterator it;

it = users.find (userid);
if (it == users.end ()) {
Expand Down Expand Up @@ -400,8 +391,8 @@ static int update_jobspec_bank (flux_plugin_t *p, int userid)
// associated with the submitted job
static bank_info_result get_bank_info (int userid, char *bank)
{
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

it = users.find (userid);
if (it == users.end ()) {
Expand Down Expand Up @@ -521,9 +512,10 @@ static void rec_update_cb (flux_t *h,
"active", &active) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

struct bank_info *b;
Association *b;
b = &users[uid][bank];

b->bank_name = bank;
b->fairshare = fshare;
b->max_run_jobs = max_running_jobs;
b->max_active_jobs = max_active_jobs;
Expand Down Expand Up @@ -631,7 +623,7 @@ static int priority_cb (flux_plugin_t *p,
char *bank = NULL;
char *queue = NULL;
int64_t priority;
struct bank_info *b;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -648,7 +640,7 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand All @@ -660,8 +652,8 @@ static int priority_cb (flux_plugin_t *p,
return -1;
}

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

if (b->max_run_jobs == BANK_INFO_MISSING) {
// try to look up user again
Expand Down Expand Up @@ -750,11 +742,12 @@ static int priority_cb (flux_plugin_t *p,

static void add_missing_bank_info (flux_plugin_t *p, flux_t *h, int userid)
{
struct bank_info *b;
Association *b;

b = &users[userid]["DNE"];
users_def_bank[userid] = "DNE";

b->bank_name = "DNE";
b->fairshare = 0.1;
b->max_run_jobs = BANK_INFO_MISSING;
b->cur_run_jobs = 0;
Expand Down Expand Up @@ -790,9 +783,9 @@ static int validate_cb (flux_plugin_t *p,
double fairshare = 0.0;
bool only_dne_data;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, struct queue_info>::iterator q_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;
std::map<std::string, Association>::iterator q_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand Down Expand Up @@ -885,10 +878,10 @@ static int new_cb (flux_plugin_t *p,
char *queue = NULL;
int max_run_jobs, cur_active_jobs, max_active_jobs = 0;
double fairshare = 0.0;
struct bank_info *b;
Association *b;

std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -900,7 +893,7 @@ static int new_cb (flux_plugin_t *p,
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1002,7 +995,7 @@ static int depend_cb (flux_plugin_t *p,
{
int userid;
long int id;
struct bank_info *b;
Association *b;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -1016,7 +1009,7 @@ static int depend_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1054,9 +1047,9 @@ static int run_cb (flux_plugin_t *p,
void *data)
{
int userid;
struct bank_info *b;
Association *b;

b = static_cast<bank_info *>
b = static_cast<Association *>
(flux_jobtap_job_aux_get (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1085,11 +1078,11 @@ static int job_updated (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
struct bank_info *b;
Association *b;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -1101,7 +1094,7 @@ static int job_updated (flux_plugin_t *p,
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab bank_info struct for user/bank (if any)
b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down Expand Up @@ -1156,7 +1149,7 @@ static int update_queue_cb (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
std::map<std::string, Association>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;
Expand Down Expand Up @@ -1210,9 +1203,9 @@ static int inactive_cb (flux_plugin_t *p,
void *data)
{
int userid;
struct bank_info *b;
std::map<int, std::map<std::string, struct bank_info>>::iterator it;
std::map<std::string, struct bank_info>::iterator bank_it;
Association *b;
std::map<int, std::map<std::string, Association>>::iterator it;
std::map<std::string, Association>::iterator bank_it;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
Expand All @@ -1226,7 +1219,7 @@ static int inactive_cb (flux_plugin_t *p,
return -1;
}

b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
b = static_cast<Association *> (flux_jobtap_job_aux_get (
p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority:bank_info"));
Expand Down
Loading
Loading