Skip to content

Commit

Permalink
Merge pull request #490 from cmoussa1/issue#352
Browse files Browse the repository at this point in the history
`view-job-records`: add `--project` filter option
  • Loading branch information
mergify[bot] authored Sep 30, 2024
2 parents a3a9038 + 989ac6a commit 921b4ef
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/bindings/python/fluxacct/accounting/__init__.py.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
db_dir = "@X_LOCALSTATEDIR@/lib/flux/"
db_path = "@X_LOCALSTATEDIR@/lib/flux/FluxAccounting.db"
db_schema_version = 22
db_schema_version = 23

__all__ = ["db_dir", "db_path", "db_schema_version"]
3 changes: 2 additions & 1 deletion src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def create_db(
t_inactive real NOT NULL,
ranks text NOT NULL,
R text NOT NULL,
jobspec text NOT NULL
jobspec text NOT NULL,
project text
);"""
)
logging.info("Created jobs table successfully")
Expand Down
23 changes: 18 additions & 5 deletions src/bindings/python/fluxacct/accounting/jobs_table_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class JobRecord:
A record of an individual job.
"""

def __init__(self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources):
def __init__(
self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources, project
):
self.userid = userid
self.username = get_username(userid)
self.jobid = jobid
Expand All @@ -44,6 +46,7 @@ def __init__(self, userid, jobid, t_submit, t_run, t_inactive, nnodes, resources
self.t_inactive = t_inactive
self.nnodes = nnodes
self.resources = resources
self.project = project

@property
def elapsed(self):
Expand All @@ -69,6 +72,7 @@ def write_records_to_file(job_records, output_file):
"T_Inactive",
"Nodes",
"R",
"Project",
)
)
for record in job_records:
Expand All @@ -82,6 +86,7 @@ def write_records_to_file(job_records, output_file):
str(record.t_inactive),
str(record.nnodes),
str(record.resources),
str(record.project),
)
)

Expand All @@ -93,26 +98,28 @@ def convert_to_str(job_records):
"""
job_record_str = []
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
"UserID",
"Username",
"JobID",
"T_Submit",
"T_Run",
"T_Inactive",
"Nodes",
"Project",
)
)
for record in job_records:
job_record_str.append(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10}".format(
"{:<10} {:<10} {:<20} {:<20} {:<20} {:<20} {:<10} {:<20}".format(
record.userid,
record.username,
record.jobid,
record.t_submit,
record.t_run,
record.t_inactive,
record.nnodes,
record.project,
)
)

Expand Down Expand Up @@ -143,6 +150,7 @@ def convert_to_obj(rows):
t_inactive=row[4],
nnodes=job_nnodes,
resources=row[6],
project=row[8] if row[8] is not None else "",
)
job_records.append(job_record)

Expand Down Expand Up @@ -212,14 +220,16 @@ def get_jobs(conn, **kwargs):
jobs are found, an empty list is returned.
"""
# find out which args were passed and place them in a dict
valid_params = {"user", "after_start_time", "before_end_time", "jobid"}
valid_params = {"user", "after_start_time", "before_end_time", "jobid", "project"}
params = {
key: val
for key, val in kwargs.items()
if val is not None and key in valid_params
}

select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs"
select_stmt = (
"SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec,project FROM jobs"
)
where_clauses = []
params_list = []

Expand All @@ -236,6 +246,9 @@ def get_jobs(conn, **kwargs):
if "jobid" in params:
where_clauses.append("id = ?")
params_list.append(params["jobid"])
if "project" in params:
where_clauses.append("project = ?")
params_list.append(params["project"])

if where_clauses:
select_stmt += " WHERE " + " AND ".join(where_clauses)
Expand Down
18 changes: 17 additions & 1 deletion src/cmd/flux-account-fetch-job-records.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import argparse
import sqlite3
import json

import flux
import flux.job
Expand Down Expand Up @@ -89,6 +90,17 @@ def fetch_new_jobs(last_timestamp=0.0):
single_record["R"] = data["R"]
if data["jobspec"] is not None:
single_record["jobspec"] = data["jobspec"]
try:
jobspec = json.loads(single_record["jobspec"])
# using .get() here ensures no KeyError is raised if
# "attributes" or "project" are missing; will set
# single_record["project"] to None if it can't be found
accounting_attributes = jobspec.get("attributes", {}).get("system", {})
single_record["project"] = accounting_attributes.get("project")
except json.JSONDecodeError as exc:
# the job does not have a project in jobspec, so don't add it
# to the job dictionary
continue

