From 1edc519895fbee4127a4a8065ab32a0e847d801e Mon Sep 17 00:00:00 2001 From: xkmato Date: Mon, 15 Jul 2024 12:06:25 +0300 Subject: [PATCH] Initial clean to get cli commands to work --- pyproject.toml | 7 +- setup.py | 3 +- tests/test_cql_tools.py | 37 ++++-- tests/test_logical_model_generator.py | 8 +- tests/test_questionnaire_generator.py | 4 +- who_l3_smart_tools/cli/indicator_testing.py | 8 +- who_l3_smart_tools/cli/logical_model_gen.py | 8 +- who_l3_smart_tools/cli/terminology.py | 2 - .../core/cql_tools/cql_file_generator.py | 18 ++- .../logical_models/logical_model_generator.py | 125 ++++++++++++------ .../questionnaires/questionnaire_generator.py | 58 +++++--- .../core/terminology/terminology.py | 3 +- who_l3_smart_tools/utils/__init__.py | 9 +- 13 files changed, 196 insertions(+), 94 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 88c8d2e..0c2baa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,12 +16,12 @@ fhirpy = ">=1.4" Faker = ">=25.0" inflect = "^7.3.0" python-slugify = ">=8.0.4" +stringcase = ">=1.2" [tool.poetry.group.dev.dependencies] pytest = ">=6" flake8 = ">=3.8" Sphinx = ">=3.2" -stringcase = ">=1.2" pyright = "^1.1.367" [tool.pyright] @@ -31,3 +31,8 @@ venv = ".venv" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +logical_model_gen = "who_l3_smart_tools.cli.logical_model_gen:main" +indicator_testing = "who_l3_smart_tools.cli.indicator_testing:main" +terminology = "who_l3_smart_tools.cli.terminology:main" diff --git a/setup.py b/setup.py index ea17fd9..9e691f3 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ -from setuptools import setup, find_packages - +from setuptools import find_packages, setup setup( name="who_l3_smart_tools", diff --git a/tests/test_cql_tools.py b/tests/test_cql_tools.py index c22246d..e3b4ad3 100644 --- a/tests/test_cql_tools.py +++ b/tests/test_cql_tools.py @@ -3,17 +3,20 @@ import os import re from who_l3_smart_tools.core.cql_tools.cql_file_generator import CqlFileGenerator -from who_l3_smart_tools.core.cql_tools.cql_resource_generator import CqlResourceGenerator +from who_l3_smart_tools.core.cql_tools.cql_resource_generator import ( + CqlResourceGenerator, +) import pandas as pd import unittest import stringcase + class TestCqlTools(unittest.TestCase): def test_generate_cql_file_headers(self): input_indicators = "tests/data/l2/test_indicators.xlsx" input_dd = "tests/data/l2/test_dd.xlsx" output_dir = "tests/output/cql/templates/" - + # Make sure output directory exists if not os.path.exists(output_dir): os.makedirs(output_dir) @@ -26,7 +29,6 @@ def test_generate_cql_file_headers(self): assert os.path.exists(os.path.join(output_dir, "HIVIND2Logic.cql")) - def test_generate_concepts_cql(self): input_indicators = "tests/data/l2/test_indicators.xlsx" input_dd = "tests/data/l2/test_dd.xlsx" @@ -34,7 +36,7 @@ def test_generate_concepts_cql(self): if not os.path.exists(output_dir): os.makedirs(output_dir) - + generator = CqlFileGenerator(input_indicators, input_dd) generator.generate_cql_concept_file(output_dir=output_dir) @@ -45,7 +47,7 @@ def test_generate_concepts_cql(self): class TestCqlResourceGenerator(unittest.TestCase): def setUp(self): # since we're comparing text, it's useful to have large diffs - self.maxDiff=5000 + self.maxDiff = 5000 # Load example CQL from data directory cql_file_path = "tests/data/example_cql_HIV27.cql" @@ -59,11 +61,13 @@ def setUp(self): indicator_file_path, sheet_name="Indicator definitions" ) - self.indicator_row = indicator_file[indicator_file['DAK ID'] == 'HIV.IND.27'].head(1).squeeze() + self.indicator_row = ( + indicator_file[indicator_file["DAK ID"] == "HIV.IND.27"].head(1).squeeze() + ) - self.generator = CqlResourceGenerator(self.cql_content, { - self.indicator_row['DAK ID']: self.indicator_row - }) + self.generator = CqlResourceGenerator( + self.cql_content, {self.indicator_row["DAK ID"]: self.indicator_row} + ) def test_parse_cql_with_valid_content(self): parsed_cql = self.generator.parsed_cql @@ -102,7 +106,9 @@ def test_generate_measure_fsh(self): assert measure_fsh is not None - output_file = f"tests/output/fsh/{stringcase.alphanumcase(p["library_name"])}_measure.fsh" + output_file = ( + f"tests/output/fsh/{stringcase.alphanumcase(p["library_name"])}_measure.fsh" + ) if os.path.exists(output_file): os.remove(output_file) @@ -115,12 +121,13 @@ def test_generate_measure_fsh(self): # The date is always the date the measure was generated, so we need to update it expected_measure_fsh = expected_measure_fsh.replace( '* date = "2024-06-14"', - f'* date = "{datetime.datetime.now(datetime.timezone.utc).date():%Y-%m-%d}"' + f'* date = "{datetime.datetime.now(datetime.timezone.utc).date():%Y-%m-%d}"', ) self.assertIsNotNone(measure_fsh) self.assertEqual(expected_measure_fsh, measure_fsh) + class TestCqlGeneratorOnAllFiles(unittest.TestCase): def test_resource_gen_for_all(self): @@ -144,7 +151,6 @@ def test_resource_gen_for_all(self): if not os.path.exists(os.path.join(output_directory, subfolder)): os.makedirs(os.path.join(output_directory, subfolder)) - # For each cql file, generate library resources. Only generate measures for # cql files with corresponding indicator definitions. for cql_file in os.listdir(input_directory): @@ -175,9 +181,14 @@ def test_resource_gen_for_all(self): # Create Measure file and save to file measure_fsh = generator.generate_measure_fsh() if measure_fsh: - output_file = os.path.join(output_directory, "measures", f"{stringcase.alphanumcase(generator.get_library_name())}.fsh") + output_file = os.path.join( + output_directory, + "measures", + f"{stringcase.alphanumcase(generator.get_library_name())}.fsh", + ) with open(output_file, "w") as f: f.write(measure_fsh) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_logical_model_generator.py b/tests/test_logical_model_generator.py index 2b14c55..ab3b74f 100644 --- a/tests/test_logical_model_generator.py +++ b/tests/test_logical_model_generator.py @@ -25,7 +25,9 @@ def test_generate_fsh_from_excel(self): with open(output_file, "r") as f: fsh_artifact = f.read() - with open(os.path.join("tests", "data", "example_fsh", "HIVARegistration.fsh"), "r") as f: + with open( + os.path.join("tests", "data", "example_fsh", "HIVARegistration.fsh"), "r" + ) as f: expected_fsh_artifact = f.read() self.assertEqual(expected_fsh_artifact, fsh_artifact) @@ -37,7 +39,9 @@ def setUp(self) -> None: self.output_dir = os.path.join("tests", "output", "fsh") def test_full_data_dictionary(self): - generator = LogicalModelAndTerminologyGenerator(self.input_file, self.output_dir) + generator = LogicalModelAndTerminologyGenerator( + self.input_file, self.output_dir + ) generator.generate_fsh_from_excel() diff --git a/tests/test_questionnaire_generator.py b/tests/test_questionnaire_generator.py index e7abc87..62521c8 100644 --- a/tests/test_questionnaire_generator.py +++ b/tests/test_questionnaire_generator.py @@ -1,6 +1,8 @@ import os import unittest -from who_l3_smart_tools.core.questionnaires.questionnaire_generator import QuestionnaireGenerator +from who_l3_smart_tools.core.questionnaires.questionnaire_generator import ( + QuestionnaireGenerator, +) class TestQuestionnaireGenerator(unittest.TestCase): diff --git a/who_l3_smart_tools/cli/indicator_testing.py b/who_l3_smart_tools/cli/indicator_testing.py index ab55aaa..6c685b8 100644 --- a/who_l3_smart_tools/cli/indicator_testing.py +++ b/who_l3_smart_tools/cli/indicator_testing.py @@ -1,11 +1,13 @@ +import argparse import datetime import os -import argparse from pathlib import Path -import pandas as pd + from who_l3_smart_tools.core.indicator_testing.bundle_generator import BundleGenerator from who_l3_smart_tools.core.indicator_testing.data_generator import DataGenerator -from who_l3_smart_tools.core.indicator_testing.scaffolding_generator import ScaffoldingGenerator +from who_l3_smart_tools.core.indicator_testing.scaffolding_generator import ( + ScaffoldingGenerator, +) def generate_test_scaffold(input_file): diff --git a/who_l3_smart_tools/cli/logical_model_gen.py b/who_l3_smart_tools/cli/logical_model_gen.py index 953fefe..293fd85 100644 --- a/who_l3_smart_tools/cli/logical_model_gen.py +++ b/who_l3_smart_tools/cli/logical_model_gen.py @@ -1,5 +1,7 @@ import argparse -from who_l3_smart_tools.core.logical_models.logical_model_generator import LogicalModelAndTerminologyGenerator +from who_l3_smart_tools.core.logical_models.logical_model_generator import ( + LogicalModelAndTerminologyGenerator, +) def main(): @@ -21,7 +23,9 @@ def main(): args = parser.parse_args() - LogicalModelAndTerminologyGenerator(args.input, args.output).generate_fsh_from_excel() + LogicalModelAndTerminologyGenerator( + args.input, args.output + ).generate_fsh_from_excel() if __name__ == "__main__": diff --git a/who_l3_smart_tools/cli/terminology.py b/who_l3_smart_tools/cli/terminology.py index ed7eb3f..1f393a0 100755 --- a/who_l3_smart_tools/cli/terminology.py +++ b/who_l3_smart_tools/cli/terminology.py @@ -4,8 +4,6 @@ import os import sys -sys.path.insert(0, os.getcwd()) - from who_l3_smart_tools.core.terminology.who.terminology import HIVTerminology diff --git a/who_l3_smart_tools/core/cql_tools/cql_file_generator.py b/who_l3_smart_tools/core/cql_tools/cql_file_generator.py index 6f30a74..829628d 100644 --- a/who_l3_smart_tools/core/cql_tools/cql_file_generator.py +++ b/who_l3_smart_tools/core/cql_tools/cql_file_generator.py @@ -3,7 +3,10 @@ import stringcase import pandas as pd -from who_l3_smart_tools.utils.cql_helpers import determine_scoring_suggestion, get_dak_name +from who_l3_smart_tools.utils.cql_helpers import ( + determine_scoring_suggestion, + get_dak_name, +) # Templates cql_file_header_template = """/* @@ -81,7 +84,10 @@ def print_to_files(self, output_dir: str, update_existing: bool = True): """ This method writes the CQL scaffolds to files in the output directory. """ - last_generated_line = ["include FHIRCommon called FC\n", "using FHIR version '4.0.1'\n"] + last_generated_line = [ + "include FHIRCommon called FC\n", + "using FHIR version '4.0.1'\n", + ] for indicator_name, scaffold in self.cql_scaffolds.items(): file_name = indicator_name.replace(".", "") @@ -92,8 +98,12 @@ def print_to_files(self, output_dir: str, update_existing: bool = True): with open(f"{output_dir}/{file_name}Logic.cql", "r") as file: # Read up to the last generated line lines = file.readlines() - last_generated_line_index = lines.index(last_generated_line[0]) if last_generated_line[0] in lines else lines.index(last_generated_line[1]) - if last_generated_line_index == -1: + last_generated_line_index = ( + lines.index(last_generated_line[0]) + if last_generated_line[0] in lines + else lines.index(last_generated_line[1]) + ) + if last_generated_line_index == -1: raise ValueError( f"Could not find last generated line in {file_name}Logic.cql" ) diff --git a/who_l3_smart_tools/core/logical_models/logical_model_generator.py b/who_l3_smart_tools/core/logical_models/logical_model_generator.py index bf2611e..d05b6c6 100644 --- a/who_l3_smart_tools/core/logical_models/logical_model_generator.py +++ b/who_l3_smart_tools/core/logical_models/logical_model_generator.py @@ -114,7 +114,7 @@ def __init__(self, input_file, output_dir): def generate_fsh_from_excel(self): # create output structure - for dir in[self.models_dir, self.codesystem_dir, self.valuesets_dir]: + for dir in [self.models_dir, self.codesystem_dir, self.valuesets_dir]: if not os.path.exists(dir): os.makedirs(dir) @@ -164,7 +164,7 @@ def generate_fsh_from_excel(self): validations = self.parse_validations(df) # Template for invariants based on validation conditions - for (validation, data_ids) in validations.items(): + for validation, data_ids in validations.items(): id = self.invariants_dict[short_name[1]].next invariant_id = f"{short_name[0]}-{short_name[1]}-{id}" if type(validation) == str: @@ -173,21 +173,27 @@ def generate_fsh_from_excel(self): description = "" expression = "" - fsh_artifact += fsh_invariant_template.format( - invariant_id=invariant_id, - description=description, - expression=expression, - ) + "\n" + fsh_artifact += ( + fsh_invariant_template.format( + invariant_id=invariant_id, + description=description, + expression=expression, + ) + + "\n" + ) for data_element_id in data_ids: validation_lookup[data_element_id] = invariant_id # Generate the FSH logical model header based on the sheet name - fsh_header = fsh_lm_header_template.format( - name=clean_name, - title=sheet_name, - description=cover_info[sheet_name.upper()], - ) + "\n" + fsh_header = ( + fsh_lm_header_template.format( + name=clean_name, + title=sheet_name, + description=cover_info[sheet_name.upper()], + ) + + "\n" + ) fsh_artifact += fsh_header @@ -212,11 +218,17 @@ def generate_fsh_from_excel(self): else: # equalize spaces - label = label.strip().replace('*', '').replace('[', '').replace(']', '').replace('"', "'") + label = ( + label.strip() + .replace("*", "") + .replace("[", "") + .replace("]", "") + .replace('"', "'") + ) # remove many special characters - label_clean = (label - .replace("(", "") + label_clean = ( + label.replace("(", "") .replace(")", "") .replace("'s", "") .replace("-", "_") @@ -227,7 +239,8 @@ def generate_fsh_from_excel(self): .replace("<=", "less than") .replace(">", "more than") .replace("<", "less than") - .lower()) + .lower() + ) else: label = "" label_clean = "" @@ -245,11 +258,13 @@ def generate_fsh_from_excel(self): if required == "C": required_condition = row["Explain Conditionality"] - codes.append({ - "code": data_element_id, - "label": label, - "description": description - }) + codes.append( + { + "code": data_element_id, + "label": label, + "description": description, + } + ) # handle ValueSets # First we identify a ValueSet @@ -261,19 +276,25 @@ def generate_fsh_from_excel(self): "name": data_element_id.replace(".", ""), "title": f"{label} ValueSet", "description": f"Value set of {description[0].lower() + description[1:] if description[0].isupper() and not description.startswith("HIV") else description}", - "codes": [] + "codes": [], } valuesets.append(active_valueset) # Then we identify the codes for the ValueSet - elif data_type == "Codes" and multiple_choice_type == "Input Option": + elif ( + data_type == "Codes" and multiple_choice_type == "Input Option" + ): if active_valueset is None: - print(f"Attempted to create a member of a ValueSet without a ValueSet context for code {data_element_id}", sys.stderr) + print( + f"Attempted to create a member of a ValueSet without a ValueSet context for code {data_element_id}", + sys.stderr, + ) else: - active_valueset['codes'].append({ - "code": f"{code_system}#{data_element_id}", - "label": f"{label}" - }) - + active_valueset["codes"].append( + { + "code": f"{code_system}#{data_element_id}", + "label": f"{label}", + } + ) # If row is a value in a valueset, skip since the info is in Terminology if data_type == "Codes": @@ -291,21 +312,31 @@ def generate_fsh_from_excel(self): # number, using the inflect library if len(label_camel) > 0 and not label_camel[0].isalpha(): try: - prefix, rest = re.split(r'(?=[a-zA-Z])', label_camel, 1) + prefix, rest = re.split(r"(?=[a-zA-Z])", label_camel, 1) except: prefix, rest = label_camel, "" if prefix.isnumeric(): - prefix = camel_case(inflect_engine.number_to_words(int(prefix)).replace("-", "_")) + prefix = camel_case( + inflect_engine.number_to_words(int(prefix)).replace( + "-", "_" + ) + ) else: - print("Did not know how to handle element prefix:", sheet_name, data_element_id, prefix, file=sys.stderr) + print( + "Did not know how to handle element prefix:", + sheet_name, + data_element_id, + prefix, + file=sys.stderr, + ) label_camel = f"{prefix}{rest}" # data elements can only be 64 characters # note that the idea here is that we trim whole words until reaching the desired size if len(label_camel) > 64: - new_label_camel = '' + new_label_camel = "" for label_part in re.split("(?=[A-Z1-9])", label_camel): if len(new_label_camel) + len(label_part) > 64: break @@ -327,13 +358,14 @@ def generate_fsh_from_excel(self): label_camel += suffix # otherwise, shorten the name to include the suffix else: - label_camel = label_camel[:64 - len(suffix)] + suffix - + label_camel = label_camel[: 64 - len(suffix)] + suffix # Process as a normal entry fsh_artifact += fsh_lm_element_template.format( element_name=label_camel, - cardinality=self.map_cardinality(required, multiple_choice_type), + cardinality=self.map_cardinality( + required, multiple_choice_type + ), data_type=self.map_data_type(data_type), label=label, description=description, @@ -381,9 +413,9 @@ def generate_fsh_from_excel(self): if len(codes) > 0: code_system_artifact = fsh_cs_header_template.format( - code_system = code_system, - title = "WHO SMART HIV Concepts CodeSystem", - description = "This code system defines the concepts used in the World Health Organization SMART HIV DAK" + code_system=code_system, + title="WHO SMART HIV Concepts CodeSystem", + description="This code system defines the concepts used in the World Health Organization SMART HIV DAK", ) for code in codes: @@ -403,16 +435,24 @@ def process_cover(self, cover_df): seen_header = False for i, row in cover_df.iterrows(): if not seen_header: - if row.iloc[0] and type(row.iloc[0]) == str and re.match(r"sheet\s*name", row.iloc[0], re.IGNORECASE): + if ( + row.iloc[0] + and type(row.iloc[0]) == str + and re.match(r"sheet\s*name", row.iloc[0], re.IGNORECASE) + ): seen_header = True continue if type(row.iloc[0]) == str and row.iloc[0] != "": key = row.iloc[0].upper() - first_dot_idx = key.find('.') + first_dot_idx = key.find(".") if first_dot_idx >= 0 and first_dot_idx < len(key): if key[first_dot_idx + 1].isspace(): - key = key[0:first_dot_idx] + '.' + key[first_dot_idx + 1:].lstrip() + key = ( + key[0:first_dot_idx] + + "." + + key[first_dot_idx + 1 :].lstrip() + ) cover_data[key] = row.iloc[1] else: @@ -437,7 +477,6 @@ def map_cardinality(self, required_indicator, multiple_choice): return f"{minimum}..{maximum}" - def parse_validations(self, df): # unique_validations = set(df["Validation Condition"]) valids = df.groupby("Validation Condition")["Data Element ID"].groups diff --git a/who_l3_smart_tools/core/questionnaires/questionnaire_generator.py b/who_l3_smart_tools/core/questionnaires/questionnaire_generator.py index 8066e8e..a037458 100644 --- a/who_l3_smart_tools/core/questionnaires/questionnaire_generator.py +++ b/who_l3_smart_tools/core/questionnaires/questionnaire_generator.py @@ -68,7 +68,9 @@ def generate_fsh_from_excel(self): # handle an activity change if type(activity_id) == str and activity_id != current_activity_id: # write out any existing activity - self._write_current_activity(current_activity_id, questionnaire_items) + self._write_current_activity( + current_activity_id, questionnaire_items + ) # start a new activity current_activity_id = activity_id @@ -88,11 +90,15 @@ def generate_fsh_from_excel(self): questionnaire_items.append( questionnaire_item_template.format( - data_element_id = data_element_id, - data_element_label = str(row["Data Element Label"])\ - .replace("*", "").replace('[', '').replace(']', '').replace('"', "'").strip(), - data_type = data_type_map[data_type], - required = "true" if str(row["Required"]) == "R" else "false" + data_element_id=data_element_id, + data_element_label=str(row["Data Element Label"]) + .replace("*", "") + .replace("[", "") + .replace("]", "") + .replace('"', "'") + .strip(), + data_type=data_type_map[data_type], + required="true" if str(row["Required"]) == "R" else "false", ) ) @@ -100,21 +106,28 @@ def generate_fsh_from_excel(self): if data_type == "choice": questionnaire_items.append( questionnaire_item_valueset.format( - data_element_id = data_element_id + data_element_id=data_element_id ) ) self._write_current_activity(current_activity_id, questionnaire_items) - for (activity_code, activity) in self._activities.items(): + for activity_code, activity in self._activities.items(): questionnaire_items = activity.pop("questionnaire_items") with open(os.path.join(self.output_dir, f"{activity_code}.fsh"), "w") as f: - f.write(questionnaire_template.format( - **activity - ) + ("\n" + "".join(questionnaire_items) if len(questionnaire_items) > 0 else "") + "\n") - + f.write( + questionnaire_template.format(**activity) + + ( + "\n" + "".join(questionnaire_items) + if len(questionnaire_items) > 0 + else "" + ) + + "\n" + ) - def _write_current_activity(self, current_activity_id: Union[str, None], questionnaire_items: List[str]): + def _write_current_activity( + self, current_activity_id: Union[str, None], questionnaire_items: List[str] + ): if current_activity_id is not None: if "\n" in current_activity_id: activities = current_activity_id.split("\n") @@ -126,17 +139,26 @@ def _write_current_activity(self, current_activity_id: Union[str, None], questio if " " in activity: activity_code, activity_description = activity.split(" ", 1) activity_desc_camel = camel_case(activity_description) - activity_desc_camel = activity_desc_camel[0].upper() + activity_desc_camel[1:] + activity_desc_camel = ( + activity_desc_camel[0].upper() + activity_desc_camel[1:] + ) else: activity_code = activity - activity_description = activity_desc_camel = activity.split(".", 1)[1] + activity_description = activity_desc_camel = activity.split( + ".", 1 + )[1] if activity_code not in self._activities: self._activities[activity_code] = { "activity_id": f"{activity_code}{activity_desc_camel}", "activity_title": activity_description, - "activity_title_description": activity_description[0].lower() + activity_description[1:], - "questionnaire_items": [] + "activity_title_description": activity_description[ + 0 + ].lower() + + activity_description[1:], + "questionnaire_items": [], } - self._activities[activity_code]["questionnaire_items"] += questionnaire_items + self._activities[activity_code][ + "questionnaire_items" + ] += questionnaire_items diff --git a/who_l3_smart_tools/core/terminology/terminology.py b/who_l3_smart_tools/core/terminology/terminology.py index c720532..11eac93 100644 --- a/who_l3_smart_tools/core/terminology/terminology.py +++ b/who_l3_smart_tools/core/terminology/terminology.py @@ -3,6 +3,7 @@ from typing import Optional, Union from openpyxl import load_workbook + from who_l3_smart_tools.core.terminology.schema import ConceptSchema @@ -266,7 +267,7 @@ def _convert_rows(self) -> None: sheet = workbook[sheet_name] header: Optional[list[str]] = None for row in sheet.iter_rows(values_only=True): - # if header is set the current raw as the header and skip to the next row. + # if header is None. Set the current row as the header and skip to the next row. if header is None: header = row continue diff --git a/who_l3_smart_tools/utils/__init__.py b/who_l3_smart_tools/utils/__init__.py index 149e229..56decd8 100644 --- a/who_l3_smart_tools/utils/__init__.py +++ b/who_l3_smart_tools/utils/__init__.py @@ -1,10 +1,15 @@ import re -split_re = re.compile(r'[\W_]') +split_re = re.compile(r"[\W_]") def camel_case(str: str) -> str: if str == None: return "" - return ''.join([s.lower() if i == 0 else s.capitalize() for i, s in enumerate(split_re.split(str))]) + return "".join( + [ + s.lower() if i == 0 else s.capitalize() + for i, s in enumerate(split_re.split(str)) + ] + )