Skip to content

Commit

Permalink
Merge pull request #802 from jhu-bids/fix-backend-tests
Browse files Browse the repository at this point in the history
Fix backend tests
  • Loading branch information
joeflack4 authored May 29, 2024
2 parents 25328dc + 18564d5 commit 5268910
Show file tree
Hide file tree
Showing 18 changed files with 404 additions and 384 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_backend_e2e_and_unit_and_qc.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Built from:
# https://docs.github.com/en/actions/guides/building-and-testing-python
# Test - Backend - E2E and unit tests and QC
# todo: Add 'qc' parts eventually to this, or rather, codestyle (e.g. black, flake8, pylint, docstyle, etc.)
# Built w/ inspiration from: https://docs.github.com/en/actions/guides/building-and-testing-python

name: Test - Backend - E2E and unit tests and QC

Expand Down
3 changes: 0 additions & 3 deletions backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
Resources
- https://github.com/tiangolo/fastapi
"""

import uvicorn
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import time

from backend.config import CONFIG, override_schema
CONFIG['importer'] = 'app.py'
Expand All @@ -18,7 +16,6 @@
# APP = FastAPI()
APP = FastAPI(client_max_size=100_000_000) # trying this, but it shouldn't be necessary
APP.include_router(cset_crud.router)
# APP.include_router(oak.router)
APP.include_router(graph.router)
APP.include_router(db.router)
APP.add_middleware(
Expand Down
60 changes: 53 additions & 7 deletions backend/db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
{{ counts_markdown_table }}"""


class InvalidCompareSchemaError(ValueError):
"""For if there is any problem with a schema comparison situation."""


# TODO: rename current_counts_and_deltas where from_cache = True to counts_deltas_history or something. because the datastructure is
# different. either that, or have it re-use the from_cache code at the end if from_cache = False
# - then, counts_over_time() & docs(): add cache param set to false, and change how they call current_counts()
Expand All @@ -49,7 +53,39 @@ def _current_counts_and_deltas(
) -> Union[pd.DataFrame, Dict]:
"""Gets current database counts and deltas
:param filter_temp_refresh_tables: Filters out any temporary tables that are created during the refresh, e.g. ones
that end w/ the suffix '_old'."""
that end w/ the suffix '_old'.
:returns pd.DataFrame if cache, else dict. If schema doesn't exist in counts, returns empty dict.
Performance:
As of 2025/05/26 on an M3, `SELECT COUNT(*) from {schema}.{table}` took this many seconds for each table.
concept_ancestor: 40.2
concept_ancestor_plus: 9.3
concept_relationship: 7.4
concept_graph: 5.7
concept_relationship_plus: 3.8
concept_set_members: 3.0
concepts_with_counts_ungrouped: 1.5
concepts_with_counts: 1.3
cset_members_items: 1.1
concept: 1.0
concept_set_version_item: 0.7
codeset_ids_by_concept_id: 0.4
deidentified_term_usage_by_domain_clamped: 0.1
all_csets: 0.0
code_sets: 0.0
codeset_counts: 0.0
concept_ids_by_codeset_id: 0.0
concept_set_container: 0.0
concept_set_counts_clamped: 0.0
concept_set_json: 0.0
members_items_summary: 0.0
omopconceptset: 0.0
omopconceptsetcontainer: 0.0
relationship: 0.0
researcher: 0.0
session_concept: 0.0
sessions: 0.0
"""
if from_cache:
with get_db_connection(schema='', local=local) as con:
counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT * from counts;', return_with_keys=True)]
Expand All @@ -58,10 +94,10 @@ def _current_counts_and_deltas(
return df
# Get previous counts
with get_db_connection(schema='', local=local) as con:
timestamps: List[datetime] = [
dp.parse(x[0]) for x in sql_query(con, f"SELECT DISTINCT timestamp from counts WHERE schema = '{schema}';", return_with_keys=False)]
ts_strings: List[List[str]] = sql_query(con, f"SELECT DISTINCT timestamp from counts WHERE schema = '{schema}';", return_with_keys=False)
timestamps: List[datetime] = [dp.parse(x[0]) for x in ts_strings]
most_recent_timestamp: str = str(max(timestamps)) if timestamps else None
prev_counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT * from counts;', return_with_keys=True)]
prev_counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT count, "table", "timestamp" from counts;', return_with_keys=True)]
prev_counts_df = pd.DataFrame(prev_counts)
# Get current counts / deltas
with get_db_connection(schema=schema, local=local) as con:
Expand Down Expand Up @@ -89,12 +125,14 @@ def _current_counts_and_deltas(

def counts_compare_schemas(
compare_schema: str = 'most_recent_backup', schema: str = SCHEMA, local=False, verbose=True, use_cached_counts=False
) -> pd.DataFrame:
) -> Union[pd.DataFrame, None]:
"""Checks counts of database tables for the current schema and its most recent backup.
:param compare_schema: The schema to check against. e.g. ncurrent_counts3c_backup_20230322
:param use_cached_counts: If True, will use whatever is in the `counts` table, though it is less likely that counts
will exist for backups. Runs much faster though if using this option.
:returns pd.DataFrame if w/ comparison of current schema and another schema. If compare_schema='most_recent_backup'
and there is no backup schema in the counts, returns None.
"""
# Determine most recent schema if necessary
if compare_schema == 'most_recent_backup':
Expand All @@ -106,14 +144,22 @@ def counts_compare_schemas(
"""
with get_db_connection(schema='', local=local) as con:
backup_schemas: List[str] = [x[0] for x in sql_query(con, query, return_with_keys=False) if x[0].startswith(f'{schema}_backup_')]
if not backup_schemas:
print('Warning: Detected no backup schema to compare against. Quitting counts_compare_schemas().',
file=sys.stderr)
return None
dates: List[datetime] = [dp.parse(x.split('_')[2]) for x in backup_schemas]
for schema_name, date in zip(backup_schemas, dates):
if date == max(dates):
compare_schema = schema_name