required_keys = [
"userid",
Expand Down Expand Up @@ -123,7 +135,8 @@ def insert_jobs_in_db(conn, job_records):
cur.execute(
"""
INSERT OR IGNORE INTO jobs
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec, project)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
single_job["id"],
Expand All @@ -134,6 +147,9 @@ def insert_jobs_in_db(conn, job_records):
single_job["ranks"],
single_job["R"],
single_job["jobspec"],
single_job["project"]
if single_job.get("project") is not None
else "",
),
)
except KeyError:
Expand Down
1 change: 1 addition & 0 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ def view_job_records(self, handle, watcher, msg, arg):
user=msg.payload["user"],
before_end_time=msg.payload["before_end_time"],
after_start_time=msg.payload["after_start_time"],
project=msg.payload["project"],
)

payload = {"view_job_records": val}
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ def add_view_job_records_arg(subparsers):
help="end time",
metavar="END TIME",
)
subparser_view_job_records.add_argument(
"--project",
help="project",
metavar="PROJECT",
)


def add_create_db_arg(subparsers):
Expand Down Expand Up @@ -696,6 +701,7 @@ def select_accounting_function(args, output_file, parser):
"user": args.user,
"before_end_time": args.before_end_time,
"after_start_time": args.after_start_time,
"project": args.project,
}
return_val = flux.Flux().rpc("accounting.view_job_records", data).get()
# the return value of view-job-records without
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ TESTSCRIPTS = \
t1038-hierarchy-small-tie-all-db.t \
t1039-issue476.t \
t1040-mf-priority-projects.t \
t1041-view-jobs-by-project.t \
t5000-valgrind.t \
python/t1000-example.py \
python/t1001_db.py \
Expand Down
2 changes: 1 addition & 1 deletion t/expected/job_usage/no_jobs.expected
Original file line number Diff line number Diff line change
@@ -1 +1 @@
UserID Username JobID T_Submit T_Run T_Inactive Nodes
UserID Username JobID T_Submit T_Run T_Inactive Nodes Project
Binary file added t/expected/test_dbs/FluxAccountingv0-37-0.db
Binary file not shown.
6 changes: 5 additions & 1 deletion t/scripts/insert_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def main():
t_inactive_two_weeks = time.time() - (604861 * 2) # more than 2 weeks old
t_inactive_old = time.time() - (604861 * 27) # more than six months old
ranks = r = jobspec = ""
insert_stmt = "INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
insert_stmt = """
INSERT INTO jobs
(id, userid, t_submit, t_run, t_inactive, ranks, R, jobspec)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""

cur.execute(
insert_stmt,
Expand Down
95 changes: 95 additions & 0 deletions t/t1041-view-jobs-by-project.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/bin/bash

test_description='test viewing and filtering job records by project'

. `dirname $0`/sharness.sh
MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so
SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py
DB_PATH=$(pwd)/FluxAccountingTest.db

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job

flux setattr log-stderr-level 1

test_expect_success 'load multi-factor priority plugin' '
flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY}
'

test_expect_success 'check that mf_priority plugin is loaded' '
flux jobtap list | grep mf_priority
'

test_expect_success 'create flux-accounting DB' '
flux account -p ${DB_PATH} create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

test_expect_success 'add banks to the DB' '
flux account add-bank root 1 &&
flux account add-bank --parent-bank=root account1 1
'

test_expect_success 'add projects to the DB' '
flux account add-project projectA &&
flux account add-project projectB
'

test_expect_success 'add a user with a list of projects to the DB' '
flux account add-user \
--username=user1 \
--userid=5001 \
--bank=account1 \
--projects="projectA,projectB"
'

test_expect_success 'send flux-accounting DB information to the plugin' '
flux account-priority-update -p ${DB_PATH}
'

test_expect_success 'submit 2 jobs under projectA' '
job1=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectA hostname) &&
flux job wait-event -f json $job1 priority &&
job2=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectA hostname) &&
flux job wait-event -f json $job2 priority &&
flux cancel $job1 &&
flux cancel $job2
'

test_expect_success 'submit 2 jobs under projectB' '
job1=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectB hostname) &&
flux job wait-event -f json $job1 priority &&
job2=$(flux python ${SUBMIT_AS} 5001 --setattr=system.project=projectB hostname) &&
flux job wait-event -f json $job2 priority &&
flux cancel $job1 &&
flux cancel $job2
'

test_expect_success 'run fetch-job-records script' '
flux account-fetch-job-records -p ${DB_PATH}
'

test_expect_success 'look at all jobs (will show 4 records)' '
flux account view-job-records > all_jobs.out &&
test $(grep -c "project" all_jobs.out) -eq 4
'

test_expect_success 'filter jobs by projectA (will show 2 records)' '
flux account view-job-records --project=projectA > projectA_jobs.out &&
test $(grep -c "projectA" projectA_jobs.out) -eq 2
'

test_expect_success 'filter jobs by projectB (will show 2 records)' '
flux account view-job-records --project=projectB > projectB_jobs.out &&
test $(grep -c "projectB" projectB_jobs.out) -eq 2
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done

0 comments on commit 921b4ef

Please sign in to comment.