Skip to content

Commit

Permalink
Fix backend tests
Browse files Browse the repository at this point in the history
- Bug fixes: Erroring tests
- Bug fixes: Failing tests
- Update: misc comments / todos
- Update: Increased speed of test_backend.py by moving an import that takes time.
- Add: New test_cset_version_enclave_to_db__raises_err(). Currently deactivated, though. Split this out of a different test.
- Update: Moved IntegrityError tests for each major object type / table to its own test method, and abstracted common logic to _raises_err_on_duplicate_insert_test().

General
- Update: Fixed some errors in unused code, just to help with development flow, to actually know how many errors exist in a given file (e.g. graph.py).
- Add: Some missing type definitions
- Delete: Some old, outdated comments
- Delete: Unused code / features: csets_read(), csets_git_update(), vocab_update(), csets_update()
- Update: Optimized a query in _current_counts_and_deltas()
- Update: counts_compare_schemas(): Now returns None if is asked to compare against most recent backup but there is no backup. If asked to compare a schema that doesn't exist, now throws an error instead of populating 0's in the rows for that schema name. Fixed a bug where temporary _old and _new tables would be detected during mid-refresh, and then it would check counts for those tables, but they had been deleted, so it would error.
- Refactor: run_sql() and sql_query(): To be clearer. Moved these heavily related functions next to each other. In fact, sql_query() now calls run_sql(). run_sql() was part of sql_query(); it was running the exact same code, just with different variable names. Added clarifying comments.
  • Loading branch information
joeflack4 committed May 28, 2024
1 parent 209d375 commit 74806a6
Show file tree
Hide file tree
Showing 15 changed files with 300 additions and 309 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/test_backend_e2e_and_unit_and_qc.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Built from:
# https://docs.github.com/en/actions/guides/building-and-testing-python
# Test - Backend - E2E and unit tests and QC
# todo: Add 'qc' parts eventually to this, or rather, codestyle (e.g. black, flake8, pylint, docstyle, etc.)
# Built w/ inspiration from: https://docs.github.com/en/actions/guides/building-and-testing-python

name: Test - Backend - E2E and unit tests and QC

