Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flux-accounting: add a local job-archive #357

Merged
merged 12 commits into from
May 9, 2024
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,5 @@ dist_fluxcmd_SCRIPTS = \
cmd/flux-account-pop-db.py \
cmd/flux-account-export-db.py \
cmd/flux-account-update-db.py \
cmd/flux-account-service.py
cmd/flux-account-service.py \
cmd/flux-account-fetch-job-records.py
18 changes: 18 additions & 0 deletions src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,22 @@ def create_db(
conn.execute("INSERT INTO project_table (project) VALUES ('*')")
conn.commit()

# Jobs Table
# stores job records for associations
logging.info("Creating jobs table in DB...")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id char(16) PRIMARY KEY NOT NULL,
userid integer NOT NULL,
t_submit real NOT NULL,
t_run real NOT NULL,
t_inactive real NOT NULL,
ranks text NOT NULL,
R text NOT NULL,
jobspec text NOT NULL
);"""
)
logging.info("Created jobs table successfully")

conn.close()
38 changes: 19 additions & 19 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,25 +398,25 @@ def get_curr_usg_bin(acct_conn, user, bank):
return float(row[0])


def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank):
def calc_usage_factor(conn, pdhl, user, bank, default_bank):

# hl_period represents the number of seconds that represent one usage bin
hl_period = pdhl * 604800

acct_cur = acct_conn.cursor()
cur = conn.cursor()

# fetch timestamp of the end of the current half-life period
s_end_hl = """
SELECT end_half_life_period FROM t_half_life_period_table WHERE cluster='cluster'
"""
acct_cur.execute(s_end_hl)
row = acct_cur.fetchone()
cur.execute(s_end_hl)
row = cur.fetchone()
end_hl = row[0]

# get jobs that have completed since the last seen completed job
last_j_ts = get_last_job_ts(acct_conn, user, bank)
last_j_ts = get_last_job_ts(conn, user, bank)
user_jobs = get_job_records(
jobs_conn,
conn,
bank,
default_bank,
user=user,
Expand All @@ -436,38 +436,38 @@ def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank):
last_t_inactive = user_jobs[-1].t_inactive
usg_current = sum(per_job_factors)

update_t_inactive(acct_conn, last_t_inactive, user, bank)
update_t_inactive(conn, last_t_inactive, user, bank)

if len(user_jobs) == 0 and (float(end_hl) > (time.time() - hl_period)):
# no new jobs in the current half-life period
usg_past = fetch_usg_bins(acct_conn, user, bank)
usg_past = fetch_usg_bins(conn, user, bank)

usg_historical = sum(usg_past)
elif len(user_jobs) == 0 and (float(end_hl) < (time.time() - hl_period)):
# no new jobs in the new half-life period
usg_historical = apply_decay_factor(0.5, acct_conn, user, bank)
usg_historical = apply_decay_factor(0.5, conn, user, bank)

update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)
elif (last_t_inactive - float(end_hl)) < hl_period:
# found new jobs in the current half-life period
usg_current += get_curr_usg_bin(acct_conn, user, bank)
usg_current += get_curr_usg_bin(conn, user, bank)

# usage_user_past = sum of the older usage factors
usg_past = fetch_usg_bins(acct_conn, user, bank)
usg_past = fetch_usg_bins(conn, user, bank)

usg_historical = usg_current + sum(usg_past[1:])

update_curr_usg_col(acct_conn, usg_current, user, bank)
update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_curr_usg_col(conn, usg_current, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)
else:
# found new jobs in the new half-life period

# apply decay factor to past usage periods of a user's jobs
usg_past = apply_decay_factor(0.5, acct_conn, user, bank)
usg_past = apply_decay_factor(0.5, conn, user, bank)
usg_historical = usg_current + usg_past

update_curr_usg_col(acct_conn, usg_historical, user, bank)
update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_curr_usg_col(conn, usg_historical, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)

return usg_historical

Expand Down Expand Up @@ -557,15 +557,15 @@ def calc_parent_bank_usage(acct_conn, cur, bank, total_usage=0.0):
return total_usage


def update_job_usage(acct_conn, jobs_conn, pdhl=1):
def update_job_usage(acct_conn, pdhl=1):
s_assoc = "SELECT username, bank, default_bank FROM association_table"
cur = acct_conn.cursor()
cur.execute(s_assoc)
result = cur.fetchall()

# update the job usage for every user in the association_table
for row in result:
calc_usage_factor(jobs_conn, acct_conn, pdhl, row[0], row[1], row[2])
calc_usage_factor(acct_conn, pdhl, row[0], row[1], row[2])

# find the root bank in the flux-accounting database
s_root_bank = "SELECT bank FROM bank_table WHERE parent_bank=''"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_01_tables_exist(self):
"t_half_life_period_table",
"queue_table",
"project_table",
"jobs",
]
self.assertEqual(list_of_tables, expected)

Expand Down
Loading
Loading