diff --git a/tests/data/example_fsh/HIVARegistration.fsh b/tests/data/example_fsh/HIVARegistration.fsh new file mode 100644 index 0000000..f5c7e97 --- /dev/null +++ b/tests/data/example_fsh/HIVARegistration.fsh @@ -0,0 +1,132 @@ +Invariant: HIV-A-1 +Description: "'Date of birth' OR 'Date of birth unknown' OR 'Estimated age' should be entered" +Expression: "" +Severity: #error + +Invariant: HIV-A-2 +Description: "Can be a drop-down list of input options" +Expression: "" +Severity: #error + +Invariant: HIV-A-3 +Description: "Can be based on the structure and format of addresses in the country" +Expression: "" +Severity: #error + +Invariant: HIV-A-4 +Description: "DateTime ≤ Current DateTime" +Expression: "" +Severity: #error + +Invariant: HIV-A-5 +Description: "If 'Date of birth unknown' = True, 'Estimated age' is required" +Expression: "" +Severity: #error + +Invariant: HIV-A-6 +Description: "List of countries" +Expression: "" +Severity: #error + +Invariant: HIV-A-7 +Description: "Minimum and maximum number of characters based on country" +Expression: "" +Severity: #error + +Invariant: HIV-A-8 +Description: "Minimum and maximum number of characters based on local policy" +Expression: "" +Severity: #error + +Invariant: HIV-A-9 +Description: "Minimum and maximum number of characters, based on local policy" +Expression: "" +Severity: #error + +Invariant: HIV-A-10 +Description: "Must be appropriate email format with '@' sign" +Expression: "" +Severity: #error + +Invariant: HIV-A-11 +Description: "Only letters and special characters (period, dash) allowed" +Expression: "" +Severity: #error + +Logical: HIVARegistration +Title: "HIV.A Registration" +Description: "This tab describes the data that are collected during the registration workflow (HIV.A)" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablestructuredefinition" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablestructuredefinition" +* ^extension[http://hl7.org/fhir/tools/StructureDefinition/logical-target].valueBoolean = true +* ^experimental = true +* ^name = "HIVARegistration" +* ^status = #active + +* firstName 1..1 string "First name" "Client's first or given name" + * ^code[+] = HIVConcepts#HIV.A.DE1 +* familyName 1..1 string "Family name" "Client's family name or last name" + * ^code[+] = HIVConcepts#HIV.A.DE2 +* visitDate 1..1 dateTime "Visit date" "The date and time of the client's visit" + * ^code[+] = HIVConcepts#HIV.A.DE3 +* referral 1..1 boolean "Referral" "If client was referred for care" + * ^code[+] = HIVConcepts#HIV.A.DE4 +* referredBy 0..1 Coding "Referred by" "How the client was referred" + * ^code[+] = HIVConcepts#HIV.A.DE5 + * referredBy from HIV.A.DE5 +* uniqueIdentifier 1..1 Identifier "Unique identifier" "Unique identifier generated for new clients or a universal ID, if used in the country" + * ^code[+] = HIVConcepts#HIV.A.DE8 +* nationalId 0..1 Identifier "National ID" "National unique identifier assigned to the client, if used in the country" + * ^code[+] = HIVConcepts#HIV.A.DE9 +* nationalHealthId 0..1 Identifier "National health ID" "National health unique identifier assigned to the client, if used in the country" + * ^code[+] = HIVConcepts#HIV.A.DE10 +* nationalProgrammeId 0..1 Identifier "National programme ID" "National programme unique identifier assigned to the client, if used in the country" + * ^code[+] = HIVConcepts#HIV.A.DE11 +* nationalHealthInsuranceId 0..1 Identifier "National health insurance ID" "National health insurance unique identifier assigned to the client, if used in the country" + * ^code[+] = HIVConcepts#HIV.A.DE12 +* countryOfBirth 1..1 Coding "Country of birth" "Country where the client was born" + * ^code[+] = HIVConcepts#HIV.A.DE13 + * countryOfBirth from HIV.A.DE13 +* dateOfBirth 0..1 date "Date of birth" "The client's date of birth (DOB) if known" + * ^code[+] = HIVConcepts#HIV.A.DE14 +* dateOfBirthUnknown 0..1 boolean "Date of birth unknown" "Is the client's DOB is unknown?" + * ^code[+] = HIVConcepts#HIV.A.DE15 +* estimatedAge 0..1 integer "Estimated age" "If DOB is unknown, enter the client's estimated age. Display client's age in number of years." + * ^code[+] = HIVConcepts#HIV.A.DE16 +* age 0..1 integer "Age" "Calculated age (number of years) of the client based on date of birth" + * ^code[+] = HIVConcepts#HIV.A.DE17 +* gender 1..1 Coding "Gender" "Gender of the client" + * ^code[+] = HIVConcepts#HIV.A.DE18 + * gender from HIV.A.DE18 +* other 0..1 string "Other (specify)" "Additional category (please specify)" + * ^code[+] = HIVConcepts#HIV.A.DE24 +* sex 1..1 Coding "Sex" "Sex of the client assigned at birth" + * ^code[+] = HIVConcepts#HIV.A.DE25 + * sex from HIV.A.DE25 +* address 1..1 string "Address" "Client's home address or address which the client is consenting to disclose" + * ^code[+] = HIVConcepts#HIV.A.DE29 +* maritalStatus 0..1 Coding "Marital Status" "Client's current marital status " + * ^code[+] = HIVConcepts#HIV.A.DE30 + * maritalStatus from HIV.A.DE30 +* telephoneNumber 1..1 integer "Telephone number" "Client's telephone number (a landline or a mobile phone number)" + * ^code[+] = HIVConcepts#HIV.A.DE42 +* administrativeArea 1..1 Coding "Administrative Area" "This should be a context-specific list of administrative areas, such as villages, districts, etc. The purpose of this data element is to allow for grouping and flagging of client data to a particular facility's catchment area. This can be input into the system by the end user OR it can be automated in the database based on the end user's attributes." + * ^code[+] = HIVConcepts#HIV.A.DE43 + * administrativeArea from HIV.A.DE43 +* communicationConsent 0..1 boolean "Communication consent" "Indication that client gave consent to be contacted" + * ^code[+] = HIVConcepts#HIV.A.DE44 +* reminderMessages 0..1 boolean "Reminder messages" "Whether client wants to receive text or other messages as follow-up for HIV services" + * ^code[+] = HIVConcepts#HIV.A.DE45 +* communicationPreference 0..* Coding "Communication preference(s)" "How the client would like to receive family planning communications" + * ^code[+] = HIVConcepts#HIV.A.DE46 + * communicationPreference from HIV.A.DE46 +* clientEmail 0..1 string "Client's email" "Client's primary email account where the client can be contacted" + * ^code[+] = HIVConcepts#HIV.A.DE49 +* alternateContactName 0..1 string "Alternate contact's name" "Name of an alternate contact, which could be next of kin (e.g. partner, husband, mother, sibling, etc.). The alternate contact would be used in the case of an emergency situation." + * ^code[+] = HIVConcepts#HIV.A.DE50 +* alternateContactPhoneNumber 0..1 integer "Alternate contact's phone number" "Phone number of the alternate contact" + * ^code[+] = HIVConcepts#HIV.A.DE51 +* alternateContactAddress 0..1 string "Alternate contact's address" "Alternate contact's home address or address which the client is consenting to disclose" + * ^code[+] = HIVConcepts#HIV.A.DE52 +* alternateContactRelationship 0..1 string "Alternate contact relationship" "The alternate contact's relationship to the client (e.g. partner, husband, mother, sibling, etc.)" + * ^code[+] = HIVConcepts#HIV.A.DE53 diff --git a/tests/test_logical_model_generator.py b/tests/test_logical_model_generator.py index 40be517..2b14c55 100644 --- a/tests/test_logical_model_generator.py +++ b/tests/test_logical_model_generator.py @@ -1,69 +1,43 @@ import unittest import os import pandas as pd +import shutil import sys from who_l3_smart_tools.core.logical_models.logical_model_generator import ( - LogicalModelGenerator, + LogicalModelAndTerminologyGenerator, ) -class TestLogicalModelGenerator(unittest.TestCase): +class TestLogicalModelAndTerminologyGenerator(unittest.TestCase): def setUp(self): - self.input_file = "tests/tmp/l2/test_dd.xlsx" - self.output_dir = "tests/tmp/fsh/models" - - def tearDown(self): - os.remove(self.input_file) - for file in os.listdir(self.output_dir): - os.remove(os.path.join(self.output_dir, file)) - os.rmdir(self.output_dir) + self.input_file = os.path.join("tests", "data", "l2", "test_dd.xlsx") + self.output_dir = os.path.join("tests", "output", "fsh") def test_generate_fsh_from_excel(self): - # Create a test Excel file - test_data = { - "Data Element ID": ["DE1", "DE2"], - "Data Element Label": ["Label 1", "Label 2"], - "Description and Definition": ["Description 1", "Description 2"], - "Validation Condition": [None, "Validation Condition 2"], - } - df = pd.DataFrame(test_data) - df.to_excel(self.input_file, index=False) - - g = LogicalModelGenerator(self.input_file, self.output_dir) + self.maxDiff = 50000 + g = LogicalModelAndTerminologyGenerator(self.input_file, self.output_dir) g.generate_fsh_from_excel() - output_file = os.path.join(self.output_dir, "HIV.X.fsh") + output_file = os.path.join(self.output_dir, "models", "HIVARegistration.fsh") self.assertTrue(os.path.exists(output_file)) with open(output_file, "r") as f: fsh_artifact = f.read() - expected_fsh_artifact = """ - LogicalModel: HIV.X - Title: "HIV.X" - Description: "Data elements for the HIV.X Data Dictionary." - * ^extension[http://hl7.org/fhir/tools/StructureDefinition/logical-target].valueBoolean = true - * ^name = "HIV.X" - * ^status = #active - - * DE1 1..1 string "Label 1" "Description 1" - * ^code[+] = DE1 - - * DE2 1..1 string "Label 2" "Description 2" - * ^code[+] = DE2 - """ + with open(os.path.join("tests", "data", "example_fsh", "HIVARegistration.fsh"), "r") as f: + expected_fsh_artifact = f.read() - self.assertEqual(fsh_artifact.strip(), expected_fsh_artifact.strip()) + self.assertEqual(expected_fsh_artifact, fsh_artifact) class TestFullLogicalModelGeneration(unittest.TestCase): def setUp(self) -> None: self.input_file = os.path.join("tests", "data", "l2", "test_dd.xlsx") - self.output_dir = os.path.join("tests", "output", "fsh", "models") + self.output_dir = os.path.join("tests", "output", "fsh") def test_full_data_dictionary(self): - generator = LogicalModelGenerator(self.input_file, self.output_dir) + generator = LogicalModelAndTerminologyGenerator(self.input_file, self.output_dir) generator.generate_fsh_from_excel() diff --git a/who_l3_smart_tools/cli/logical_model_gen.py b/who_l3_smart_tools/cli/logical_model_gen.py index 6413b1e..953fefe 100644 --- a/who_l3_smart_tools/cli/logical_model_gen.py +++ b/who_l3_smart_tools/cli/logical_model_gen.py @@ -1,5 +1,5 @@ import argparse -from who_l3_smart_tools.core.logical_models.logical_model_generator import LogicalModelGenerator +from who_l3_smart_tools.core.logical_models.logical_model_generator import LogicalModelAndTerminologyGenerator def main(): @@ -21,7 +21,7 @@ def main(): args = parser.parse_args() - LogicalModelGenerator(args.input, args.output).generate_fsh_from_excel() + LogicalModelAndTerminologyGenerator(args.input, args.output).generate_fsh_from_excel() if __name__ == "__main__": diff --git a/who_l3_smart_tools/core/logical_models/logical_model_generator.py b/who_l3_smart_tools/core/logical_models/logical_model_generator.py index 0b5c655..bb38fd3 100644 --- a/who_l3_smart_tools/core/logical_models/logical_model_generator.py +++ b/who_l3_smart_tools/core/logical_models/logical_model_generator.py @@ -1,8 +1,8 @@ import pandas as pd -import argparse import stringcase import os import re +import sys import datetime # TODO: differentiate between Coding, code, and CodableConcept @@ -27,15 +27,13 @@ "Quantity": "integer", } -fsh_invariant_template = """ -Invariant: {invariant_id} +fsh_invariant_template = """Invariant: {invariant_id} Description: "{description}" Expression: "{expression}" Severity: #error """ -fsh_header_template = """ -Logical: {name} +fsh_lm_header_template = """Logical: {name} Title: "{title}" Description: "{description}" * ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablestructuredefinition" @@ -50,10 +48,10 @@ # * ^code[+] = IMMZConcepts#D1.DE92 # * artStartDate 1..1 date "ART start date" "The date on which the client started or restarted antiretroviral therapy (ART)" # * ^code[+] = IMMZConcepts#D1.DE49 -fsh_element_template = """ -* {labelCamel} {cardinality} {data_type} "{label}" "{description}" """ +fsh_lm_element_template = """ +* {label_camel} {cardinality} {data_type} "{label}" "{description}" """.rstrip() -fsh_validation_element_template = """ +fsh_lm_validation_element_template = """ * obeys {validation_id}""" # `Coding`` will be the first entry, followed by n `Codes` rows. @@ -64,33 +62,72 @@ # * ^code[+] = IMMZConcepts#D1.DE10 # * hivStatus from IMMZ.D1.DE10 -fsh_valueset_element_tempalte = """ - * {label} from {valueset}" """ +fsh_lm_valueset_element_template = """ + * {label} from {valueset}""" -fsh_coding_element_template = """ - * ^code[+] = {code} """ +fsh_lm_coding_element_template = """ + * ^code[+] = {code}""" +fsh_cs_header_template = """CodeSystem: {code_system} +Title: "{title}" +Description: "{description}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablecodesystem" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablecodesystem" +* ^experimental = true +* ^caseSensitive = false +""" + +fsh_cs_code_template = """ +* #{code} "{label}" "{description}" """.rstrip() + +fsh_vs_header_temmplate = """ValueSet: {value_set} +Title: "{title}" +Description: "{description}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-computablevalueset" +* ^status = #active +* ^experimental = true +""" + +fsh_vs_code_template = """ +* {code} "{label}" """.rstrip() -class LogicalModelGenerator: +# regular expression to strip characters that cannot be properly handled from the string +# basically, we try to remove "'s" and anything between parentheses +label_strip_re = re.compile(r"(?:|'s|\s*\([^)]*\))(?:\b|$)", re.IGNORECASE) + +class LogicalModelAndTerminologyGenerator: def __init__(self, input_file, output_dir): self.input_file = input_file self.output_dir = output_dir + self.models_dir = os.path.join(output_dir, "models") + self.codesystem_dir = os.path.join(output_dir, "codesystems") + self.valuesets_dir = os.path.join(output_dir, "valuesets") def generate_fsh_from_excel(self): + # create output structure + for dir in[self.models_dir, self.codesystem_dir, self.valuesets_dir]: + if not os.path.exists(dir): + os.makedirs(dir) + # Load the Excel file dd_xls = pd.read_excel(self.input_file, sheet_name=None) # Process the Cover sheet cover_info = self.process_cover(dd_xls["COVER"]) + # Code system name + code_system = "HIVConcepts" + # store the actual codes as we process them + codes = [] + validation_lookup = {} # Iterate over each sheet in the Excel file and generate a FSH logical model for each one for sheet_name in dd_xls.keys(): if re.match(r"HIV\.\w+", sheet_name): - print(f"Processing sheet: {sheet_name}") - df = dd_xls[sheet_name] clean_name = stringcase.alphanumcase(sheet_name) short_name = (sheet_name.split(" ")[0]).split(".") @@ -98,114 +135,190 @@ def generate_fsh_from_excel(self): # Initialize the FSH artifact fsh_artifact = "" + # Initialize any ValueSets + valuesets = [] + active_valueset = None + # Process Invariants First # Get all unique validation conditions, and store their assigned rows validations = self.parse_validations(df) # Template for invariants based on validation conditions for i, (validation, data_ids) in enumerate(validations.items()): - invariant_id = f"{short_name[0]}-{short_name[1]}-{i+1}" - description = validation + invariant_id = f"{short_name[0]}-{short_name[1]}-{i + 1}" + if type(validation) == str: + description = validation.replace('"', "'") + else: + description = "" + expression = "" fsh_artifact += fsh_invariant_template.format( invariant_id=invariant_id, description=description, expression=expression, - ) + ) + "\n" + for data_element_id in data_ids: validation_lookup[data_element_id] = invariant_id # Generate the FSH logical model header based on the sheet name - fsh_header = fsh_header_template.format( + fsh_header = fsh_lm_header_template.format( name=clean_name, title=sheet_name, description=cover_info[sheet_name.upper()], - ) + ) + "\n" fsh_artifact += fsh_header - # Set code system - code_system = "HIVConcepts" - for i, row in df.iterrows(): de_id = row["Data Element ID"] - if not de_id or type(de_id) != str: + if type(de_id) != str or not de_id: continue # Process general fields - data_element_id = row["Data Element ID"].split(".") + multiple_choice_type = row["Multiple Choice Type (if applicable)"] data_type = row["Data Type"] label = row["Data Element Label"] - if label and type(label) == str: - label_camel = stringcase.camelcase(label.replace(" ", "_")) + if type(label) == str and label: + # equalize spaces + label = label.strip().replace('*', '').replace('[', '').replace(']', '').replace('"', "'") + + # remove special characters that aren't handled by stringcase properly + # also lower-case everything + label_clean = label_strip_re.sub("", label).replace("-", "_").replace("/", "_").replace(" ", "_").lower() + + label_camel = stringcase.camelcase(label_clean) else: + label = "" label_camel = "" - code_sys_ref = f"{code_system}#{'.'.join(data_element_id[1:])}" + + code_sys_ref = f"{code_system}#{de_id}" description = row["Description and Definition"] + if type(description) == str: + description = description.replace("*", "").replace('"', "'") + else: + description = "" + required = row["Required"] if required == "C": required_condition = row["Explain Conditionality"] + codes.append({ + "code": de_id, + "label": label, + "description": description + }) + + # handle ValueSets + # First we identify a ValueSet + if data_type == "Coding" and multiple_choice_type in ["Select one", "Select all that apply"]: + active_valueset = { + "value_set": de_id, + "title": f"{label} ValueSet", + "description": f"Value set of {description[0].lower() + description[1:] if description[0].isupper() and not description.startswith("HIV") else description}", + "codes": [] + } + valuesets.append(active_valueset) + # Then we identify the codes for the ValueSet + elif data_type == "Codes" and multiple_choice_type == "Input Option": + if active_valueset is None: + print(f"Attempted to create a member of a ValueSet without a ValueSet context for code {de_id}", sys.stderr) + else: + active_valueset['codes'].append({ + "code": f"{code_system}#{de_id}", + "label": f"{label}" + }) + + # If row is a value in a valueset, skip since the info is in Terminology if data_type == "Codes": continue # Process as a normal entry - fsh_artifact += fsh_element_template.format( - labelCamel=label_camel, - cardinality=self.map_cardinality(required), + fsh_artifact += fsh_lm_element_template.format( + label_camel=label_camel, + cardinality=self.map_cardinality(required, multiple_choice_type), data_type=self.map_data_type(data_type), label=label, description=description, ) # Add validation if needed - if row["Data Element ID"] in validation_lookup.keys(): - fsh_artifact += fsh_validation_element_template.format( - validation_id=validation_lookup[row["Data Element ID"]] + if de_id in validation_lookup.keys(): + fsh_artifact += fsh_lm_validation_element_template.format( + validation_id=validation_lookup[de_id] ) # Add Terminology reference - fsh_artifact += fsh_coding_element_template.format( + fsh_artifact += fsh_lm_coding_element_template.format( code=code_sys_ref ) # Process Coding/Codes/Input Options with ValueSets if data_type == "Coding": - fsh_artifact += fsh_valueset_element_tempalte.format( - label=label_camel, valueset=row["Data Element ID"] + fsh_artifact += fsh_lm_valueset_element_template.format( + label=label_camel, valueset=de_id ) - print(fsh_artifact) - output_file = os.path.join( - self.output_dir, f"{stringcase.alphanumcase(sheet_name)}.fsh" + self.models_dir, f"{stringcase.alphanumcase(sheet_name)}.fsh" ) with open(output_file, "w") as f: - f.write(fsh_artifact) + f.write(fsh_artifact + "\n") + + if len(valuesets) > 0: + for valueset in valuesets: + vs_artifact = fsh_vs_header_temmplate.format(**valueset) + for code in valueset["codes"]: + vs_artifact += fsh_vs_code_template.format(**code) + + output_file = os.path.join( + self.valuesets_dir, f"{valueset["value_set"]}.fsh" + ) + + with open(output_file, "w") as f: + f.write(vs_artifact + "\n") + + if len(codes) > 0: + code_system_artifact = fsh_cs_header_template.format( + code_system = code_system, + title = "WHO HIV DAK Concepts CodeSystem", + description = "This code system defines the concepts used in the World Health Organization SMART HIV DAK" + ) + + for code in codes: + code_system_artifact += fsh_cs_code_template.format(**code) + + code_system_output_file = os.path.join( + self.codesystem_dir, "HIVConcepts.fsh" + ) + + with open(code_system_output_file, "w") as f: + f.write(code_system_artifact + "\n") ### Helpers def process_cover(self, cover_df): - row_iterator = cover_df.iterrows() cover_data = {} - # loop until header row reached - for i, row in row_iterator: - print(row.iloc[0]) - if row.iloc[0] and type(row.iloc[0]) == str: - if re.match(r"sheet name", row.iloc[0], re.IGNORECASE): - break - - for i, row in row_iterator: - sheet_name = row.iloc[0] - description = row.iloc[1] + seen_header = False + for i, row in cover_df.iterrows(): + if not seen_header: + if row.iloc[0] and type(row.iloc[0]) == str and re.match(r"sheet\s*name", row.iloc[0], re.IGNORECASE): + seen_header = True + continue if type(row.iloc[0]) == str and row.iloc[0] != "": - cover_data[sheet_name.upper()] = description + key = row.iloc[0].upper() + first_dot_idx = key.find('.') + if first_dot_idx >= 0 and first_dot_idx < len(key): + if key[first_dot_idx + 1].isspace(): + key = key[0:first_dot_idx] + '.' + key[first_dot_idx + 1:].lstrip() + + cover_data[key] = row.iloc[1] else: break @@ -216,11 +329,18 @@ def process_cover(self, cover_df): def map_data_type(self, data_type_str): return data_type_map[data_type_str] - def map_cardinality(self, card_str): - if card_str == "R": - return "1..1" - else: - return "0..1" + def map_cardinality(self, required_indicator, multiple_choice): + minimum = "0" + maximum = "1" + + if required_indicator == "R": + minimum = "1" + + if multiple_choice == "Select all that apply": + maximum = "*" + + return f"{minimum}..{maximum}" + def parse_validations(self, df): # unique_validations = set(df["Validation Condition"])