Skip to content

Commit

Permalink
Merge pull request #357 from cmoussa1/add.local.job-archive
Browse files Browse the repository at this point in the history
flux-accounting: add a local job-archive
  • Loading branch information
mergify[bot] authored May 9, 2024
2 parents e5f409b + 035a0b5 commit f44be60
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 231 deletions.
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,5 @@ dist_fluxcmd_SCRIPTS = \
cmd/flux-account-pop-db.py \
cmd/flux-account-export-db.py \
cmd/flux-account-update-db.py \
cmd/flux-account-service.py
cmd/flux-account-service.py \
cmd/flux-account-fetch-job-records.py
18 changes: 18 additions & 0 deletions src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,22 @@ def create_db(
conn.execute("INSERT INTO project_table (project) VALUES ('*')")
conn.commit()

# Jobs Table
# stores job records for associations
logging.info("Creating jobs table in DB...")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id char(16) PRIMARY KEY NOT NULL,
userid integer NOT NULL,
t_submit real NOT NULL,
t_run real NOT NULL,
t_inactive real NOT NULL,
ranks text NOT NULL,
R text NOT NULL,
jobspec text NOT NULL
);"""
)
logging.info("Created jobs table successfully")

conn.close()
38 changes: 19 additions & 19 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,25 +398,25 @@ def get_curr_usg_bin(acct_conn, user, bank):
return float(row[0])


def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank):
def calc_usage_factor(conn, pdhl, user, bank, default_bank):

# hl_period represents the number of seconds that represent one usage bin
hl_period = pdhl * 604800

acct_cur = acct_conn.cursor()
cur = conn.cursor()

# fetch timestamp of the end of the current half-life period
s_end_hl = """
SELECT end_half_life_period FROM t_half_life_period_table WHERE cluster='cluster'
"""
acct_cur.execute(s_end_hl)
row = acct_cur.fetchone()
cur.execute(s_end_hl)
row = cur.fetchone()
end_hl = row[0]

# get jobs that have completed since the last seen completed job
last_j_ts = get_last_job_ts(acct_conn, user, bank)
last_j_ts = get_last_job_ts(conn, user, bank)
user_jobs = get_job_records(
jobs_conn,
conn,
bank,
default_bank,
user=user,
Expand All @@ -436,38 +436,38 @@ def calc_usage_factor(jobs_conn, acct_conn, pdhl, user, bank, default_bank):
last_t_inactive = user_jobs[-1].t_inactive
usg_current = sum(per_job_factors)

update_t_inactive(acct_conn, last_t_inactive, user, bank)
update_t_inactive(conn, last_t_inactive, user, bank)

if len(user_jobs) == 0 and (float(end_hl) > (time.time() - hl_period)):
# no new jobs in the current half-life period
usg_past = fetch_usg_bins(acct_conn, user, bank)
usg_past = fetch_usg_bins(conn, user, bank)

usg_historical = sum(usg_past)
elif len(user_jobs) == 0 and (float(end_hl) < (time.time() - hl_period)):
# no new jobs in the new half-life period
usg_historical = apply_decay_factor(0.5, acct_conn, user, bank)
usg_historical = apply_decay_factor(0.5, conn, user, bank)

update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)
elif (last_t_inactive - float(end_hl)) < hl_period:
# found new jobs in the current half-life period
usg_current += get_curr_usg_bin(acct_conn, user, bank)
usg_current += get_curr_usg_bin(conn, user, bank)

# usage_user_past = sum of the older usage factors
usg_past = fetch_usg_bins(acct_conn, user, bank)
usg_past = fetch_usg_bins(conn, user, bank)

usg_historical = usg_current + sum(usg_past[1:])

update_curr_usg_col(acct_conn, usg_current, user, bank)
update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_curr_usg_col(conn, usg_current, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)
else:
# found new jobs in the new half-life period

# apply decay factor to past usage periods of a user's jobs
usg_past = apply_decay_factor(0.5, acct_conn, user, bank)
usg_past = apply_decay_factor(0.5, conn, user, bank)
usg_historical = usg_current + usg_past

update_curr_usg_col(acct_conn, usg_historical, user, bank)
update_hist_usg_col(acct_conn, usg_historical, user, bank)
update_curr_usg_col(conn, usg_historical, user, bank)
update_hist_usg_col(conn, usg_historical, user, bank)

return usg_historical

Expand Down Expand Up @@ -557,15 +557,15 @@ def calc_parent_bank_usage(acct_conn, cur, bank, total_usage=0.0):
return total_usage


def update_job_usage(acct_conn, jobs_conn, pdhl=1):
def update_job_usage(acct_conn, pdhl=1):
s_assoc = "SELECT username, bank, default_bank FROM association_table"
cur = acct_conn.cursor()
cur.execute(s_assoc)
result = cur.fetchall()

# update the job usage for every user in the association_table
for row in result:
calc_usage_factor(jobs_conn, acct_conn, pdhl, row[0], row[1], row[2])
calc_usage_factor(acct_conn, pdhl, row[0], row[1], row[2])

# find the root bank in the flux-accounting database
s_root_bank = "SELECT bank FROM bank_table WHERE parent_bank=''"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def test_01_tables_exist(self):
"t_half_life_period_table",
"queue_table",
"project_table",
"jobs",
]
self.assertEqual(list_of_tables, expected)

Expand Down
Loading

0 comments on commit f44be60

Please sign in to comment.