Skip to content

Commit

Permalink
Merge pull request #987 from jhu-bids/sync-draft-expressions
Browse files Browse the repository at this point in the history
Sync Draft Expression Items
  • Loading branch information
joeflack4 authored Nov 25, 2024
2 parents e7955d5 + 07249a8 commit fb58c94
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
10 changes: 7 additions & 3 deletions backend/db/resolve_fetch_failures_0_members.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from backend.db.resolve_fetch_failures_excess_items import resolve_fetch_failures_excess_items
from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, reset_temp_refresh_tables,\
run_sql, select_failed_fetches, refresh_derived_tables, sql_in, sql_query
from enclave_wrangler.objects_api import csets_and_members_to_db, fetch_cset_and_member_objects, fetch_cset_version, \
get_csets_over_threshold, update_cset_metadata_from_objs
from enclave_wrangler.objects_api import csets_and_members_to_db, fetch_cset_and_member_objects, fetch_cset_version, \
get_csets_over_threshold, sync_expressions_for_csets, update_cset_metadata_from_objs

DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API."

Expand Down Expand Up @@ -225,14 +225,18 @@ def resolve_fetch_failures_0_members(
# Check for new failures: that may have occurred during runtime
failed_cset_ids, failure_lookup_i = get_failures_0_members(version_ids, use_local_db, force)
failure_lookup.update(failure_lookup_i)

# Fetch data
csets_and_members: Dict[str, List[Dict]] = fetch_cset_and_member_objects(
codeset_ids=list(failed_cset_ids), flag_issues=False)

# - identify & report discarded drafts
# Sync updates
with get_db_connection(schema=schema, local=use_local_db) as con:
# - identify & report discarded drafts
discarded_cset_ids = handle_discarded_drafts(
con, csets_and_members, failed_cset_ids, failure_lookup, use_local_db=use_local_db)
# - sync expression item additions or deletions (todo: updates too)
sync_expressions_for_csets(csets_and_members['OMOPConceptSet'], con, schema)

if not csets_and_members:
return # all failures were discarded
Expand Down
13 changes: 11 additions & 2 deletions backend/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def refresh_derived_tables_exec(
# refreshes on normal schema and test_schema don't block each other. Only a minor issue. Will sometimes cause tests
# to take a very long time to run, especially during the wee hours when vocab/counts refreshes are running.
def refresh_derived_tables(
con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, local=False,
con: Connection, independent_tables: Union[str, List[str]] = CORE_CSET_TABLES, schema=SCHEMA, local=False,
polling_interval_seconds: int = 30
):
"""Refresh TermHub core cset derived tables: wrapper function
Expand All @@ -228,6 +228,7 @@ def refresh_derived_tables(
"""
i = 0
t0 = datetime.now()
independent_tables: List[str] = [independent_tables] if isinstance(independent_tables, str) else independent_tables
while True:
i += 1
if (datetime.now() - t0).total_seconds() >= 2 * 60 * 60: # 2 hours
Expand Down Expand Up @@ -568,14 +569,22 @@ def sql_query_single_col(*argv) -> List:
return [r[0] for r in results]


# todo: consider adding 'schema' param
def delete_obj_by_composite_key(con, table: str, key_ids: Dict[str, Union[str, int]]):
"""Get object by ID"""
"""Delete object by ID"""
keys_str = ' AND '.join([f'{key} = (:{key})' for key in key_ids.keys()])
return run_sql(
con, f'DELETE FROM {table} WHERE {keys_str}',
{f'{key}': _id for key, _id in key_ids.items()})


# todo: consider adding 'schema' param
def delete_obj_by_pk(con, table: str, pk_field: str, pk: Union[str, int]):
"""Delete object by primary key"""
return run_sql(
con, f'DELETE FROM {table} WHERE {pk_field} = :{pk_field}',{pk_field: pk})


def get_obj_by_composite_key(con, table: str, keys: List[str], obj: Dict) -> List[RowMapping]:
"""Get object by ID
todo: could be made more consistent w/ get_obj_by_id(): accept obj_id instead?"""
Expand Down
4 changes: 3 additions & 1 deletion enclave_wrangler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def add_mappings(csv_str: str):
includeMapped, includeMapped
isExcluded, isExcluded
createdBy, created_by
createdAt, created_at""")
createdAt, created_at
annotation, annotation
sourceApplication, source_application""")

# OMOPConceptSet (Version): object <-> dataset
add_mappings(
Expand Down
59 changes: 56 additions & 3 deletions enclave_wrangler/objects_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@
get_objects_df, get_url_from_api_path, \
make_objects_request
from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey
from backend.db.utils import SCHEMA, dedupe_dicts, get_field_data_types, insert_fetch_statuses, insert_from_dict, \
insert_from_dicts, \
is_refresh_active, reset_temp_refresh_tables, refresh_derived_tables, \
from backend.db.utils import SCHEMA, dedupe_dicts, delete_obj_by_pk, get_field_data_types, insert_fetch_statuses, \
insert_from_dict, insert_from_dicts, is_refresh_active, reset_temp_refresh_tables, refresh_derived_tables, \
sql_in, sql_query, sql_query_single_col, run_sql, get_db_connection, update_from_dicts
from backend.db.queries import get_concepts
from backend.utils import call_github_action
Expand Down Expand Up @@ -1123,6 +1122,60 @@ def update_cset_metadata_from_objs(
conn.close()


def sync_cset_expression_changes(cset: Dict[str, Any], con: Connection = None):
"""Sync: Add or delete expression item changes as needed.
todo: Sync expression updates (example: includeDescendants true -> false)
- exps_enclave_db_format useful for that
- if doing this, best way to do it is to repurpose update_cset_metadata_from_objs(), cset_obj_field_datatypes(),
and cset_objs_set_missing_fields_to_null(), to be generalized, to work with csets (1st, previous use case) and
expression items (this new use case). For update_cset_metadata_from_objs(), would have to (a) create a new audit
table just for expression items, or (b) set a flag to not do auditing in this case.
todo: audit table for concept_set_version_items deletions
"""
conn = con if con else get_db_connection()
cst_id: int = cset['properties']['codesetId']
# Collect data
# - enclave
exps_enclave: List[Dict] = [x['properties'] for x in cset['expression_items']]
# exps_enclave_db_format = [convert_row('OmopConceptSetVersionItem', 'concept_set_version_item', x) for x in exps_enclave]
exps_enclave: Dict[int, Dict] = {x['conceptId']: x for x in exps_enclave}
# exps_enclave_db_format: Dict[int, Dict] = {x['concept_id']: x for x in exps_enclave_db_format}
# - db
qry = f"SELECT * FROM concept_set_version_item WHERE codeset_id = {cst_id};"
exps_db: List[Dict] = [dict(x) for x in sql_query(conn, qry)]
exps_db: Dict[int, Dict] = {x['concept_id']: x for x in exps_db}

# Detect changes
# - additions
additions = {k: v for k, v in exps_enclave.items() if k not in exps_db}
# - deletions
deletions = {k: v for k, v in exps_db.items() if k not in exps_enclave}

# Make updates
# - additions
add_objects_to_db(conn, 'OmopConceptSetVersionItem', list(additions.values()))
# - deletions
for exp in deletions.values():
delete_obj_by_pk(conn, 'concept_set_version_item', 'item_id', exp['item_id'])
return any([bool(x) for x in [additions, deletions]])


def sync_expressions_for_csets(csets: List[Dict], con: Connection = None, schema=SCHEMA) -> bool:
"""Sync expression item changes between Enclave and DB.
todo: There will be a redundancy if refreshing derived tables is done here, and then later in the fetch failures
script in the event that a draft was finalized. Ideally refresh_derived_tables() would only be done 1x in such
situations."""
conn = con if con else get_db_connection()
changes = 0
for cset in csets:
changes += sync_cset_expression_changes(cset, conn)
if changes:
refresh_derived_tables(con, 'concept_set_version_item', schema=schema)
return bool(changes)


def _get_cset_row_diffs_by_field(list1: List[Dict], list2: List[Dict]) -> Dict[int, Dict[str, Tuple[Any, Any]]]:
"""Get differences between two sets of rows of the same type of codeset, e.g. 1 from DB and 1 from Enclave.
Expand Down

0 comments on commit fb58c94

Please sign in to comment.