Skip to content

Commit

Permalink
Merge pull request #434 from cmoussa1/projects-plugin
Browse files Browse the repository at this point in the history
plugin: add project information to Association information in plugin
  • Loading branch information
mergify[bot] authored Mar 27, 2024
2 parents 3864ed5 + bf90b45 commit ffc9383
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 66 deletions.
17 changes: 16 additions & 1 deletion src/cmd/flux-account-priority-update.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ def bulk_update(path):
data = {}
bulk_user_data = []
bulk_q_data = []
bulk_proj_data = []

# fetch all rows from association_table (will print out tuples)
for row in cur.execute(
"""SELECT userid, bank, default_bank,
fairshare, max_running_jobs, max_active_jobs,
queues, active FROM association_table"""
queues, active, projects, default_project
FROM association_table"""
):
# create a JSON payload with the results of the query
single_user_data = {
Expand All @@ -88,6 +90,8 @@ def bulk_update(path):
"max_active_jobs": int(row[5]),
"queues": str(row[6]),
"active": int(row[7]),
"projects": str(row[8]),
"def_project": str(row[9]),
}
bulk_user_data.append(single_user_data)

Expand All @@ -111,6 +115,17 @@ def bulk_update(path):

flux.Flux().rpc("job-manager.mf_priority.rec_q_update", json.dumps(data)).get()

# fetch all rows from project_table
for row in cur.execute("SELECT project FROM project_table"):
# create a JSON payload with the results of the query
single_project = {
"project": str(row[0]),
}
bulk_proj_data.append(single_project)

data = {"data": bulk_proj_data}
flux.Flux().rpc("job-manager.mf_priority.rec_proj_update", data).get()

flux.Flux().rpc("job-manager.mf_priority.reprioritize")

# close DB connection
Expand Down
20 changes: 20 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,23 @@ bool check_map_for_dne_only (std::map<int, std::map<std::string, Association>>

return true;
}


int get_project_info (const char *project,
std::vector<std::string> &permissible_projects,
std::vector<std::string> projects)
{
auto it = std::find (projects.begin (), projects.end (), project);
if (it == projects.end ())
// project is unknown to flux-accounting
return UNKNOWN_PROJECT;

it = std::find (permissible_projects.begin (),
permissible_projects.end (),
project);
if (it == permissible_projects.end ())
// association doesn't have access to submit jobs under this project
return INVALID_PROJECT;

return 0;
}
33 changes: 23 additions & 10 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ extern "C" {
class Association {
public:
// attributes
std::string bank_name; // name of bank
double fairshare; // fair share value
int max_run_jobs; // max number of running jobs
int cur_run_jobs; // current number of running jobs
int max_active_jobs; // max number of active jobs
int cur_active_jobs; // current number of active jobs
std::vector<long int> held_jobs; // list of currently held job ID's
std::vector<std::string> queues; // list of accessible queues
int queue_factor; // priority factor associated with queue
int active; // active status
std::string bank_name; // name of bank
double fairshare; // fair share value
int max_run_jobs; // max number of running jobs
int cur_run_jobs; // current number of running jobs
int max_active_jobs; // max number of active jobs
int cur_active_jobs; // current number of active jobs
std::vector<long int> held_jobs; // list of currently held job ID's
std::vector<std::string> queues; // list of accessible queues
int queue_factor; // priority factor associated with queue
int active; // active status
std::vector<std::string> projects; // list of accessible projects
std::string def_project; // default project

// methods
json_t* to_json () const; // convert object to JSON string
Expand All @@ -56,6 +58,12 @@ class Association {
#define NO_QUEUE_SPECIFIED 0
#define INVALID_QUEUE -6

// - UNKNOWN_PROJECT: a project that flux-accounting doesn't know about
// - INVALID_PROJECT: a project that the association doesn't have permission
// to charge jobs under
#define UNKNOWN_PROJECT -6
#define INVALID_PROJECT -7

// min_nodes_per_job, max_nodes_per_job, and max_time_per_job are not
// currently used or enforced in this plugin, so their values have no
// effect in queue limit enforcement.
Expand Down Expand Up @@ -96,4 +104,9 @@ bool check_map_for_dne_only (std::map<int, std::map<std::string, Association>>
&users,
std::map<int, std::string> &users_def_bank);

// validate a potentially passed-in project by an association
int get_project_info (const char *project,
std::vector<std::string> &permissible_projects,
std::vector<std::string> projects);

#endif // ACCOUNTING_H
69 changes: 63 additions & 6 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern "C" {
std::map<int, std::map<std::string, Association>> users;
std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;
std::vector<std::string> projects;

/******************************************************************************
* *
Expand Down Expand Up @@ -205,7 +206,7 @@ static void rec_update_cb (flux_t *h,
const flux_msg_t *msg,
void *arg)
{
char *bank, *def_bank, *assoc_queues = NULL;
char *bank, *def_bank, *assoc_queues, *projects, *def_project = NULL;
int uid, max_running_jobs, max_active_jobs = 0;
double fshare = 0.0;
json_t *data, *jtemp = NULL;
Expand All @@ -232,15 +233,18 @@ static void rec_update_cb (flux_t *h,
json_t *el = json_array_get(data, i);

if (json_unpack_ex (el, &error, 0,
"{s:i, s:s, s:s, s:F, s:i, s:i, s:s, s:i}",
"{s:i, s:s, s:s, s:F, s:i,"
" s:i, s:s, s:i, s:s, s:s}",
"userid", &uid,
"bank", &bank,
"def_bank", &def_bank,
"fairshare", &fshare,
"max_running_jobs", &max_running_jobs,
"max_active_jobs", &max_active_jobs,
"queues", &assoc_queues,
"active", &active) < 0)
"active", &active,
"projects", &projects,
"def_project", &def_project) < 0)
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);

Association *b;
Expand All @@ -251,10 +255,14 @@ static void rec_update_cb (flux_t *h,
b->max_run_jobs = max_running_jobs;
b->max_active_jobs = max_active_jobs;
b->active = active;
b->def_project = def_project;

// split queues comma-delimited string and add it to b->queues vector
b->queues.clear ();
split_string_and_push_back (assoc_queues, b->queues);
// do the same thing for the association's projects
b->projects.clear ();
split_string_and_push_back (projects, b->projects);

users_def_bank[uid] = def_bank;
}
Expand Down Expand Up @@ -323,6 +331,53 @@ static void rec_q_cb (flux_t *h,
}


/*
* Unpack a payload from an external bulk update service and place it in the
* "projects" vector.
*/
static void rec_proj_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
char *project = NULL;
json_t *data, *jtemp = NULL;
json_error_t error;
int num_data = 0;
size_t index;
json_t *el;

if (flux_request_unpack (msg, NULL, "{s:o}", "data", &data) < 0) {
flux_log_error (h, "failed to unpack custom_priority.trigger msg");
goto error;
}

if (!data || !json_is_array (data)) {
flux_log (h, LOG_ERR, "mf_priority: invalid project info payload");
goto error;
}
num_data = json_array_size (data);

// clear the projects vector
projects.clear ();

json_array_foreach (data, index, el) {
if (json_unpack_ex (el, &error, 0, "{s:s}", "project", &project) < 0) {
flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text);
goto error;
}
projects.push_back (project);
}

if (flux_respond (h, msg, NULL) < 0)
flux_log_error (h, "flux_respond");

return;
error:
flux_respond_error (h, msg, errno, flux_msg_last_error (msg));
}


static void reprior_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
Expand Down Expand Up @@ -1005,9 +1060,11 @@ static const struct flux_plugin_handler tab[] = {
extern "C" int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "mf_priority", tab) < 0
|| flux_jobtap_service_register (p, "rec_update", rec_update_cb, p)
|| flux_jobtap_service_register (p, "reprioritize", reprior_cb, p)
|| flux_jobtap_service_register (p, "rec_q_update", rec_q_cb, p) < 0)
|| flux_jobtap_service_register (p, "rec_update", rec_update_cb, p) < 0
|| flux_jobtap_service_register (p, "reprioritize", reprior_cb, p) < 0
|| flux_jobtap_service_register (p, "rec_q_update", rec_q_cb, p) < 0
|| flux_jobtap_service_register (p, "rec_proj_update", rec_proj_cb, p)
< 0)
return -1;

return 0;
Expand Down
79 changes: 74 additions & 5 deletions src/plugins/test/accounting_test01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ std::map<int, std::map<std::string, Association>> users;
std::map<int, std::string> users_def_bank;
// define a test queues map
std::map<std::string, Queue> queues;
// define a vector of chargeable projects
std::vector<std::string> projects;


/*
Expand All @@ -50,7 +52,9 @@ void add_user_to_map (
a.held_jobs,
a.queues,
a.queue_factor,
a.active
a.active,
a.projects,
a.def_project
};
}

Expand All @@ -61,8 +65,8 @@ void add_user_to_map (
void initialize_map (
std::map<int, std::map<std::string, Association>> &users)
{
Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1};
Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1};
Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1, {"*"}, "*"};
Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, {}, 0, 1, {"*"}, "*"};

add_user_to_map (users, 1001, "bank_A", user1);
users_def_bank[1001] = "bank_A";
Expand All @@ -82,6 +86,17 @@ void initialize_queues () {
}


/*
* helper function to add test projects to the projects vector
*/
void initialize_projects () {
projects.push_back ("*");
projects.push_back ("A");
projects.push_back ("B");
projects.push_back ("C");
}


// ensure we can access a user/bank in the users map
static void test_direct_map_access (
std::map<int, std::map<std::string, Association>> &users)
Expand Down Expand Up @@ -189,6 +204,54 @@ static void test_get_queue_info_invalid_queue ()
}


// ensure user has access to a default project
static void test_get_project_info_success_default ()
{
Association a = users[1001]["bank_A"];
const char *p = "*";
int result = get_project_info (p, a.projects, projects);

ok (result == 0, "association has access to default project");
}


// ensure we can access projects that we add to an association
static void test_get_project_info_success_specified ()
{
Association a = users[1001]["bank_A"];
a.projects = {"*", "A"};
const char *p = "A";

int result = get_project_info (p, a.projects, projects);

ok (result == 0, "association has access to a specified project");
}


// ensure UNKNOWN_PROJECT is returned when an unrecognized project is passed in
static void test_get_project_info_unknown_project ()
{
Association a = users[1001]["bank_A"];
const char *p = "foo";
int result = get_project_info (p, a.projects, projects);

ok (result == UNKNOWN_PROJECT,
"UNKNOWN_PROJECT is returned when an unrecognized project is passed in");
}


// ensure INVALID_PROJECT is returned when an invalid project is passed in
static void test_get_project_info_invalid_project ()
{
Association a = users[1001]["bank_A"];
const char *p = "B";
int result = get_project_info (p, a.projects, projects);

ok (result == INVALID_PROJECT,
"INVALID_PROJECT is returned when an inaccessible project is passed in");
}


// ensure false is returned because we have valid flux-accounting data in map
static void test_check_map_dne_false ()
{
Expand All @@ -204,7 +267,7 @@ static void test_check_map_dne_true ()
users.clear ();
users_def_bank.clear ();

Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, {}, 0, 1};
Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, {}, 0, 1, {"*"}, "*"};
add_user_to_map (users, 9999, "DNE", tmp_user);
users_def_bank[9999] = "DNE";

Expand All @@ -217,12 +280,14 @@ static void test_check_map_dne_true ()
int main (int argc, char* argv[])
{
// declare the number of tests that we plan to run
plan (11);
plan (15);

// add users to the test map
initialize_map (users);
// add queues to the test queues map
initialize_queues ();
// add projects to the test projects vector
initialize_projects ();

test_direct_map_access (users);
test_get_association_success ();
Expand All @@ -233,6 +298,10 @@ int main (int argc, char* argv[])
test_get_queue_info_no_queue_specified ();
test_get_queue_info_unknown_queue ();
test_get_queue_info_invalid_queue ();
test_get_project_info_success_default ();
test_get_project_info_success_specified ();
test_get_project_info_unknown_project ();
test_get_project_info_invalid_project ();
test_check_map_dne_false ();
test_check_map_dne_true ();

Expand Down
Loading

0 comments on commit ffc9383

Please sign in to comment.