# Get counts
main: Dict = _current_counts_and_deltas(schema, from_cache=use_cached_counts, local=local)
compare: Dict = _current_counts_and_deltas(compare_schema, from_cache=use_cached_counts, local=local)
main: Dict = _current_counts_and_deltas(
schema, from_cache=use_cached_counts, local=local, filter_temp_refresh_tables=True)
compare: Dict = _current_counts_and_deltas(
compare_schema, from_cache=use_cached_counts, local=local, filter_temp_refresh_tables=True)
if not compare:
raise InvalidCompareSchemaError(f'compare_schema {compare_schema} does not exist.')
tables = set(main.keys()).union(set(compare.keys()))
rows = []
for table in tables:
Expand Down
2 changes: 2 additions & 0 deletions backend/db/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def get_pg_connect_url(local=False):
# 'csets_to_ignore',
'cset_members_items_plus',
]
# STANDALONE_TABLES & DERIVED_TABLE_DEPENDENCY_MAP
# - 100% of tables in the main schema, e.g. n3c, should be listed somewhere in
# STANDALONE_TABLES: Not derived from any other table, nor used to derive any other table/view. Used for QC testing.
STANDALONE_TABLES = [
'concept_set_json',
Expand Down
59 changes: 26 additions & 33 deletions backend/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from jinja2 import Template
# noinspection PyUnresolvedReferences
from psycopg2.errors import UndefinedTable
from sqlalchemy import create_engine, event
from sqlalchemy import CursorResult, create_engine, event
from sqlalchemy.engine import Row, RowMapping

from sqlalchemy.engine.base import Connection
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import TextClause
from typing import Any, Dict, Set, Tuple, Union, List
from typing import Dict, Set, Tuple, Union, List

DB_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_ROOT = Path(DB_DIR).parent.parent
Expand Down Expand Up @@ -410,42 +410,45 @@ def database_exists(con: Connection, db_name: str) -> bool:
return len(result) == 1


def run_sql(con: Connection, query: str, params: Dict = {}) -> CursorResult:
"""Run a sql command"""
query = text(query) if not isinstance(query, TextClause) else query
return con.execute(query, params) if params else con.execute(query)


def sql_query(
con: Connection, query: Union[text, str], params: Dict = {}, debug: bool = DEBUG, return_with_keys=True
) -> Union[List[RowMapping], List[List]]:
"""Run a sql query with optional params, fetching records.
https://stackoverflow.com/a/39414254/1368860:
query = "SELECT * FROM my_table t WHERE t.id = ANY(:ids);"
conn.execute(sqlalchemy.text(query), ids=some_ids)
"""Run an idempotent (read) SQL query with optional params, fetching records.
Inspiration:
https://stackoverflow.com/a/39414254/1368860:
query = "SELECT * FROM my_table t WHERE t.id = ANY(:ids);"
conn.execute(sqlalchemy.text(query), ids=some_ids)
"""
query = text(query) if not isinstance(query, TextClause) else query
try:
if params:
# after SQLAlchemy upgrade, send params as dict, not **params
q = con.execute(query, params) if params else con.execute(query)
else:
q = con.execute(query)
q: CursorResult = run_sql(con, query, params)

if debug:
print(f'{query}\n{json.dumps(params, indent=2)}')
# Conversions: after upgrading some packages, fastapi can no longer serialize Row & RowMapping objects
# todo: format q.mappings() for FastAPI like w/ q.fetchall() below? Are we not doing this cuz heavy refactor?
if return_with_keys:
# noinspection PyTypeChecker
results: List[RowMapping] = q.mappings().all() # key value pairs
# after upgrading some packages, fastapi can no longer serialize RowMapping objects
results: List[RowMapping] = q.mappings().all() # Key value pairs
# return [dict(x) for x in results]
return results
else:
# noinspection PyTypeChecker
results: List[Row] = q.fetchall() # Row tuples, with additional properties
# after upgrading some packages, fastapi can no longer serialize Row objects
return [list(x) for x in results]
return results
except (ProgrammingError, OperationalError) as err:
raise RuntimeError(f'Got an error [{err}] executing the following statement:\n{query}, {json.dumps(params, indent=2)}')
raise RuntimeError(
f'Got an error [{err}] executing the following statement:\n{query}, {json.dumps(params, indent=2)}')


def sql_query_single_col(*argv) -> List:
"""Run SQL query on single column"""
results = sql_query(*argv, return_with_keys=False)
results: List = sql_query(*argv, return_with_keys=False)
return [r[0] for r in results]


Expand All @@ -466,7 +469,7 @@ def get_obj_by_composite_key(con, table: str, keys: List[str], obj: Dict) -> Lis
{f'{key}_id': obj[key] for key in keys})


def get_obj_by_id(con, table: str, pk: str, obj_id: Union[str, int]) -> List[Row]:
def get_obj_by_id(con, table: str, pk: str, obj_id: Union[str, int]) -> List[List]:
"""Get object by ID"""
return sql_query(con, f'SELECT * FROM {table} WHERE {pk} = (:obj_id)', {'obj_id': obj_id}, return_with_keys=False)

Expand Down Expand Up @@ -548,16 +551,16 @@ def insert_from_dict(con: Connection, table: str, d: Union[Dict, List[Dict]], sk
if pk:
already_in_db = []
if isinstance(pk, str): # normal, single primary key
already_in_db: List[Dict] = get_obj_by_id(con, table, pk, d[pk])
already_in_db: List[List] = get_obj_by_id(con, table, pk, d[pk])
elif isinstance(pk, list): # composite key
already_in_db: List[Dict] = get_obj_by_composite_key(con, table, pk, d)
already_in_db: List[RowMapping] = get_obj_by_composite_key(con, table, pk, d)
if already_in_db:
return

insert = f"""
query = f"""
INSERT INTO {table} ({', '.join([f'"{x}"' for x in d.keys()])})
VALUES ({', '.join([':' + str(k) for k in d.keys()])})"""
run_sql(con, insert, d)
run_sql(con, query, d)


def sql_count(con: Connection, table: str) -> int:
Expand All @@ -582,16 +585,6 @@ def sql_in_safe(lst: List) -> (str, dict):
return (query, params)


def run_sql(con: Connection, command: str, params: Dict = {}) -> Any:
"""Run a sql command"""
command = text(command) if not isinstance(command, TextClause) else command
if params:
q = con.execute(command, params) if params else con.execute(command)
else:
q = con.execute(command)
return q


def list_schema_objects(
con: Connection = None, schema: str = None, filter_views=False, filter_sequences=False,
filter_temp_refresh_objects=False, filter_tables=False, names_only=False, verbose=True
Expand Down
67 changes: 0 additions & 67 deletions backend/routes/cset_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,70 +173,6 @@ class CsetsGitUpdate(BaseModel):
row_index_data_map: Dict[int, Dict[str, Any]] = {}


# TODO: (i) move most of this functionality out of route into separate function (potentially keeping this route which
# simply calls that function as well), (ii) can then connect that function as step in the routes that coordinate
# enclave uploads
# TODO: git/patch changes: https://github.com/jhu-bids/TermHub/issues/165#issuecomment-1276557733
def csets_git_update(dataset_path: str, row_index_data_map: Dict[int, Dict[str, Any]]) -> Dict:
"""Update cset dataset. Works only on tabular files."""
# Vars
result = 'success'
details = ''
cset_dir = os.path.join(PROJECT_DIR, 'termhub-csets')
path_root = os.path.join(cset_dir, 'datasets')

# Update cset
# todo: dtypes need to be registered somewhere. perhaps a <CSV_NAME>_codebook.json()?, accessed based on filename,
# and inserted here
# todo: check git status first to ensure clean? maybe doesn't matter since we can just add by filename
path = os.path.join(path_root, dataset_path)
# noinspection PyBroadException
try:
df = pd.read_csv(path, dtype={'id': np.int32, 'last_name': str, 'first_name': str}).fillna('')
for index, field_values in row_index_data_map.items():
for field, value in field_values.items():
df.at[index, field] = value
df.to_csv(path, index=False)
except BaseException as err:
result = 'failure'
details = str(err)

# Push commit
# todo?: Correct git status after change should show something like this near end: `modified: FILENAME`
relative_path = os.path.join('datasets', dataset_path)
# todo: Want to see result as string? only getting int: 1 / 0
# ...answer: it's being printed to stderr and stdout. I remember there's some way to pipe and capture if needed
# TODO: What if the update resulted in no changes? e.g. changed values were same?
git_add_result = sp_call(f'git add {relative_path}'.split(), cwd=cset_dir)
if git_add_result != 0:
result = 'failure'
details = f'Error: Git add: {dataset_path}'
git_commit_result = sp_call(['git', 'commit', '-m', f'Updated by server: {relative_path}'], cwd=cset_dir)
if git_commit_result != 0:
result = 'failure'
details = f'Error: Git commit: {dataset_path}'
git_push_result = sp_call('git push origin HEAD:main'.split(), cwd=cset_dir)
if git_push_result != 0:
result = 'failure'
details = f'Error: Git push: {dataset_path}'

return {'result': result, 'details': details}


# todo: Maybe change to `id` instead of row index
# TODO: obsolete?
@router.put("/datasets/csets")
def put_csets_update(d: CsetsGitUpdate = None) -> Dict:
"""HTTP PUT wrapper for csets_update()"""
return csets_git_update(d.dataset_path, d.row_index_data_map)


@router.put("/datasets/vocab")
def vocab_update():
"""Update vocab dataset"""
pass


@router.post("/create-new-draft-omop-concept-set-version")
def route_create_new_draft_omop_concept_set_version(d: UploadJsonNewCsetVersionWithConcepts) -> Dict:
"""Upload new version of existing container, with concepets"""
Expand Down Expand Up @@ -345,7 +281,4 @@ def route_csv_upload_new_container_with_concepts(data: UploadCsvVersionWithConce
# noinspection PyTypeChecker
df = pd.read_csv(StringIO(data.dict()['csv'])).fillna('')
response: Dict = upload_new_cset_container_with_concepts_from_csv(df=df)
# print('CSV upload result: ')
# print(json.dumps(response, indent=2))
return response

Loading

0 comments on commit 5268910

Please sign in to comment.