diff --git a/.github/workflows/refresh_counts.yml b/.github/workflows/refresh_counts.yml index 5c5e0f098..5ef0c6ae1 100644 --- a/.github/workflows/refresh_counts.yml +++ b/.github/workflows/refresh_counts.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-counts-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh counts - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group counts + run: make refresh-counts diff --git a/.github/workflows/refresh_voc.yml b/.github/workflows/refresh_voc.yml index 322350404..dbf2f259f 100644 --- a/.github/workflows/refresh_voc.yml +++ b/.github/workflows/refresh_voc.yml @@ -7,7 +7,8 @@ on: workflow_dispatch: jobs: refresh-vocab-tables: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest # fail-fast: At least shows "Error: Process completed with exit code 143." instead of "exit code 1", for slightly more information about unexpected exits: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures strategy: fail-fast: false @@ -41,5 +42,4 @@ jobs: pip install -r requirements.txt - name: Refresh vocab - run: | - python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + run: make refresh-vocab diff --git a/.github/workflows/refresh_voc_and_counts.yml b/.github/workflows/refresh_voc_and_counts.yml index 889d6944a..c609d24f7 100644 --- a/.github/workflows/refresh_voc_and_counts.yml +++ b/.github/workflows/refresh_voc_and_counts.yml @@ -7,7 +7,11 @@ on: workflow_dispatch: jobs: refresh-voc-and-counts: - runs-on: ubuntu-latest + runs-on: BIDS-Premium-Action-Runners +# runs-on: ubuntu-latest + # fail-fast: Can show why action unexpectedly stops: https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs#handling-failures + strategy: + fail-fast: false steps: - name: Checkout repository and submodules uses: actions/checkout@v2 @@ -22,17 +26,14 @@ jobs: run: | echo "Commit hash: ${{ github.sha }}" echo "Branch: ${{ github.ref }}" - - name: 'Create env file' run: | mkdir env echo "${{ secrets.ENV_FILE }}" > env/.env - - name: Create and start virtual environment run: | python3 -m venv venv source venv/bin/activate - - name: Install dependencies run: | python -m pip install --upgrade pip @@ -40,6 +41,9 @@ jobs: pip install --upgrade setuptools pip install -r requirements.txt - - name: Refresh voc and counts + - name: Refresh counts + run: | + python backend/db/refresh_dataset_group_tables.py --dataset-group counts + - name: Refresh vocab run: | - python backend/db/refresh_voc_and_counts.py + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab \ No newline at end of file diff --git a/backend/db/refresh_voc_and_counts.py b/backend/db/refresh_dataset_group_tables.py similarity index 86% rename from backend/db/refresh_voc_and_counts.py rename to backend/db/refresh_dataset_group_tables.py index e703eda0c..1c3163ccb 100644 --- a/backend/db/refresh_voc_and_counts.py +++ b/backend/db/refresh_dataset_group_tables.py @@ -24,16 +24,16 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, check_db_status_var, get_db_connection, get_ddl_statements, load_csv, \ - refresh_any_dependent_tables, \ - run_sql + refresh_derived_tables, run_sql from enclave_wrangler.config import DATASET_GROUPS_CONFIG from enclave_wrangler.datasets import download_datasets, get_last_update_of_dataset -def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): +def refresh_voc_and_counts(dataset_group: List[str], skip_downloads: bool = False, schema=SCHEMA): """Refresh vocabulary and counts tables.""" print('Refreshing vocabulary and counts tables.') - for group_name, config in DATASET_GROUPS_CONFIG.items(): + selected_configs = {k: v for k, v in DATASET_GROUPS_CONFIG.items() if k in dataset_group} + for group_name, config in selected_configs.items(): print(f'\nRefreshing {group_name} tables...') # Check if tables are already up to date last_updated_us: str = check_db_status_var(config['last_updated_termhub_var']) @@ -64,14 +64,17 @@ def refresh_voc_and_counts(skip_downloads: bool = False, schema=SCHEMA): for statement in statements: run_sql(con, statement) # print('Recreating derived tables') # printed w/in refresh_derived_tables() - refresh_any_dependent_tables(con, config['tables']) - + refresh_derived_tables(con, config['tables'], schema) print('Done') def cli(): """Command line interface""" parser = ArgumentParser(prog='Refresh vocabulary and counts tables.') + parser.add_argument( + '-d', '--dataset-group', nargs='+', choices=list(DATASET_GROUPS_CONFIG.keys()), + default=list(DATASET_GROUPS_CONFIG.keys()), + help='Names of dataset/table groups to refresh.') parser.add_argument( '-s', '--skip-downloads', action='store_true', help='Use if you have already downloaded updated files.') diff --git a/backend/db/resolve_fetch_failures_0_members.py b/backend/db/resolve_fetch_failures_0_members.py index 6baea9258..112c31a24 100644 --- a/backend/db/resolve_fetch_failures_0_members.py +++ b/backend/db/resolve_fetch_failures_0_members.py @@ -17,7 +17,7 @@ from enclave_wrangler.objects_api import concept_set_members__from_csets_and_members_to_db, \ fetch_cset_and_member_objects from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, select_failed_fetches, \ - refresh_termhub_core_cset_derived_tables + refresh_derived_tables DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API." @@ -78,7 +78,7 @@ def resolve_fetch_failures_0_members( if success_cases: with get_db_connection(schema=schema, local=use_local_db) as con: concept_set_members__from_csets_and_members_to_db(con, csets_and_members) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Report success if success_cases: diff --git a/backend/db/resolve_fetch_failures_excess_items.py b/backend/db/resolve_fetch_failures_excess_items.py index f059e725d..3f82d684a 100644 --- a/backend/db/resolve_fetch_failures_excess_items.py +++ b/backend/db/resolve_fetch_failures_excess_items.py @@ -11,7 +11,7 @@ PROJECT_ROOT = os.path.join(BACKEND_DIR, '..') sys.path.insert(0, str(PROJECT_ROOT)) from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ select_failed_fetches from enclave_wrangler.datasets import CSV_TRANSFORM_DIR, download_datasets from enclave_wrangler.utils import was_file_modified_within_threshold @@ -74,7 +74,7 @@ def resolve_fetch_failures_excess_items(use_local_db=False, cached_dataset_thres if rows: insert_from_dicts(con, dataset, rows) solved_failures.append(failure) - refresh_termhub_core_cset_derived_tables(con) + refresh_derived_tables(con) # Update fetch_audit status fetch_status_set_success(solved_failures) diff --git a/backend/db/utils.py b/backend/db/utils.py index 2414a3366..637e5b5ba 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -117,7 +117,7 @@ def refresh_any_dependent_tables(con: Connection, independent_tables: List[str] refresh_derived_tables(con, derived_tables, schema) -def refresh_derived_tables( +def refresh_derived_tables_exec( con: Connection, derived_tables_queue: List[str] = CORE_CSET_DEPENDENT_TABLES, schema=SCHEMA ): """Refresh TermHub core cset derived tables @@ -157,7 +157,10 @@ def refresh_derived_tables( # todo: move this somewhere else, possibly load.py or db_refresh.py # todo: what to do if this process fails? any way to roll back? should we? # todo: currently has no way of passing 'local' down to db status var funcs -def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, polling_interval_seconds: int = 30): +def refresh_derived_tables( + con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, + polling_interval_seconds: int = 30 +): """Refresh TermHub core cset derived tables: wrapper function Handles simultaneous requests and try/except for worker function: refresh_termhub_core_cset_derived_tables_exec()""" @@ -178,8 +181,8 @@ def refresh_termhub_core_cset_derived_tables(con: Connection, schema=SCHEMA, pol # The following two calls yield equivalent results as of 2023/08/08. I've commented out # refresh_derived_tables() in case anything goes wrong with refresh_any_dependent_tables(), since that # is based on a heuristic currently, and if anything goes wrong, we may want to switch back. -joeflack4 - # refresh_derived_tables(con, CORE_CSET_DEPENDENT_TABLES, schema) - refresh_any_dependent_tables(con, CORE_CSET_TABLES, schema) + # refresh_derived_tables_exec(con, CORE_CSET_DEPENDENT_TABLES, schema) + refresh_any_dependent_tables(con, independent_tables, schema) finally: update_db_status_var('derived_tables_refresh_status', 'inactive') break @@ -214,12 +217,6 @@ def set_search_path(dbapi_connection, connection_record): return engine.connect() -def chunk_list(input_list: List, chunk_size) -> List[List]: - """Split a list into chunks""" - for i in range(0, len(input_list), chunk_size): - yield input_list[i:i + chunk_size] - - def current_datetime(time_zone=['UTC/GMT', 'EST/EDT'][1]) -> str: """Get current datetime in ISO format as a string.""" if time_zone == 'UTC/GMT': diff --git a/enclave_wrangler/datasets.py b/enclave_wrangler/datasets.py index 005182918..afe17e7e0 100644 --- a/enclave_wrangler/datasets.py +++ b/enclave_wrangler/datasets.py @@ -4,28 +4,25 @@ # TODO: get rid of unnamed row number at start of csv files """ +import os +import re +import shutil import sys +import tempfile +import time from argparse import ArgumentParser from pathlib import Path - from typing import Dict, List, Union -from typeguard import typechecked -import os -import re + import pandas as pd -import tempfile -# import pyarrow as pa -import pyarrow.parquet as pq -# import asyncio -import shutil -import time +import psutil +from typeguard import typechecked ENCLAVE_WRANGLER_DIR = os.path.dirname(__file__) PROJECT_ROOT = Path(ENCLAVE_WRANGLER_DIR).parent sys.path.insert(0, str(PROJECT_ROOT)) # TODO: backend implorts: Ideally we don't want to couple with TermHub code -from backend.db.utils import chunk_list -from backend.utils import commify, pdump +from backend.utils import pdump from enclave_wrangler.config import TERMHUB_CSETS_DIR, DATASET_REGISTRY, DATASET_REGISTRY_RID_NAME_MAP from enclave_wrangler.utils import enclave_get, log_debug_info @@ -37,15 +34,15 @@ # #'content-type': 'application/json' # } DEBUG = False -# TODO: Once git LFS set up, dl directly to datasets folder, or put these in raw/ and move csvs_repaired to datasets/ CSV_DOWNLOAD_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'downloads') CSV_TRANSFORM_DIR = os.path.join(TERMHUB_CSETS_DIR, 'datasets', 'prepped_files') DESC = 'Tool for working w/ the Palantir Foundry enclave API. This part is for downloading enclave datasets.' -os.makedirs(CSV_TRANSFORM_DIR, exist_ok=True) +for _dir in [CSV_DOWNLOAD_DIR, CSV_TRANSFORM_DIR]: + os.makedirs(_dir, exist_ok=True) @typechecked -def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: +def get_transaction(dataset_rid: str, ref: str = 'master', return_field: Union[str, None] = 'rid') -> Union[str, Dict]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getTransaction tested with curl: @@ -70,20 +67,20 @@ def getTransaction(dataset_rid: str, ref: str = 'master', return_field: Union[st @typechecked -def views2(dataset_rid: str, endRef: str) -> [str]: +def views2(dataset_rid: str, end_ref: str) -> [str]: """API documentation at https://unite.nih.gov/workspace/documentation/developer/api/catalog/services/CatalogService/endpoints/getDatasetViewFiles2 tested with curl: curl https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100 -H "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" | json_pp """ - curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" + # curl_url = "https://unite.nih.gov/foundry-catalog/api/catalog/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views2/ri.foundry.main.transaction.00000022-85ed-47eb-9eeb-959737c88847/files?pageSize=100" if DEBUG: log_debug_info() endpoint = 'https://unite.nih.gov/foundry-catalog/api/catalog/datasets/' - template = '{endpoint}{dataset_rid}/views2/{endRef}/files?pageSize=100' - url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, endRef=endRef) + template = '{endpoint}{dataset_rid}/views2/{end_ref}/files?pageSize=100' + url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, end_ref=end_ref) response = enclave_get(url, verbose=False) response_json = response.json() @@ -91,92 +88,49 @@ def views2(dataset_rid: str, endRef: str) -> [str]: file_parts = [fp for fp in file_parts if re.match('.*part-\d\d\d\d\d', fp)] return file_parts +# TODO: temp +def get_termhub_disk_usage() -> float: + """Get usage in GB""" + root_directory = Path(PROJECT_ROOT) + s_bytes = sum(f.stat().st_size for f in root_directory.glob('**/*') if f.is_file()) + return s_bytes / (1024 ** 3) + + @typechecked -def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str) -> pd.DataFrame: +def download_and_combine_dataset_parts(fav: dict, file_parts: [str], outpath: str): """tested with cURL: wget https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets/ri.foundry.main.dataset.5cb3c4a3-327a-47bf-a8bf-daf0cafe6772/views/master/spark%2Fpart-00000-c94edb9f-1221-4ae8-ba74-58848a4d79cb-c000.snappy.parquet --header "authorization: Bearer $PALANTIR_ENCLAVE_AUTHENTICATION_BEARER_TOKEN" """ dataset_rid: str = fav['rid'] endpoint = 'https://unite.nih.gov/foundry-data-proxy/api/dataproxy/datasets' template = '{endpoint}/{dataset_rid}/views/master/{fp}' - # if DEBUG: - - # download parquet files with tempfile.TemporaryDirectory() as parquet_dir: # flush: for gh action; otherwise does not always show in the log + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp print(f'\nINFO: Downloading {outpath}; tempdir {parquet_dir}, filepart endpoints:', flush=True) + print(f'memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp for index, fp in enumerate(file_parts): url = template.format(endpoint=endpoint, dataset_rid=dataset_rid, fp=fp) print('\t' + f'{index + 1} of {len(file_parts)}: {url}') response = enclave_get(url, args={'stream': True}, verbose=False) + print(f'response received. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp if response.status_code == 200: - fname = parquet_dir + fp.replace('spark', '') - with open(fname, "wb") as f: - response.raw.decode_content = True + parquet_part_path = parquet_dir + fp.replace('spark', '') + response.raw.decode_content = True + with open(parquet_part_path, "wb") as f: shutil.copyfileobj(response.raw, f) + print(f'wrote response to file. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + df = pd.read_parquet(parquet_part_path) + print(f'read part into pandas. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp + df.to_csv(outpath, index=False, header=True if index == 0 else False, mode='a') + print(f'appended part to csv. memory used (GB): {psutil.Process().memory_info().rss / (1024 ** 3)}') # TODO: temp + print('TEMP: Disk usage: ', get_termhub_disk_usage(), flush=True) # TODO: temp else: raise RuntimeError(f'Failed opening {url} with {response.status_code}: {response.content}') - - print('INFO: Combining parquet files: Saving chunks') - combined_parquet_fname = parquet_dir + '/combined.parquet' - files: List[str] = [] - for file_name in os.listdir(parquet_dir): - files.append(os.path.join(parquet_dir, file_name)) - - df = pd.DataFrame() - chunk_paths = [] - # TODO: why don't we just convert parquet to csv one at a time? - # TODO: Changing this to see if helps - # chunks = chunk_list(files, 5) # chunks size 5: arbitrary - chunks = chunk_list(files, len(files)) - # Chunked to avoid memory issues w/ GitHub Action. - for chunk_n, chunk in enumerate(chunks): - if chunk[0].endswith('.parquet'): - combine_parquet_files(chunk, combined_parquet_fname) - df = pd.read_parquet(combined_parquet_fname) - elif chunk[0].endswith('.csv'): - if len(chunk) != 1: - raise RuntimeError(f"with csv, only expected one file; got: [{', '.join(chunk)}]") - df = pd.read_csv(chunk[0], names=fav['column_names']) - else: - raise RuntimeError(f"unexpected file(s) downloaded: [{', '.join(chunk)}]") - if outpath: - os.makedirs(os.path.dirname(outpath), exist_ok=True) - outpath_i = outpath.replace('.csv', f'_{chunk_n}.csv') - chunk_paths.append(outpath_i) - df.to_csv(outpath_i, index=False) - - print('INFO: Combining parquet files: Combining chunks') - if outpath: - df = pd.concat([pd.read_csv(path) for path in chunk_paths]) - df.to_csv(outpath, index=False) - for path in chunk_paths: - os.remove(path) - - print(f'Downloaded {os.path.basename(outpath)}, {commify(len(df))} records\n') - return df - - -def combine_parquet_files(input_files, target_path): - """Combine parquet files""" - files = [] - try: - for file_name in input_files: - files.append(pq.read_table(file_name)) - with pq.ParquetWriter( - target_path, - files[0].schema, - version='2.0', - compression='gzip', - use_dictionary=True, - data_page_size=2097152, # 2MB - write_statistics=True - ) as writer: - for f in files: - writer.write_table(f) - except Exception as e: - print(e, file=sys.stderr) + print(f'Downloaded {os.path.basename(outpath)}\n') # todo: for this and alltransform_* funcs. They have a common pattern, especially first couple lines. Can we refactor? @@ -191,7 +145,6 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: """Transformations to concept_relationship.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN & Filter try: csm_df = pd.read_csv( @@ -204,11 +157,9 @@ def transform_dataset__concept_relationship(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept_relationship.csv, but concept_set_members.csv must be downloaded ' 'and transformed first. Try running again after this process completes. Eventually need to do these ' 'transformations dependency ordered fashion.', file=sys.stderr) - # Filter df2 = df[df['relationship_id'] == 'Subsumes'] df2.to_csv(os.path.join(CSV_TRANSFORM_DIR, 'concept_relationship_subsumes_only.csv'), index=False) - return df @@ -216,7 +167,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: """Transformations to concept.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN try: csm_df = pd.read_csv( @@ -227,7 +177,6 @@ def transform_dataset__concept(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept.csv, but concept_set_members.csv must be downloaded and transformed ' 'first. Try running again after this process completes. Eventually need to do these transformations ' 'dependency ordered fashion.', file=sys.stderr) - return df @@ -235,7 +184,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: """Transformations to concept_ancestor.csv""" df = pd.read_csv(os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv'), keep_default_na=False).fillna('') df = transforms_common(df, dataset_name) - # JOIN try: csm_df = pd.read_csv( @@ -246,7 +194,6 @@ def transform_dataset__concept_ancestor(dataset_name: str) -> pd.DataFrame: print('Warning: Tried transforming concept_ancestor.csv, but concept_set_members.csv must be downloaded and ' 'transformed first. Try running again after this process completes. Eventually need to do these ' 'transformations dependency ordered fashion.', file=sys.stderr) - return df @@ -308,9 +255,12 @@ def transforms_common(df: pd.DataFrame, dataset_name) -> pd.DataFrame: # TODO: currently overwrites if download is newer than prepped. should also overwrite if dependency # prepped files are newer than this -def transform(fav: dict) -> pd.DataFrame: - """Data transformations""" - dataset_name: str = fav['name'] +def transform(dataset_config: dict): + """Data transformations + + The input/outputs of this function are files. It reads the filename from `dataset_config`, then reads that file, + transforms, and writes back to that file.""" + dataset_name: str = dataset_config['name'] print(f'INFO: Transforming: {dataset_name}') inpath = os.path.join(CSV_DOWNLOAD_DIR, dataset_name + '.csv') outpath = os.path.join(CSV_TRANSFORM_DIR, dataset_name + '.csv') @@ -334,7 +284,7 @@ def transform(fav: dict) -> pd.DataFrame: if func: df = func(dataset_name) else: - converters = fav.get('converters') or {} + converters = dataset_config.get('converters') or {} df = pd.read_csv(inpath, keep_default_na=False, converters=converters).fillna('') df = transforms_common(df, dataset_name) @@ -354,7 +304,7 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: rid = dataset_name_or_rid if dataset_name_or_rid.startswith('ri.foundry.main.dataset.') \ else DATASET_REGISTRY[dataset_name_or_rid]['rid'] ref = 'master' - transaction = getTransaction(rid, ref, return_field=None) + transaction = get_transaction(rid, ref, return_field=None) # pdump(transaction) if transaction['status'] != 'COMMITTED': pdump(transaction) @@ -364,16 +314,15 @@ def get_last_update_of_dataset(dataset_name_or_rid: str) -> str: def download_and_transform( dataset_name: str = None, dataset_rid: str = None, ref: str = 'master', output_dir: str = None, outpath: str = None, - transforms_only=False, fav: Dict = None, force_if_exists=True -) -> pd.DataFrame: + transforms_only=False, dataset_config: Dict = None, force_if_exists=True +): """Download dataset & run transformations""" print(f'INFO: Downloading: {dataset_name}') dataset_rid = DATASET_REGISTRY[dataset_name]['rid'] if not dataset_rid else dataset_rid dataset_name = DATASET_REGISTRY_RID_NAME_MAP[dataset_rid] if not dataset_name else dataset_name - fav = fav if fav else DATASET_REGISTRY[dataset_name] + dataset_config = dataset_config if dataset_config else DATASET_REGISTRY[dataset_name] # Download - df = pd.DataFrame() if not transforms_only: # todo: Temp: would be good to accept either 'outdir' or 'outpath'. if not outpath: @@ -385,16 +334,14 @@ def download_and_transform( if os.path.exists(outpath): t = time.ctime(os.path.getmtime(outpath)) print(f'Clobbering {os.path.basename(outpath)}: {t}, {os.path.getsize(outpath)} bytes.') - end_ref = getTransaction(dataset_rid, ref) - args = {'dataset_rid': dataset_rid, 'endRef': end_ref} + end_ref = get_transaction(dataset_rid, ref) + args = {'dataset_rid': dataset_rid, 'end_ref': end_ref} file_parts = views2(**args) # asyncio.run(download_and_combine_dataset_parts(dataset_rid, file_parts)) - df: pd.DataFrame = download_and_combine_dataset_parts(fav, file_parts, outpath=outpath) + download_and_combine_dataset_parts(dataset_config, file_parts, outpath=outpath) # Transform - df2: pd.DataFrame = transform(fav) - - return df2 if len(df2) > 0 else df + transform(dataset_config) def download_datasets( @@ -411,7 +358,7 @@ def download_datasets( else configs for conf in datasets_configs: download_and_transform( - fav=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), + dataset_config=conf, dataset_name=conf['name'], outpath=os.path.join(outdir, conf['name'] + '.csv'), transforms_only=transforms_only, force_if_exists=force_if_exists) # todo: ideally would allow user to select output dir that contains both CSV_DOWNLOAD_DIR and CSV_TRANSFORM_DIR diff --git a/enclave_wrangler/objects_api.py b/enclave_wrangler/objects_api.py index 155cbc722..c1adea3a5 100644 --- a/enclave_wrangler/objects_api.py +++ b/enclave_wrangler/objects_api.py @@ -37,7 +37,7 @@ make_objects_request from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey from backend.db.utils import SCHEMA, insert_fetch_statuses, insert_from_dict, insert_from_dicts, \ - refresh_termhub_core_cset_derived_tables, \ + refresh_derived_tables, \ sql_query_single_col, run_sql, get_db_connection from backend.db.queries import get_concepts from backend.utils import call_github_action, pdump @@ -504,7 +504,7 @@ def csets_and_members_to_db(con: Connection, csets_and_members: Dict[str, List[D print(f' - concept_set_members completed in {(datetime.now() - t2).seconds} seconds') # Derived tables - refresh_termhub_core_cset_derived_tables(con, schema) + refresh_derived_tables(con, schema) def fetch_object_by_id( diff --git a/makefile b/makefile index 6de43231b..0893d56f8 100644 --- a/makefile +++ b/makefile @@ -1,7 +1,8 @@ SRC=backend/ .PHONY: lint tags ltags test all lintall codestyle docstyle lintsrc linttest doctest doc docs code linters_all codesrc \ -codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets +codetest docsrc doctest counts-compare-schemas counts-table deltas-table test-missing-csets fetch-missing-csets \ +refresh-counts refresh-vocab serve-frontend serve-backend # Analysis ANALYSIS_SCRIPT = 'backend/db/analysis.py' @@ -22,12 +23,6 @@ deltas-table: counts-docs: @python $(ANALYSIS_SCRIPT) --counts-docs -# todo -#deltas-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_delta_viz -#counts-viz: -# @python $(ANALYSIS_SCRIPT) --counts-over-time save_counts_viz - # counts-update: Update 'counts' table with current row counts for the 'n3c' schema. Adds note to the 'counts-runs' table. counts-update: @python $(ANALYSIS_SCRIPT) --counts-update @@ -107,6 +102,12 @@ test-frontend-deployments: fetch-missing-csets: python enclave_wrangler/objects_api.py --find-and-add-missing-csets-to-db +# Database refreshes +refresh-counts: + python backend/db/refresh_dataset_group_tables.py --dataset-group counts +refresh-vocab: + python backend/db/refresh_dataset_group_tables.py --dataset-group vocab + # Serve # nvm allows to switch to a particular versio of npm/node. Useful for working w/ deployment # https://github.com/nvm-sh/nvm @@ -116,5 +117,3 @@ serve-frontend: serve-backend: uvicorn backend.app:APP --reload -# TODO: does this work? -serve: serve-backend serve-frontend diff --git a/requirements-unlocked.txt b/requirements-unlocked.txt index caff14a06..9884f80b6 100755 --- a/requirements-unlocked.txt +++ b/requirements-unlocked.txt @@ -20,4 +20,5 @@ psycopg2-binary networkx # dev dependencies +psutil # for memory profiling virtualenvwrapper diff --git a/requirements.txt b/requirements.txt index 07fb1a32f..9f921c799 100644 --- a/requirements.txt +++ b/requirements.txt @@ -94,7 +94,7 @@ multidict==6.0.4 myst-parser==1.0.0 ndex2==3.5.0 networkx==2.8.8 -numpy>=1.23.0 +numpy==1.23.1 nxontology==0.4.1 oaklib==0.5.9 ols-client==0.1.3 @@ -112,6 +112,7 @@ ply==3.11 prefixcommons==0.1.12 prefixmaps==0.1.4 pronto==2.5.3 +psutil==5.9.5 psycopg2-binary==2.9.5 py==1.11.0 pyarrow==9.0.0 @@ -141,7 +142,7 @@ rdflib==6.2.0 rdflib-jsonld==0.6.1 rdflib-shim==1.0.3 regex==2023.5.5 -requests==2.31.0 +requests==2.28.1 requests-cache==1.0.1 requests-toolbelt==0.10.1 rfc3339-validator==0.1.4