Skip to content

Commit

Permalink
Merge pull request #460 from cmoussa1/view.jobs.cleanup
Browse files Browse the repository at this point in the history
job archive interface: clean up a couple helper functions
  • Loading branch information
mergify[bot] authored Jul 1, 2024
2 parents ab3f9e6 + caee39f commit 6672f86
Showing 1 changed file with 35 additions and 58 deletions.
93 changes: 35 additions & 58 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ def add_job_records(rows):
job_records = []

for row in rows:
rset = ResourceSet(row[6]) # fetch R
try:
# attempt to create a ResourceSet from R
rset = ResourceSet(row[6])
nnodes = rset.nnodes
except (ValueError, TypeError):
# can't convert R to a ResourceSet object; skip it
continue

job_record = JobRecord(
row[0], # userid
Expand All @@ -132,7 +138,7 @@ def add_job_records(rows):
row[2], # t_submit
row[3], # t_run
row[4], # t_inactive
rset.nnodes, # nnodes
nnodes, # nnodes
row[6], # resources
)
job_records.append(job_record)
Expand All @@ -150,99 +156,70 @@ def check_jobspec(jobspec, bank):
)


# we are looking for jobs that were submitted under a secondary bank, so we'll
# only add jobs that have the same bank name attribute in the jobspec
def sec_bank_jobs(job_records, bank):
# Filter job records based on the specified bank. For a default bank,
# it includes jobs that either specify the default bank or do not
# specify any bank at all.
def filter_jobs_by_bank(job_records, bank, is_default_bank=False):
jobs = []
for job in job_records:
jobspec = json.loads(job[7])

if check_jobspec(jobspec, bank):
jobs.append(job)

return jobs


# we are looking for jobs that were submitted under a default bank, which has
# two cases: 1) the user submitted a job while specifying their default bank,
# or 2) the user submitted a job without specifying any bank at all
def def_bank_jobs(job_records, default_bank):
jobs = []
for job in job_records:
jobspec = json.loads(job[7])

if check_jobspec(jobspec, default_bank):
jobs.append(job)
elif "bank" not in jobspec["attributes"]["system"]:
elif is_default_bank and "bank" not in jobspec["attributes"]["system"]:
jobs.append(job)

return jobs


def get_job_records(conn, bank, default_bank, **kwargs):
job_records = []

# find out which args were passed and place them in a dict
valid_params = ("user", "after_start_time", "before_end_time", "jobid")
params = {}
params_list = []

valid_params = {"user", "after_start_time", "before_end_time", "jobid"}
params = {
key: val
for (key, val) in kwargs.items()
for key, val in kwargs.items()
if val is not None and key in valid_params
}

select_stmt = (
"SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs "
)
where_stmt = ""

def append_to_where(where_stmt, conditional):
if where_stmt != "":
return "{} AND {} ".format(where_stmt, conditional)

return "WHERE {}".format(conditional)
select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs"
where_clauses = []
params_list = []

# generate the SELECT statement based on the parameters passed in
if "user" in params:
params["user"] = get_uid(params["user"])
where_clauses.append("userid = ?")
params_list.append(params["user"])
where_stmt = append_to_where(where_stmt, "userid=? ")
if "after_start_time" in params:
where_clauses.append("t_run > ?")
params_list.append(params["after_start_time"])
where_stmt = append_to_where(where_stmt, "t_run > ? ")
if "before_end_time" in params:
where_clauses.append("t_inactive < ?")
params_list.append(params["before_end_time"])
where_stmt = append_to_where(where_stmt, "t_inactive < ? ")
if "jobid" in params:
where_clauses.append("id = ?")
params_list.append(params["jobid"])
where_stmt = append_to_where(where_stmt, "id=? ")

select_stmt += where_stmt
if where_clauses:
select_stmt += " WHERE " + " AND ".join(where_clauses)

cur = conn.cursor()
cur.execute(select_stmt, (*tuple(params_list),))
cur.execute(select_stmt, tuple(params_list))
result = cur.fetchall()
# if the length of dataframe is 0, that means no job records were found
# in the jobs table, so just return an empty list
if len(result) == 0:
return job_records

if not result:
return []

if bank is None and default_bank is None:
# special case for unit tests in test_job_archive_interface.py
job_records = add_job_records(result)

return job_records
return add_job_records(result)

if bank != default_bank:
jobs = sec_bank_jobs(result, bank)
else:
jobs = def_bank_jobs(result, default_bank)
# find out if we are fetching jobs from a user's default bank or under
# one of their secondary banks; this will determine how we filter the
# job records we've found
is_default_bank = bank == default_bank
jobs = filter_jobs_by_bank(result, bank, is_default_bank)

job_records = add_job_records(jobs)

return job_records
return add_job_records(jobs)


def output_job_records(conn, output_file, **kwargs):
Expand Down

0 comments on commit 6672f86

Please sign in to comment.