Expand Down
60 changes: 53 additions & 7 deletions backend/db/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
{{ counts_markdown_table }}"""


class InvalidCompareSchemaError(ValueError):
"""For if there is any problem with a schema comparison situation."""


# TODO: rename current_counts_and_deltas where from_cache = True to counts_deltas_history or something. because the datastructure is
# different. either that, or have it re-use the from_cache code at the end if from_cache = False
# - then, counts_over_time() & docs(): add cache param set to false, and change how they call current_counts()
Expand All @@ -49,7 +53,39 @@ def _current_counts_and_deltas(
) -> Union[pd.DataFrame, Dict]:
"""Gets current database counts and deltas
:param filter_temp_refresh_tables: Filters out any temporary tables that are created during the refresh, e.g. ones
that end w/ the suffix '_old'."""
that end w/ the suffix '_old'.
:returns pd.DataFrame if cache, else dict. If schema doesn't exist in counts, returns empty dict.
Performance:
As of 2025/05/26 on an M3, `SELECT COUNT(*) from {schema}.{table}` took this many seconds for each table.
concept_ancestor: 40.2
concept_ancestor_plus: 9.3
concept_relationship: 7.4
concept_graph: 5.7
concept_relationship_plus: 3.8
concept_set_members: 3.0
concepts_with_counts_ungrouped: 1.5
concepts_with_counts: 1.3
cset_members_items: 1.1
concept: 1.0
concept_set_version_item: 0.7
codeset_ids_by_concept_id: 0.4
deidentified_term_usage_by_domain_clamped: 0.1
all_csets: 0.0
code_sets: 0.0
codeset_counts: 0.0
concept_ids_by_codeset_id: 0.0
concept_set_container: 0.0
concept_set_counts_clamped: 0.0
concept_set_json: 0.0
members_items_summary: 0.0
omopconceptset: 0.0
omopconceptsetcontainer: 0.0
relationship: 0.0
researcher: 0.0
session_concept: 0.0
sessions: 0.0
"""
if from_cache:
with get_db_connection(schema='', local=local) as con:
counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT * from counts;', return_with_keys=True)]
Expand All @@ -58,10 +94,10 @@ def _current_counts_and_deltas(
return df
# Get previous counts
with get_db_connection(schema='', local=local) as con:
timestamps: List[datetime] = [
dp.parse(x[0]) for x in sql_query(con, f"SELECT DISTINCT timestamp from counts WHERE schema = '{schema}';", return_with_keys=False)]
ts_strings: List[List[str]] = sql_query(con, f"SELECT DISTINCT timestamp from counts WHERE schema = '{schema}';", return_with_keys=False)
timestamps: List[datetime] = [dp.parse(x[0]) for x in ts_strings]
most_recent_timestamp: str = str(max(timestamps)) if timestamps else None
prev_counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT * from counts;', return_with_keys=True)]
prev_counts: List[Dict] = [dict(x) for x in sql_query(con, f'SELECT count, "table", "timestamp" from counts;', return_with_keys=True)]
prev_counts_df = pd.DataFrame(prev_counts)
# Get current counts / deltas
with get_db_connection(schema=schema, local=local) as con:
Expand Down Expand Up @@ -89,12 +125,14 @@ def _current_counts_and_deltas(

def counts_compare_schemas(
compare_schema: str = 'most_recent_backup', schema: str = SCHEMA, local=False, verbose=True, use_cached_counts=False
) -> pd.DataFrame:
) -> Union[pd.DataFrame, None]:
"""Checks counts of database tables for the current schema and its most recent backup.
:param compare_schema: The schema to check against. e.g. ncurrent_counts3c_backup_20230322
:param use_cached_counts: If True, will use whatever is in the `counts` table, though it is less likely that counts
will exist for backups. Runs much faster though if using this option.
:returns pd.DataFrame if w/ comparison of current schema and another schema. If compare_schema='most_recent_backup'
and there is no backup schema in the counts, returns None.
"""
# Determine most recent schema if necessary
if compare_schema == 'most_recent_backup':
Expand All @@ -106,14 +144,22 @@ def counts_compare_schemas(
"""
with get_db_connection(schema='', local=local) as con:
backup_schemas: List[str] = [x[0] for x in sql_query(con, query, return_with_keys=False) if x[0].startswith(f'{schema}_backup_')]
if not backup_schemas:
print('Warning: Detected no backup schema to compare against. Quitting counts_compare_schemas().',
file=sys.stderr)
return None
dates: List[datetime] = [dp.parse(x.split('_')[2]) for x in backup_schemas]
for schema_name, date in zip(backup_schemas, dates):
if date == max(dates):
compare_schema = schema_name

# Get counts
main: Dict = _current_counts_and_deltas(schema, from_cache=use_cached_counts, local=local)
compare: Dict = _current_counts_and_deltas(compare_schema, from_cache=use_cached_counts, local=local)
main: Dict = _current_counts_and_deltas(
schema, from_cache=use_cached_counts, local=local, filter_temp_refresh_tables=True)
compare: Dict = _current_counts_and_deltas(
compare_schema, from_cache=use_cached_counts, local=local, filter_temp_refresh_tables=True)
if not compare:
raise InvalidCompareSchemaError(f'compare_schema {compare_schema} does not exist.')
tables = set(main.keys()).union(set(compare.keys()))
rows = []
for table in tables:
Expand Down
2 changes: 2 additions & 0 deletions backend/db/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def get_pg_connect_url(local=False):
# 'csets_to_ignore',
'cset_members_items_plus',
]
# STANDALONE_TABLES & DERIVED_TABLE_DEPENDENCY_MAP
# - 100% of tables in the main schema, e.g. n3c, should be listed somewhere in
# STANDALONE_TABLES: Not derived from any other table, nor used to derive any other table/view. Used for QC testing.
STANDALONE_TABLES = [
'concept_set_json',
Expand Down
57 changes: 25 additions & 32 deletions backend/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
from jinja2 import Template
# noinspection PyUnresolvedReferences
from psycopg2.errors import UndefinedTable
from sqlalchemy import create_engine, event
from sqlalchemy import CursorResult, create_engine, event
from sqlalchemy.engine import Row, RowMapping

from sqlalchemy.engine.base import Connection
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.sql import text
from sqlalchemy.sql.elements import TextClause
from typing import Any, Dict, Set, Tuple, Union, List
from typing import Dict, Set, Tuple, Union, List

DB_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_ROOT = Path(DB_DIR).parent.parent
Expand Down Expand Up @@ -410,37 +410,40 @@ def database_exists(con: Connection, db_name: str) -> bool:
return len(result) == 1


def run_sql(con: Connection, query: str, params: Dict = {}) -> CursorResult:
"""Run a sql command"""
query = text(query) if not isinstance(query, TextClause) else query
return con.execute(query, params) if params else con.execute(query)


def sql_query(
con: Connection, query: Union[text, str], params: Dict = {}, debug: bool = DEBUG, return_with_keys=True
) -> Union[List[RowMapping], List[List]]:
"""Run a sql query with optional params, fetching records.
https://stackoverflow.com/a/39414254/1368860:
query = "SELECT * FROM my_table t WHERE t.id = ANY(:ids);"
conn.execute(sqlalchemy.text(query), ids=some_ids)
"""Run an idempotent (read) SQL query with optional params, fetching records.
Inspiration:
https://stackoverflow.com/a/39414254/1368860:
query = "SELECT * FROM my_table t WHERE t.id = ANY(:ids);"
conn.execute(sqlalchemy.text(query), ids=some_ids)
"""
query = text(query) if not isinstance(query, TextClause) else query
try:
if params:
# after SQLAlchemy upgrade, send params as dict, not **params
q = con.execute(query, params) if params else con.execute(query)
else:
q = con.execute(query)
q: CursorResult = run_sql(con, query, params)

if debug:
print(f'{query}\n{json.dumps(params, indent=2)}')
# Conversions: after upgrading some packages, fastapi can no longer serialize Row & RowMapping objects
# todo: format q.mappings() for FastAPI like w/ q.fetchall() below? Are we not doing this cuz heavy refactor?
if return_with_keys:
# noinspection PyTypeChecker
results: List[RowMapping] = q.mappings().all() # key value pairs
# after upgrading some packages, fastapi can no longer serialize RowMapping objects
results: List[RowMapping] = q.mappings().all() # Key value pairs
# return [dict(x) for x in results]
return results
else:
# noinspection PyTypeChecker
results: List[Row] = q.fetchall() # Row tuples, with additional properties
# after upgrading some packages, fastapi can no longer serialize Row objects
return [list(x) for x in results]
return results
except (ProgrammingError, OperationalError) as err:
raise RuntimeError(f'Got an error [{err}] executing the following statement:\n{query}, {json.dumps(params, indent=2)}')
raise RuntimeError(
f'Got an error [{err}] executing the following statement:\n{query}, {json.dumps(params, indent=2)}')


def sql_query_single_col(*argv) -> List:
Expand All @@ -466,7 +469,7 @@ def get_obj_by_composite_key(con, table: str, keys: List[str], obj: Dict) -> Lis
{f'{key}_id': obj[key] for key in keys})


def get_obj_by_id(con, table: str, pk: str, obj_id: Union[str, int]) -> List[Row]:
def get_obj_by_id(con, table: str, pk: str, obj_id: Union[str, int]) -> List[List]:
"""Get object by ID"""
return sql_query(con, f'SELECT * FROM {table} WHERE {pk} = (:obj_id)', {'obj_id': obj_id}, return_with_keys=False)

Expand Down Expand Up @@ -548,16 +551,16 @@ def insert_from_dict(con: Connection, table: str, d: Union[Dict, List[Dict]], sk
if pk:
already_in_db = []
if isinstance(pk, str): # normal, single primary key
already_in_db: List[Dict] = get_obj_by_id(con, table, pk, d[pk])
already_in_db: List[List] = get_obj_by_id(con, table, pk, d[pk])
elif isinstance(pk, list): # composite key
already_in_db: List[Dict] = get_obj_by_composite_key(con, table, pk, d)
already_in_db: List[RowMapping] = get_obj_by_composite_key(con, table, pk, d)
if already_in_db:
return

insert = f"""
query = f"""
INSERT INTO {table} ({', '.join([f'"{x}"' for x in d.keys()])})
VALUES ({', '.join([':' + str(k) for k in d.keys()])})"""
run_sql(con, insert, d)
run_sql(con, query, d)


def sql_count(con: Connection, table: str) -> int:
Expand All @@ -582,16 +585,6 @@ def sql_in_safe(lst: List) -> (str, dict):
return (query, params)


def run_sql(con: Connection, command: str, params: Dict = {}) -> Any:
"""Run a sql command"""
command = text(command) if not isinstance(command, TextClause) else command
if params:
q = con.execute(command, params) if params else con.execute(command)
else:
q = con.execute(command)
return q


def list_schema_objects(
con: Connection = None, schema: str = None, filter_views=False, filter_sequences=False,
filter_temp_refresh_objects=False, filter_tables=False, names_only=False, verbose=True
Expand Down
67 changes: 0 additions & 67 deletions backend/routes/cset_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,70 +173,6 @@ class CsetsGitUpdate(BaseModel):
row_index_data_map: Dict[int, Dict[str, Any]] = {}


# TODO: (i) move most of this functionality out of route into separate function (potentially keeping this route which
# simply calls that function as well), (ii) can then connect that function as step in the routes that coordinate
# enclave uploads
# TODO: git/patch changes: https://github.com/jhu-bids/TermHub/issues/165#issuecomment-1276557733
def csets_git_update(dataset_path: str, row_index_data_map: Dict[int, Dict[str, Any]]) -> Dict:
"""Update cset dataset. Works only on tabular files."""
# Vars
result = 'success'
details = ''
cset_dir = os.path.join(PROJECT_DIR, 'termhub-csets')
path_root = os.path.join(cset_dir, 'datasets')

# Update cset
# todo: dtypes need to be registered somewhere. perhaps a <CSV_NAME>_codebook.json()?, accessed based on filename,
# and inserted here
# todo: check git status first to ensure clean? maybe doesn't matter since we can just add by filename
path = os.path.join(path_root, dataset_path)
# noinspection PyBroadException
try:
df = pd.read_csv(path, dtype={'id': np.int32, 'last_name': str, 'first_name': str}).fillna('')
for index, field_values in row_index_data_map.items():
for field, value in field_values.items():
df.at[index, field] = value
df.to_csv(path, index=False)
except BaseException as err:
result = 'failure'
details = str(err)

# Push commit
# todo?: Correct git status after change should show something like this near end: `modified: FILENAME`
relative_path = os.path.join('datasets', dataset_path)
# todo: Want to see result as string? only getting int: 1 / 0
# ...answer: it's being printed to stderr and stdout. I remember there's some way to pipe and capture if needed
# TODO: What if the update resulted in no changes? e.g. changed values were same?
git_add_result = sp_call(f'git add {relative_path}'.split(), cwd=cset_dir)
if git_add_result != 0:
result = 'failure'
details = f'Error: Git add: {dataset_path}'
git_commit_result = sp_call(['git', 'commit', '-m', f'Updated by server: {relative_path}'], cwd=cset_dir)
if git_commit_result != 0:
result = 'failure'
details = f'Error: Git commit: {dataset_path}'
git_push_result = sp_call('git push origin HEAD:main'.split(), cwd=cset_dir)
if git_push_result != 0:
result = 'failure'
details = f'Error: Git push: {dataset_path}'

return {'result': result, 'details': details}


# todo: Maybe change to `id` instead of row index
# TODO: obsolete?
@router.put("/datasets/csets")
def put_csets_update(d: CsetsGitUpdate = None) -> Dict:
"""HTTP PUT wrapper for csets_update()"""
return csets_git_update(d.dataset_path, d.row_index_data_map)


@router.put("/datasets/vocab")
def vocab_update():
"""Update vocab dataset"""
pass


@router.post("/create-new-draft-omop-concept-set-version")
def route_create_new_draft_omop_concept_set_version(d: UploadJsonNewCsetVersionWithConcepts) -> Dict:
"""Upload new version of existing container, with concepets"""
Expand Down Expand Up @@ -345,7 +281,4 @@ def route_csv_upload_new_container_with_concepts(data: UploadCsvVersionWithConce
# noinspection PyTypeChecker
df = pd.read_csv(StringIO(data.dict()['csv'])).fillna('')
response: Dict = upload_new_cset_container_with_concepts_from_csv(df=df)
# print('CSV upload result: ')
# print(json.dumps(response, indent=2))
return response

Loading

0 comments on commit 74806a6

Please sign in to comment.