From 4ed973b585d979f8fc1d6eab010fed74747796f3 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 16 Sep 2024 11:29:19 -0700 Subject: [PATCH 1/4] view-job-records: add "project" filter option Problem: There is no way to filter jobs by project when looking at job records in flux-accounting's "jobs" table. Add a "project" column to the jobs table that will store an optional project name for each job. Update the schema version of the DB with the addition of the column name. Add an option to filter jobs by "project" when adding job records to the "jobs" table and while looking at job records with the view-job-records command. --- .../python/fluxacct/accounting/__init__.py.in | 2 +- .../python/fluxacct/accounting/create_db.py | 3 ++- .../accounting/jobs_table_subcommands.py | 23 +++++++++++++++---- src/cmd/flux-account-fetch-job-records.py | 18 ++++++++++++++- src/cmd/flux-account-service.py | 1 + src/cmd/flux-account.py | 6 +++++ 6 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/__init__.py.in b/src/bindings/python/fluxacct/accounting/__init__.py.in index be5aa4236..51dfab19e 100644 --- a/src/bindings/python/fluxacct/accounting/__init__.py.in +++ b/src/bindings/python/fluxacct/accounting/__init__.py.in @@ -1,5 +1,5 @@ db_dir = "@X_LOCALSTATEDIR@/lib/flux/" db_path = "@X_LOCALSTATEDIR@/lib/flux/FluxAccounting.db" -db_schema_version = 22 +db_schema_version = 23 __all__ = ["db_dir", "db_path", "db_schema_version"] diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index a54ba53dd..15c28b532 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -212,7 +212,8 @@ def create_db( t_inactive real NOT NULL, ranks text NOT NULL, R text NOT NULL, - jobspec text NOT NULL + jobspec text NOT NULL, + project text );""" ) logging.info("Created jobs table successfully") diff --git a/src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py b/src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py index dff19881c..bfbafd765 100644 --- a/src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py @@ -35,7 +35,9 @@ class JobRecord: A record of an individual job. """ - def __init__(self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources): + def __init__( + self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources, project + ): self.userid = userid self.username = get_username(userid) self.jobid = jobid @@ -44,6 +46,7 @@ def __init__(self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources self.t_inactive = t_inactive self.nnodes = nnodes self.resources = resources + self.project = project @property def elapsed(self): @@ -69,6 +72,7 @@ def write_records_to_file(job_records, output_file): "T_Inactive", "Nodes", "R", + "Project", ) ) for record in job_records: @@ -82,6 +86,7 @@ def write_records_to_file(job_records, output_file): str(record.t_inactive), str(record.nnodes), str(record.resources), + str(record.project), ) ) @@ -93,7 +98,7 @@ def convert_to_str(job_records): """ job_record_str = [] job_record_str.append( - "{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10}".format( + "{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format( "UserID", "Username", "JobID", @@ -101,11 +106,12 @@ def convert_to_str(job_records): "T_Run", "T_Inactive", "Nodes", + "Project", ) ) for record in job_records: job_record_str.append( - "{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10}".format( + "{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format( record.userid, record.username, record.jobid, @@ -113,6 +119,7 @@ def convert_to_str(job_records): record.t_run, record.t_inactive, record.nnodes, + record.project, ) ) @@ -143,6 +150,7 @@ def convert_to_obj(rows): t_inactive=row[4], nnodes=job_nnodes, resources=row[6], + project=row[8] if row[8] is not None else "", ) job_records.append(job_record) @@ -212,14 +220,16 @@ def get_jobs(conn, **kwargs): jobs are found, an empty list is returned. """ # find out which args were passed and place them in a dict - valid_params = {"user", "after_start_time", "before_end_time", "jobid"} + valid_params = {"user", "after_start_time", "before_end_time", "jobid", "project"} params = { key: val for key, val in kwargs.items() if val is not None and key in valid_params } - select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs" + select_stmt = ( + "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project FROM jobs" + ) where_clauses = [] params_list = [] @@ -236,6 +246,9 @@ def get_jobs(conn, **kwargs): if "jobid" in params: where_clauses.append("id = ?") params_list.append(params["jobid"]) + if "project" in params: + where_clauses.append("project = ?") + params_list.append(params["project"]) if where_clauses: select_stmt += " WHERE " + " AND ".join(where_clauses) diff --git a/src/cmd/flux-account-fetch-job-records.py b/src/cmd/flux-account-fetch-job-records.py index 748d97a9f..f03412150 100755 --- a/src/cmd/flux-account-fetch-job-records.py +++ b/src/cmd/flux-account-fetch-job-records.py @@ -14,6 +14,7 @@ import sys import argparse import sqlite3 +import json import flux import flux.job @@ -89,6 +90,17 @@ def fetch_new_jobs(last_timestamp=0.0): single_record["R"] = data["R"] if data["jobspec"] is not None: single_record["jobspec"] = data["jobspec"] + try: + jobspec = json.loads(single_record["jobspec"]) + # using .get() here ensures no KeyError is raised if + # "attributes" or "project" are missing; will set + # single_record["project"] to None if it can't be found + accounting_attributes = jobspec.get("attributes", {}).get("system", {}) + single_record["project"] = accounting_attributes.get("project") + except json.JSONDecodeError as exc: + # the job does not have a project in jobspec, so don't add it + # to the job dictionary + continue required_keys = [ "userid", @@ -123,7 +135,8 @@ def insert_jobs_in_db(conn, job_records): cur.execute( """ INSERT OR IGNORE INTO jobs - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + (id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( single_job["id"], @@ -134,6 +147,9 @@ def insert_jobs_in_db(conn, job_records): single_job["ranks"], single_job["R"], single_job["jobspec"], + single_job["project"] + if single_job.get("project") is not None + else "", ), ) except KeyError: diff --git a/src/cmd/flux-account-service.py b/src/cmd/flux-account-service.py index 93be3f91f..e448354ee 100755 --- a/src/cmd/flux-account-service.py +++ b/src/cmd/flux-account-service.py @@ -357,6 +357,7 @@ def view_job_records(self, handle, watcher, msg, arg): user=msg.payload["user"], before_end_time=msg.payload["before_end_time"], after_start_time=msg.payload["after_start_time"], + project=msg.payload["project"], ) payload = {"view_job_records": val} diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index a421b485f..63a5ef9ac 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -230,6 +230,11 @@ def add_view_job_records_arg(subparsers): help="end time", metavar="END TIME", ) + subparser_view_job_records.add_argument( + "--project", + help="project", + metavar="PROJECT", + ) def add_create_db_arg(subparsers): @@ -696,6 +701,7 @@ def select_accounting_function(args, output_file, parser): "user": args.user, "before_end_time": args.before_end_time, "after_start_time": args.after_start_time, + "project": args.project, } return_val = flux.Flux().rpc("accounting.view_job_records", data).get() # the return value of view-job-records without From f0affa0838785549c670b1b51506cf1d711c6db1 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 16 Sep 2024 11:54:02 -0700 Subject: [PATCH 2/4] t: add flux-accounting DB v0.37.0 to testsuite Problem: A new column is added to the "jobs" table, but the test file that tests updating previous versions of flux-accounting DBs does not contain the schema version of the DB before this addition. Add the flux-accounting DB from v0.37.0 to the set of DBs tested in an update in t1017-update-db.t. --- t/expected/test_dbs/FluxAccountingv0-37-0.db | Bin 0 -> 61440 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 t/expected/test_dbs/FluxAccountingv0-37-0.db diff --git a/t/expected/test_dbs/FluxAccountingv0-37-0.db b/t/expected/test_dbs/FluxAccountingv0-37-0.db new file mode 100644 index 0000000000000000000000000000000000000000..14207b921d4fc996bb77e1b14145f1e2a7cf3970 GIT binary patch literal 61440 zcmeI*&2QUe90zc_N$R9+<_)Qf08MxrQkt&Xn)PK&)5Jo_q-t4PN_&{lWVxx^c}twm zcF=a37)2Tq64xF13%DWez?m!D5SM8eZioY1n2;#X^Vm=FIJVR7g-Q!wtEu0A&+~kq z$44ag^=EO1VjBb?009U<00Izz00bZa0SG_<0{@M`SSn8f%j!=O`N9MN2tWV= z5P$##AOHafKmY;|fWT`aa7I3s&E`&|R5h1XWI3HzRnK(2wre`l-anPf`%nFbWR&}R zdwY|UBV<)p|B%QRCI~ zbUYXxWh?!I@@DF1c_sCYR3f{?N%>k_!!iUQ009U<00M_D@cCOQrEv13{Kb9t z$v)lB_~|aC7gnp4wW_vO`Jh(Sy4$sh5v^xpJFQK&)v`Tv+jO**)x~R-)$7{T>UFKM zzP7x$MB-hmF0D<)p3WnB=3S33^ZC;9nzpoFt3}T=>Ynwf=^sZ5o+y_~+K1IEmG#=1 zRwkR1&)hK_)An>)bB&kD&nyt-57+R?GH3bSbi7DAZMv-7!5@CpGZ{T21D+VQo{u zbJL|xlX@4Do7ik^>Ru3toh3%=v^5+rIhxq1>>S>+bcIKwy(>(~(b}%**oNOoc7kWw z_jr#B5yZv*(Sm{QpAc8K`MJ57*>1e#Xl(R#57e}ms@tZyWwaZ?n5I!*Dw;UpSry}2 zn$QdM_~j*SVR`9FZE;~uTdl6tDht(a=S8nC@hV}(@k@oS@J^?fi!dd_ZW)%t8o-Z^ zk2c45(|K~Vrg2wy+O}=k+d8SItHn`-Vs)d@$o(khq7<|^{~GX1i!JUv4cwi0<>{7w z#hs5Y&Teh;o0}F}+`01X`Puho=4btK4_aLJv}n0^+Ge|3D5r`gMgUKJDw0SG_<0y~aPj}lJO)HdGE@p)C2xQ(b2+08z zj--n(5X7>3^P8VLh(nUryJ?Jml@c|y!g<{XlRIrcE#kw^Znvb6NZ^M zR`+?BA&3+viBCCC?i~KSfEYy>COkVf_VEGcI+q(RES^eoWvREmnO+uFMs}}dmBOi0 z@tJzIhJ>ip3A2gGMY|*&|+dr5ig4g z*45oCV_ItIvxcG+E?trz+-Bp+({CD$ExloFvG!Z^u@PgVVJ2?l< z@?hQ>cznbZgX3b7w@rI9Qd!}C&m)j4aHMlbD_tEPF6bAMt~xSs{Re^cE%Fx7a`*)p zzg^i~$tZ;j7vzT*nNj>v@mq&OZt=&&sPTkxk*#AUZVZ{-o(hEUJW3Hm_Q6gji> EFUgC%761SM literal 0 HcmV?d00001 From 105329b6943ec0ed6ec68d4366b7e1438bea5a69 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 16 Sep 2024 11:35:33 -0700 Subject: [PATCH 3/4] t: update helper script, expected output Problem: There is a helper script that inserts test job records into the "jobs" table that needs its INSERT statement to be updated to account for the addition of the "project" column. There is also an expected output file that needs to be updated to also account for the new "Project" column in the output of the view-job-records command. Update both of these in accordance with the addition of the "project" column to the "jobs" table in the flux-accounting DB. --- t/expected/job_usage/no_jobs.expected | 2 +- t/scripts/insert_jobs.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/t/expected/job_usage/no_jobs.expected b/t/expected/job_usage/no_jobs.expected index 06096cf80..a78b142da 100644 --- a/t/expected/job_usage/no_jobs.expected +++ b/t/expected/job_usage/no_jobs.expected @@ -1 +1 @@ -UserID Username JobID T_Submit T_Run T_Inactive Nodes +UserID Username JobID T_Submit T_Run T_Inactive Nodes Project diff --git a/t/scripts/insert_jobs.py b/t/scripts/insert_jobs.py index 8457ec842..92e266299 100644 --- a/t/scripts/insert_jobs.py +++ b/t/scripts/insert_jobs.py @@ -34,7 +34,11 @@ def main(): t_inactive_two_weeks = time.time() - (604861 * 2) # more than 2 weeks old t_inactive_old = time.time() - (604861 * 27) # more than six months old ranks = r = jobspec = "" - insert_stmt = "INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?)" + insert_stmt = """ + INSERT INTO jobs + (id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """ cur.execute( insert_stmt, From 989ac6a85f58f73d8af53957428e1dece0e7e5f1 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 16 Sep 2024 11:38:07 -0700 Subject: [PATCH 4/4] t: add tests for filtering jobs by project Problem: There are no tests that ensure filtering jobs by project works. Add some basic tests. --- t/Makefile.am | 1 + t/t1041-view-jobs-by-project.t | 95 ++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100755 t/t1041-view-jobs-by-project.t diff --git a/t/Makefile.am b/t/Makefile.am index 58b6baf28..392e0f7ce 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -40,6 +40,7 @@ TESTSCRIPTS = \ t1038-hierarchy-small-tie-all-db.t \ t1039-issue476.t \ t1040-mf-priority-projects.t \ + t1041-view-jobs-by-project.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1041-view-jobs-by-project.t b/t/t1041-view-jobs-by-project.t new file mode 100755 index 000000000..7e9a201e4 --- /dev/null +++ b/t/t1041-view-jobs-by-project.t @@ -0,0 +1,95 @@ +#!/bin/bash + +test_description='test viewing and filtering job records by project' + +. `dirname $0`/sharness.sh +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job + +flux setattr log-stderr-level 1 + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root account1 1 +' + +test_expect_success 'add projects to the DB' ' + flux account add-project projectA && + flux account add-project projectB +' + +test_expect_success 'add a user with a list of projects to the DB' ' + flux account add-user \ + --username=user1 \ + --userid=5001 \ + --bank=account1 \ + --projects="projectA,projectB" +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'submit 2 jobs under projectA' ' + job1=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectA hostname) && + flux job wait-event -f json $job1 priority && + job2=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectA hostname) && + flux job wait-event -f json $job2 priority && + flux cancel $job1 && + flux cancel $job2 +' + +test_expect_success 'submit 2 jobs under projectB' ' + job1=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectB hostname) && + flux job wait-event -f json $job1 priority && + job2=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectB hostname) && + flux job wait-event -f json $job2 priority && + flux cancel $job1 && + flux cancel $job2 +' + +test_expect_success 'run fetch-job-records script' ' + flux account-fetch-job-records -p ${DB_PATH} +' + +test_expect_success 'look at all jobs (will show 4 records)' ' + flux account view-job-records > all_jobs.out && + test $(grep -c "project" all_jobs.out) -eq 4 +' + +test_expect_success 'filter jobs by projectA (will show 2 records)' ' + flux account view-job-records --project=projectA > projectA_jobs.out && + test $(grep -c "projectA" projectA_jobs.out) -eq 2 +' + +test_expect_success 'filter jobs by projectB (will show 2 records)' ' + flux account view-job-records --project=projectB > projectB_jobs.out && + test $(grep -c "projectB" projectB_jobs.out) -eq 2 +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done