Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add callback specific for validating an updated queue #399

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 74 additions & 9 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,10 @@ static int run_cb (flux_plugin_t *p,
}


/*
* apply an update on a job with regard to its queue once it has been
* validated.
*/
static int job_updated (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
Expand All @@ -1087,12 +1091,12 @@ static int job_updated (flux_plugin_t *p,

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s{s{s{s?s, s?s}}}}",
"{s:i, s{s{s{s?s}}}, s:{s?s}}",
"userid", &userid,
"jobspec", "attributes", "system",
"bank", &bank, "queue", &queue) < 0) {
return flux_jobtap_reject_job (p, args, "unable to unpack bank arg");
}
"jobspec", "attributes", "system", "bank", &bank,
"updates",
"attributes.system.queue", &queue) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// grab bank_info struct for user/bank (if any)
b = static_cast<bank_info *> (flux_jobtap_job_aux_get (
Expand Down Expand Up @@ -1129,10 +1133,70 @@ static int job_updated (flux_plugin_t *p,
bank_it = lookup_result.second;
}

// fetch the priority of the validated queue; assign it to the bank_info
// struct associated with the job
int queue_factor = get_queue_info (queue, bank_it);
b->queue_factor = queue_factor;
// if the queue for the job has been updated, fetch the priority of the
// validated queue and assign it to the associated bank_info struct
if (queue != NULL) {
int queue_factor = get_queue_info (queue, bank_it);
b->queue_factor = queue_factor;
}

return 0;
}


/*
* check for an updated queue and validate it for a user/bank; if the
* user/bank does not have access to the queue they are trying to update
* their job for, reject the update and keep the job in its current queue.
*/
static int update_queue_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
std::map<std::string, struct bank_info>::iterator bank_it;
int userid;
char *bank = NULL;
char *queue = NULL;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:s, s{s{s{s?s}}}}",
"userid", &userid,
"value", &queue,
"jobspec", "attributes", "system", "bank",
&bank) < 0)
return flux_jobtap_error (p, args, "unable to unpack plugin args");

// look up user/bank info based on unpacked information
bank_info_result lookup_result = get_bank_info (userid, bank);

if (lookup_result.first == BANK_USER_NOT_FOUND) {
return flux_jobtap_error (p,
args,
"mf_priority: cannot find info for user ",
userid);
} else if (lookup_result.first == BANK_INVALID) {
return flux_jobtap_error (p,
args,
"mf_priority: not a member of %s",
bank);
} else if (lookup_result.first == BANK_NO_DEFAULT) {
return flux_jobtap_error (p,
args,
"mf_priority: user/default bank entry does "
"not exist");
} else if (lookup_result.first == BANK_SUCCESS) {
bank_it = lookup_result.second;

// validate the updated queue and make sure the user/bank has
// access to it; if not, reject the update
if (get_queue_info (queue, bank_it) == INVALID_QUEUE)
return flux_jobtap_error (p,
args,
"mf_priority: queue not valid for user: %s",
queue);
}

return 0;
}
Expand Down Expand Up @@ -1210,6 +1274,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.update", job_updated, NULL},
{ "job.state.run", run_cb, NULL},
{ "plugin.query", query_cb, NULL},
{ "job.update.attributes.system.queue", update_queue_cb, NULL },
{ 0 },
};

Expand Down
34 changes: 32 additions & 2 deletions t/t1030-mf-priority-update-queue.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ test_expect_success 'load multi-factor priority plugin' '

test_expect_success 'add some banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root A 1
flux account add-bank --parent-bank=root A 1 &&
flux account add-bank --parent-bank=root B 1
'

test_expect_success 'add some queues to the DB' '
Expand All @@ -44,6 +45,10 @@ test_expect_success 'add a user to the DB' '
flux account add-user --username=user5001 \
--userid=5001 \
--bank=A \
--queues="bronze,silver" &&
flux account add-user --username=user5001 \
--userid=5001 \
--bank=B \
--queues="bronze,silver"
'

Expand Down Expand Up @@ -78,13 +83,38 @@ test_expect_success 'update of queue of pending job works' '
test_expect_success 'updating a job using a queue the user does not belong to fails' '
test_must_fail flux update $jobid1 queue=gold > unavail_queue.out 2>&1 &&
test_debug "cat unavail_queue.out" &&
grep "ERROR: Queue not valid for user: gold" unavail_queue.out
grep "ERROR: mf_priority: queue not valid for user: gold" unavail_queue.out
'

test_expect_success 'cancel job' '
flux job cancel $jobid1
'

test_expect_success 'submit job for testing under non-default bank' '
jobid2=$(flux python ${SUBMIT_AS} 5001 --setattr=bank=B --queue=bronze sleep 30) &&
flux job wait-event -f json $jobid2 priority \
| jq '.context.priority' > job2_bronze.test &&
grep 1050000 job2_bronze.test
'

test_expect_success 'update of queue of pending job under a non-default bank works' '
flux update $jobid2 queue=silver &&
flux job wait-event -f json $jobid2 priority &&
flux job eventlog $jobid2 > eventlog.out &&
grep "attributes.system.queue=\"silver\"" eventlog.out &&
grep 2050000 eventlog.out
'

test_expect_success 'updating a job under non-default bank using a queue the user does not belong to fails' '
test_must_fail flux update $jobid2 queue=gold > unavail_queue.out 2>&1 &&
test_debug "cat unavail_queue.out" &&
grep "ERROR: mf_priority: queue not valid for user: gold" unavail_queue.out
'

test_expect_success 'cancel job' '
flux job cancel $jobid2
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'
Expand Down
Loading