From 9b032f754f5cc44a8f46a916fbb5dcced28969fa Mon Sep 17 00:00:00 2001 From: joeflack4 Date: Thu, 28 Sep 2023 20:37:51 -0400 Subject: [PATCH] Bugfix: Counts and Vocab refresh action(s) - Bugfix: Counts and Vocab refresh GH action: running out of memory. This was solved by removing a few unnecessary steps from our download process, where we thought we had to combine the parquet files and save them to CSV. Then, because we had a dynamic "chunking" operation which we weren't using in order to somehow save on memory, we needed to re-read the saved parquet file(s) and we were concatenating them in a single CSV. The end result was that we were using a lot of needless memory. Now, instead we are downloading the parquet files, converting to CSV on the fly for each part of the CSV, and appending to the output file. - Update: To use premium GitHub 'large runners' to solve resources issue Update: Generalized refresh of dataset groups - Update: refresh_voc_and_counts.py -> refresh_dataset_group_tables.py: Generalized this script so that we can have more flexibility in which groups we run. - Update: GH action: Also updated the GitHub action so that it runs these groups sequentially. That way, at least one group may successfully complete. - Update: makefile: With goals for each action More related updates - Update: Dataset group refresh stability: Now this task, similar to the normal DB refresh, will check to see if there's currently a dervied table refresh active and, if so, wait until it is done before proceeding. This will help in the incident of multiple refreshes happing simultaneously, especially now since we are splitting the vocab and counts refreshes apart. General - Update: makefile: Added missing phony targets, and removed some unnecessary stuff. --- .github/workflows/refresh_counts.yml | 6 +- .github/workflows/refresh_voc.yml | 14 +- .github/workflows/refresh_voc_and_counts.yml | 16 +- ...nts.py => refresh_dataset_group_tables.py} | 15 +- .../db/resolve_fetch_failures_0_members.py | 4 +- .../db/resolve_fetch_failures_excess_items.py | 4 +- backend/db/utils.py | 17 +- enclave_wrangler/datasets.py | 165 ++++++------------ enclave_wrangler/objects_api.py | 4 +- makefile | 17 +- requirements-unlocked.txt | 1 + requirements.txt | 5 +- 12 files changed, 114 insertions(+), 154 deletions(-) rename backend/db/{refresh_voc_and_counts.py => refresh_dataset_group_tables.py} (86%) diff --git a/.github/workflows/refresh_counts.yml b/.github/workflows/refresh_counts.yml index 5c5e0f098..5ef0c6ae1 100644 --- a/.github/workflows/refresh_counts.yml +++ b/.github/workflows/refresh_counts.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-counts-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh counts - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group counts + run: make refresh-counts diff --git a/.github/workflows/refresh_voc.yml b/.github/workflows/refresh_voc.yml index 322350404..bc50345d4 100644 --- a/.github/workflows/refresh_voc.yml +++ b/.github/workflows/refresh_voc.yml @@ -7,7 +7,16 @@ on: workflow_dispatch: jobs: refresh-vocab-tables: - runs-on: ubuntu-latest + runs-on: + group: organization/Default + labels: BIDS-Premium-Action-Runners +# Attempt 2 +# runs-on: +# group: Default +# labels: BIDS-Premium-Action-Runners +# Attempt 1 +# runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +50,4 @@ jobs: pip install -r requirements.txt - name: Refresh vocab - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + run: make refresh-vocab diff --git a/.github/workflows/refresh_voc_and_counts.yml b/.github/workflows/refresh_voc_and_counts.yml index 889d6944a..c609d24f7 100644 --- a/.github/workflows/refresh_voc_and_counts.yml +++ b/.github/workflows/refresh_voc_and_counts.yml @@ -7,7 +7,11 @@ on: workflow_dispatch: jobs: refresh-voc-and-counts: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest + # fail-fast: Can show why action unexpectedly stops: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures + strategy: + fail-fast: false steps: - name: Checkout repository and submodules uses: actions/checkout@v2 @@ -22,17 +26,14 @@ jobs: run: | echo "Commit hash: ${{ github.sha }}" echo "Branch: ${{ github.ref }}" - - name: 'Create env file' run: | mkdir env echo "${{ secrets.ENV_FILE }}" > env/.env - - name: Create and start virtual environment run: | python3 -m venv venv source venv/bin/activate - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -40,6 +41,9 @@ jobs: pip install --upgrade setuptools pip install -r requirements.txt - - name: Refresh voc and counts + - name: Refresh counts + run: | + python backend/db/refresh_dataset_group_tables.py --dataset-group counts + - name: Refresh vocab run: | - python backend/db/refresh_voc_and_counts.py + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab \ No newline at end of file diff --git a/backend/db/refresh_voc_and_counts.py b/backend/db/refresh_dataset_group_tables.py similarity index 86% rename from backend/db/refresh_voc_and_counts.py rename to backend/db/refresh_dataset_group_tables.py index e703eda0c..1c3163ccb 100644 --- a/backend/db/refresh_voc_and_counts.py +++ b/backend/db/refresh_dataset_group_tables.py @@ -24,16 +24,16 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, check_db_status_var, get_db_connection, get_ddl_statements, load_csv, \ - refresh_any_dependent_tables, \ - run_sql + refresh_derived_tables, run_sql from enclave_wrangler.config import DATASET_GROUPS_CONFIG from enclave_wrangler.datasets import download_datasets, get_last_update_of_dataset -def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): +def refresh_voc_and_counts(dataset_group: List[str], skip_downloads: bool = False, schema=SCHEMA): """Refresh vocabulary and counts tables.""" print('Refreshing vocabulary and counts tables.') - for group_name, config in DATASET_GROUPS_CONFIG.items(): + selected_configs = {k: v for k, v in DATASET_GROUPS_CONFIG.items() if k in dataset_group} + for group_name, config in selected_configs.items(): print(f'\nRefreshing {group_name} tables...') # Check if tables are already up to date last_updated_us: str = check_db_status_var(config['last_updated_termhub_var']) @@ -64,14 +64,17 @@ def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): for statement in statements: run_sql(con, statement) # print('Recreating derived tables') # printed w/in refresh_derived_tables() - refresh_any_dependent_tables(con, config['tables']) - + refresh_derived_tables(con, config['tables'], schema) print('Done') def cli(): """Command line interface""" parser = ArgumentParser(prog='Refresh vocabulary and counts tables.') + parser.add_argument( + '-d', '--dataset-group', nargs='+', choices=list(DATASET_GROUPS_CONFIG.keys()), + default=list(DATASET_GROUPS_CONFIG.keys()), + help='Names of dataset/table groups to refresh.') parser.add_argument( '-s', '--skip-downloads', action='store_true', help='Use if you have already downloaded updated files.') diff --git a/backend/db/resolve_fetch_failures_0_members.py b/backend/db/resolve_fetch_failures_0_members.py index 6baea9258..112c31a24 100644 --- a/backend/db/resolve_fetch_failures_0_members.py +++ b/backend/db/resolve_fetch_failures_0_members.py @@ -17,7 +17,7 @@ from enclave_wrangler.objects_api import concept_set_members__from_csets_and_members_to_db, \ fetch_cset_and_member_objects from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, select_failed_fetches, \ - refresh_termhub_core_cset_derived_tables + refresh_derived_tables DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API." @@ -78,7 +78,7 @@ def resolve_fetch_failures_0_members( if success_cases: with get_db_connection(schema=schema, local=use_local_db) as con: concept_set_members__from_csets_and_members_to_db(con, csets_and_members) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Report success if success_cases: diff --git a/backend/db/resolve_fetch_failures_excess_items.py b/backend/db/resolve_fetch_failures_excess_items.py index f059e725d..3f82d684a 100644 --- a/backend/db/resolve_fetch_failures_excess_items.py +++ b/backend/db/resolve_fetch_failures_excess_items.py @@ -11,7 +11,7 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ select_failed_fetches from enclave_wrangler.datasets import CSV_TRANSFORM_DIR, download_datasets from enclave_wrangler.utils import was_file_modified_within_threshold @@ -74,7 +74,7 @@ def resolve_fetch_failures_excess_items(use_local_db=False, cached_dataset_thres if rows: insert_from_dicts(con, dataset, rows) solved_failures.append(failure) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Update fetch_audit status fetch_status_set_success(solved_failures) diff --git a/backend/db/utils.py b/backend/db/utils.py index 2414a3366..637e5b5ba 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -117,7 +117,7 @@ def refresh_any_dependent_tables(con: Connection, independent_tables: List[str] refresh_derived_tables(con, derived_tables, schema) -def refresh_derived_tables( +def refresh_derived_tables_exec( con: Connection, derived_tables_queue: List[str] = CORE_CSET_DEPENDENT_TABLES, schema=SCHEMA ): """Refresh TermHub core cset derived tables @@ -157,7 +157,10 @@ def refresh_derived_tables( # todo: move this somewhere else, possibly load.py or db_refresh.py # todo: what to do if this process fails? any way to roll back? should we? # todo: currently has no way of passing 'local' down to db status var funcs -def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, polling_interval_seconds: int = 30): +def refresh_derived_tables( + con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, + polling_interval_seconds: int = 30 +): """Refresh TermHub core cset derived tables: wrapper function Handles simultaneous requests and try/except for worker function: refresh_termhub_core_cset_derived_tables_exec()""" @@ -178,8 +181,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol # The following two calls yield equivalent results as of 2023/08/08. I've commented out # refresh_derived_tables() in case anything goes wrong with refresh_any_dependent_tables(), since that # is based on a heuristic currently, and if anything goes wrong, we may want to switch back. -joeflack4 - # refresh_derived_tables(con, CORE_CSET_DEPENDENT_TABLES, schema) - refresh_any_dependent_tables(con, CORE_CSET_TABLES, schema) + # refresh_derived_tables_exec(con, CORE_CSET_DEPENDENT_TABLES, schema) + refresh_any_dependent_tables(con, independent_tables, schema) finally: update_db_status_var('derived_tables_refresh_status', 'inactive') break @@ -214,12 +217,6 @@ def set_search_path(dbapi_connection, connection_record): return engine.connect() -def chunk_list(input_list: List, chunk_size) -> List[List]: - """Split a list into chunks""" - for i in range(0, len(input_list), chunk_size): - yield input_list[i:i + chunk_size] - - def current_datetime(time_zone=['UTC/GMT', 'EST/EDT'][1]) -> str: """Get current datetime in ISO format as a string.""" if time_zone == 'UTC/GMT': diff --git a/enclave_wrangler/datasets.py b/enclave_wrangler/datasets.py index 005182918..afe17e7e0 100644 --- a/enclave_wrangler/datasets.py +++ b/enclave_wrangler/datasets.py @@ -4,28 +4,25 @@ # TODO: get rid of unnamed row number at start of csv files """ +import os +import re +import shutil import sys +import tempfile +import time from argparse import ArgumentParser from pathlib import Path - from typing import Dict, List, Union -from typeguard import typechecked -import os -import re + import pandas as pd -import tempfile -# import pyarrow as pa -import pyarrow.parquet as pq -# import asyncio -import shutil -import time +import psutil +from typeguard import typechecked ENCLAVE_WRANGLER_DIR = os.path.dirname(__file__) PROJECT_ROOT = Path(ENCLAVE_WRANGLER_DIR).parent sys.path.insert(0, str(PROJECT_ROOT)) # TODO: backend implorts: Ideally we don't want to couple with TermHub code -from backend.db.utils import chunk_list -from backend.utils import commify, pdump +from backend.utils import pdump from enclave_wrangler.config import TERMHUB_CSETS_DIR, DATASET_REGISTRY, DATASET_REGISTRY_RID_NAME_MAP from enclave_wrangler.utils import enclave_get, log_debug_info @@ -37,15 +34,15 @@ # #'content-type': 'application/json' # } DEBUG = False -# TODO: Once git LFS set up, dl directly to datasets folder, or put these in raw/ and move csvs_repaired to datasets/ CSV_DOWNLOAD_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'downloads') CSV_TRANSFORM_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'prepped_files') DESC = 'Tool for working w/ the Palantir Foundry enclave API. This part is for downloading enclave datasets.' -os.makedirs(CSV_TRANSFORM_DIR, exist_ok=True) +for _dir in [CSV_DOWNLOAD_DIR, CSV_TRANSFORM_DIR]: + os.makedirs(_dir, exist_ok=True) @typechecked -def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: +def get_transaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getTransaction tested with curl: @@ -70,20 +67,20 @@ def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[st @typechecked -def views2(dataset_rid: str, endRef: str) -> [str]: +def views2(dataset_rid: str, end_ref: str) -> [str]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getDatasetViewFiles2 tested with curl: curl https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100 -H "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" | json_pp """ - curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" + # curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" if DEBUG: log_debug_info() endpoint = 'https://unite.nih.gov/foundry-catalog/api/catalog/datasets/' - template = '{endpoint}{dataset_rid}/views2/{endRef}/files?pageSize=100' - url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, endRef=endRef) + template = '{endpoint}{dataset_rid}/views2/{end_ref}/files?pageSize=100' + url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, end_ref=end_ref) response = enclave_get(url, verbose=False) response_json = response.json() @@ -91,92 +88,49 @@ def views2(dataset_rid: str, endRef: str) -> [str]: file_parts = [fp for fp in file_parts if re.match('.*part-\d\d\d\d\d', fp)] return file_parts +# TODO: temp +def get_termhub_disk_usage() -> float: + """Get usage in GB""" + root_directory = Path(PROJECT_ROOT) + s_bytes = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) + return s_bytes / (1024 ** 3) + + @typechecked -def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str) -> pd.DataFrame: +def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str): """tested with cURL: wget https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views/master/spark%2Fpart-00000-c94edb9f-1221-4ae8-ba74-58848a4d79cb-c000.snappy.parquet --header "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" """ dataset_rid: str = fav['rid'] endpoint = 'https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets' template = '{endpoint}/{dataset_rid}/views/master/{fp}' - # if DEBUG: - - # download parquet files with tempfile.TemporaryDirectory() as parquet_dir: # flush: for gh action; otherwise does not always show in the log + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp print(f'\nINFO: Downloading {outpath}; tempdir {parquet_dir}, filepart endpoints:', flush=True) + print(f'memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp for index, fp in enumerate(file_parts): url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, fp=fp) print('\t' + f'{index + 1} of {len(file_parts)}: {url}') response = enclave_get(url, args={'stream': True}, verbose=False) + print(f'response received. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp if response.status_code == 200: - fname = parquet_dir + fp.replace('spark', '') - with open(fname, "wb") as f: - response.raw.decode_content = True + parquet_part_path = parquet_dir + fp.replace('spark', '') + response.raw.decode_content = True + with open(parquet_part_path, "wb") as f: shutil.copyfileobj(response.raw, f) + print(f'wrote response to file. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + df = pd.read_parquet(parquet_part_path) + print(f'read part into pandas. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp + df.to_csv(outpath, index=False, header=True if index == 0 else False, mode='a') + print(f'appended part to csv. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp else: raise RuntimeError(f'Failed opening {url} with {response.status_code}: {response.content}') - - print('INFO: Combining parquet files: Saving chunks') - combined_parquet_fname = parquet_dir + '/combined.parquet' - files: List[str] = [] - for file_name in os.listdir(parquet_dir): - files.append(os.path.join(parquet_dir, file_name)) - - df = pd.DataFrame() - chunk_paths = [] - # TODO: why don't we just convert parquet to csv one at a time? - # TODO: Changing this to see if helps - # chunks = chunk_list(files, 5) # chunks size 5: arbitrary - chunks = chunk_list(files, len(files)) - # Chunked to avoid memory issues w/ GitHub Action. - for chunk_n, chunk in enumerate(chunks): - if chunk[0].endswith('.parquet'): - combine_parquet_files(chunk, combined_parquet_fname) - df = pd.read_parquet(combined_parquet_fname) - elif chunk[0].endswith('.csv'): - if len(chunk) != 1: - raise RuntimeError(f"with csv, only expected one file; got: [{', '.join(chunk)}]") - df = pd.read_csv(chunk[0], names=fav['column_names']) - else: - raise RuntimeError(f"unexpected file(s) downloaded: [{', '.join(chunk)}]") - if outpath: - os.makedirs(os.path.dirname(outpath), exist_ok=True) - outpath_i = outpath.replace('.csv', f'_{chunk_n}.csv') - chunk_paths.append(outpath_i) - df.to_csv(outpath_i, index=False) - - print('INFO: Combining parquet files: Combining chunks') - if outpath: - df = pd.concat([pd.read_csv(path) for path in chunk_paths]) - df.to_csv(outpath, index=False) - for path in chunk_paths: - os.remove(path) - - print(f'Downloaded {os.path.basename(outpath)}, {commify(len(df))} records\n') - return df - - -def combine_parquet_files(input_files, target_path): - """Combine parquet files""" - files = [] - try: - for file_name in input_files: - files.append(pq.read_table(file_name)) - with pq.ParquetWriter( - target_path, - files[0].schema, - version='2.0', - compression='gzip', - use_dictionary=True, - data_page_size=2097152, # 2MB - write_statistics=True - ) as writer: - for f in files: - writer.write_table(f) - except Exception as e: - print(e, file=sys.stderr) + print(f'Downloaded {os.path.basename(outpath)}\n') # todo: for this and alltransform_* funcs. They have a common pattern, especially first couple lines. Can we refactor? @@ -191,7 +145,6 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: """Transformations to concept_relationship.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN & Filter try: csm_df = pd.read_csv( @@ -204,11 +157,9 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept_relationship.csv, but concept_set_members.csv must be downloaded ' 'and transformed first. Try running again after this process completes. Eventually need to do these ' 'transformations dependency ordered fashion.', file=sys.stderr) - # Filter df2 = df[df['relationship_id'] == 'Subsumes'] df2.to_csv(os.path.join(CSV_TRANSFORM_DIR, 'concept_relationship_subsumes_only.csv'), index=False) - return df @@ -216,7 +167,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: """Transformations to concept.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN try: csm_df = pd.read_csv( @@ -227,7 +177,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept.csv, but concept_set_members.csv must be downloaded and transformed ' 'first. Try running again after this process completes. Eventually need to do these transformations ' 'dependency ordered fashion.', file=sys.stderr) - return df @@ -235,7 +184,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: """Transformations to concept_ancestor.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN try: csm_df = pd.read_csv( @@ -246,7 +194,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept_ancestor.csv, but concept_set_members.csv must be downloaded and ' 'transformed first. Try running again after this process completes. Eventually need to do these ' 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -308,9 +255,12 @@ def transforms_common(df: pd.DataFrame, dataset_name) -> pd.DataFrame: # TODO: currently overwrites if download is newer than prepped. should also overwrite if dependency # prepped files are newer than this -def transform(fav: dict) -> pd.DataFrame: - """Data transformations""" - dataset_name: str = fav['name'] +def transform(dataset_config: dict): + """Data transformations + + The input/outputs of this function are files. It reads the filename from `dataset_config`, then reads that file, + transforms, and writes back to that file.""" + dataset_name: str = dataset_config['name'] print(f'INFO: Transforming: {dataset_name}') inpath = os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv') outpath = os.path.join(CSV_TRANSFORM_DIR, dataset_name + '.csv') @@ -334,7 +284,7 @@ def transform(fav: dict) -> pd.DataFrame: if func: df = func(dataset_name) else: - converters = fav.get('converters') or {} + converters = dataset_config.get('converters') or {} df = pd.read_csv(inpath, keep_default_na=False, converters=converters).fillna('') df = transforms_common(df, dataset_name) @@ -354,7 +304,7 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: rid = dataset_name_or_rid if dataset_name_or_rid.startswith('ri.foundry.main.dataset.') \ else DATASET_REGISTRY[dataset_name_or_rid]['rid'] ref = 'master' - transaction = getTransaction(rid, ref, return_field=None) + transaction = get_transaction(rid, ref, return_field=None) # pdump(transaction) if transaction['status'] != 'COMMITTED': pdump(transaction) @@ -364,16 +314,15 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: def download_and_transform( dataset_name: str = None, dataset_rid: str = None, ref: str = 'master', output_dir: str = None, outpath: str = None, - transforms_only=False, fav: Dict = None, force_if_exists=True -) -> pd.DataFrame: + transforms_only=False, dataset_config: Dict = None, force_if_exists=True +): """Download dataset & run transformations""" print(f'INFO: Downloading: {dataset_name}') dataset_rid = DATASET_REGISTRY[dataset_name]['rid'] if not dataset_rid else dataset_rid dataset_name = DATASET_REGISTRY_RID_NAME_MAP[dataset_rid] if not dataset_name else dataset_name - fav = fav if fav else DATASET_REGISTRY[dataset_name] + dataset_config = dataset_config if dataset_config else DATASET_REGISTRY[dataset_name] # Download - df = pd.DataFrame() if not transforms_only: # todo: Temp: would be good to accept either 'outdir' or 'outpath'. if not outpath: @@ -385,16 +334,14 @@ def download_and_transform( if os.path.exists(outpath): t = time.ctime(os.path.getmtime(outpath)) print(f'Clobbering {os.path.basename(outpath)}: {t}, {os.path.getsize(outpath)} bytes.') - end_ref = getTransaction(dataset_rid, ref) - args = {'dataset_rid': dataset_rid, 'endRef': end_ref} + end_ref = get_transaction(dataset_rid, ref) + args = {'dataset_rid': dataset_rid, 'end_ref': end_ref} file_parts = views2(**args) # asyncio.run(download_and_combine_dataset_parts(dataset_rid, file_parts)) - df: pd.DataFrame = download_and_combine_dataset_parts(fav, file_parts, outpath=outpath) + download_and_combine_dataset_parts(dataset_config, file_parts, outpath=outpath) # Transform - df2: pd.DataFrame = transform(fav) - - return df2 if len(df2) > 0 else df + transform(dataset_config) def download_datasets( @@ -411,7 +358,7 @@ def download_datasets( else configs for conf in datasets_configs: download_and_transform( - fav=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), + dataset_config=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), transforms_only=transforms_only, force_if_exists=force_if_exists) # todo: ideally would allow user to select output dir that contains both CSV_DOWNLOAD_DIR and CSV_TRANSFORM_DIR diff --git a/enclave_wrangler/objects_api.py b/enclave_wrangler/objects_api.py index 155cbc722..c1adea3a5 100644 --- a/enclave_wrangler/objects_api.py +++ b/enclave_wrangler/objects_api.py @@ -37,7 +37,7 @@ make_objects_request from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey from backend.db.utils import SCHEMA, insert_fetch_statuses, insert_from_dict, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ sql_query_single_col, run_sql, get_db_connection from backend.db.queries import get_concepts from backend.utils import call_github_action, pdump @@ -504,7 +504,7 @@ def csets_and_members_to_db(con: Connection, csets_and_members: Dict[str, List[D print(f' - concept_set_members completed in {(datetime.now() - t2).seconds} seconds') # Derived tables - refresh_termhub_core_cset_derived_tables(con, schema) + refresh_derived_tables(con, schema) def fetch_object_by_id( diff --git a/makefile b/makefile index 6de43231b..0893d56f8 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,8 @@ SRC=backend/ .PHONY: lint tags ltags test all lintall codestyle docstyle lintsrc linttest doctest doc docs code linters_all codesrc \ -codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets +codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets \ +refresh-counts refresh-vocab serve-frontend serve-backend # Analysis ANALYSIS_SCRIPT = 'backend/db/analysis.py' @@ -22,12 +23,6 @@ deltas-table: counts-docs: @python $(ANALYSIS_SCRIPT) --counts-docs -# todo -#deltas-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_delta_viz -#counts-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_counts_viz - # counts-update: Update 'counts' table with current row counts for the 'n3c' schema. Adds note to the 'counts-runs' table. counts-update: @python $(ANALYSIS_SCRIPT) --counts-update @@ -107,6 +102,12 @@ test-frontend-deployments: fetch-missing-csets: python enclave_wrangler/objects_api.py --find-and-add-missing-csets-to-db +# Database refreshes +refresh-counts: + python backend/db/refresh_dataset_group_tables.py --dataset-group counts +refresh-vocab: + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + # Serve # nvm allows to switch to a particular versio of npm/node. Useful for working w/ deployment # https://github.com/nvm-sh/nvm @@ -116,5 +117,3 @@ serve-frontend: serve-backend: uvicorn backend.app:APP --reload -# TODO: does this work? -serve: serve-backend serve-frontend diff --git a/requirements-unlocked.txt b/requirements-unlocked.txt index caff14a06..9884f80b6 100755 --- a/requirements-unlocked.txt +++ b/requirements-unlocked.txt @@ -20,4 +20,5 @@ psycopg2-binary networkx # dev dependencies +psutil # for memory profiling virtualenvwrapper diff --git a/requirements.txt b/requirements.txt index 07fb1a32f..9f921c799 100644 --- a/requirements.txt +++ b/requirements.txt @@ -94,7 +94,7 @@ multidict==6.0.4 myst-parser==1.0.0 ndex2==3.5.0 networkx==2.8.8 -numpy>=1.23.0 +numpy==1.23.1 nxontology==0.4.1 oaklib==0.5.9 ols-client==0.1.3 @@ -112,6 +112,7 @@ ply==3.11 prefixcommons==0.1.12 prefixmaps==0.1.4 pronto==2.5.3 +psutil==5.9.5 psycopg2-binary==2.9.5 py==1.11.0 pyarrow==9.0.0 @@ -141,7 +142,7 @@ rdflib==6.2.0 rdflib-jsonld==0.6.1 rdflib-shim==1.0.3 regex==2023.5.5 -requests==2.31.0 +requests==2.28.1 requests-cache==1.0.1 requests-toolbelt==0.10.1 rfc3339-validator==0.1.4