Skip to content

Commit

Permalink
Bugfix: Counts and Vocab refresh action(s)
Browse files Browse the repository at this point in the history
- Bugfix: Counts and Vocab refresh GH action: running out of memory. This was solved by removing a few unnecessary steps from our download process, where we thought we had to combine the parquet files and save them to CSV. Then, because we had a dynamic "chunking" operation which we weren't using in order to somehow save on memory, we needed to re-read the saved parquet file(s) and we were concatenating them in a single CSV. The end result was that we were using a lot of needless memory. Now, instead we are downloading the parquet files, converting to CSV on the fly for each part of the CSV, and appending to the output file.
- Update: To use premium GitHub 'large runners' to solve resources issue
- Bugfix: Now deletes old, backed up versions of remade tables at the end. Couldn't drop because some derived tables were still dependent.
- Bugfix: Vocab tables were being filtered based on if the concept or cset IDs appeared in other tables. This is no longer helpful (and is actually bad) now that we refresh via objects API.

Update: Generalized refresh of dataset groups
- Update: refresh_voc_and_counts.py -> refresh_dataset_group_tables.py: Generalized this script so that we can have more flexibility in which groups we run.
- Update: GH action: Also updated the GitHub action so that it runs these groups sequentially. That way, at least one group may successfully complete.
- Update: makefile: With goals for each action

More related updates
- Update: Dataset group refresh stability: Now this task, similar to the normal DB refresh, will check to see if there's currently a dervied table refresh active and, if so, wait until it is done before proceeding. This will help in the incident of multiple refreshes happing simultaneously, especially now since we are splitting the vocab and counts refreshes apart.

General
- Update: makefile: Added missing phony targets, and removed some unnecessary stuff.
  • Loading branch information
joeflack4 committed Nov 1, 2023
1 parent 1216861 commit efeb83e
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 226 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/refresh_counts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ on:
workflow_dispatch:
jobs:
refresh-counts-tables:
runs-on: ubuntu-latest
runs-on: BIDS-Premium-Action-Runners
# runs-on: ubuntu-latest
# fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures
strategy:
fail-fast: false
Expand Down Expand Up @@ -41,5 +42,4 @@ jobs:
pip install -r requirements.txt
- name: Refresh counts
run: |
python backend/db/refresh_dataset_group_tables.py --dataset-group counts
run: make refresh-counts
6 changes: 3 additions & 3 deletions .github/workflows/refresh_voc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ on:
workflow_dispatch:
jobs:
refresh-vocab-tables:
runs-on: ubuntu-latest
runs-on: BIDS-Premium-Action-Runners
# runs-on: ubuntu-latest
# fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures
strategy:
fail-fast: false
Expand Down Expand Up @@ -41,5 +42,4 @@ jobs:
pip install -r requirements.txt
- name: Refresh vocab
run: |
python backend/db/refresh_dataset_group_tables.py --dataset-group vocab
run: make refresh-vocab
16 changes: 10 additions & 6 deletions .github/workflows/refresh_voc_and_counts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ on:
workflow_dispatch:
jobs:
refresh-voc-and-counts:
runs-on: ubuntu-latest
runs-on: BIDS-Premium-Action-Runners
# runs-on: ubuntu-latest
# fail-fast: Can show why action unexpectedly stops: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures
strategy:
fail-fast: false
steps:
- name: Checkout repository and submodules
uses: actions/checkout@v2
Expand All @@ -22,24 +26,24 @@ jobs:
run: |
echo "Commit hash: ${{ github.sha }}"
echo "Branch: ${{ github.ref }}"
- name: 'Create env file'
run: |
mkdir env
echo "${{ secrets.ENV_FILE }}" > env/.env
- name: Create and start virtual environment
run: |
python3 -m venv venv
source venv/bin/activate
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install --upgrade wheel
pip install --upgrade setuptools
pip install -r requirements.txt
- name: Refresh voc and counts
- name: Refresh counts
run: |
python backend/db/refresh_dataset_group_tables.py --dataset-group counts
- name: Refresh vocab
run: |
python backend/db/refresh_voc_and_counts.py
python backend/db/refresh_dataset_group_tables.py --dataset-group vocab
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
PROJECT_ROOT = os.path.join(BACKEND_DIR, '..')
sys.path.insert(0, str(PROJECT_ROOT))
from backend.db.utils import SCHEMA, check_db_status_var, get_db_connection, get_ddl_statements, load_csv, \
refresh_any_dependent_tables, \
run_sql
refresh_derived_tables, run_sql
from enclave_wrangler.config import DATASET_GROUPS_CONFIG
from enclave_wrangler.datasets import download_datasets, get_last_update_of_dataset


