From 2ab4a29e84b2a593c023c80f37b37a15963dfb19 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 23 May 2023 14:51:00 -0700 Subject: [PATCH 01/12] create_db: add "jobs" table Add a new table to the flux-accounting DB: jobs, which will store jobs fetched by flux-accounting periodically to be used for job-usage and fair-share calculation as well as for viewing by users/admins. --- .../python/fluxacct/accounting/create_db.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index 0c8bfea6..a54ba53d 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -199,4 +199,22 @@ def create_db( conn.execute("INSERT INTO project_table (project) VALUES ('*')") conn.commit() + # Jobs Table + # stores job records for associations + logging.info("Creating jobs table in DB...") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS jobs ( + id char(16) PRIMARY KEY NOT NULL, + userid integer NOT NULL, + t_submit real NOT NULL, + t_run real NOT NULL, + t_inactive real NOT NULL, + ranks text NOT NULL, + R text NOT NULL, + jobspec text NOT NULL + );""" + ) + logging.info("Created jobs table successfully") + conn.close() From f073dd2f8d09e5a0314deb50996a0826dac10bab Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:17:42 -0700 Subject: [PATCH 02/12] flux-account: add new fetch-job-records.py script Add a new Python script to the flux-accounting command suite that fetches jobs from Flux using the job-list and job-info interfaces and inserts them into a table in the flux-accounting DB. --- src/Makefile.am | 3 +- src/cmd/flux-account-fetch-job-records.py | 202 ++++++++++++++++++++++ 2 files changed, 204 insertions(+), 1 deletion(-) create mode 100755 src/cmd/flux-account-fetch-job-records.py diff --git a/src/Makefile.am b/src/Makefile.am index 79359668..5fc17dd1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -142,4 +142,5 @@ dist_fluxcmd_SCRIPTS = \ cmd/flux-account-pop-db.py \ cmd/flux-account-export-db.py \ cmd/flux-account-update-db.py \ - cmd/flux-account-service.py + cmd/flux-account-service.py \ + cmd/flux-account-fetch-job-records.py diff --git a/src/cmd/flux-account-fetch-job-records.py b/src/cmd/flux-account-fetch-job-records.py new file mode 100755 index 00000000..2c47d27c --- /dev/null +++ b/src/cmd/flux-account-fetch-job-records.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 + +############################################################### +# Copyright 2023 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### + +import os +import sys +import argparse +import sqlite3 + +import flux +import flux.job +import fluxacct.accounting + + +def set_db_loc(args): + path = args.path if args.path else fluxacct.accounting.db_path + + return path + + +# try to open database file; will exit with 1 if database file not found +def est_sqlite_conn(path): + if not os.path.isfile(path): + print(f"Database file does not exist: {path}", file=sys.stderr) + sys.exit(1) + + db_uri = "file:" + path + "?mode=rw" + try: + conn = sqlite3.connect(db_uri, uri=True) + # set foreign keys constraint + conn.execute("PRAGMA foreign_keys = 1") + except sqlite3.OperationalError as exc: + print(f"Unable to open database file: {db_uri}", file=sys.stderr) + print(f"Exception: {exc}") + sys.exit(1) + + return conn + + +def get_jobs(rpc_handle): + try: + jobs = rpc_handle.get_jobs() + return jobs + except EnvironmentError as exc: + print("{}: {}".format("rpc", exc.strerror), file=sys.stderr) + sys.exit(1) + + +# fetch new jobs using Flux's job-list and job-info interfaces; +# create job records for each newly seen job +def fetch_new_jobs(last_timestamp=0.0): + handle = flux.Flux() + + # attributes needed using job-list + custom_attrs = ["userid", "t_submit", "t_run", "t_inactive", "ranks"] + + # construct and send RPC + rpc_handle = flux.job.job_list_inactive( + handle, attrs=custom_attrs, since=last_timestamp + ) + jobs = get_jobs(rpc_handle) + + # job_records is a list of dictionaries where each dictionary contains + # information about a single job record + job_records = [] + for single_job in jobs: + single_record = {} + # get attributes from job-list + for attr in single_job: + single_record[attr] = single_job[attr] + + # attributes needed using job-info + data = flux.job.job_kvs_lookup( + handle, single_job["id"], keys=["R", "jobspec"], decode=False + ) + + if data is None: + # this job never ran; don't add it to a user's list of job records + continue + if data["R"] is not None: + single_record["R"] = data["R"] + if data["jobspec"] is not None: + single_record["jobspec"] = data["jobspec"] + + # append job to job_records list + job_records.append(single_record) + + return job_records + + +# insert newly seen jobs into the "jobs" table in the flux-accounting DB +def insert_jobs_in_db(conn, job_records): + cur = conn.cursor() + + for single_job in job_records: + cur.execute( + """ + INSERT OR IGNORE INTO jobs + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + single_job["id"], + single_job["userid"], + single_job["t_submit"], + single_job["t_run"], + single_job["t_inactive"], + single_job["ranks"], + single_job["R"], + single_job["jobspec"], + ), + ) + + conn.commit() + + +# connect to flux-core's job-archive DB, fetch all records from its jobs table, +# and populate them into the jobs table of the flux-accounting DB +def copy_db_contents(old_cur, cur, conn): + select_stmt = """ + SELECT id,userid,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs + """ + insert_stmt = """ + INSERT OR IGNORE INTO jobs + (id,userid,t_submit,t_run,t_inactive,ranks,R,jobspec) + VALUES (?,?,?,?,?,?,?,?) + """ + + old_cur.execute(select_stmt) + result = old_cur.fetchall() + + if result: + for row in result: + if row[6] == "": + # this job never ran; skip it + continue + cur.execute( + insert_stmt, + ( + row[0], + row[1], + row[2], + row[3], + row[4], + row[5], + row[6], + row[7], + ), + ) + + conn.commit() + + +def main(): + parser = argparse.ArgumentParser( + description=""" + Description: Fetch new job records using Flux's job-list and job-info + interfaces and insert them into a table in the flux-accounting DB. + """ + ) + + parser.add_argument( + "-p", "--path", dest="path", help="specify location of database file" + ) + parser.add_argument( + "-c", "--copy", dest="copy", help="copy contents from a job-archive DB" + ) + args = parser.parse_args() + + path = set_db_loc(args) + conn = est_sqlite_conn(path) + cur = conn.cursor() + + if args.copy: + # copy the contents from one job-archive DB to this one + old_archive_conn = est_sqlite_conn(args.copy) + old_cur = old_archive_conn.cursor() + copy_db_contents(old_cur, cur, conn) + + # get the timestamp of the last seen job + timestamp = 0.0 + cur.execute("SELECT MAX(t_inactive) FROM jobs") + timestamp_arr = cur.fetchall() + + if timestamp_arr[0][0]: + timestamp = timestamp_arr[0][0] + + job_records = [] + job_records = fetch_new_jobs(timestamp) + + insert_jobs_in_db(conn, job_records) + + +if __name__ == "__main__": + main() From d18bb19c81f623457376011e1521721eb1d1f662 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:20:00 -0700 Subject: [PATCH 03/12] update-usage: remove job-archive DB path argument Problem: The update-usage command takes in a positional argument for the path to flux-core's job-archive DB. Now that flux-accounting has implemented its own job-archive within its own database, it no longer needs to look for jobs in the job-archive DB. Remove the positional argument that specifies a path to flux-core's job-archive DB in the update-usage command. --- src/cmd/flux-account.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index 59a513a0..a6d9f84f 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -342,11 +342,6 @@ def add_update_usage_arg(subparsers): formatter_class=flux.util.help_formatter(), ) subparser_update_usage.set_defaults(func="update_usage") - subparser_update_usage.add_argument( - "job_archive_db_path", - help="job-archive DB location", - metavar="JOB-ARCHIVE_DB_PATH", - ) subparser_update_usage.add_argument( "--priority-decay-half-life", default=1, @@ -622,7 +617,6 @@ def select_accounting_function(args, output_file, parser): elif args.func == "update_usage": data = { "path": args.path, - "job_archive_db_path": args.job_archive_db_path, "priority_decay_half_life": args.priority_decay_half_life, } return_val = flux.Flux().rpc("accounting.update_usage", data).get() From 5e331281163fe18fed158e78e3e1064411c75f21 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:22:45 -0700 Subject: [PATCH 04/12] flux-account-service: rm job-archive DB path arg Problem: The view-job-records and update-usage commands both take arguments that specify where flux-core's job-archive DB is located in order to look for job-records. Now that flux-accounting has implemented its own job-archive, these commands can just look in the flux-accounting DB for these records. Remove the job-archive DB path argument from both of these commands in flux-account-service.py. --- src/cmd/flux-account-service.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/cmd/flux-account-service.py b/src/cmd/flux-account-service.py index 3240359e..da40ce15 100755 --- a/src/cmd/flux-account-service.py +++ b/src/cmd/flux-account-service.py @@ -321,11 +321,8 @@ def edit_bank(self, handle, watcher, msg, arg): # pylint: disable=no-self-use def view_job_records(self, handle, watcher, msg, arg): try: - # connect to job-archive DB - jobs_conn = establish_sqlite_connection(msg.payload["path"]) - val = jobs.output_job_records( - jobs_conn, + self.conn, msg.payload["output_file"], jobid=msg.payload["jobid"], user=msg.payload["user"], @@ -335,7 +332,6 @@ def view_job_records(self, handle, watcher, msg, arg): payload = {"view_job_records": val} - jobs_conn.close() handle.respond(msg, payload) except KeyError as exc: handle.respond_error(msg, 0, f"missing key in payload: {exc}") @@ -346,11 +342,8 @@ def view_job_records(self, handle, watcher, msg, arg): def update_usage(self, handle, watcher, msg, arg): try: - jobs_conn = establish_sqlite_connection(msg.payload["job_archive_db_path"]) - val = jobs.update_job_usage( self.conn, - jobs_conn, msg.payload["priority_decay_half_life"], ) From 9be82878edbd1afdec9c4ea08190d1f1db78f03c Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:26:32 -0700 Subject: [PATCH 05/12] job-archive-interface: rm param for job-archive DB Problem: The calc_usage_factor() and update_job_usage() functions both take an argument for a SQLite connection to flux-core's job-archive DB. Now that flux-accounting has implemented its own job-archive locally in its own DB, these two functions no longer need to connect to the job-archive DB. Remove the SQLite connection parameter from both the calc_usage_factor() and update_job_usage() functions that connect to flux-core's job-archive DB, and instead just use one SQLite connection to the flux-accounting DB. --- .../accounting/job_archive_interface.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/job_archive_interface.py b/src/bindings/python/fluxacct/accounting/job_archive_interface.py index b1176319..e09e1017 100755 --- a/src/bindings/python/fluxacct/accounting/job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/job_archive_interface.py @@ -398,25 +398,25 @@ def get_curr_usg_bin(acct_conn, user, bank): return float(row[0]) -def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank): +def calc_usage_factor(conn, pdhl, user, bank, default_bank): # hl_period represents the number of seconds that represent one usage bin hl_period = pdhl * 604800 - acct_cur = acct_conn.cursor() + cur = conn.cursor() # fetch timestamp of the end of the current half-life period s_end_hl = """ SELECT end_half_life_period FROM t_half_life_period_table WHERE cluster='cluster' """ - acct_cur.execute(s_end_hl) - row = acct_cur.fetchone() + cur.execute(s_end_hl) + row = cur.fetchone() end_hl = row[0] # get jobs that have completed since the last seen completed job - last_j_ts = get_last_job_ts(acct_conn, user, bank) + last_j_ts = get_last_job_ts(conn, user, bank) user_jobs = get_job_records( - jobs_conn, + conn, bank, default_bank, user=user, @@ -436,38 +436,38 @@ def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank): last_t_inactive = user_jobs[-1].t_inactive usg_current = sum(per_job_factors) - update_t_inactive(acct_conn, last_t_inactive, user, bank) + update_t_inactive(conn, last_t_inactive, user, bank) if len(user_jobs) == 0 and (float(end_hl) > (time.time() - hl_period)): # no new jobs in the current half-life period - usg_past = fetch_usg_bins(acct_conn, user, bank) + usg_past = fetch_usg_bins(conn, user, bank) usg_historical = sum(usg_past) elif len(user_jobs) == 0 and (float(end_hl) < (time.time() - hl_period)): # no new jobs in the new half-life period - usg_historical = apply_decay_factor(0.5, acct_conn, user, bank) + usg_historical = apply_decay_factor(0.5, conn, user, bank) - update_hist_usg_col(acct_conn, usg_historical, user, bank) + update_hist_usg_col(conn, usg_historical, user, bank) elif (last_t_inactive - float(end_hl)) < hl_period: # found new jobs in the current half-life period - usg_current += get_curr_usg_bin(acct_conn, user, bank) + usg_current += get_curr_usg_bin(conn, user, bank) # usage_user_past = sum of the older usage factors - usg_past = fetch_usg_bins(acct_conn, user, bank) + usg_past = fetch_usg_bins(conn, user, bank) usg_historical = usg_current + sum(usg_past[1:]) - update_curr_usg_col(acct_conn, usg_current, user, bank) - update_hist_usg_col(acct_conn, usg_historical, user, bank) + update_curr_usg_col(conn, usg_current, user, bank) + update_hist_usg_col(conn, usg_historical, user, bank) else: # found new jobs in the new half-life period # apply decay factor to past usage periods of a user's jobs - usg_past = apply_decay_factor(0.5, acct_conn, user, bank) + usg_past = apply_decay_factor(0.5, conn, user, bank) usg_historical = usg_current + usg_past - update_curr_usg_col(acct_conn, usg_historical, user, bank) - update_hist_usg_col(acct_conn, usg_historical, user, bank) + update_curr_usg_col(conn, usg_historical, user, bank) + update_hist_usg_col(conn, usg_historical, user, bank) return usg_historical @@ -557,7 +557,7 @@ def calc_parent_bank_usage(acct_conn, cur, bank, total_usage=0.0): return total_usage -def update_job_usage(acct_conn, jobs_conn, pdhl=1): +def update_job_usage(acct_conn, pdhl=1): s_assoc = "SELECT username, bank, default_bank FROM association_table" cur = acct_conn.cursor() cur.execute(s_assoc) @@ -565,7 +565,7 @@ def update_job_usage(acct_conn, jobs_conn, pdhl=1): # update the job usage for every user in the association_table for row in result: - calc_usage_factor(jobs_conn, acct_conn, pdhl, row[0], row[1], row[2]) + calc_usage_factor(acct_conn, pdhl, row[0], row[1], row[2]) # find the root bank in the flux-accounting database s_root_bank = "SELECT bank FROM bank_table WHERE parent_bank=''" From c128a41342394c4a00ce80ec6ece5cad099205aa Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:29:48 -0700 Subject: [PATCH 06/12] test: add new table to expected table list Add "jobs" as an expected table to the list of expected tables in the unit tests for create_db.py. --- src/bindings/python/fluxacct/accounting/test/test_create_db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bindings/python/fluxacct/accounting/test/test_create_db.py b/src/bindings/python/fluxacct/accounting/test/test_create_db.py index 12bd929e..ed04c104 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_create_db.py +++ b/src/bindings/python/fluxacct/accounting/test/test_create_db.py @@ -55,6 +55,7 @@ def test_01_tables_exist(self): "t_half_life_period_table", "queue_table", "project_table", + "jobs", ] self.assertEqual(list_of_tables, expected) From 9e37a05587258e383a0dc15c988ae58cf4cb6f85 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:31:41 -0700 Subject: [PATCH 07/12] test: remove SQLite conn to job-archive DB Problem: The unit tests for test_job_archive_interface.py create and use a test job-archive DB. Now that flux-accounting has implemented its own job-archive, the test should just use one connection to the flux-accounting DB's "jobs" table. Remove the creation of the test job-archive DB. Remove the SQLite connection to the test job-archive DB. Restructure the INSERT SQLite statement that inserts fake job records into the jobs table to match the schema of the new flux-accounting "jobs" table. --- .../test/test_job_archive_interface.py | 106 ++++++------------ 1 file changed, 36 insertions(+), 70 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py index 0251a6f4..5d443ae7 100755 --- a/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/test/test_job_archive_interface.py @@ -24,10 +24,9 @@ class TestAccountingCLI(unittest.TestCase): - # create accounting, job-archive databases + # create accounting database @classmethod def setUpClass(self): - global jobs_conn global acct_conn global cur @@ -35,24 +34,6 @@ def setUpClass(self): global op op = "job_records.csv" - jobs_conn = sqlite3.connect("file:jobs.db?mode:rwc", uri=True) - jobs_conn.execute( - """ - CREATE TABLE IF NOT EXISTS jobs ( - id char(16) NOT NULL, - userid int NOT NULL, - ranks text NOT NULL, - t_submit real NOT NULL, - t_run real NOT NULL, - t_cleanup real NOT NULL, - t_inactive real NOT NULL, - eventlog text NOT NULL, - jobspec text NOT NULL, - R text NOT NULL, - PRIMARY KEY (id) - );""" - ) - c.create_db("FluxAccountingUsers.db") try: acct_conn = sqlite3.connect("file:FluxAccountingUsers.db?mode=rw", uri=True) @@ -85,7 +66,7 @@ def setUpClass(self): interval = 0 # add to job timestamps to diversify job-archive records @mock.patch("time.time", mock.MagicMock(return_value=9000000)) - def populate_job_archive_db(jobs_conn, userid, bank, ranks, nodes, num_entries): + def populate_job_archive_db(acct_conn, userid, bank, ranks, nodes, num_entries): nonlocal jobid nonlocal interval t_inactive_delta = 2000 @@ -115,37 +96,33 @@ def populate_job_archive_db(jobs_conn, userid, bank, ranks, nodes, num_entries): for i in range(num_entries): try: - jobs_conn.execute( + acct_conn.execute( """ INSERT INTO jobs ( id, userid, - ranks, t_submit, t_run, - t_cleanup, t_inactive, - eventlog, - jobspec, - R + ranks, + R, + jobspec ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( jobid, userid, - ranks, (time.time() + interval) - 2000, (time.time() + interval), - (time.time() + interval) + 1000, (time.time() + interval) + t_inactive_delta, - "eventlog", - '{ "attributes": { "system": { "bank": "' + bank + '"} } }', + ranks, R_input, + '{ "attributes": { "system": { "bank": "' + bank + '"} } }', ), ) # commit changes - jobs_conn.commit() + acct_conn.commit() # make sure entry is unique except sqlite3.IntegrityError as integrity_error: print(integrity_error) @@ -155,35 +132,35 @@ def populate_job_archive_db(jobs_conn, userid, bank, ranks, nodes, num_entries): t_inactive_delta += 100 # populate the job-archive DB with fake job entries - populate_job_archive_db(jobs_conn, 1001, "C", "0", "fluke[0]", 2) + populate_job_archive_db(acct_conn, 1001, "C", "0", "fluke[0]", 2) - populate_job_archive_db(jobs_conn, 1002, "C", "0-1", "fluke[0-1]", 3) - populate_job_archive_db(jobs_conn, 1002, "C", "0", "fluke[0]", 2) + populate_job_archive_db(acct_conn, 1002, "C", "0-1", "fluke[0-1]", 3) + populate_job_archive_db(acct_conn, 1002, "C", "0", "fluke[0]", 2) - populate_job_archive_db(jobs_conn, 1003, "D", "0-2", "fluke[0-2]", 3) + populate_job_archive_db(acct_conn, 1003, "D", "0-2", "fluke[0-2]", 3) - populate_job_archive_db(jobs_conn, 1004, "D", "0-3", "fluke[0-3]", 4) - populate_job_archive_db(jobs_conn, 1004, "D", "0", "fluke[0]", 4) + populate_job_archive_db(acct_conn, 1004, "D", "0-3", "fluke[0-3]", 4) + populate_job_archive_db(acct_conn, 1004, "D", "0", "fluke[0]", 4) # passing a valid jobid should return # its job information def test_01_with_jobid_valid(self): my_dict = {"jobid": 102} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) print(job_records) self.assertEqual(len(job_records), 2) # passing a bad jobid should return no records def test_02_with_jobid_failure(self): my_dict = {"jobid": 000} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 1) # passing a timestamp before the first job to # start should return all of the jobs def test_03_after_start_time_all(self): my_dict = {"after_start_time": 0} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 19) # passing a timestamp after all of the start time @@ -191,7 +168,7 @@ def test_03_after_start_time_all(self): @mock.patch("time.time", mock.MagicMock(return_value=11000000)) def test_04_after_start_time_none(self): my_dict = {"after_start_time": time.time()} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 1) # passing a timestamp before the end time of the @@ -199,21 +176,21 @@ def test_04_after_start_time_none(self): @mock.patch("time.time", mock.MagicMock(return_value=11000000)) def test_05_before_end_time_all(self): my_dict = {"before_end_time": time.time()} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 19) # passing a timestamp before the end time of # the first completed jobs should return no jobs def test_06_before_end_time_none(self): my_dict = {"before_end_time": 0} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 1) # passing a user not in the jobs table # should return no jobs def test_07_by_user_failure(self): my_dict = {"user": "9999"} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 1) # view_jobs_run_by_username() interacts with a @@ -221,7 +198,7 @@ def test_07_by_user_failure(self): # just pass the userid def test_08_by_user_success(self): my_dict = {"user": "1001"} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 3) # passing a combination of params should further @@ -229,14 +206,14 @@ def test_08_by_user_success(self): @mock.patch("time.time", mock.MagicMock(return_value=9000500)) def test_09_multiple_params(self): my_dict = {"user": "1001", "after_start_time": time.time()} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 2) # passing no parameters will result in a generic query # returning all results def test_10_no_options_passed(self): my_dict = {} - job_records = jobs.output_job_records(jobs_conn, op, **my_dict) + job_records = jobs.output_job_records(acct_conn, op, **my_dict) self.assertEqual(len(job_records), 19) # users that have run a lot of jobs should have a larger usage factor @@ -255,7 +232,6 @@ def test_11_calc_usage_factor_many_jobs(self): acct_conn.commit() usage_factor = jobs.calc_usage_factor( - jobs_conn, acct_conn, pdhl=1, user=user, @@ -281,7 +257,6 @@ def test_12_calc_usage_factor_few_jobs(self): acct_conn.commit() usage_factor = jobs.calc_usage_factor( - jobs_conn, acct_conn, pdhl=1, user=user, @@ -299,7 +274,6 @@ def test_13_update_t_inactive_success(self): self.assertEqual(ts_old, 0.0) usage_factor = jobs.calc_usage_factor( - jobs_conn, acct_conn, pdhl=1, user="1003", @@ -335,44 +309,39 @@ def test_15_append_jobs_in_diff_half_life_period(self): bank = "C" try: - jobs_conn.execute( + acct_conn.execute( """ INSERT INTO jobs ( id, userid, - ranks, t_submit, t_run, - t_cleanup, t_inactive, - eventlog, - jobspec, - R + ranks, + R, + jobspec ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( "200", "1001", - "0", time.time() + 100, time.time() + 300, - time.time() + 400, time.time() + 500, - "eventlog", - '{ "attributes": { "system": { "bank": "C"} } }', + "0", '{"version":1,"execution": {"R_lite":[{"rank":"0","children": {"core": "0"}}]}}', + '{ "attributes": { "system": { "bank": "C"} } }', ), ) # commit changes - jobs_conn.commit() + acct_conn.commit() # make sure entry is unique except sqlite3.IntegrityError as integrity_error: print(integrity_error) # re-calculate usage factor for user1001 usage_factor = jobs.calc_usage_factor( - jobs_conn, acct_conn, pdhl=1, user=user, @@ -389,7 +358,6 @@ def test_16_recalculate_usage_after_half_life_period(self): bank = "C" usage_factor = jobs.calc_usage_factor( - jobs_conn, acct_conn, pdhl=1, user=user, @@ -412,7 +380,7 @@ def test_17_update_job_usage_same_half_life_period(self): job_usage = cur.fetchone()[0] self.assertEqual(job_usage, 17044.0) - jobs.update_job_usage(acct_conn, jobs_conn, pdhl=1) + jobs.update_job_usage(acct_conn, pdhl=1) cur.execute(s_stmt) job_usage = cur.fetchone()[0] @@ -467,7 +435,7 @@ def test_20_update_job_usage_next_half_life_period(self): self.assertEqual(job_usage, 17044.0) - jobs.update_job_usage(acct_conn, jobs_conn, pdhl=1) + jobs.update_job_usage(acct_conn, pdhl=1) cur.execute(s_stmt) job_usage = cur.fetchone()[0] @@ -476,8 +444,6 @@ def test_20_update_job_usage_next_half_life_period(self): # remove database and log file @classmethod def tearDownClass(self): - jobs_conn.close() - os.remove("jobs.db") os.remove("job_records.csv") os.remove("FluxAccountingUsers.db") From 11cc4b6a77720316e484c3cbe4a524443dd9917f Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:34:56 -0700 Subject: [PATCH 08/12] t: remove t1010-job-usage.t Problem: t1010-job-usage.t creates a fake job-archive DB to test the update-usage command for flux-accounting. Now that flux-accounting has implemented its own job-archive, this sharness test file is not necessarily needed, especially because the next sharness test file, t1011-job-archive.db, actually tests fetching job records and updating usage using flux-accounting's job-archive. Remove t1010-job-usage.t from the test suite. --- t/Makefile.am | 4 -- t/expected/job_usage/post_update1.expected | 12 ---- t/expected/job_usage/post_update2.expected | 12 ---- t/expected/job_usage/pre_update.expected | 12 ---- t/t1010-update-usage.t | 82 ---------------------- 5 files changed, 122 deletions(-) delete mode 100644 t/expected/job_usage/post_update1.expected delete mode 100644 t/expected/job_usage/post_update2.expected delete mode 100644 t/expected/job_usage/pre_update.expected delete mode 100755 t/t1010-update-usage.t diff --git a/t/Makefile.am b/t/Makefile.am index 13ec66be..dd6e7aad 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -11,7 +11,6 @@ TESTSCRIPTS = \ t1007-flux-account-users.t \ t1008-mf-priority-update.t \ t1009-pop-db.t \ - t1010-update-usage.t \ t1011-job-archive-interface.t \ t1012-mf-priority-load.t \ t1013-mf-priority-queues.t \ @@ -98,9 +97,6 @@ EXTRA_DIST= \ expected/flux_account/F_bank_users.expected \ expected/pop_db/db_hierarchy_base.expected \ expected/pop_db/db_hierarchy_new_users.expected \ - expected/job_usage/pre_update.expected \ - expected/job_usage/post_update1.expected \ - expected/job_usage/post_update2.expected \ expected/plugin_state/internal_state_1.expected \ expected/plugin_state/internal_state_3.expected \ expected/plugin_state/internal_state_3.expected \ diff --git a/t/expected/job_usage/post_update1.expected b/t/expected/job_usage/post_update1.expected deleted file mode 100644 index e611c1d6..00000000 --- a/t/expected/job_usage/post_update1.expected +++ /dev/null @@ -1,12 +0,0 @@ -Account Username RawShares RawUsage Fairshare -root 1 24000 - account1 1 16000 - account1 5011 1 4000 0.428571 - account1 5012 1 6000 0.285714 - account1 5013 1 6000 0.285714 - account2 1 8000 - account2 5021 1 8000 0.571429 - account2 5022 1 0 0.714286 - account3 1 0 - account3 5031 1 0 1 - account3 5032 1 0 1 diff --git a/t/expected/job_usage/post_update2.expected b/t/expected/job_usage/post_update2.expected deleted file mode 100644 index e611c1d6..00000000 --- a/t/expected/job_usage/post_update2.expected +++ /dev/null @@ -1,12 +0,0 @@ -Account Username RawShares RawUsage Fairshare -root 1 24000 - account1 1 16000 - account1 5011 1 4000 0.428571 - account1 5012 1 6000 0.285714 - account1 5013 1 6000 0.285714 - account2 1 8000 - account2 5021 1 8000 0.571429 - account2 5022 1 0 0.714286 - account3 1 0 - account3 5031 1 0 1 - account3 5032 1 0 1 diff --git a/t/expected/job_usage/pre_update.expected b/t/expected/job_usage/pre_update.expected deleted file mode 100644 index 1f711270..00000000 --- a/t/expected/job_usage/pre_update.expected +++ /dev/null @@ -1,12 +0,0 @@ -Account Username RawShares RawUsage Fairshare -root 1 0 - account1 1 0 - account1 5011 1 0 0.5 - account1 5012 1 0 0.5 - account1 5013 1 0 0.5 - account2 1 0 - account2 5021 1 0 0.5 - account2 5022 1 0 0.5 - account3 1 0 - account3 5031 1 0 0.5 - account3 5032 1 0 0.5 diff --git a/t/t1010-update-usage.t b/t/t1010-update-usage.t deleted file mode 100755 index 77e9d883..00000000 --- a/t/t1010-update-usage.t +++ /dev/null @@ -1,82 +0,0 @@ -#!/bin/bash - -test_description='Test flux account update-usage command with user and job data' - -. `dirname $0`/sharness.sh -DB_PATH=$(pwd)/FluxAccountingTest.db -CREATE_TEST_DB=${SHARNESS_TEST_SRCDIR}/scripts/create_test_db.py -UPDATE_USAGE_COL=${SHARNESS_TEST_SRCDIR}/scripts/update_usage_column.py -CREATE_JOB_ARCHIVE=${SHARNESS_TEST_SRCDIR}/scripts/create_job_archive_db.py - -export TEST_UNDER_FLUX_NO_JOB_EXEC=y -export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" -test_under_flux 1 job - -flux setattr log-stderr-level 1 - -test_expect_success 'create flux-accounting DB' ' - flux account -p $(pwd)/FluxAccountingTest.db create-db -' - -test_expect_success 'start flux-accounting service' ' - flux account-service -p ${DB_PATH} -t -' - -test_expect_success 'add some banks to the DB' ' - flux account add-bank root 1 && - flux account add-bank --parent-bank=root account1 1 && - flux account add-bank --parent-bank=root account2 1 && - flux account add-bank --parent-bank=root account3 1 -' - -test_expect_success 'add some users to the DB' ' - flux account add-user --username=5011 --userid=5011 --bank=account1 --shares=1 && - flux account add-user --username=5012 --userid=5012 --bank=account1 --shares=1 && - flux account add-user --username=5013 --userid=5013 --bank=account1 --shares=1 && - flux account add-user --username=5021 --userid=5021 --bank=account2 --shares=1 && - flux account add-user --username=5022 --userid=5022 --bank=account2 --shares=1 && - flux account add-user --username=5031 --userid=5031 --bank=account3 --shares=1 && - flux account add-user --username=5032 --userid=5032 --bank=account3 --shares=1 -' - -test_expect_success 'create sample job-archive DB' ' - flux python ${CREATE_JOB_ARCHIVE} -' - -test_expect_success 'update-usage raises a usage error when passing a bad type for priority-decay-half-life' ' - test_must_fail flux account update-usage job-archive.sqlite \ - --priority-decay-half-life foo > bad_arg.out 2>&1 && - test_debug "cat bad_arg.out" && - grep "flux-account.py update-usage: error: argument --priority-decay-half-life: invalid int value:" bad_arg.out -' - -test_expect_success 'create & compare hierarchy output from FluxAccountingTest.db: pre-usage update' ' - flux account-shares -p $(pwd)/FluxAccountingTest.db > pre_update.test && - test_cmp ${SHARNESS_TEST_SRCDIR}/expected/job_usage/pre_update.expected pre_update.test -' - -test_expect_success 'run update-usage and update-fshare commands' ' - flux account update-usage job-archive.sqlite && - flux account-update-fshare -p ${DB_PATH} -' - -test_expect_success 'create & compare hierarchy output from FluxAccountingTest.db: post-usage update 1' ' - flux account-shares -p $(pwd)/FluxAccountingTest.db > post_update1.test && - test_cmp ${SHARNESS_TEST_SRCDIR}/expected/job_usage/post_update2.expected post_update1.test -' - -test_expect_success 'run update-usage and update-fshare commands with an optional arg' ' - flux account update-usage job-archive.sqlite --priority-decay-half-life 1 && - flux account-update-fshare -p ${DB_PATH} -' - -test_expect_success 'create & compare hierarchy output from FluxAccountingTest.db: post-usage update 2' ' - flux account-shares -p $(pwd)/FluxAccountingTest.db > post_update2.test && - test_cmp ${SHARNESS_TEST_SRCDIR}/expected/job_usage/post_update2.expected post_update2.test -' - -test_expect_success 'shut down flux-accounting service' ' - flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" -' - -test_done From 7f6945c02997861738ac55b77e7d96eee21e91ce Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Tue, 18 Jul 2023 12:45:43 -0700 Subject: [PATCH 09/12] t: remove job-archive path in update-usage test Remove the argument to flux-core's job-archive DB when testing the update-usage command in t1026-flux-account-perms.t. --- t/t1026-flux-account-perms.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/t1026-flux-account-perms.t b/t/t1026-flux-account-perms.t index 29b9515f..4654573c 100755 --- a/t/t1026-flux-account-perms.t +++ b/t/t1026-flux-account-perms.t @@ -78,7 +78,7 @@ test_expect_success 'update-usage should not be accessible by all users' ' newid=$(($(id -u)+1)) && ( export FLUX_HANDLE_ROLEMASK=0x2 && export FLUX_HANDLE_USERID=$newid && - test_must_fail flux account update-usage path_to_db.db > no_access_update-usage.out 2>&1 && + test_must_fail flux account update-usage > no_access_update-usage.out 2>&1 && grep "Request requires owner credentials" no_access_update-usage.out ) ' From 96ac0a5c67e004535fa11b6f14119128589fc3fe Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:40:09 -0700 Subject: [PATCH 10/12] t: use fetch-job-records script in test Problem: t1011-job-archive-interface.py doesn't make use of the new flux-accounting Python script that fetches new job records and adds it to the jobs table in the flux-accounting DB. Add tests that call this Python script. --- t/t1011-job-archive-interface.t | 44 ++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/t/t1011-job-archive-interface.t b/t/t1011-job-archive-interface.t index 298fdae1..7fa04aa7 100755 --- a/t/t1011-job-archive-interface.t +++ b/t/t1011-job-archive-interface.t @@ -35,6 +35,13 @@ wait_db() { return 0 } +# select job records from flux-accounting DB +select_job_records() { + local dbpath=$1 + query="SELECT * FROM jobs;" + ${QUERYCMD} -t 100 ${dbpath} "${query}" +} + test_expect_success 'create flux-accounting DB' ' flux account -p $(pwd)/FluxAccountingTest.db create-db ' @@ -43,6 +50,10 @@ test_expect_success 'start flux-accounting service' ' flux account-service -p ${DB_PATH} -t ' +test_expect_success 'run fetch-job-records script with no jobs in jobs_table' ' + flux account-fetch-job-records -p ${DB_PATH} +' + test_expect_success 'add some banks to the DB' ' flux account add-bank root 1 && flux account add-bank --parent-bank=root account1 1 && @@ -72,6 +83,23 @@ test_expect_success 'load job-archive module' ' flux module load job-archive ' +test_expect_success 'submit some jobs so they populate flux-core job-archive' ' + jobid1=$(flux submit -N 1 hostname) && + jobid2=$(flux submit -N 1 hostname) && + jobid3=$(flux submit -N 2 hostname) && + jobid4=$(flux submit -N 1 hostname) && + wait_db $jobid1 ${ARCHIVEDB} && + wait_db $jobid2 ${ARCHIVEDB} && + wait_db $jobid3 ${ARCHIVEDB} && + wait_db $jobid4 ${ARCHIVEDB} +' + +test_expect_success 'call --copy argument to populate jobs table from job-archive DB' ' + flux account-fetch-job-records --copy ${ARCHIVEDB} -p ${DB_PATH} && + select_job_records ${DB_PATH} > records.out && + grep "hostname" records.out +' + test_expect_success 'submit some sleep 1 jobs under one user' ' jobid1=$(flux submit -N 1 sleep 1) && jobid2=$(flux submit -N 1 sleep 1) && @@ -81,16 +109,20 @@ test_expect_success 'submit some sleep 1 jobs under one user' ' wait_db $jobid3 ${ARCHIVEDB} ' +test_expect_success 'run fetch-job-records script' ' + flux account-fetch-job-records -p ${DB_PATH} +' + test_expect_success 'view job records for a user' ' - flux account -p ${ARCHIVEDB} view-job-records --user $username + flux account -p ${DB_PATH} view-job-records --user $username ' test_expect_success 'view job records for a user and direct it to a file' ' - flux account -p ${ARCHIVEDB} --output-file $(pwd)/test.txt view-job-records --user $username + flux account -p ${DB_PATH} --output-file $(pwd)/test.txt view-job-records --user $username ' test_expect_success 'run update-usage and update-fshare commands' ' - flux account update-usage ${ARCHIVEDB} && + flux account -p ${DB_PATH} update-usage && flux account-update-fshare -p ${DB_PATH} ' @@ -108,8 +140,12 @@ test_expect_success 'submit some sleep 1 jobs under the secondary bank of the sa wait_db $jobid3 ${ARCHIVEDB} ' +test_expect_success 'run custom job-list script' ' + flux account-fetch-job-records -p ${DB_PATH} +' + test_expect_success 'run update-usage and update-fshare commands' ' - flux account update-usage ${ARCHIVEDB} && + flux account -p ${DB_PATH} update-usage && flux account-update-fshare -p ${DB_PATH} ' From 319c6dcad0eb517f164b512f257b9283590c2997 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 8 Apr 2024 11:37:42 -0700 Subject: [PATCH 11/12] t1011: add new tests for canceled jobs Problem: t1011-job-archive-interface does not have any tests that ensure a user's job-usage value does not get affected by a job that never ran. Add a basic set of tests in t1011-job-archive-interface.t that submits a job which gets canceled and ensures that the record does not get added to flux-accounting's jobs table and thus does not affect a user's job-usage value. --- t/Makefile.am | 1 + t/expected/job_usage/no_jobs.expected | 1 + t/t1011-job-archive-interface.t | 25 +++++++++++++++++++++++++ 3 files changed, 27 insertions(+) create mode 100644 t/expected/job_usage/no_jobs.expected diff --git a/t/Makefile.am b/t/Makefile.am index dd6e7aad..26d66e0a 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -97,6 +97,7 @@ EXTRA_DIST= \ expected/flux_account/F_bank_users.expected \ expected/pop_db/db_hierarchy_base.expected \ expected/pop_db/db_hierarchy_new_users.expected \ + expected/job_usage/no_jobs.expected \ expected/plugin_state/internal_state_1.expected \ expected/plugin_state/internal_state_3.expected \ expected/plugin_state/internal_state_3.expected \ diff --git a/t/expected/job_usage/no_jobs.expected b/t/expected/job_usage/no_jobs.expected new file mode 100644 index 00000000..06096cf8 --- /dev/null +++ b/t/expected/job_usage/no_jobs.expected @@ -0,0 +1 @@ +UserID Username JobID T_Submit T_Run T_Inactive Nodes diff --git a/t/t1011-job-archive-interface.t b/t/t1011-job-archive-interface.t index 7fa04aa7..c87d5e4a 100755 --- a/t/t1011-job-archive-interface.t +++ b/t/t1011-job-archive-interface.t @@ -8,6 +8,7 @@ DB_PATH=$(pwd)/FluxAccountingTest.db ARCHIVEDIR=`pwd` ARCHIVEDB="${ARCHIVEDIR}/jobarchive.db" QUERYCMD="flux python ${SHARNESS_TEST_SRCDIR}/scripts/query.py" +NO_JOBS=${SHARNESS_TEST_SRCDIR}/expected/job_usage/no_jobs.expected export FLUX_CONF_DIR=$(pwd) test_under_flux 4 job @@ -83,6 +84,30 @@ test_expect_success 'load job-archive module' ' flux module load job-archive ' +test_expect_success 'submit a job that does not run' ' + job=$(flux submit --urgency=0 sleep 60) && + flux job wait-event -vt 10 $job priority && + flux cancel $job && + wait_db $job ${ARCHIVEDB} +' + +test_expect_success 'run scripts to update job usage and fair-share' ' + flux account-fetch-job-records --copy ${ARCHIVEDB} -p ${DB_PATH} && + flux account -p ${DB_PATH} update-usage && + flux account-update-fshare -p ${DB_PATH} +' + +test_expect_success 'check that usage does not get affected by canceled jobs' ' + flux account view-user --json $username > user.json && + test_debug "jq -S . no_jobs.test && + test_cmp ${NO_JOBS} no_jobs.test +' + test_expect_success 'submit some jobs so they populate flux-core job-archive' ' jobid1=$(flux submit -N 1 hostname) && jobid2=$(flux submit -N 1 hostname) && From 035a0b5df17b4a3d900b80373383e7ef32950292 Mon Sep 17 00:00:00 2001 From: Christopher Moussa Date: Wed, 24 May 2023 09:41:27 -0700 Subject: [PATCH 12/12] t: add new tables to expected table list Problem: There is a test that looks for an expected list of tables in a flux-accounting DB, but there is no "jobs" table. Add the new table as an expected table in t1017-update-db.t. --- t/t1017-update-db.t | 1 + 1 file changed, 1 insertion(+) diff --git a/t/t1017-update-db.t b/t/t1017-update-db.t index 88d40906..974aeaf7 100755 --- a/t/t1017-update-db.t +++ b/t/t1017-update-db.t @@ -59,6 +59,7 @@ test_expect_success 'get all the tables of the old DB and check that new table w job_usage_factor_table t_half_life_period_table project_table + jobs organization queue_table EOF