diff --git a/.github/workflows/refresh_counts.yml b/.github/workflows/refresh_counts.yml index 5c5e0f098..5ef0c6ae1 100644 --- a/.github/workflows/refresh_counts.yml +++ b/.github/workflows/refresh_counts.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-counts-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh counts - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group counts + run: make refresh-counts diff --git a/.github/workflows/refresh_voc.yml b/.github/workflows/refresh_voc.yml index 322350404..dbf2f259f 100644 --- a/.github/workflows/refresh_voc.yml +++ b/.github/workflows/refresh_voc.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-vocab-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh vocab - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + run: make refresh-vocab diff --git a/.github/workflows/refresh_voc_and_counts.yml b/.github/workflows/refresh_voc_and_counts.yml index 889d6944a..c609d24f7 100644 --- a/.github/workflows/refresh_voc_and_counts.yml +++ b/.github/workflows/refresh_voc_and_counts.yml @@ -7,7 +7,11 @@ on: workflow_dispatch: jobs: refresh-voc-and-counts: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest + # fail-fast: Can show why action unexpectedly stops: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures + strategy: + fail-fast: false steps: - name: Checkout repository and submodules uses: actions/checkout@v2 @@ -22,17 +26,14 @@ jobs: run: | echo "Commit hash: ${{ github.sha }}" echo "Branch: ${{ github.ref }}" - - name: 'Create env file' run: | mkdir env echo "${{ secrets.ENV_FILE }}" > env/.env - - name: Create and start virtual environment run: | python3 -m venv venv source venv/bin/activate - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -40,6 +41,9 @@ jobs: pip install --upgrade setuptools pip install -r requirements.txt - - name: Refresh voc and counts + - name: Refresh counts + run: | + python backend/db/refresh_dataset_group_tables.py --dataset-group counts + - name: Refresh vocab run: | - python backend/db/refresh_voc_and_counts.py + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab \ No newline at end of file diff --git a/backend/db/refresh_voc_and_counts.py b/backend/db/refresh_dataset_group_tables.py similarity index 82% rename from backend/db/refresh_voc_and_counts.py rename to backend/db/refresh_dataset_group_tables.py index e703eda0c..3fc6e0a89 100644 --- a/backend/db/refresh_voc_and_counts.py +++ b/backend/db/refresh_dataset_group_tables.py @@ -24,16 +24,16 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, check_db_status_var, get_db_connection, get_ddl_statements, load_csv, \ - refresh_any_dependent_tables, \ - run_sql + refresh_derived_tables, run_sql from enclave_wrangler.config import DATASET_GROUPS_CONFIG from enclave_wrangler.datasets import download_datasets, get_last_update_of_dataset -def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): +def refresh_voc_and_counts(dataset_group: List[str], skip_downloads: bool = False, schema=SCHEMA): """Refresh vocabulary and counts tables.""" print('Refreshing vocabulary and counts tables.') - for group_name, config in DATASET_GROUPS_CONFIG.items(): + selected_configs = {k: v for k, v in DATASET_GROUPS_CONFIG.items() if k in dataset_group} + for group_name, config in selected_configs.items(): print(f'\nRefreshing {group_name} tables...') # Check if tables are already up to date last_updated_us: str = check_db_status_var(config['last_updated_termhub_var']) @@ -54,24 +54,30 @@ def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): load_csv(con, table, replace_rule='do not replace', schema=SCHEMA, optional_suffix='_new') run_sql(con, f'ALTER TABLE IF EXISTS {schema}.{table} RENAME TO {table}_old;') run_sql(con, f'ALTER TABLE {schema}.{table}_new RENAME TO {table};') - run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;') + t1 = datetime.now() print(f' done in {(t1 - t0).seconds} seconds') # todo: set variable for 'last updated' for each table (look at load()) # - consider: check if table already updated sooner than last_updated_them. if so, skip. and add a param to CLI for this print('Creating indexes') - statements: List[str] = get_ddl_statements(schema, ['indexes'], 'flat') + statements: List[str] = get_ddl_statements(schema, ['indexes'], return_type='flat') for statement in statements: run_sql(con, statement) - # print('Recreating derived tables') # printed w/in refresh_derived_tables() - refresh_any_dependent_tables(con, config['tables']) - + print('Recreating derived tables') + refresh_derived_tables(con, config['tables'], schema) + print('Deleting old, temporarily backed up versions of tables') + for table in config['tables']: + run_sql(con, f'DROP TABLE IF EXISTS {schema}.{table}_old;') print('Done') def cli(): """Command line interface""" parser = ArgumentParser(prog='Refresh vocabulary and counts tables.') + parser.add_argument( + '-d', '--dataset-group', nargs='+', choices=list(DATASET_GROUPS_CONFIG.keys()), + default=list(DATASET_GROUPS_CONFIG.keys()), + help='Names of dataset/table groups to refresh.') parser.add_argument( '-s', '--skip-downloads', action='store_true', help='Use if you have already downloaded updated files.') diff --git a/backend/db/refresh_from_datasets.py b/backend/db/refresh_from_datasets.py index b923641e8..0edbd046b 100755 --- a/backend/db/refresh_from_datasets.py +++ b/backend/db/refresh_from_datasets.py @@ -105,7 +105,8 @@ def cli(): parser.add_argument( '-l', '--use-local-db', action='store_true', default=False, help='Use local database instead of server.') parser.add_argument( - '-z', '--run-final-ddl-only', action='store_true', default=False, help='Only run indexes_and_derived_tables (ddl.jinja.sql).') + '-z', '--run-final-ddl-only', action='store_true', default=False, + help='Only run indexes_and_derived_tables (ddl.jinja.sql).') reset_and_update_db(**vars(parser.parse_args())) diff --git a/backend/db/resolve_fetch_failures_0_members.py b/backend/db/resolve_fetch_failures_0_members.py index 6baea9258..112c31a24 100644 --- a/backend/db/resolve_fetch_failures_0_members.py +++ b/backend/db/resolve_fetch_failures_0_members.py @@ -17,7 +17,7 @@ from enclave_wrangler.objects_api import concept_set_members__from_csets_and_members_to_db, \ fetch_cset_and_member_objects from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, select_failed_fetches, \ - refresh_termhub_core_cset_derived_tables + refresh_derived_tables DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API." @@ -78,7 +78,7 @@ def resolve_fetch_failures_0_members( if success_cases: with get_db_connection(schema=schema, local=use_local_db) as con: concept_set_members__from_csets_and_members_to_db(con, csets_and_members) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Report success if success_cases: diff --git a/backend/db/resolve_fetch_failures_excess_items.py b/backend/db/resolve_fetch_failures_excess_items.py index f059e725d..3f82d684a 100644 --- a/backend/db/resolve_fetch_failures_excess_items.py +++ b/backend/db/resolve_fetch_failures_excess_items.py @@ -11,7 +11,7 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ select_failed_fetches from enclave_wrangler.datasets import CSV_TRANSFORM_DIR, download_datasets from enclave_wrangler.utils import was_file_modified_within_threshold @@ -74,7 +74,7 @@ def resolve_fetch_failures_excess_items(use_local_db=False, cached_dataset_thres if rows: insert_from_dicts(con, dataset, rows) solved_failures.append(failure) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Update fetch_audit status fetch_status_set_success(solved_failures) diff --git a/backend/db/utils.py b/backend/db/utils.py index e43fcb73f..7e3cf56a6 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -122,10 +122,10 @@ def refresh_any_dependent_tables(con: Connection, independent_tables: List[str] if not derived_tables: print(f'No derived tables found for: {", ".join(independent_tables)}') return - refresh_derived_tables(con, derived_tables, schema) + refresh_derived_tables_exec(con, derived_tables, schema) -def refresh_derived_tables( +def refresh_derived_tables_exec( con: Connection, derived_tables_queue: List[str] = CORE_CSET_DEPENDENT_TABLES, schema=SCHEMA ): """Refresh TermHub core cset derived tables @@ -165,7 +165,10 @@ def refresh_derived_tables( # todo: move this somewhere else, possibly load.py or db_refresh.py # todo: what to do if this process fails? any way to roll back? should we? # todo: currently has no way of passing 'local' down to db status var funcs -def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, polling_interval_seconds: int = 30): +def refresh_derived_tables( + con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, + polling_interval_seconds: int = 30 +): """Refresh TermHub core cset derived tables: wrapper function Handles simultaneous requests and try/except for worker function: refresh_termhub_core_cset_derived_tables_exec()""" @@ -176,9 +179,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol if (datetime.now() - t0).total_seconds() >= 2 * 60 * 60: # 2 hours raise RuntimeError('Timed out after waiting 2 hours for other active derived table refresh to complete.') elif check_db_status_var('derived_tables_refresh_status') == 'active': - msg = f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \ - if i == 0 else '- trying again' - print(msg) + print(f'Another derived table refresh is active. Waiting {polling_interval_seconds} seconds to try again.' \ + if i == 1 else '- trying again') time.sleep(polling_interval_seconds) else: try: @@ -186,8 +188,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol # The following two calls yield equivalent results as of 2023/08/08. I've commented out # refresh_derived_tables() in case anything goes wrong with refresh_any_dependent_tables(), since that # is based on a heuristic currently, and if anything goes wrong, we may want to switch back. -joeflack4 - # refresh_derived_tables(con, CORE_CSET_DEPENDENT_TABLES, schema) - refresh_any_dependent_tables(con, CORE_CSET_TABLES, schema) + # refresh_derived_tables_exec(con, CORE_CSET_DEPENDENT_TABLES, schema) + refresh_any_dependent_tables(con, independent_tables, schema) finally: update_db_status_var('derived_tables_refresh_status', 'inactive') break @@ -222,12 +224,6 @@ def set_search_path(dbapi_connection, connection_record): return engine.connect() -def chunk_list(input_list: List, chunk_size) -> List[List]: - """Split a list into chunks""" - for i in range(0, len(input_list), chunk_size): - yield input_list[i:i + chunk_size] - - def current_datetime(time_zone=['UTC/GMT', 'EST/EDT'][1]) -> str: """Get current datetime in ISO format as a string.""" if time_zone == 'UTC/GMT': diff --git a/enclave_wrangler/datasets.py b/enclave_wrangler/datasets.py index 1b7f80a85..9e8a25275 100644 --- a/enclave_wrangler/datasets.py +++ b/enclave_wrangler/datasets.py @@ -4,28 +4,24 @@ # TODO: get rid of unnamed row number at start of csv files """ +import os +import re +import shutil import sys +import tempfile +import time from argparse import ArgumentParser from pathlib import Path - from typing import Dict, List, Union -from typeguard import typechecked -import os -import re + import pandas as pd -import tempfile -# import pyarrow as pa -import pyarrow.parquet as pq -# import asyncio -import shutil -import time +from typeguard import typechecked ENCLAVE_WRANGLER_DIR = os.path.dirname(__file__) PROJECT_ROOT = Path(ENCLAVE_WRANGLER_DIR).parent sys.path.insert(0, str(PROJECT_ROOT)) # TODO: backend implorts: Ideally we don't want to couple with TermHub code -from backend.db.utils import chunk_list -from backend.utils import commify, pdump +from backend.utils import pdump from enclave_wrangler.config import TERMHUB_CSETS_DIR, DATASET_REGISTRY, DATASET_REGISTRY_RID_NAME_MAP from enclave_wrangler.utils import enclave_get, log_debug_info @@ -37,15 +33,15 @@ # #'content-type': 'application/json' # } DEBUG = False -# TODO: Once git LFS set up, dl directly to datasets folder, or put these in raw/ and move csvs_repaired to datasets/ CSV_DOWNLOAD_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'downloads') CSV_TRANSFORM_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'prepped_files') DESC = 'Tool for working w/ the Palantir Foundry enclave API. This part is for downloading enclave datasets.' -os.makedirs(CSV_TRANSFORM_DIR, exist_ok=True) +for _dir in [CSV_DOWNLOAD_DIR, CSV_TRANSFORM_DIR]: + os.makedirs(_dir, exist_ok=True) @typechecked -def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: +def get_transaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getTransaction tested with curl: @@ -70,39 +66,40 @@ def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[st @typechecked -def views2(dataset_rid: str, endRef: str) -> [str]: +def views2(dataset_rid: str, end_ref: str) -> [str]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getDatasetViewFiles2 tested with curl: curl https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100 -H "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" | json_pp """ - curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" - + # curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" if DEBUG: log_debug_info() - endpoint = 'https://unite.nih.gov/foundry-catalog/api/catalog/datasets/' - template = '{endpoint}{dataset_rid}/views2/{endRef}/files?pageSize=100' - url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, endRef=endRef) - + url = f'{endpoint}{dataset_rid}/views2/{end_ref}/files?pageSize=100' response = enclave_get(url, verbose=False) response_json = response.json() file_parts = [f['logicalPath'] for f in response_json['values']] file_parts = [fp for fp in file_parts if re.match(r'.*part-\d\d\d\d\d', fp)] return file_parts +# TODO: temp +def get_termhub_disk_usage() -> float: + """Get usage in GB""" + root_directory = Path(PROJECT_ROOT) + s_bytes = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) + return s_bytes / (1024 ** 3) + + @typechecked -def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str) -> pd.DataFrame: +def download_csv_from_parquet_parts(fav: dict, file_parts: [str], outpath: str): """tested with cURL: wget https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views/master/spark%2Fpart-00000-c94edb9f-1221-4ae8-ba74-58848a4d79cb-c000.snappy.parquet --header "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" """ dataset_rid: str = fav['rid'] endpoint = 'https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets' template = '{endpoint}/{dataset_rid}/views/master/{fp}' - # if DEBUG: - - # download parquet files with tempfile.TemporaryDirectory() as parquet_dir: # flush: for gh action; otherwise does not always show in the log print(f'\nINFO: Downloading {outpath}; tempdir {parquet_dir}, filepart endpoints:', flush=True) @@ -111,72 +108,15 @@ def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: st print('\t' + f'{index + 1} of {len(file_parts)}: {url}') response = enclave_get(url, args={'stream': True}, verbose=False) if response.status_code == 200: - fname = parquet_dir + fp.replace('spark', '') - with open(fname, "wb") as f: - response.raw.decode_content = True + parquet_part_path = parquet_dir + fp.replace('spark', '') + response.raw.decode_content = True + with open(parquet_part_path, "wb") as f: shutil.copyfileobj(response.raw, f) + df = pd.read_parquet(parquet_part_path) + df.to_csv(outpath, index=False, header=True if index == 0 else False, mode='a') else: raise RuntimeError(f'Failed opening {url} with {response.status_code}: {response.content}') - - print('INFO: Combining parquet files: Saving chunks') - combined_parquet_fname = parquet_dir + '/combined.parquet' - files: List[str] = [] - for file_name in os.listdir(parquet_dir): - files.append(os.path.join(parquet_dir, file_name)) - - df = pd.DataFrame() - chunk_paths = [] - # TODO: why don't we just convert parquet to csv one at a time? - # TODO: Changing this to see if helps - # chunks = chunk_list(files, 5) # chunks size 5: arbitrary - chunks = chunk_list(files, len(files)) - # Chunked to avoid memory issues w/ GitHub Action. - for chunk_n, chunk in enumerate(chunks): - if chunk[0].endswith('.parquet'): - combine_parquet_files(chunk, combined_parquet_fname) - df = pd.read_parquet(combined_parquet_fname) - elif chunk[0].endswith('.csv'): - if len(chunk) != 1: - raise RuntimeError(f"with csv, only expected one file; got: [{', '.join(chunk)}]") - df = pd.read_csv(chunk[0], names=fav['column_names']) - else: - raise RuntimeError(f"unexpected file(s) downloaded: [{', '.join(chunk)}]") - if outpath: - os.makedirs(os.path.dirname(outpath), exist_ok=True) - outpath_i = outpath.replace('.csv', f'_{chunk_n}.csv') - chunk_paths.append(outpath_i) - df.to_csv(outpath_i, index=False) - - print('INFO: Combining parquet files: Combining chunks') - if outpath: - df = pd.concat([pd.read_csv(path) for path in chunk_paths]) - df.to_csv(outpath, index=False) - for path in chunk_paths: - os.remove(path) - - print(f'Downloaded {os.path.basename(outpath)}, {commify(len(df))} records\n') - return df - - -def combine_parquet_files(input_files, target_path): - """Combine parquet files""" - files = [] - try: - for file_name in input_files: - files.append(pq.read_table(file_name)) - with pq.ParquetWriter( - target_path, - files[0].schema, - version='2.0', - compression='gzip', - use_dictionary=True, - data_page_size=2097152, # 2MB - write_statistics=True - ) as writer: - for f in files: - writer.write_table(f) - except Exception as e: - print(e, file=sys.stderr) + print(f'Downloaded {os.path.basename(outpath)}\n') # todo: for this and alltransform_* funcs. They have a common pattern, especially first couple lines. Can we refactor? @@ -191,24 +131,9 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: """Transformations to concept_relationship.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN & Filter - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - # JOIN - cr_csm = df.merge(csm_df, left_on='concept_id_1', right_on='concept_id') - # Filter - df = df[df['concept_id_1'].isin(cr_csm['concept_id_1'])] - except FileNotFoundError: - print('Warning: Tried transforming concept_relationship.csv, but concept_set_members.csv must be downloaded ' - 'and transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - # Filter df2 = df[df['relationship_id'] == 'Subsumes'] df2.to_csv(os.path.join(CSV_TRANSFORM_DIR, 'concept_relationship_subsumes_only.csv'), index=False) - return df @@ -216,18 +141,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: """Transformations to concept.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - concept_csm = df.merge(csm_df, on='concept_id') - df = df[df['concept_id'].isin(concept_csm['concept_id'])] - except FileNotFoundError: - print('Warning: Tried transforming concept.csv, but concept_set_members.csv must be downloaded and transformed ' - 'first. Try running again after this process completes. Eventually need to do these transformations ' - 'dependency ordered fashion.', file=sys.stderr) - return df @@ -235,18 +148,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: """Transformations to concept_ancestor.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - csm_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_members.csv'), keep_default_na=False).fillna('') - ca_csm = df.merge(csm_df, left_on='ancestor_concept_id', right_on='concept_id') - df = df[df['ancestor_concept_id'].isin(ca_csm['ancestor_concept_id'])] - except FileNotFoundError: - print('Warning: Tried transforming concept_ancestor.csv, but concept_set_members.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -257,19 +158,6 @@ def transform_dataset__concept_set_members(dataset_name: str) -> pd.DataFrame: # dtype={'archived': bool}, # doesn't work because of missing values converters={'archived': lambda x: True if x == 'True' else False}, # this makes it a bool field keep_default_na=False).fillna('') - # JOIN - try: - # Note: Depends on `code_sets.csv` now -- don't load concept_set_members unless codeset exists - # don't have to do that anymore, I think - cs_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'code_sets.csv'), keep_default_na=False).fillna('') - codeset_ids = set(cs_df['codeset_id']) - df = df[df['codeset_id'].isin(codeset_ids)] - except FileNotFoundError: - print('Warning: Tried transforming code_sets.csv, but concept_set_container.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - df = transforms_common(df, dataset_name) return df @@ -278,21 +166,6 @@ def transform_dataset__code_sets(dataset_name: str) -> pd.DataFrame: """Transformations to code_sets.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - - # JOIN - try: - # Note: Depends on `concept_set_container.csv` -- don't load code_sets unless container exists - # Note: Depends on `concept_set_container.csv`, but there is no transform for it. So, read from DL dir. - # don't have to do that anymore, I think - csc_df = pd.read_csv( - os.path.join(CSV_TRANSFORM_DIR, 'concept_set_container.csv'), keep_default_na=False).fillna('') - container_concept_set_name_ids = set(csc_df['concept_set_id']) - df = df[df['concept_set_name'].isin(container_concept_set_name_ids)] - except FileNotFoundError: - print('Warning: Tried transforming code_sets.csv, but concept_set_container.csv must be downloaded and ' - 'transformed first. Try running again after this process completes. Eventually need to do these ' - 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -302,15 +175,17 @@ def transforms_common(df: pd.DataFrame, dataset_name) -> pd.DataFrame: for col in [x for x in list(df.columns) if x.startswith('Unnamed')]: # e.g. 'Unnamed: 0' df.drop(col, axis=1, inplace=True) df.sort_values(DATASET_REGISTRY[dataset_name]['sort_idx'], inplace=True) - return df # TODO: currently overwrites if download is newer than prepped. should also overwrite if dependency # prepped files are newer than this -def transform(fav: dict) -> pd.DataFrame: - """Data transformations""" - dataset_name: str = fav['name'] +def transform(dataset_config: dict): + """Data transformations + + The input/outputs of this function are files. It reads the filename from `dataset_config`, then reads that file, + transforms, and writes back to that file.""" + dataset_name: str = dataset_config['name'] print(f'INFO: Transforming: {dataset_name}') inpath = os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv') outpath = os.path.join(CSV_TRANSFORM_DIR, dataset_name + '.csv') @@ -334,7 +209,7 @@ def transform(fav: dict) -> pd.DataFrame: if func: df = func(dataset_name) else: - converters = fav.get('converters') or {} + converters = dataset_config.get('converters') or {} df = pd.read_csv(inpath, keep_default_na=False, converters=converters).fillna('') df = transforms_common(df, dataset_name) @@ -354,7 +229,7 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: rid = dataset_name_or_rid if dataset_name_or_rid.startswith('ri.foundry.main.dataset.') \ else DATASET_REGISTRY[dataset_name_or_rid]['rid'] ref = 'master' - transaction = getTransaction(rid, ref, return_field=None) + transaction = get_transaction(rid, ref, return_field=None) # pdump(transaction) if transaction['status'] != 'COMMITTED': pdump(transaction) @@ -364,16 +239,15 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: def download_and_transform( dataset_name: str = None, dataset_rid: str = None, ref: str = 'master', output_dir: str = None, outpath: str = None, - transforms_only=False, fav: Dict = None, force_if_exists=True -) -> pd.DataFrame: + transforms_only=False, dataset_config: Dict = None, force_if_exists=True +): """Download dataset & run transformations""" print(f'INFO: Downloading: {dataset_name}') dataset_rid = DATASET_REGISTRY[dataset_name]['rid'] if not dataset_rid else dataset_rid dataset_name = DATASET_REGISTRY_RID_NAME_MAP[dataset_rid] if not dataset_name else dataset_name - fav = fav if fav else DATASET_REGISTRY[dataset_name] + dataset_config = dataset_config if dataset_config else DATASET_REGISTRY[dataset_name] # Download - df = pd.DataFrame() if not transforms_only: # todo: Temp: would be good to accept either 'outdir' or 'outpath'. if not outpath: @@ -385,16 +259,14 @@ def download_and_transform( if os.path.exists(outpath): t = time.ctime(os.path.getmtime(outpath)) print(f'Clobbering {os.path.basename(outpath)}: {t}, {os.path.getsize(outpath)} bytes.') - end_ref = getTransaction(dataset_rid, ref) - args = {'dataset_rid': dataset_rid, 'endRef': end_ref} + end_ref = get_transaction(dataset_rid, ref) + args = {'dataset_rid': dataset_rid, 'end_ref': end_ref} file_parts = views2(**args) # asyncio.run(download_and_combine_dataset_parts(dataset_rid, file_parts)) - df: pd.DataFrame = download_and_combine_dataset_parts(fav, file_parts, outpath=outpath) + download_csv_from_parquet_parts(dataset_config, file_parts, outpath=outpath) # Transform - df2: pd.DataFrame = transform(fav) - - return df2 if len(df2) > 0 else df + transform(dataset_config) def download_datasets( @@ -411,7 +283,7 @@ def download_datasets( else configs for conf in datasets_configs: download_and_transform( - fav=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), + dataset_config=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), transforms_only=transforms_only, force_if_exists=force_if_exists) # todo: ideally would allow user to select output dir that contains both CSV_DOWNLOAD_DIR and CSV_TRANSFORM_DIR diff --git a/enclave_wrangler/objects_api.py b/enclave_wrangler/objects_api.py index 5d97cafd5..59732d01e 100644 --- a/enclave_wrangler/objects_api.py +++ b/enclave_wrangler/objects_api.py @@ -37,7 +37,7 @@ make_objects_request from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey from backend.db.utils import SCHEMA, insert_fetch_statuses, insert_from_dict, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ sql_query_single_col, run_sql, get_db_connection from backend.db.queries import get_concepts from backend.utils import call_github_action, pdump @@ -513,7 +513,7 @@ def csets_and_members_to_db(con: Connection, csets_and_members: Dict[str, List[D print(f' - concept_set_members completed in {(datetime.now() - t2).seconds} seconds') # Derived tables - refresh_termhub_core_cset_derived_tables(con, schema) + refresh_derived_tables(con, schema) def fetch_object_by_id( diff --git a/makefile b/makefile index ad1d6c7a4..76a9eb6d8 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,8 @@ SRC=backend/ .PHONY: lint tags ltags test all lintall codestyle docstyle lintsrc linttest doctest doc docs code linters_all codesrc \ -codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets +codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets \ +refresh-counts refresh-vocab serve-frontend serve-backend # Analysis ANALYSIS_SCRIPT = 'backend/db/analysis.py' @@ -22,12 +23,6 @@ deltas-table: counts-docs: @python $(ANALYSIS_SCRIPT) --counts-docs -# todo -#deltas-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_delta_viz -#counts-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_counts_viz - # counts-update: Update 'counts' table with current row counts for the 'n3c' schema. Adds note to the 'counts-runs' table. counts-update: @python $(ANALYSIS_SCRIPT) --counts-update @@ -107,6 +102,12 @@ test-frontend-deployments: fetch-missing-csets: python enclave_wrangler/objects_api.py --find-and-add-missing-csets-to-db +# Database refreshes +refresh-counts: + python backend/db/refresh_dataset_group_tables.py --dataset-group counts +refresh-vocab: + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + # Serve # nvm allows to switch to a particular versio of npm/node. Useful for working w/ deployment # https://github.com/nvm-sh/nvm diff --git a/requirements-unlocked.txt b/requirements-unlocked.txt index bbe84c98e..ea3f38e25 100755 --- a/requirements-unlocked.txt +++ b/requirements-unlocked.txt @@ -21,6 +21,7 @@ psycopg2-binary networkx # dev dependencies +# psutil # for memory profiling virtualenvwrapper setuptools \ No newline at end of file diff --git a/termhub-vocab b/termhub-vocab index 9116fa434..b0cc420e4 160000 --- a/termhub-vocab +++ b/termhub-vocab @@ -1 +1 @@ -Subproject commit 9116fa434d406320ce90583d683414c9ad05dd42 +Subproject commit b0cc420e42558c42ccf537578196ca79908622bf