def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA):
def refresh_voc_and_counts(dataset_group: List[str], skip_downloads: bool = False, schema=SCHEMA):
"""Refresh vocabulary and counts tables."""
print('Refreshing vocabulary and counts tables.')
for group_name, config in DATASET_GROUPS_CONFIG.items():
selected_configs = {k: v for k, v in DATASET_GROUPS_CONFIG.items() if k in dataset_group}
for group_name, config in selected_configs.items():
print(f'\nRefreshing {group_name} tables...')
# Check if tables are already up to date
last_updated_us: str = check_db_status_var(config['last_updated_termhub_var'])
Expand All @@ -54,24 +54,30 @@ def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA):
load_csv(con, table, replace_rule='do not replace', schema=SCHEMA, optional_suffix='_new')
run_sql(con, f'ALTER TABLE IF EXISTS {schema}.{table} RENAME TO {table}_old;')
run_sql(con, f'ALTER TABLE {schema}.{table}_new RENAME TO {table};')
run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;')

t1 = datetime.now()
print(f' done in {(t1 - t0).seconds} seconds')
# todo: set variable for 'last updated' for each table (look at load())
# - consider: check if table already updated sooner than last_updated_them. if so, skip. and add a param to CLI for this
print('Creating indexes')
statements: List[str] = get_ddl_statements(schema, ['indexes'], 'flat')
statements: List[str] = get_ddl_statements(schema, ['indexes'], return_type='flat')
for statement in statements:
run_sql(con, statement)
# print('Recreating derived tables') # printed w/in refresh_derived_tables()
refresh_any_dependent_tables(con, config['tables'])

print('Recreating derived tables')
refresh_derived_tables(con, config['tables'], schema)
print('Deleting old, temporarily backed up versions of tables')
for table in config['tables']:
run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;')
print('Done')


def cli():
"""Command line interface"""
parser = ArgumentParser(prog='Refresh vocabulary and counts tables.')
parser.add_argument(
'-d', '--dataset-group', nargs='+', choices=list(DATASET_GROUPS_CONFIG.keys()),
default=list(DATASET_GROUPS_CONFIG.keys()),
help='Names of dataset/table groups to refresh.')
parser.add_argument(
'-s', '--skip-downloads', action='store_true',
help='Use if you have already downloaded updated files.')
Expand Down
3 changes: 2 additions & 1 deletion backend/db/refresh_from_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def cli():
parser.add_argument(
'-l', '--use-local-db', action='store_true', default=False, help='Use local database instead of server.')
parser.add_argument(
'-z', '--run-final-ddl-only', action='store_true', default=False, help='Only run indexes_and_derived_tables (ddl.jinja.sql).')
'-z', '--run-final-ddl-only', action='store_true', default=False,
help='Only run indexes_and_derived_tables (ddl.jinja.sql).')
reset_and_update_db(**vars(parser.parse_args()))


Expand Down
4 changes: 2 additions & 2 deletions backend/db/resolve_fetch_failures_0_members.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from enclave_wrangler.objects_api import concept_set_members__from_csets_and_members_to_db, \
fetch_cset_and_member_objects
from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, select_failed_fetches, \
refresh_termhub_core_cset_derived_tables
refresh_derived_tables

DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API."

