From 07249a8a14275ade41e6926fd3dd90f9c6c074ec Mon Sep 17 00:00:00 2001 From: Joe Flack Date: Sun, 24 Nov 2024 19:09:05 -0500 Subject: [PATCH] Sync Draft Expression Items - Now syncing added or deleted expression items while csets are drafts or are finalized. General - Bug fix: When adding expression items, values for 'annotation' and 'sourceApplciation' fields were being dropped. --- .../db/resolve_fetch_failures_0_members.py | 10 +++- backend/db/utils.py | 13 +++- enclave_wrangler/models.py | 4 +- enclave_wrangler/objects_api.py | 59 ++++++++++++++++++- 4 files changed, 77 insertions(+), 9 deletions(-) diff --git a/backend/db/resolve_fetch_failures_0_members.py b/backend/db/resolve_fetch_failures_0_members.py index 881ad0af3..fdae7c5f2 100644 --- a/backend/db/resolve_fetch_failures_0_members.py +++ b/backend/db/resolve_fetch_failures_0_members.py @@ -20,8 +20,8 @@ from backend.db.resolve_fetch_failures_excess_items import resolve_fetch_failures_excess_items from backend.db.utils import SCHEMA, fetch_status_set_success, get_db_connection, reset_temp_refresh_tables,\ run_sql, select_failed_fetches, refresh_derived_tables, sql_in, sql_query -from enclave_wrangler.objects_api import csets_and_members_to_db, fetch_cset_and_member_objects, fetch_cset_version, \ - get_csets_over_threshold, update_cset_metadata_from_objs +from enclave_wrangler.objects_api import csets_and_members_to_db, fetch_cset_and_member_objects, fetch_cset_version, \ + get_csets_over_threshold, sync_expressions_for_csets, update_cset_metadata_from_objs DESC = "Resolve any failures resulting from fetching data from the Enclave's objects API." @@ -225,14 +225,18 @@ def resolve_fetch_failures_0_members( # Check for new failures: that may have occurred during runtime failed_cset_ids, failure_lookup_i = get_failures_0_members(version_ids, use_local_db, force) failure_lookup.update(failure_lookup_i) + # Fetch data csets_and_members: Dict[str, List[Dict]] = fetch_cset_and_member_objects( codeset_ids=list(failed_cset_ids), flag_issues=False) - # - identify & report discarded drafts + # Sync updates with get_db_connection(schema=schema, local=use_local_db) as con: + # - identify & report discarded drafts discarded_cset_ids = handle_discarded_drafts( con, csets_and_members, failed_cset_ids, failure_lookup, use_local_db=use_local_db) + # - sync expression item additions or deletions (todo: updates too) + sync_expressions_for_csets(csets_and_members['OMOPConceptSet'], con, schema) if not csets_and_members: return # all failures were discarded diff --git a/backend/db/utils.py b/backend/db/utils.py index 243c31364..1d2585133 100644 --- a/backend/db/utils.py +++ b/backend/db/utils.py @@ -216,7 +216,7 @@ def refresh_derived_tables_exec( # refreshes on normal schema and test_schema don't block each other. Only a minor issue. Will sometimes cause tests # to take a very long time to run, especially during the wee hours when vocab/counts refreshes are running. def refresh_derived_tables( - con: Connection, independent_tables: List[str] = CORE_CSET_TABLES, schema=SCHEMA, local=False, + con: Connection, independent_tables: Union[str, List[str]] = CORE_CSET_TABLES, schema=SCHEMA, local=False, polling_interval_seconds: int = 30 ): """Refresh TermHub core cset derived tables: wrapper function @@ -228,6 +228,7 @@ def refresh_derived_tables( """ i = 0 t0 = datetime.now() + independent_tables: List[str] = [independent_tables] if isinstance(independent_tables, str) else independent_tables while True: i += 1 if (datetime.now() - t0).total_seconds() >= 2 * 60 * 60: # 2 hours @@ -568,14 +569,22 @@ def sql_query_single_col(*argv) -> List: return [r[0] for r in results] +# todo: consider adding 'schema' param def delete_obj_by_composite_key(con, table: str, key_ids: Dict[str, Union[str, int]]): - """Get object by ID""" + """Delete object by ID""" keys_str = ' AND '.join([f'{key} = (:{key})' for key in key_ids.keys()]) return run_sql( con, f'DELETE FROM {table} WHERE {keys_str}', {f'{key}': _id for key, _id in key_ids.items()}) +# todo: consider adding 'schema' param +def delete_obj_by_pk(con, table: str, pk_field: str, pk: Union[str, int]): + """Delete object by primary key""" + return run_sql( + con, f'DELETE FROM {table} WHERE {pk_field} = :{pk_field}',{pk_field: pk}) + + def get_obj_by_composite_key(con, table: str, keys: List[str], obj: Dict) -> List[RowMapping]: """Get object by ID todo: could be made more consistent w/ get_obj_by_id(): accept obj_id instead?""" diff --git a/enclave_wrangler/models.py b/enclave_wrangler/models.py index 9937c228a..ed3c2f69a 100644 --- a/enclave_wrangler/models.py +++ b/enclave_wrangler/models.py @@ -281,7 +281,9 @@ def add_mappings(csv_str: str): includeMapped, includeMapped isExcluded, isExcluded createdBy, created_by - createdAt, created_at""") + createdAt, created_at + annotation, annotation + sourceApplication, source_application""") # OMOPConceptSet (Version): object <-> dataset add_mappings( diff --git a/enclave_wrangler/objects_api.py b/enclave_wrangler/objects_api.py index 991825f7c..59f3989c4 100644 --- a/enclave_wrangler/objects_api.py +++ b/enclave_wrangler/objects_api.py @@ -42,9 +42,8 @@ get_objects_df, get_url_from_api_path, \ make_objects_request from enclave_wrangler.models import OBJECT_TYPE_TABLE_MAP, convert_row, get_field_names, field_name_mapping, pkey -from backend.db.utils import SCHEMA, dedupe_dicts, get_field_data_types, insert_fetch_statuses, insert_from_dict, \ - insert_from_dicts, \ - is_refresh_active, reset_temp_refresh_tables, refresh_derived_tables, \ +from backend.db.utils import SCHEMA, dedupe_dicts, delete_obj_by_pk, get_field_data_types, insert_fetch_statuses, \ + insert_from_dict, insert_from_dicts, is_refresh_active, reset_temp_refresh_tables, refresh_derived_tables, \ sql_in, sql_query, sql_query_single_col, run_sql, get_db_connection, update_from_dicts from backend.db.queries import get_concepts from backend.utils import call_github_action @@ -1123,6 +1122,60 @@ def update_cset_metadata_from_objs( conn.close() +def sync_cset_expression_changes(cset: Dict[str, Any], con: Connection = None): + """Sync: Add or delete expression item changes as needed. + + todo: Sync expression updates (example: includeDescendants true -> false) + - exps_enclave_db_format useful for that + - if doing this, best way to do it is to repurpose update_cset_metadata_from_objs(), cset_obj_field_datatypes(), + and cset_objs_set_missing_fields_to_null(), to be generalized, to work with csets (1st, previous use case) and + expression items (this new use case). For update_cset_metadata_from_objs(), would have to (a) create a new audit + table just for expression items, or (b) set a flag to not do auditing in this case. + todo: audit table for concept_set_version_items deletions + """ + conn = con if con else get_db_connection() + cst_id: int = cset['properties']['codesetId'] + # Collect data + # - enclave + exps_enclave: List[Dict] = [x['properties'] for x in cset['expression_items']] + # exps_enclave_db_format = [convert_row('OmopConceptSetVersionItem', 'concept_set_version_item', x) for x in exps_enclave] + exps_enclave: Dict[int, Dict] = {x['conceptId']: x for x in exps_enclave} + # exps_enclave_db_format: Dict[int, Dict] = {x['concept_id']: x for x in exps_enclave_db_format} + # - db + qry = f"SELECT * FROM concept_set_version_item WHERE codeset_id = {cst_id};" + exps_db: List[Dict] = [dict(x) for x in sql_query(conn, qry)] + exps_db: Dict[int, Dict] = {x['concept_id']: x for x in exps_db} + + # Detect changes + # - additions + additions = {k: v for k, v in exps_enclave.items() if k not in exps_db} + # - deletions + deletions = {k: v for k, v in exps_db.items() if k not in exps_enclave} + + # Make updates + # - additions + add_objects_to_db(conn, 'OmopConceptSetVersionItem', list(additions.values())) + # - deletions + for exp in deletions.values(): + delete_obj_by_pk(conn, 'concept_set_version_item', 'item_id', exp['item_id']) + return any([bool(x) for x in [additions, deletions]]) + + +def sync_expressions_for_csets(csets: List[Dict], con: Connection = None, schema=SCHEMA) -> bool: + """Sync expression item changes between Enclave and DB. + + todo: There will be a redundancy if refreshing derived tables is done here, and then later in the fetch failures + script in the event that a draft was finalized. Ideally refresh_derived_tables() would only be done 1x in such + situations.""" + conn = con if con else get_db_connection() + changes = 0 + for cset in csets: + changes += sync_cset_expression_changes(cset, conn) + if changes: + refresh_derived_tables(con, 'concept_set_version_item', schema=schema) + return bool(changes) + + def _get_cset_row_diffs_by_field(list1: List[Dict], list2: List[Dict]) -> Dict[int, Dict[str, Tuple[Any, Any]]]: """Get differences between two sets of rows of the same type of codeset, e.g. 1 from DB and 1 from Enclave.