Skip to content

Commit

Permalink
Merge pull request #459 from cmoussa1/job.archive.purge
Browse files Browse the repository at this point in the history
database: add the ability to remove old job records from `jobs` table
  • Loading branch information
mergify[bot] authored Jun 27, 2024
2 parents 2aaf033 + d8382fa commit ab3f9e6
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 0 deletions.
30 changes: 30 additions & 0 deletions doc/guide/accounting-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,36 @@ The scripts should be run by :core:man1:`flux-cron`:
30 * * * * bash -c "flux account-fetch-job-records; flux account update-usage; flux account-update-fshare; flux account-priority-update"
Periodically fetching and storing job records in the flux-accounting database
can cause the DB to grow large in size. Since there comes a point where job
records become no longer useful to flux-accounting in terms of job usage and
fair-share calculation, you can run ``flux account scrub-old-jobs`` to
remove old job records. If no argument is passed to this command, it will
delete any job record that has completed more than 6 months ago. This can be
tuned by specifying the number of weeks to go back when determining which
records to remove. The example below will remove any job record more than 4
weeks old:

.. code-block:: console
$ flux account scrub-old-jobs 4
By default, the memory occupied by a SQLite database does not decrease when
records are ``DELETE``'d from the database. After scrubbing old job records
from the flux-accounting database, if space is still an issue, the ``VACUUM``
command will clean up the space previously occupied by those deleted records.
You can run this command by connecting to the flux-accounting database in a
SQLite shell:

.. code-block:: console
$ sqlite3 FluxAccounting.db
sqlite> VACUUM;
Note that running ``VACUUM`` can take minutes to run and also requires an
exclusive lock on the database; it will fail if the database has a pending SQL
statement or open transaction.

***********************
Database Administration
***********************
Expand Down
15 changes: 15 additions & 0 deletions src/bindings/python/fluxacct/accounting/job_archive_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,18 @@ def update_job_usage(acct_conn, pdhl=1):
acct_conn.commit()

return 0


# Scrub jobs from the flux-accounting "jobs" table by removing any
# record that is older than num_weeks old. If no number of weeks is
# specified, remove any record that is older than 6 months old.
def scrub_old_jobs(conn, num_weeks=26):
cur = conn.cursor()
# calculate total amount of time to go back (in terms of seconds)
# (there are 604,800 seconds in a week)
cutoff_time = time.time() - (num_weeks * 604800)

# fetch all jobs that finished before this time
select_stmt = "DELETE FROM jobs WHERE t_inactive < ?"
cur.execute(select_stmt, (cutoff_time,))
conn.commit()
15 changes: 15 additions & 0 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, flux_handle, conn):
"edit_queue",
"add_project",
"delete_project",
"scrub_old_jobs",
"shutdown_service",
]

Expand Down Expand Up @@ -483,6 +484,20 @@ def delete_project(self, handle, watcher, msg, arg):
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
)

def scrub_old_jobs(self, handle, watcher, msg, arg):
try:
val = jobs.scrub_old_jobs(self.conn, msg.payload["num_weeks"])

payload = {"scrub_old_jobs": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
)


LOGGER = logging.getLogger("flux-uri")

Expand Down
25 changes: 25 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,24 @@ def add_delete_project_arg(subparsers):
)


def add_scrub_job_records_arg(subparsers):
subparser = subparsers.add_parser(
"scrub-old-jobs",
help="clean job-archive of old job records",
formatter_class=flux.util.help_formatter(),
)

subparser.set_defaults(func="scrub_old_jobs")
subparser.add_argument(
"num_weeks",
help="delete jobs that have finished more than NUM_WEEKS ago",
type=int,
nargs="?",
metavar="NUM_WEEKS",
default=26,
)


def add_arguments_to_parser(parser, subparsers):
add_path_arg(parser)
add_output_file_arg(parser)
Expand All @@ -505,6 +523,7 @@ def add_arguments_to_parser(parser, subparsers):
add_add_project_arg(subparsers)
add_view_project_arg(subparsers)
add_delete_project_arg(subparsers)
add_scrub_job_records_arg(subparsers)


def set_db_location(args):
Expand Down Expand Up @@ -670,6 +689,12 @@ def select_accounting_function(args, output_file, parser):
"project": args.project,
}
return_val = flux.Flux().rpc("accounting.delete_project", data).get()
elif args.func == "scrub_old_jobs":
data = {
"path": args.path,
"num_weeks": args.num_weeks,
}
return_val = flux.Flux().rpc("accounting.scrub_old_jobs", data).get()
else:
print(parser.print_usage())
return
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TESTSCRIPTS = \
t1032-mf-priority-update-bank.t \
t1033-mf-priority-update-job.t \
t1034-mf-priority-config.t \
t1035-flux-account-scrub-old-jobs.t \
t5000-valgrind.t \
python/t1000-example.py

