From ecf252a3af4c5ad7171177af1b84f6d308caf46e Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 5 Aug 2024 16:41:19 -0700 Subject: [PATCH] fetch-job-records: add integrity check for records Problem: There is a case where a job can have an R but not have entered RUN state, and thus, does not receive a "t_run" time. The "flux account-fetch-job-records" script does not verify that the timestamp key-value pairs exist before attempting to insert them into the "jobs" table in the flux-accounting DB, which can result in a KeyError. Add two integrity checks for job records fetched by this script. - When initially fetching jobs that checks that each key-value pair exists before adding it to a list of fetched job records, if a key-value pair is not valid, skip adding the job. - When attempting to insert a job record into the "jobs" table, wrap the INSERT in a try-except block in case a KeyError is raised. If so, just skip adding the job. --- src/cmd/flux-account-fetch-job-records.py | 54 ++++++++++++++++------- 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/src/cmd/flux-account-fetch-job-records.py b/src/cmd/flux-account-fetch-job-records.py index 2c47d27cd..242449183 100755 --- a/src/cmd/flux-account-fetch-job-records.py +++ b/src/cmd/flux-account-fetch-job-records.py @@ -90,6 +90,24 @@ def fetch_new_jobs(last_timestamp=0.0): if data["jobspec"] is not None: single_record["jobspec"] = data["jobspec"] + required_keys = [ + "userid", + "t_submit", + "t_run", + "t_inactive", + "ranks", + "id", + "R", + "jobspec", + ] + if not all( + key in single_record and single_record[key] is not None + for key in required_keys + ): + # job does not have all required fields to be added to jobs table + # in DB; skip this entry + continue + # append job to job_records list job_records.append(single_record) @@ -101,22 +119,26 @@ def insert_jobs_in_db(conn, job_records): cur = conn.cursor() for single_job in job_records: - cur.execute( - """ - INSERT OR IGNORE INTO jobs - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - single_job["id"], - single_job["userid"], - single_job["t_submit"], - single_job["t_run"], - single_job["t_inactive"], - single_job["ranks"], - single_job["R"], - single_job["jobspec"], - ), - ) + try: + cur.execute( + """ + INSERT OR IGNORE INTO jobs + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + single_job["id"], + single_job["userid"], + single_job["t_submit"], + single_job["t_run"], + single_job["t_inactive"], + single_job["ranks"], + single_job["R"], + single_job["jobspec"], + ), + ) + except KeyError: + # one of the key-value pairs is missing or invalid; skip the entry + continue conn.commit()