From 314d69aaaa6ccfe1c5e27d0d245453fcf4abc03d Mon Sep 17 00:00:00 2001 From: joeflack4 Date: Thu, 28 Sep 2023 20:37:51 -0400 Subject: [PATCH] Bugfix: Counts and Vocab refresh action(s) - Bugfix: Counts and Vocab refresh GH action: running out of memory. This was solved by removing a few unnecessary steps from our download process, where we thought we had to combine the parquet files and save them to CSV. Then, because we had a dynamic "chunking" operation which we weren't using in order to somehow save on memory, we needed to re-read the saved parquet file(s) and we were concatenating them in a single CSV. The end result was that we were using a lot of needless memory. Now, instead we are downloading the parquet files, converting to CSV on the fly for each part of the CSV, and appending to the output file. - Update: To use premium GitHub 'large runners' to solve resources issue - Bugfix: Now deletes old, backed up versions of remade tables at the end. Couldn't drop because some derived tables were still dependent. - Bugfix: Vocab tables were being filtered based on if the concept or cset IDs appeared in other tables. This is no longer helpful (and is actually bad) now that we refresh via objects API. Update: Generalized refresh of dataset groups - Update: refresh_voc_and_counts.py -> refresh_dataset_group_tables.py: Generalized this script so that we can have more flexibility in which groups we run. - Update: GH action: Also updated the GitHub action so that it runs these groups sequentially. That way, at least one group may successfully complete. - Update: makefile: With goals for each action More related updates - Update: Dataset group refresh stability: Now this task, similar to the normal DB refresh, will check to see if there's currently a dervied table refresh active and, if so, wait until it is done before proceeding. This will help in the incident of multiple refreshes happing simultaneously, especially now since we are splitting the vocab and counts refreshes apart. General - Update: makefile: Added missing phony targets, and removed some unnecessary stuff. --- .github/workflows/refresh_counts.yml | 6 +- .github/workflows/refresh_voc.yml | 6 +- .github/workflows/refresh_voc_and_counts.yml | 16 +- ...nts.py => refresh_dataset_group_tables.py} | 24 +- backend/db/refresh_from_datasets.py | 3 +- .../db/resolve_fetch_failures_0_members.py | 4 +- .../db/resolve_fetch_failures_excess_items.py | 4 +- backend/db/utils.py | 24 +- enclave_wrangler/datasets.py | 218 ++++-------------- enclave_wrangler/objects_api.py | 4 +- makefile | 15 +- requirements-unlocked.txt | 1 + termhub-vocab | 2 +- 13 files changed, 104 insertions(+), 223 deletions(-) rename backend/db/{refresh_voc_and_counts.py => refresh_dataset_group_tables.py} (82%) diff --git a/.github/workflows/refresh_counts.yml b/.github/workflows/refresh_counts.yml index 5c5e0f098..5ef0c6ae1 100644 --- a/.github/workflows/refresh_counts.yml +++ b/.github/workflows/refresh_counts.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-counts-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh counts - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group counts + run: make refresh-counts diff --git a/.github/workflows/refresh_voc.yml b/.github/workflows/refresh_voc.yml index 322350404..dbf2f259f 100644 --- a/.github/workflows/refresh_voc.yml +++ b/.github/workflows/refresh_voc.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-vocab-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh vocab - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + run: make refresh-vocab diff --git a/.github/workflows/refresh_voc_and_counts.yml b/.github/workflows/refresh_voc_and_counts.yml index 889d6944a..c609d24f7 100644 --- a/.github/workflows/refresh_voc_and_counts.yml +++ b/.github/workflows/refresh_voc_and_counts.yml @@ -7,7 +7,11 @@ on: workflow_dispatch: jobs: refresh-voc-and-counts: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest + # fail-fast: Can show why action unexpectedly stops: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures + strategy: + fail-fast: false steps: - name: Checkout repository and submodules uses: actions/checkout@v2 @@ -22,17 +26,14 @@ jobs: run: | echo "Commit hash: ${{ github.sha }}" echo "Branch: ${{ github.ref }}" - - name: 'Create env file' run: | mkdir env echo "${{ secrets.ENV_FILE }}" > env/.env - - name: Create and start virtual environment run: | python3 -m venv venv source venv/bin/activate - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -40,6 +41,9 @@ jobs: pip install --upgrade setuptools pip install -r requirements.txt - - name: Refresh voc and counts + - name: Refresh counts + run: | + python backend/db/refresh_dataset_group_tables.py --dataset-group counts + - name: Refresh vocab run: | - python backend/db/refresh_voc_and_counts.py + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab \ No newline at end of file diff --git a/backend/db/refresh_voc_and_counts.py b/backend/db/refresh_dataset_group_tables.py similarity index 82% rename from backend/db/refresh_voc_and_counts.py rename to backend/db/refresh_dataset_group_tables.py index e703eda0c..3fc6e0a89 100644 --- a/backend/db/refresh_voc_and_counts.py +++ b/backend/db/refresh_dataset_group_tables.py @@ -24,16 +24,16 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, check_db_status_var, get_db_connection, get_ddl_statements, load_csv, \ - refresh_any_dependent_tables, \ - run_sql + refresh_derived_tables, run_sql from enclave_wrangler.config import DATASET_GROUPS_CONFIG from enclave_wrangler.datasets import download_datasets, get_last_update_of_dataset -def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): +def refresh_voc_and_counts(dataset_group: List[str], skip_downloads: bool = False, schema=SCHEMA): """Refresh vocabulary and counts tables.""" print('Refreshing vocabulary and counts tables.') - for group_name, config in DATASET_GROUPS_CONFIG.items(): + selected_configs = {k: v for k, v in DATASET_GROUPS_CONFIG.items() if k in dataset_group} + for group_name, config in selected_configs.items(): print(f'\nRefreshing {group_name} tables...') # Check if tables are already up to date last_updated_us: str = check_db_status_var(config['last_updated_termhub_var']) @@ -54,24 +54,30 @@ def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): load_csv(con, table, replace_rule='do not replace', schema=SCHEMA, optional_suffix='_new') run_sql(con, f'ALTER TABLE IF EXISTS {schema}.{table} RENAME TO {table}_old;') run_sql(con, f'ALTER TABLE {schema}.{table}_new RENAME TO {table};') - run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;') + t1 = datetime.now() print(f' done in {(t1 - t0).seconds} seconds') # todo: set variable for 'last updated' for each table (look at load()) # - consider: check if table already updated sooner than last_updated_them. if so, skip. and add a param to CLI for this print('Creating indexes') - statements: List[str] = get_ddl_statements(schema, ['indexes'], 'flat') + statements: List[str] = get_ddl_statements(schema, ['indexes'], return_type='flat') for statement in statements: run_sql(con, statement) - # print('Recreating derived tables') # printed w/in refresh_derived_tables() - refresh_any_dependent_tables(con, config['tables']) - + print('Recreating derived tables') + refresh_derived_tables(con, config['tables'], schema) + print('Deleting old, temporarily backed up versions of tables') + for table in config['tables']: + run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;') print('Done') def cli(): """Command line interface""" parser = ArgumentParser(prog='Refresh vocabulary and counts tables.') + parser.add_argument( + '-d', '--dataset-group', nargs='+', choices=list(DATASET_GROUPS_CONFIG.keys()), + default=list(DATASET_GROUPS_CONFIG.keys()), + help='Names of dataset/table groups to refresh.') parser.add_argument( '-s', '--skip-downloads', action='store_true', help='Use if you have already downloaded updated files.') diff --git a/backend/db/refresh_from_datasets.py b/backend/db/refresh_from_datasets.py index b923641e8..0edbd046b 100755 --- a/backend/db/refresh_from_datasets.py +++ b/backend/db/refresh_from_datasets.py @@ -105,7 +105,8 @@ def cli(): parser.add_argument( '-l', '--use-local-db', action='store_true', default=False, help='Use local database instead of server.') parser.add_argument( - '-z', '--run-final-ddl-only', action='store_true', default=False, help='Only run indexes_and_derived_tables (ddl.jinja.sql).') + '-z', '--run-final-ddl-only', action='store_true', default=False, + help='Only run indexes_and_derived_tables (ddl.jinja.sql).') reset_and_update_db(**vars(parser.parse_args())) diff --git a/backend/db/resolve_fetch_failures_0_members.py b/backend/db/resolve_fetch_failures_0_members.py index 6baea9258..112c31a24 100644 --- a/backend/db/resolve_fetch_failures_0_members.py +++ b/backend/db/resolve_fetch_failures_0_members.py @@ -17,7 +17,7 @@ from enclave_wrangler.objects_api import concept_set_members__from_csets_and_members_to_db, \ fetch_cset_and_member_objects from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, select_failed_fetches, \ - refresh_termhub_core_cset_derived_tables + refresh_derived_tables DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API." @@ -78,7 +78,7 @@ def resolve_fetch_failures_0_members( if success_cases: with get_db_connection(schema=schema, local=use_local_db) as con: concept_set_members__from_csets_and_members_to_db(con, csets_and_members) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Report success if success_cases: diff --git a/backend/db/resolve_fetch_failures_excess_items.py b/backend/db/resolve_fetch_failures_excess_items.py index f059e725d..3f82d684a 100644 --- a/backend/db/resolve_fetch_failures_excess_items.py +++ b/backend/db/resolve_fetch_failures_excess_items.py @@ -11,7 +11,7 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ select_failed_fetches from enclave_wrangler.datasets import CSV_TRANSFORM_DIR, download_datasets from enclave_wrangler.utils import was_file_modified_within_threshold @@ -74,7 +74,7 @@ def resolve_fetch_failures_excess_items(use_local_db=False, cached_dataset_thres if rows: insert_from_dicts(con, dataset, rows) solved_failures.append(failure) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Update fetch_audit status fetch_status_set_success(solved_failures) diff --git a/backend/db/utils.py b/backend/db/utils.py index e43fcb73f..7e3cf56a6 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -122,10 +122,10 @@ def refresh_any_dependent_tables(con: Connection, independent_tables: List[str] if not derived_tables: print(f'No derived tables found for: {", ".join(independent_tables)}') return - refresh_derived_tables(con, derived_tables, schema) + refresh_derived_tables_exec(con, derived_tables, schema) -def refresh_derived_tables( +def refresh_derived_tables_exec( con: Connection, derived_tables_queue: List[str] = CORE_CSET_DEPENDENT_TABLES, schema=SCHEMA ): """Refresh TermHub core cset derived tables @@ -165,7 +165,10 @@ def refresh_derived_tables( # todo: move this somewhere else, possibly load.py or db_refresh.py # todo: what to do if this process fails? any way to roll back? should we? # todo: currently has no way of passing 'local' down to db status var funcs -def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, polling_interval_seconds: int = 30): +def refresh_derived_tables( + con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, + polling_interval_seconds: int = 30 +): """Refresh TermHub core cset derived tables: wrapper function Handles simultaneous requests and try/except for worker function: refresh_termhub_core_cset_derived_tables_exec()""" @@ -176,9 +179,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol if (datetime.now() - t0).total_seconds() >= 2 * 60 * 60: # 2 hours raise RuntimeError('Timed out after waiting 2 hours for other active derived table refresh to complete.') elif check_db_status_var('derived_tables_refresh_status') == 'active': - msg = f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \ - if i == 0 else '- trying again' - print(msg) + print(f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \ + if i == 1 else '- trying again') time.sleep(polling_interval_seconds) else: try: @@ -186,8 +188,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol # The following two calls yield equivalent results as of 2023/08/08. I've commented out # refresh_derived_tables() in case anything goes wrong with refresh_any_dependent_tables(), since that # is based on a heuristic currently, and if anything goes wrong, we may want to switch back. -joeflack4 - # refresh_derived_tables(con, CORE_CSET_DEPENDENT_TABLES, schema) - refresh_any_dependent_tables(con, CORE_CSET_TABLES, schema) + # refresh_derived_tables_exec(con, CORE_CSET_DEPENDENT_TABLES, schema) + refresh_any_dependent_tables(con, independent_tables, schema) finally: update_db_status_var('derived_tables_refresh_status', 'inactive') break @@ -222,12 +224,6 @@ def set_search_path(dbapi_connection, connection_record): return engine.connect() -def chunk_list(input_list: List, chunk_size) -> List[List]: - """Split a list into chunks""" - for i in range(0, len(input_list), chunk_size): - yield input_list[i:i + chunk_size] - - def current_datetime(time_zone=['UTC/GMT', 'EST/EDT'][1]) -> str: """Get current datetime in ISO format as a string.""" if time_zone == 'UTC/GMT': diff --git a/enclave_wrangler/datasets.py b/enclave_wrangler/datasets.py index 1b7f80a85..9e8a25275 100644 --- a/enclave_wrangler/datasets.py +++ b/enclave_wrangler/datasets.py @@ -4,28 +4,24 @@ # TODO: get rid of unnamed row number at start of csv files """ +import os +import re +import shutil import sys +import tempfile +import time from argparse import ArgumentParser from pathlib import Path - from typing import Dict, List, Union -from typeguard import typechecked -import os -import re + import pandas as pd -import tempfile -# import pyarrow as pa -import pyarrow.parquet as pq -# import asyncio -import shutil -import time +from typeguard import typechecked ENCLAVE_WRANGLER_DIR = os.path.dirname(__file__) PROJECT_ROOT = Path(ENCLAVE_WRANGLER_DIR).parent sys.path.insert(0, str(PROJECT_ROOT)) # TODO: backend implorts: Ideally we don't want to couple with TermHub code -from backend.db.utils import chunk_list -from backend.utils import commify, pdump +from backend.utils import pdump from enclave_wrangler.config import TERMHUB_CSETS_DIR, DATASET_REGISTRY, DATASET_REGISTRY_RID_NAME_MAP from enclave_wrangler.utils import enclave_get, log_debug_info @@ -37,15 +33,15 @@ # #'content-type': 'application/json' # } DEBUG = False -# TODO: Once git LFS set up, dl directly to datasets folder, or put these in raw/ and move csvs_repaired to datasets/ CSV_DOWNLOAD_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'downloads') CSV_TRANSFORM_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'prepped_files') DESC = 'Tool for working w/ the Palantir Foundry enclave API. This part is for downloading enclave datasets.' -os.makedirs(CSV_TRANSFORM_DIR, exist_ok=True) +for _dir in [CSV_DOWNLOAD_DIR, CSV_TRANSFORM_DIR]: + os.makedirs(_dir, exist_ok=True) @typechecked -def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: +def get_transaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getTransaction tested with curl: @@ -70,39 +66,40 @@ def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[st @typechecked -def views2(dataset_rid: str, endRef: str) -> [str]: +def views2(dataset_rid: str, end_ref: str) -> [str]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getDatasetViewFiles2 tested with curl: curl https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100 -H "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" | json_pp """ - curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" - + # curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" if DEBUG: log_debug_info() - endpoint = 'https://unite.nih.gov/foundry-catalog/api/catalog/datasets/' - template = '{endpoint}{dataset_rid}/views2/{endRef}/files?pageSize=100' - url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, endRef=endRef) - + url = f'{endpoint}{dataset_rid}/views2/{end_ref}/files?pageSize=100' response = enclave_get(url, verbose=False) response_json = response.json() file_parts = [f['logicalPath'] for f in response_json['values']] file_parts = [fp for fp in file_parts if re.match(r'.*part-\d\d\d\d\d', fp)] return file_parts +# TODO: temp +def get_termhub_disk_usage() -> float: + """Get usage in GB""" + root_directory = Path(PROJECT_ROOT) + s_bytes = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) + return s_bytes / (1024 ** 3) + + @typechecked -def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str) -> pd.DataFrame: +def download_csv_from_parquet_parts(fav: dict, file_parts: [str], outpath: str): """tested with cURL: wget https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views/master/spark%2Fpart-00000-c94edb9f-1221-4ae8-ba74-58848a4d79cb-c000.snappy.parquet --header "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" """ dataset_rid: str = fav['rid'] endpoint = 'https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets' template = '{endpoint}/{dataset_rid}/views/master/{fp}' - # if DEBUG: - - # download parquet files with tempfile.TemporaryDirectory() as parquet_dir: # flush: for gh action; otherwise does not always show in the log print(f'\nINFO: Downloading {outpath}; tempdir {parquet_dir}, filepart endpoints:', flush=True) @@ -111,72 +108,15 @@ def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: st print('\t' + f'{index + 1} of {len(file_parts)}: {url}') response = enclave_get(url, args={'stream': True}, verbose=False) if response.status_code == 200: - fname = parquet_dir + fp.replace('spark', '') - with open(fname, "wb") as f: - response.raw.decode_content = True + parquet_part_path = parquet_dir + fp.replace('spark', '') + response.raw.decode_content = True + with open(parquet_part_path, "wb") as f: shutil.copyfileobj(response.raw, f) + df = pd.read_parquet(parquet_part_path) + df.to_csv(outpath, index=False, header=True if index == 0 else False, mode='a') else: raise RuntimeError(f'Failed opening {url} with {response.status_code}: {response.content}') - - print('INFO: Combining parquet files: Saving chunks') - combined_parquet_fname = parquet_dir + '/combined.parquet' - files: List[str] = [] - for file_name in os.listdir(parquet_dir): - files.append(os.path.join(parquet_dir, file_name)) - - df = pd.DataFrame() - chunk_paths = [] - # TODO: why don't we just convert parquet to csv one at a time? - # TODO: Changing this to see if helps - # chunks = chunk_list(files, 5) # chunks size 5: arbitrary - chunks = chunk_list(files, len(files)) - # Chunked to avoid memory issues w/ GitHub Action. - for chunk_n, chunk in enumerate(chunks): - if chunk[0].endswith('.parquet'): - combine_parquet_files(chunk, combined_parquet_fname) - df = pd.read_parquet(combined_parquet_fname) - elif chunk[0].endswith('.csv'): - if len(chunk) != 1: - raise RuntimeError(f"with csv, only expected one file; got: [{', '.join(chunk)}]") - df = pd.read_csv(chunk[0], names=fav['column_names']) - else: - raise RuntimeError(f"unexpected file(s) downloaded: [{', '.join(chunk)}]") - if outpath: - os.makedirs(os.path.dirname(outpath), exist_ok=True) - outpath_i = outpath.replace('.csv', f'_{chunk_n}.csv') - chunk_paths.append(outpath_i) - df.to_csv(outpath_i, index=False) - - print('INFO: Combining parquet files: Combining chunks') - if outpath: - df = pd.concat([pd.read_csv(path) for path in chunk_paths]) - df.to_csv(outpath, index=False) - for path in chunk_paths: - os.remove(path) - - print(f'Downloaded {os.path.basename(outpath)}, {commify(len(df))} records\n') - return df - - -def combine_parquet_files(input_files, target_path): - """Combine parquet files""" - files = [] - try: - for file_name in input_files: - files.append(pq.read_table(file_name)) - with pq.ParquetWriter( - target_path, - files[0].schema, - version='2.0', - compression='gzip', - use_dictionary=True, - data_page_size=2097152, # 2MB - write_statistics=True - ) as writer: - for f in files: - writer.write_table(f) - except Exception as e: - print(e, file=sys.stderr) + print(f'Downloaded {os.path.basename(outpath)}\n') # todo: for this and alltransform_* funcs. They have a common pattern, especially first couple lines. Can we refactor? @@ -191,24 +131,9 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: """Transformations to concept_relationship.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN & Filter - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - # JOIN - cr_csm = df.merge(csm_df, left_on='concept_id_1', right_on='concept_id') - # Filter - df = df[df['concept_id_1'].isin(cr_csm['concept_id_1'])] - except FileNotFoundError: - print('Warning: Tried transforming concept_relationship.csv, but concept_set_members.csv must be downloaded ' - 'and transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - # Filter df2 = df[df['relationship_id'] == 'Subsumes'] df2.to_csv(os.path.join(CSV_TRANSFORM_DIR, 'concept_relationship_subsumes_only.csv'), index=False) - return df @@ -216,18 +141,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: """Transformations to concept.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - concept_csm = df.merge(csm_df, on='concept_id') - df = df[df['concept_id'].isin(concept_csm['concept_id'])] - except FileNotFoundError: - print('Warning: Tried transforming concept.csv, but concept_set_members.csv must be downloaded and transformed ' - 'first. Try running again after this process completes. Eventually need to do these transformations ' - 'dependency ordered fashion.', file=sys.stderr) - return df @@ -235,18 +148,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: """Transformations to concept_ancestor.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - ca_csm = df.merge(csm_df, left_on='ancestor_concept_id', right_on='concept_id') - df = df[df['ancestor_concept_id'].isin(ca_csm['ancestor_concept_id'])] - except FileNotFoundError: - print('Warning: Tried transforming concept_ancestor.csv, but concept_set_members.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -257,19 +158,6 @@ def transform_dataset__concept_set_members(dataset_name: str) -> pd.DataFrame: # dtype={'archived': bool}, # doesn't work because of missing values converters={'archived': lambda x: True if x == 'True' else False}, # this makes it a bool field keep_default_na=False).fillna('') - # JOIN - try: - # Note: Depends on `code_sets.csv` now -- don't load concept_set_members unless codeset exists - # don't have to do that anymore, I think - cs_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'code_sets.csv'), keep_default_na=False).fillna('') - codeset_ids = set(cs_df['codeset_id']) - df = df[df['codeset_id'].isin(codeset_ids)] - except FileNotFoundError: - print('Warning: Tried transforming code_sets.csv, but concept_set_container.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - df = transforms_common(df, dataset_name) return df @@ -278,21 +166,6 @@ def transform_dataset__code_sets(dataset_name: str) -> pd.DataFrame: """Transformations to code_sets.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - # Note: Depends on `concept_set_container.csv` -- don't load code_sets unless container exists - # Note: Depends on `concept_set_container.csv`, but there is no transform for it. So, read from DL dir. - # don't have to do that anymore, I think - csc_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_container.csv'), keep_default_na=False).fillna('') - container_concept_set_name_ids = set(csc_df['concept_set_id']) - df = df[df['concept_set_name'].isin(container_concept_set_name_ids)] - except FileNotFoundError: - print('Warning: Tried transforming code_sets.csv, but concept_set_container.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -302,15 +175,17 @@ def transforms_common(df: pd.DataFrame, dataset_name) -> pd.DataFrame: for col in [x for x in list(df.columns) if x.startswith('Unnamed')]: # e.g. 'Unnamed: 0' df.drop(col, axis=1, inplace=True) df.sort_values(DATASET_REGISTRY[dataset_name]['sort_idx'], inplace=True) - return df # TODO: currently overwrites if download is newer than prepped. should also overwrite if dependency # prepped files are newer than this -def transform(fav: dict) -> pd.DataFrame: - """Data transformations""" - dataset_name: str = fav['name'] +def transform(dataset_config: dict): + """Data transformations + + The input/outputs of this function are files. It reads the filename from `dataset_config`, then reads that file, + transforms, and writes back to that file.""" + dataset_name: str = dataset_config['name'] print(f'INFO: Transforming: {dataset_name}') inpath = os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv') outpath = os.path.join(CSV_TRANSFORM_DIR, dataset_name + '.csv') @@ -334,7 +209,7 @@ def transform(fav: dict) -> pd.DataFrame: if func: df = func(dataset_name) else: - converters = fav.get('converters') or {} + converters = dataset_config.get('converters') or {} df = pd.read_csv(inpath, keep_default_na=False, converters=converters).fillna('') df = transforms_common(df, dataset_name) @@ -354,7 +229,7 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: rid = dataset_name_or_rid if dataset_name_or_rid.startswith('ri.foundry.main.dataset.') \ else DATASET_REGISTRY[dataset_name_or_rid]['rid'] ref = 'master' - transaction = getTransaction(rid, ref, return_field=None) + transaction = get_transaction(rid, ref, return_field=None) # pdump(transaction) if transaction['status'] != 'COMMITTED': pdump(transaction) @@ -364,16 +239,15 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: def download_and_transform( dataset_name: str = None, dataset_rid: str = None, ref: str = 'master', output_dir: str = None, outpath: str = None, - transforms_only=False, fav: Dict = None, force_if_exists=True -) -> pd.DataFrame: + transforms_only=False, dataset_config: Dict = None, force_if_exists=True +): """Download dataset & run transformations""" print(f'INFO: Downloading: {dataset_name}') dataset_rid = DATASET_REGISTRY[dataset_name]['rid'] if not dataset_rid else dataset_rid dataset_name = DATASET_REGISTRY_RID_NAME_MAP[dataset_rid] if not dataset_name else dataset_name - fav = fav if fav else DATASET_REGISTRY[dataset_name] + dataset_config = dataset_config if dataset_config else DATASET_REGISTRY[dataset_name] # Download - df = pd.DataFrame() if not transforms_only: # todo: Temp: would be good to accept either 'outdir' or 'outpath'. if not outpath: @@ -385,16 +259,14 @@ def download_and_transform( if os.path.exists(outpath): t = time.ctime(os.path.getmtime(outpath)) print(f'Clobbering {os.path.basename(outpath)}: {t}, {os.path.getsize(outpath)} bytes.') - end_ref = getTransaction(dataset_rid, ref) - args = {'dataset_rid': dataset_rid, 'endRef': end_ref} + end_ref = get_transaction(dataset_rid, ref) + args = {'dataset_rid': dataset_rid, 'end_ref': end_ref} file_parts = views2(**args) # asyncio.run(download_and_combine_dataset_parts(dataset_rid, file_parts)) - df: pd.DataFrame = download_and_combine_dataset_parts(fav, file_parts, outpath=outpath) + download_csv_from_parquet_parts(dataset_config, file_parts, outpath=outpath) # Transform - df2: pd.DataFrame = transform(fav) - - return df2 if len(df2) > 0 else df + transform(dataset_config) def download_datasets( @@ -411,7 +283,7 @@ def download_datasets( else configs for conf in datasets_configs: download_and_transform( - fav=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), + dataset_config=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), transforms_only=transforms_only, force_if_exists=force_if_exists) # todo: ideally would allow user to select output dir that contains both CSV_DOWNLOAD_DIR and CSV_TRANSFORM_DIR diff --git a/enclave_wrangler/objects_api.py b/enclave_wrangler/objects_api.py index 5d97cafd5..59732d01e 100644 --- a/enclave_wrangler/objects_api.py +++ b/enclave_wrangler/objects_api.py @@ -37,7 +37,7 @@ make_objects_request from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey from backend.db.utils import SCHEMA, insert_fetch_statuses, insert_from_dict, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ sql_query_single_col, run_sql, get_db_connection from backend.db.queries import get_concepts from backend.utils import call_github_action, pdump @@ -513,7 +513,7 @@ def csets_and_members_to_db(con: Connection, csets_and_members: Dict[str, List[D print(f' - concept_set_members completed in {(datetime.now() - t2).seconds} seconds') # Derived tables - refresh_termhub_core_cset_derived_tables(con, schema) + refresh_derived_tables(con, schema) def fetch_object_by_id( diff --git a/makefile b/makefile index ad1d6c7a4..76a9eb6d8 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,8 @@ SRC=backend/ .PHONY: lint tags ltags test all lintall codestyle docstyle lintsrc linttest doctest doc docs code linters_all codesrc \ -codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets +codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets \ +refresh-counts refresh-vocab serve-frontend serve-backend # Analysis ANALYSIS_SCRIPT = 'backend/db/analysis.py' @@ -22,12 +23,6 @@ deltas-table: counts-docs: @python $(ANALYSIS_SCRIPT) --counts-docs -# todo -#deltas-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_delta_viz -#counts-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_counts_viz - # counts-update: Update 'counts' table with current row counts for the 'n3c' schema. Adds note to the 'counts-runs' table. counts-update: @python $(ANALYSIS_SCRIPT) --counts-update @@ -107,6 +102,12 @@ test-frontend-deployments: fetch-missing-csets: python enclave_wrangler/objects_api.py --find-and-add-missing-csets-to-db +# Database refreshes +refresh-counts: + python backend/db/refresh_dataset_group_tables.py --dataset-group counts +refresh-vocab: + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + # Serve # nvm allows to switch to a particular versio of npm/node. Useful for working w/ deployment # https://github.com/nvm-sh/nvm diff --git a/requirements-unlocked.txt b/requirements-unlocked.txt index bbe84c98e..ea3f38e25 100755 --- a/requirements-unlocked.txt +++ b/requirements-unlocked.txt @@ -21,6 +21,7 @@ psycopg2-binary networkx # dev dependencies +# psutil # for memory profiling virtualenvwrapper setuptools \ No newline at end of file diff --git a/termhub-vocab b/termhub-vocab index 9116fa434..b0cc420e4 160000 --- a/termhub-vocab +++ b/termhub-vocab @@ -1 +1 @@ -Subproject commit 9116fa434d406320ce90583d683414c9ad05dd42 +Subproject commit b0cc420e42558c42ccf537578196ca79908622bf