Expand Down
97 changes: 97 additions & 0 deletions t/scripts/insert_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3
###############################################################
# Copyright 2024 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import sqlite3
import sqlite3
import sys
import time


def main():
if len(sys.argv) < 2:
sys.exit(f"Usage: insert_jobs DATABASE_PATH")

db_uri = sys.argv[1]

try:
conn = sqlite3.connect(db_uri, uri=True)
cur = conn.cursor()
except sqlite3.OperationalError as exc:
print(f"Unable to open database file: {db_uri}", file=sys.stderr)
print(exc)
sys.exit(1)

userid = 9999
t_submit = t_run = 0
t_inactive_recent = time.time() # job that just finished
t_inactive_two_weeks = time.time() - (604861 * 2) # more than 2 weeks old
t_inactive_old = time.time() - (604861 * 27) # more than six months old
ranks = r = jobspec = ""
insert_stmt = "INSERT INTO jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?)"

cur.execute(
insert_stmt,
(
"1",
userid,
t_submit,
t_run,
t_inactive_recent,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"2",
userid,
t_submit,
t_run,
t_inactive_two_weeks,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"3",
userid,
t_submit,
t_run,
t_inactive_two_weeks,
ranks,
r,
jobspec,
),
)
cur.execute(
insert_stmt,
(
"4",
userid,
t_submit,
t_run,
t_inactive_old,
ranks,
r,
jobspec,
),
)

conn.commit()
conn.close()


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions t/t1026-flux-account-perms.t
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ test_expect_success 'delete-project should not be accessible by all users' '
)
'

test_expect_success 'scrub-old-jobs should not be accessible by all users' '
newid=$(($(id -u)+1)) &&
( export FLUX_HANDLE_ROLEMASK=0x2 &&
export FLUX_HANDLE_USERID=$newid &&
test_must_fail flux account scrub-old-jobs > no_access_scrub_old_jobs.out 2>&1 &&
grep "Request requires owner credentials" no_access_scrub_old_jobs.out
)
'

test_expect_success 'remove flux-accounting DB' '
rm $(pwd)/FluxAccountingTest.db
'
Expand Down
83 changes: 83 additions & 0 deletions t/t1035-flux-account-scrub-old-jobs.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/bin/bash

test_description='test removing old job records from the flux-accounting database'

. `dirname $0`/sharness.sh
DB_PATH=$(pwd)/FluxAccountingTest.db
QUERYCMD="flux python ${SHARNESS_TEST_SRCDIR}/scripts/query.py"
INSERT_JOBS="flux python ${SHARNESS_TEST_SRCDIR}/scripts/insert_jobs.py"

export TEST_UNDER_FLUX_NO_JOB_EXEC=y
export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1"
test_under_flux 1 job

flux setattr log-stderr-level 1

# get job records from jobs table
# arg1 - database path
get_job_records() {
local dbpath=$1
local i=0
local row_count=0
query="select count(*) from jobs;"

row_count=$(${QUERYCMD} -t 100 ${dbpath} "${query}" | awk -F' = ' '{print $2}')
echo $row_count
}

test_expect_success 'create flux-accounting DB' '
flux account -p $(pwd)/FluxAccountingTest.db create-db
'

test_expect_success 'start flux-accounting service' '
flux account-service -p ${DB_PATH} -t
'

# insert_jobs.py inserts three fake job records into the jobs table in the
# flux-accounting database. Four total job records are added to the jobs table:
#
# Two of the jobs have a simulated time of finishing just over two weeks ago.
# One of the jobs has a simulated time of finishing very recently.
# One of the jobs has a simulated time of finishing over six months ago.
test_expect_success 'populate DB with four job records' '
${INSERT_JOBS} ${DB_PATH}
'

test_expect_success 'ensure the jobs table has four records in it' '
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 4
'

test_expect_success 'do not pass an argument to scrub-old-jobs (should remove the oldest job)' '
flux account -p ${DB_PATH} scrub-old-jobs &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 3
'

# Passing 0 for num_weeks is saying "Remove all records older than 0 weeks
# old," or rather, remove all jobs in the table.
test_expect_success 'if we pass 0 for num_weeks, all jobs will be removed' '
flux account scrub-old-jobs 0 &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 0
'

# If num_weeks == 2, all jobs that have finished more than 2 weeks ago will be
# removed. In our testsuite, that should leave just the job that finished
# "recently".
test_expect_success 'only remove job records older than 2 weeks old' '
${INSERT_JOBS} ${DB_PATH} &&
flux account scrub-old-jobs 2 &&
get_job_records ${DB_PATH} > result.out &&
test $(cat result.out) -eq 1
'

test_expect_success 'remove flux-accounting DB' '
rm $(pwd)/FluxAccountingTest.db
'

test_expect_success 'shut down flux-accounting service' '
flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()"
'

test_done

0 comments on commit ab3f9e6

Please sign in to comment.