Expand Down Expand Up @@ -78,7 +78,7 @@ def resolve_fetch_failures_0_members(
if success_cases:
with get_db_connection(schema=schema, local=use_local_db) as con:
concept_set_members__from_csets_and_members_to_db(con, csets_and_members)
refresh_termhub_core_cset_derived_tables(con)
refresh_derived_tables(con)

# Report success
if success_cases:
Expand Down
4 changes: 2 additions & 2 deletions backend/db/resolve_fetch_failures_excess_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
PROJECT_ROOT = os.path.join(BACKEND_DIR, '..')
sys.path.insert(0, str(PROJECT_ROOT))
from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, insert_from_dicts, \
refresh_termhub_core_cset_derived_tables, \
refresh_derived_tables, \
select_failed_fetches
from enclave_wrangler.datasets import CSV_TRANSFORM_DIR, download_datasets
from enclave_wrangler.utils import was_file_modified_within_threshold
Expand Down Expand Up @@ -74,7 +74,7 @@ def resolve_fetch_failures_excess_items(use_local_db=False, cached_dataset_thres
if rows:
insert_from_dicts(con, dataset, rows)
solved_failures.append(failure)
refresh_termhub_core_cset_derived_tables(con)
refresh_derived_tables(con)

# Update fetch_audit status
fetch_status_set_success(solved_failures)
Expand Down
24 changes: 10 additions & 14 deletions backend/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ def refresh_any_dependent_tables(con: Connection, independent_tables: List[str]
derived_tables: List[str] = get_dependent_tables_queue(independent_tables)
if not derived_tables:
print(f'No derived tables found for: {", ".join(independent_tables)}')
refresh_derived_tables(con, derived_tables, schema)
refresh_derived_tables_exec(con, derived_tables, schema)


def refresh_derived_tables(
def refresh_derived_tables_exec(
con: Connection, derived_tables_queue: List[str] = CORE_CSET_DEPENDENT_TABLES, schema=SCHEMA
):
"""Refresh TermHub core cset derived tables
Expand Down Expand Up @@ -157,7 +157,10 @@ def refresh_derived_tables(
# todo: move this somewhere else, possibly load.py or db_refresh.py
# todo: what to do if this process fails? any way to roll back? should we?
# todo: currently has no way of passing 'local' down to db status var funcs
def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, polling_interval_seconds: int = 30):
def refresh_derived_tables(
con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA,
polling_interval_seconds: int = 30
):
"""Refresh TermHub core cset derived tables: wrapper function
Handles simultaneous requests and try/except for worker function: refresh_termhub_core_cset_derived_tables_exec()"""
Expand All @@ -168,18 +171,17 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol
if (datetime.now() - t0).total_seconds() >= 2 * 60 * 60: # 2 hours
raise RuntimeError('Timed out after waiting 2 hours for other active derived table refresh to complete.')
elif check_db_status_var('derived_tables_refresh_status') == 'active':
msg = f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \
if i == 0 else '- trying again'
print(msg)
print(f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \
if i == 1 else '- trying again')
time.sleep(polling_interval_seconds)
else:
try:
update_db_status_var('derived_tables_refresh_status', 'active')
# The following two calls yield equivalent results as of 2023/08/08. I've commented out
# refresh_derived_tables() in case anything goes wrong with refresh_any_dependent_tables(), since that
# is based on a heuristic currently, and if anything goes wrong, we may want to switch back. -joeflack4
# refresh_derived_tables(con, CORE_CSET_DEPENDENT_TABLES, schema)
refresh_any_dependent_tables(con, CORE_CSET_TABLES, schema)
# refresh_derived_tables_exec(con, CORE_CSET_DEPENDENT_TABLES, schema)
refresh_any_dependent_tables(con, independent_tables, schema)
finally:
update_db_status_var('derived_tables_refresh_status', 'inactive')
break
Expand Down Expand Up @@ -214,12 +216,6 @@ def set_search_path(dbapi_connection, connection_record):
return engine.connect()


def chunk_list(input_list: List, chunk_size) -> List[List]:
"""Split a list into chunks"""
for i in range(0, len(input_list), chunk_size):
yield input_list[i:i + chunk_size]


def current_datetime(time_zone=['UTC/GMT', 'EST/EDT'][1]) -> str:
"""Get current datetime in ISO format as a string."""
if time_zone == 'UTC/GMT':
Expand Down
Loading

0 comments on commit efeb83e

Please sign in to comment.