diff --git a/src/bindings/python/fluxacct/accounting/job_archive_interface.py b/src/bindings/python/fluxacct/accounting/job_archive_interface.py index bd38882b..ca1f34f2 100755 --- a/src/bindings/python/fluxacct/accounting/job_archive_interface.py +++ b/src/bindings/python/fluxacct/accounting/job_archive_interface.py @@ -123,7 +123,13 @@ def add_job_records(rows): job_records = [] for row in rows: - rset = ResourceSet(row[6]) # fetch R + try: + # attempt to create a ResourceSet from R + rset = ResourceSet(row[6]) + nnodes = rset.nnodes + except (ValueError, TypeError): + # can't convert R to a ResourceSet object; skip it + continue job_record = JobRecord( row[0], # userid @@ -132,7 +138,7 @@ def add_job_records(rows): row[2], # t_submit row[3], # t_run row[4], # t_inactive - rset.nnodes, # nnodes + nnodes, # nnodes row[6], # resources ) job_records.append(job_record) @@ -150,99 +156,70 @@ def check_jobspec(jobspec, bank): ) -# we are looking for jobs that were submitted under a secondary bank, so we'll -# only add jobs that have the same bank name attribute in the jobspec -def sec_bank_jobs(job_records, bank): +# Filter job records based on the specified bank. For a default bank, +# it includes jobs that either specify the default bank or do not +# specify any bank at all. +def filter_jobs_by_bank(job_records, bank, is_default_bank=False): jobs = [] for job in job_records: jobspec = json.loads(job[7]) if check_jobspec(jobspec, bank): jobs.append(job) - - return jobs - - -# we are looking for jobs that were submitted under a default bank, which has -# two cases: 1) the user submitted a job while specifying their default bank, -# or 2) the user submitted a job without specifying any bank at all -def def_bank_jobs(job_records, default_bank): - jobs = [] - for job in job_records: - jobspec = json.loads(job[7]) - - if check_jobspec(jobspec, default_bank): - jobs.append(job) - elif "bank" not in jobspec["attributes"]["system"]: + elif is_default_bank and "bank" not in jobspec["attributes"]["system"]: jobs.append(job) return jobs def get_job_records(conn, bank, default_bank, **kwargs): - job_records = [] - # find out which args were passed and place them in a dict - valid_params = ("user", "after_start_time", "before_end_time", "jobid") - params = {} - params_list = [] - + valid_params = {"user", "after_start_time", "before_end_time", "jobid"} params = { key: val - for (key, val) in kwargs.items() + for key, val in kwargs.items() if val is not None and key in valid_params } - select_stmt = ( - "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs " - ) - where_stmt = "" - - def append_to_where(where_stmt, conditional): - if where_stmt != "": - return "{} AND {} ".format(where_stmt, conditional) - - return "WHERE {}".format(conditional) + select_stmt = "SELECT userid,id,t_submit,t_run,t_inactive,ranks,R,jobspec FROM jobs" + where_clauses = [] + params_list = [] - # generate the SELECT statement based on the parameters passed in if "user" in params: params["user"] = get_uid(params["user"]) + where_clauses.append("userid = ?") params_list.append(params["user"]) - where_stmt = append_to_where(where_stmt, "userid=? ") if "after_start_time" in params: + where_clauses.append("t_run > ?") params_list.append(params["after_start_time"]) - where_stmt = append_to_where(where_stmt, "t_run > ? ") if "before_end_time" in params: + where_clauses.append("t_inactive < ?") params_list.append(params["before_end_time"]) - where_stmt = append_to_where(where_stmt, "t_inactive < ? ") if "jobid" in params: + where_clauses.append("id = ?") params_list.append(params["jobid"]) - where_stmt = append_to_where(where_stmt, "id=? ") - select_stmt += where_stmt + if where_clauses: + select_stmt += " WHERE " + " AND ".join(where_clauses) cur = conn.cursor() - cur.execute(select_stmt, (*tuple(params_list),)) + cur.execute(select_stmt, tuple(params_list)) result = cur.fetchall() - # if the length of dataframe is 0, that means no job records were found - # in the jobs table, so just return an empty list - if len(result) == 0: - return job_records + + if not result: + return [] if bank is None and default_bank is None: # special case for unit tests in test_job_archive_interface.py - job_records = add_job_records(result) - - return job_records + return add_job_records(result) - if bank != default_bank: - jobs = sec_bank_jobs(result, bank) - else: - jobs = def_bank_jobs(result, default_bank) + # find out if we are fetching jobs from a user's default bank or under + # one of their secondary banks; this will determine how we filter the + # job records we've found + is_default_bank = bank == default_bank + jobs = filter_jobs_by_bank(result, bank, is_default_bank) - job_records = add_job_records(jobs) - - return job_records + return add_job_records(jobs) def output_job_records(conn, output_file, **kwargs):