diff --git a/.gitignore b/.gitignore index 87f3612..0b9c044 100644 --- a/.gitignore +++ b/.gitignore @@ -178,4 +178,5 @@ who_ocl.py # Output for testing files test_output/ -tests/data/l2/csv_files/ \ No newline at end of file +tests/data/l2/csv_files/ +data/ \ No newline at end of file diff --git a/tests/test_logical_model_generator.py b/tests/test_logical_model_generator.py index ab3b74f..beba1d3 100644 --- a/tests/test_logical_model_generator.py +++ b/tests/test_logical_model_generator.py @@ -1,50 +1 @@ -import unittest -import os -import pandas as pd -import shutil -import sys -from who_l3_smart_tools.core.logical_models.logical_model_generator import ( - LogicalModelAndTerminologyGenerator, -) - - -class TestLogicalModelAndTerminologyGenerator(unittest.TestCase): - def setUp(self): - self.input_file = os.path.join("tests", "data", "l2", "test_dd.xlsx") - self.output_dir = os.path.join("tests", "output", "fsh") - - def test_generate_fsh_from_excel(self): - self.maxDiff = 50000 - g = LogicalModelAndTerminologyGenerator(self.input_file, self.output_dir) - - g.generate_fsh_from_excel() - - output_file = os.path.join(self.output_dir, "models", "HIVARegistration.fsh") - self.assertTrue(os.path.exists(output_file)) - - with open(output_file, "r") as f: - fsh_artifact = f.read() - - with open( - os.path.join("tests", "data", "example_fsh", "HIVARegistration.fsh"), "r" - ) as f: - expected_fsh_artifact = f.read() - - self.assertEqual(expected_fsh_artifact, fsh_artifact) - - -class TestFullLogicalModelGeneration(unittest.TestCase): - def setUp(self) -> None: - self.input_file = os.path.join("tests", "data", "l2", "test_dd.xlsx") - self.output_dir = os.path.join("tests", "output", "fsh") - - def test_full_data_dictionary(self): - generator = LogicalModelAndTerminologyGenerator( - self.input_file, self.output_dir - ) - - generator.generate_fsh_from_excel() - - -if __name__ == "__main__": - unittest.main() +# TODO (Kenneth): Add tests for the logical model generator. diff --git a/who_l3_smart_tools/cli/logical_model_gen.py b/who_l3_smart_tools/cli/logical_model_gen.py old mode 100644 new mode 100755 index 293fd85..0068b3d --- a/who_l3_smart_tools/cli/logical_model_gen.py +++ b/who_l3_smart_tools/cli/logical_model_gen.py @@ -1,4 +1,6 @@ +#! /usr/bin/env python import argparse + from who_l3_smart_tools.core.logical_models.logical_model_generator import ( LogicalModelAndTerminologyGenerator, ) @@ -11,7 +13,7 @@ def main(): parser.add_argument( "-i", "--input", - default="./l3-data/test-data.xlsx", + required=True, help="Input Data Dictionary file location", ) parser.add_argument( diff --git a/who_l3_smart_tools/core/logical_models/logical_model_generator.py b/who_l3_smart_tools/core/logical_models/logical_model_generator.py index f6b1833..175c25f 100644 --- a/who_l3_smart_tools/core/logical_models/logical_model_generator.py +++ b/who_l3_smart_tools/core/logical_models/logical_model_generator.py @@ -6,9 +6,12 @@ import inflect import pandas as pd import stringcase +from jinja2 import Environment, FileSystemLoader +from openpyxl import load_workbook from who_l3_smart_tools.utils import camel_case from who_l3_smart_tools.utils.counter import Counter +from who_l3_smart_tools.utils.jinja2 import render_to_file # TODO: differentiate between Coding, code, and CodableConcept # Boolean @@ -32,74 +35,14 @@ "Quantity": "integer", } -fsh_invariant_template = """Invariant: {invariant_id} -Description: "{description}" -Expression: "{expression}" -Severity: #error -""" - -fsh_lm_header_template = """Logical: {name} -Title: "{title}" -Description: "{description}" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablestructuredefinition" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablestructuredefinition" -* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGLogicalModel" -* ^extension[http://hl7.org/fhir/tools/StructureDefinition/logical-target].valueBoolean = true -* ^experimental = true -* ^name = "{name}" -* ^status = #active""" - -# Template for element definitions in FSH, including a placeholder for invariants -# * severelyImmunosuppressed 1..1 boolean "Severely immunosuppressed" "The client is known to be severely immunocompromised or immunosuppressed" -# * ^code[+] = IMMZConcepts#D1.DE92 -# * artStartDate 1..1 date "ART start date" "The date on which the client started or restarted antiretroviral therapy (ART)" -# * ^code[+] = IMMZConcepts#D1.DE49 -fsh_lm_element_template = """ -* {element_name} {cardinality} {data_type} "{label}" "{description}" """.rstrip() - -fsh_lm_validation_element_template = """ - * obeys {validation_id}""" - -# `Coding`` will be the first entry, followed by n `Codes` rows. -# We use the valueset named after the row name, and skip the `Codes` values, since -# they are in the terminology -# -# * hivStatus 0..1 Coding "HIV status" "The current human immunodeficiency virus (HIV) status of the client" -# * ^code[+] = IMMZConcepts#D1.DE10 -# * hivStatus from IMMZ.D1.DE10 - -fsh_lm_valueset_element_template = """\n* {label} from {valueset}""" - -fsh_lm_coding_element_template = """ - * ^code[+] = {code}""" - -fsh_cs_header_template = """CodeSystem: {code_system} -Title: "{title}" -Description: "{description}" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablecodesystem" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablecodesystem" -* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGCodeSystem" -* ^experimental = true -* ^caseSensitive = false -""" - -fsh_cs_code_template = """ -* #{code} "{label}" "{description}" """.rstrip() - -fsh_vs_header_temmplate = """ValueSet: {value_set} -Title: "{title}" -Description: "{description}" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablevalueset" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablevalueset" -* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-computablevalueset" -* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGValueSet" -* ^status = #active -* ^experimental = true -* ^name = "{name}" -""" - -fsh_vs_code_template = """ -* {code} "{label}" """.rstrip() +template_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") + +jinja2_env = Environment( + loader=FileSystemLoader(template_dir), + trim_blocks=True, + lstrip_blocks=True, +) + inflect_engine = inflect.engine() @@ -127,6 +70,7 @@ def __init__(self, input_file, output_dir): self.codesystem_dir = os.path.join(output_dir, "codesystems") self.valuesets_dir = os.path.join(output_dir, "valuesets") self.invariants_dict = defaultdict(Counter) + self.active_valueset = None # pylint: disable=too-many-branches,too-many-statements def generate_fsh_from_excel(self): @@ -138,10 +82,10 @@ def generate_fsh_from_excel(self): os.makedirs(_dir) # Load the Excel file - dd_xls = pd.read_excel(self.input_file, sheet_name=None) + workbook = load_workbook(self.input_file) # Process the Cover sheet - cover_info = self.process_cover(dd_xls["COVER"]) + cover_info = self.process_cover(workbook["COVER"]) # Code system name code_system = "HIVConcepts" @@ -153,9 +97,9 @@ def generate_fsh_from_excel(self): # Iterate over each sheet in the Excel file and generate a FSH logical model for each one # pylint: disable=too-many-nested-blocks - for sheet_name in dd_xls.keys(): - if re.match(r"HIV\.\w+", sheet_name): - df = dd_xls[sheet_name] + for sheet_name in workbook.sheetnames: + if sheet_name.startswith("HIV."): + sheet = workbook[sheet_name] # hard-coded, but the page labelled E-F has no F codes if sheet_name == "HIV.E-F PMTCT": @@ -166,12 +110,8 @@ def generate_fsh_from_excel(self): clean_name = stringcase.alphanumcase(cleaned_sheet_name) short_name = (cleaned_sheet_name.split(" ")[0]).split(".") - # Initialize the FSH artifact - fsh_artifact = "" - # Initialize any ValueSets valuesets = [] - active_valueset = None # Track element names existing_elements = defaultdict(Counter) @@ -181,297 +121,56 @@ def generate_fsh_from_excel(self): # Process Invariants First # Get all unique validation conditions, and store their assigned rows - validations = self.parse_validations(df) + validations = self.parse_validations(sheet) # Template for invariants based on validation conditions - for validation, data_ids in validations.items(): - _id = self.invariants_dict[short_name[1]].next - invariant_id = f"{short_name[0]}-{short_name[1]}-{_id}" - if isinstance(validation, str): - description = validation.replace('"', "'") - else: - description = "" - - expression = "" - fsh_artifact += ( - fsh_invariant_template.format( - invariant_id=invariant_id, - description=description, - expression=expression, - ) - + "\n" - ) - - for data_element_id in data_ids: - validation_lookup[data_element_id] = invariant_id - - # Generate the FSH logical model header based on the sheet name - fsh_header = ( - fsh_lm_header_template.format( - name=clean_name, - title=sheet_name, - description=cover_info[sheet_name.upper()], - ) - + "\n" + invariants = self.format_invariants( + validations, short_name, validation_lookup ) - fsh_artifact += fsh_header - - for _, row in df.iterrows(): - data_element_id = row["Data Element ID"] - if not isinstance(data_element_id, str) or not data_element_id: - continue - - # Process general fields - multiple_choice_type = row["Multiple Choice Type (if applicable)"] - data_type = row["Data Type"] - label = row["Data Element Label"] - - if isinstance(label, str) and label: - # Other (specify) elements come after a list as a data element to - # contain a non-coded selection - if label.lower() == "other (specify)": - if previous_element_label: - label_clean = f"Other_{previous_element_label.lower()}" - else: - label_clean = "Other (specify)" - else: - - # equalize spaces - label = ( - label.strip() - .replace("*", "") - .replace("[", "") - .replace("]", "") - .replace('"', "'") - ) - - # remove many special characters - label_clean = ( - label.replace("(", "") - .replace(")", "") - .replace("'s", "") - .replace("-", "_") - .replace("/", "_") - .replace(",", "") - .replace(" ", "_") - .replace(">=", "more than") - .replace("<=", "less than") - .replace(">", "more than") - .replace("<", "less than") - .lower() - ) - else: - label = "" - label_clean = "" - - code_sys_ref = f"{code_system}#{data_element_id}" - description = row["Description and Definition"] - - if isinstance(description, str): - description = description.replace("*", "").replace('"', "'") - else: - description = "" - - required = row["Required"] - - if required == "C": - # pylint: disable=unused-variable - required_condition = row["Explain Conditionality"] - - codes.append( - { - "code": data_element_id, - "label": label, - "description": description, - } - ) - - # handle ValueSets - # First we identify a ValueSet - # Originally, this looked at the Multiple Choice Type, - # but that doesn't seem to be - # guaranteed to be meaningful - if data_type == "Coding": - active_valueset = { - "value_set": data_element_id, - "name": data_element_id.replace(".", ""), - "title": f"{label} ValueSet", - "description": f"Value set of " - f"{description[0].lower() + description[1:] \ - if description[0].isupper() \ - and not description.startswith("HIV") else description}", - "codes": [], - } - valuesets.append(active_valueset) - # Then we identify the codes for the ValueSet - elif ( - data_type == "Codes" and multiple_choice_type == "Input Option" - ): - if active_valueset is None: - print( - f"Attempted to create a member of a ValueSet without a " - f"ValueSet context for code {data_element_id}", - sys.stderr, - ) - else: - active_valueset["codes"].append( - { - "code": f"{code_system}#{data_element_id}", - "label": f"{label}", - } - ) - - # If row is a value in a valueset, skip since the info is in Terminology - if data_type == "Codes": - continue - - # For any non-code we set the previous element name. This is used to determine the - # name for "Other (specify)" data elements - previous_element_label = label_clean - - # The camel-case version of the label becomes the element name in the logical model - label_camel = camel_case(label_clean) - - # valid element identifiers in FHIR must start with a alphabetical character - # therefore if an element starts with a number, we swap the spelt-out version of the - # number, using the inflect library - if len(label_camel) > 0 and not label_camel[0].isalpha(): - try: - prefix, rest = re.split(r"(?=[a-zA-Z])", label_camel, 1) - except Exception: # pylint: disable=broad-exception-caught - prefix, rest = label_camel, "" - - if prefix.isnumeric(): - prefix = camel_case( - inflect_engine.number_to_words(int(prefix)).replace( - "-", "_" - ) - ) - else: - print( - "Did not know how to handle element prefix:", - sheet_name, - data_element_id, - prefix, - file=sys.stderr, - ) - - label_camel = f"{prefix}{rest}" - - # data elements can only be 64 characters - # note that the idea here is that we trim whole words until reaching the desired size - if len(label_camel) > 64: - new_label_camel = "" - for label_part in re.split("(?=[A-Z1-9])", label_camel): - if len(new_label_camel) + len(label_part) > 64: - break - - new_label_camel += label_part - label_camel = new_label_camel - - # data elements names must be unique per logical model - count = existing_elements[label_camel].next - - # we have a duplicate data element - if count > 1: - # the first element needs no suffix - # so the suffix is one less than the count - suffix = str(count - 1) - - # if the data element id will still be less than 64 characters, we're ok - if len(label_camel) + len(suffix) <= 64: - label_camel += suffix - # otherwise, shorten the name to include the suffix - else: - label_camel = label_camel[: 64 - len(suffix)] + suffix - - # Process as a normal entry - fsh_artifact += fsh_lm_element_template.format( - element_name=label_camel, - cardinality=self.map_cardinality( - required, multiple_choice_type - ), - data_type=self.map_data_type(data_type), - label=label, - description=description, - ) - - # Add validation if needed - if data_element_id in validation_lookup: - fsh_artifact += fsh_lm_validation_element_template.format( - validation_id=validation_lookup[data_element_id] - ) - - # Add Terminology reference - fsh_artifact += fsh_lm_coding_element_template.format( - code=code_sys_ref - ) - - # Process Coding/Codes/Input Options with ValueSets - if data_type == "Coding": - fsh_artifact += fsh_lm_valueset_element_template.format( - label=label_camel, valueset=f"{data_element_id}" - ) - - output_file = os.path.join( - self.models_dir, f"{stringcase.alphanumcase(sheet_name)}.fsh" + self.render_elements( + sheet, + sheet_name, + cover_info, + code_system, + invariants, + codes, + valuesets, + existing_elements, + validation_lookup, + previous_element_label, + clean_name, ) - with open(output_file, "w") as f: - f.write(fsh_artifact + "\n") + if codes: + self.render_codes(codes, code_system) - if len(valuesets) > 0: - for valueset in valuesets: - vs_artifact = fsh_vs_header_temmplate.format(**valueset) - for code in valueset["codes"]: - vs_artifact += fsh_vs_code_template.format(**code) - - if len(valueset["codes"]) > 0: - vs_artifact += "\n" - - output_file = os.path.join( - self.valuesets_dir, f"{valueset['value_set']}.fsh" - ) - - with open(output_file, "w") as f: - f.write(vs_artifact) - - if len(codes) > 0: - code_system_artifact = fsh_cs_header_template.format( - code_system=code_system, - title="WHO SMART HIV Concepts CodeSystem", - description="This code system defines the concepts used in the World " - "Health Organization SMART HIV DAK", - ) - - for code in codes: - code_system_artifact += fsh_cs_code_template.format(**code) - - code_system_output_file = os.path.join( - self.codesystem_dir, "HIVConcepts.fsh" - ) + ### Helpers + def process_cover(self, cover): + """ + Process the cover sheet of the logical model. - with open(code_system_output_file, "w") as f: - f.write(code_system_artifact + "\n") + Args: + cover (Worksheet): The cover sheet of the logical model. - ### Helpers - def process_cover(self, cover_df): + Returns: + dict: A dictionary containing the processed cover data. + """ cover_data = {} seen_header = False - for _, row in cover_df.iterrows(): + for row in cover.iter_rows(values_only=True): if not seen_header: if ( - row.iloc[0] - and isinstance(row.iloc[0], str) - and re.match(r"sheet\s*name", row.iloc[0], re.IGNORECASE) + row[0] + and isinstance(row[0], str) + and re.match(r"sheet\s*name", row[0], re.IGNORECASE) ): seen_header = True continue - if isinstance(row.iloc[0], str) and row.iloc[0]: - key = row.iloc[0].upper() + if isinstance(row[0], str) and row[0]: + key = row[0].upper() first_dot_idx = key.find(".") if len(key) > first_dot_idx >= 0: if key[first_dot_idx + 1].isspace(): @@ -481,18 +180,24 @@ def process_cover(self, cover_df): + key[first_dot_idx + 1 :].lstrip() ) - cover_data[key] = row.iloc[1] + cover_data[key] = row[1] else: break return cover_data - ### + def map_cardinality(self, required_indicator, multiple_choice): + """ + Maps the required indicator and multiple choice values to a cardinality string. - def map_data_type(self, data_type_str): - return data_type_map[data_type_str] + Args: + required_indicator (str): The required indicator value. + multiple_choice (str): The multiple choice value. - def map_cardinality(self, required_indicator, multiple_choice): + Returns: + str: The cardinality string. + + """ minimum = "0" maximum = "1" @@ -504,7 +209,341 @@ def map_cardinality(self, required_indicator, multiple_choice): return f"{minimum}..{maximum}" - def parse_validations(self, df): - # unique_validations = set(df["Validation Condition"]) + def parse_validations(self, sheet): + """ + Parses the validations from a given sheet and returns a dictionary of + validation conditions and data element IDs. + + Args: + sheet (pandas.DataFrame): The sheet containing the validations. + + Returns: + dict: A dictionary where the keys are the validation conditions and the + values are lists of data element IDs. + """ + sheet_data = list(sheet.values) + df = pd.DataFrame(sheet_data[1:], columns=sheet_data[0]) valids = df.groupby("Validation Condition")["Data Element ID"].groups return valids + + def _clean_label(self, label): + """ + Cleans the label by removing special characters and converting it to lowercase. + + Args: + label (str): The label to be cleaned. + + Returns: + str: The cleaned label. + """ + label = ( + label.strip() + .replace("*", "") + .replace("[", "") + .replace("]", "") + .replace('"', "'") + ) + return label, ( + label.replace("(", "") + .replace(")", "") + .replace("'s", "") + .replace("-", "_") + .replace("/", "_") + .replace(",", "") + .replace(" ", "_") + .replace(">=", "more than") + .replace("<=", "less than") + .replace(">", "more than") + .replace("<", "less than") + .lower() + ) + + # pylint: disable=too-many-arguments + def _handle_value_sets( + self, + data_type, + multiple_choice_type, + data_element_id, + label, + description, + code_system, + valuesets, + ): + """ + Handles the creation of ValueSets and their codes. + """ + # Originally, this looked at the Multiple Choice Type, + # but that doesn't seem to be + # guaranteed to be meaningful + if data_type == "Coding": + self.active_valueset = { + "value_set": data_element_id, + "name": data_element_id.replace(".", ""), + "title": f"{label} ValueSet", + "description": f"Value set of " + f"{description[0].lower() + description[1:] \ + if description[0].isupper() \ + and not description.startswith("HIV") else description}", + "codes": [], + } + valuesets.append(self.active_valueset) + # Then we identify the codes for the ValueSet + elif data_type == "Codes" and multiple_choice_type == "Input Option": + if self.active_valueset is None: + print( + f"Attempted to create a member of a ValueSet without a " + f"ValueSet context for code {data_element_id}", + sys.stderr, + ) + else: + self.active_valueset["codes"].append( + { + "code": f"{code_system}#{data_element_id}", + "label": f"{label}", + } + ) + else: + self.active_valueset = None + + # pylint: disable=too-many-arguments + def render_elements( + self, + sheet, + sheet_name, + cover_info, + code_system, + invariants, + codes, + valuesets, + existing_elements, + validation_lookup, + previous_element_label, + clean_name, + ): + elements = [] + header = None + for row in sheet.iter_rows(values_only=True): + if header is None: + header = row + continue + row = dict(zip(header, row)) + data_element_id = row["Data Element ID"] + if not isinstance(data_element_id, str) or not data_element_id: + continue + + # Process general fields + multiple_choice_type = row["Multiple Choice Type (if applicable)"] + data_type = row["Data Type"] + label = row["Data Element Label"] + + if isinstance(label, str) and label: + # Other (specify) elements come after a list as a data element to + # contain a non-coded selection + if label.lower() == "other (specify)": + if previous_element_label: + label_clean = f"Other_{previous_element_label.lower()}" + else: + label_clean = "Other (specify)" + else: + label, label_clean = self._clean_label(label) + else: + label = "" + label_clean = "" + + code_sys_ref = f"{code_system}#{data_element_id}" + description = row["Description and Definition"] + + if isinstance(description, str): + description = description.replace("*", "").replace('"', "'") + else: + description = "" + + required = row["Required"] + + codes.append( + { + "code": data_element_id, + "label": label, + "description": description, + } + ) + + self._handle_value_sets( + data_type, + multiple_choice_type, + data_element_id, + label, + description, + code_system, + valuesets, + ) + + # If row is a value in a valueset, skip since the info is in Terminology + if data_type == "Codes": + continue + + # For any non-code we set the previous element name. This is used to determine the + # name for "Other (specify)" data elements + previous_element_label = label_clean + + # The camel-case version of the label becomes the element name in the logical model + label_camel = camel_case(label_clean) + + # valid element identifiers in FHIR must start with a alphabetical character + # therefore if an element starts with a number, we swap the spelt-out version of the + # number, using the inflect library + if len(label_camel) > 0 and not label_camel[0].isalpha(): + try: + prefix, rest = re.split(r"(?=[a-zA-Z])", label_camel, 1) + except Exception: # pylint: disable=broad-exception-caught + prefix, rest = label_camel, "" + + if prefix.isnumeric(): + prefix = camel_case( + inflect_engine.number_to_words(int(prefix)).replace("-", "_") + ) + else: + print( + "Did not know how to handle element prefix:", + sheet_name, + data_element_id, + prefix, + file=sys.stderr, + ) + + label_camel = f"{prefix}{rest}" + + # data elements can only be 64 characters + # note that the idea here is that we trim whole words until reaching the desired size + if len(label_camel) > 64: + new_label_camel = "" + for label_part in re.split("(?=[A-Z1-9])", label_camel): + if len(new_label_camel) + len(label_part) > 64: + break + + new_label_camel += label_part + label_camel = new_label_camel + + # data elements names must be unique per logical model + count = existing_elements[label_camel].next + + # we have a duplicate data element + if count > 1: + # the first element needs no suffix + # so the suffix is one less than the count + suffix = str(count - 1) + + # if the data element id will still be less than 64 characters, we're ok + if len(label_camel) + len(suffix) <= 64: + label_camel += suffix + # otherwise, shorten the name to include the suffix + else: + label_camel = label_camel[: 64 - len(suffix)] + suffix + + elements.append( + { + "element_name": label_camel, + "cardinality": self.map_cardinality(required, multiple_choice_type), + "data_type": data_type_map[data_type], + "label": label, + "description": description, + "code": code_sys_ref, + "validation_id": validation_lookup.get(data_element_id), + "valueset": ( + {"label": label_camel, "valueset": f"{data_element_id}"} + if data_type == "Coding" + else None + ), + } + ) + + output_file = os.path.join( + self.models_dir, f"{stringcase.alphanumcase(sheet_name)}.fsh" + ) + + template = jinja2_env.get_template("fsh_lm.j2") + render_to_file( + template, + { + "elements": elements, + "invariants": invariants, + "clean_name": clean_name, + "sheet_name": sheet_name, + "description": cover_info.get(sheet_name, ""), + }, + output_file, + ) + + self.render_valuesets(valuesets) + + def format_invariants(self, validations, short_name, validation_lookup): + """ + Formats the invariants based on the given validations, short name, + and validation lookup. + + Args: + validations (dict): A dictionary containing the validations. + short_name (tuple): A tuple containing the short name. + validation_lookup (dict): A dictionary containing the validation lookup. + + Returns: + list: A list of formatted invariants. + + """ + invariants = [] + for validation, data_ids in validations.items(): + _id = self.invariants_dict[short_name[1]].next + invariant_id = f"{short_name[0]}-{short_name[1]}-{_id}" + if isinstance(validation, str): + description = validation.replace('"', "'") + else: + description = "" + + invariants.append( + { + "id": invariant_id, + "description": description, + } + ) + + for data_element_id in data_ids: + validation_lookup[data_element_id] = invariant_id + return invariants + + def render_codes(self, codes, code_system): + """ + Renders the codes to a code system file. + + Args: + codes (list): A list of codes. + code_system (str): The code system. + + Returns: + None + """ + title = "WHO SMART HIV Concepts CodeSystem" + description = ( + "This code system defines the concepts used in the" + "World Health Organization SMART HIV DAK" + ) + + code_system_output_file = os.path.join(self.codesystem_dir, "HIVConcepts.fsh") + template = jinja2_env.get_template("fsh_cs_codes.j2") + render_to_file( + template=template, + context={ + "codes": codes, + "title": title, + "description": description, + "code_system": code_system, + }, + output_file=code_system_output_file, + ) + + def render_valuesets(self, valuesets): + for valueset in valuesets: + output_file = os.path.join( + self.valuesets_dir, f"{valueset['value_set']}.fsh" + ) + template = jinja2_env.get_template("fsh_vs_codes.j2") + render_to_file(template, valueset, output_file) diff --git a/who_l3_smart_tools/core/logical_models/templates/fsh_cs_codes.j2 b/who_l3_smart_tools/core/logical_models/templates/fsh_cs_codes.j2 new file mode 100644 index 0000000..22531e4 --- /dev/null +++ b/who_l3_smart_tools/core/logical_models/templates/fsh_cs_codes.j2 @@ -0,0 +1,12 @@ +CodeSystem: {{code_system}} +Title: "{{title}}" +Description: "{{description}}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablecodesystem" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablecodesystem" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGCodeSystem" +* ^experimental = true +* ^caseSensitive = false + +{% for code in codes %} +* #{{code.code}} "{{code.label}}" "{{code.description}}" +{% endfor %} diff --git a/who_l3_smart_tools/core/logical_models/templates/fsh_lm.j2 b/who_l3_smart_tools/core/logical_models/templates/fsh_lm.j2 new file mode 100644 index 0000000..6724b47 --- /dev/null +++ b/who_l3_smart_tools/core/logical_models/templates/fsh_lm.j2 @@ -0,0 +1,28 @@ +{% for invariant in invariants %} +Invariant: {{invariant.id}} +Description: "{{invariant.description}}" +Expression: "" +Severity: #error + +{% endfor %} +Logical: {{clean_name}} +Title: "{{sheet_name}}" +Description: "{{description}}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablestructuredefinition" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablestructuredefinition" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGLogicalModel" +* ^extension[http://hl7.org/fhir/tools/StructureDefinition/logical-target].valueBoolean = true +* ^experimental = true +* ^name = "{{clean_name}}" +* ^status = #active + +{% for element in elements%} +* {{element.element_name}} {{element.cardinality}} {{element.data_type}} "{{element.label}}" "{{element.description}}" +{% if element.validation_id %} + * obeys {{element.validation_id}} +{% endif %} + * ^code[+] = {{element.code}} +{% if element.valueset %} +* {{element.valueset.label}} from {{element.valueset.valueset}} +{% endif %} +{% endfor %} diff --git a/who_l3_smart_tools/core/logical_models/templates/fsh_vs_codes.j2 b/who_l3_smart_tools/core/logical_models/templates/fsh_vs_codes.j2 new file mode 100644 index 0000000..7f06be9 --- /dev/null +++ b/who_l3_smart_tools/core/logical_models/templates/fsh_vs_codes.j2 @@ -0,0 +1,14 @@ +ValueSet: {{value_set}} +Title: "{{title}}" +Description: "{{description}}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-computablevalueset" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGValueSet" +* ^status = #active +* ^experimental = true +* ^name = "{{name}}" + +{% for code in codes %} +* {{code.code}} "{{code.label}}" +{% endfor %} diff --git a/who_l3_smart_tools/utils/jinja2.py b/who_l3_smart_tools/utils/jinja2.py new file mode 100644 index 0000000..c9b1e5b --- /dev/null +++ b/who_l3_smart_tools/utils/jinja2.py @@ -0,0 +1,6 @@ +def render_to_file(template, context, output_file): + """ + Render a template to a file. + """ + with open(output_file, "w") as f: + f.write(template.render(context))