diff --git a/who_l3_smart_tools/cli/data_dictionary.py b/who_l3_smart_tools/cli/data_dictionary.py new file mode 100755 index 0000000..ed0a8f1 --- /dev/null +++ b/who_l3_smart_tools/cli/data_dictionary.py @@ -0,0 +1,58 @@ +#! /usr/bin/env python +import argparse + +from who_l3_smart_tools.core.l2.data_dictionary import L2Dictionary + + +def main(): + parser = argparse.ArgumentParser( + description="Generate Questionnaire FSH from L3 Data Dictionary Excel file." + ) + parser.add_argument( + "-i", + "--input", + required=True, + help="Path to the L2 Data Dictionary", + ) + parser.add_argument( + "-o", + "--output", + required=True, + help="Path to the output directory.", + ) + parser.add_argument( + "--skip-models", + action="store_true", + help="Skip generating models", + ) + parser.add_argument( + "--skip-questionnaires", + action="store_true", + help="Skip generating questionnaires", + ) + parser.add_argument( + "--skip-valuesets", + action="store_true", + help="Skip generating valuesets", + ) + parser.add_argument( + "--skip-concepts", + action="store_true", + help="Skip generating concepts", + ) + args = parser.parse_args() + + data_dictionary = L2Dictionary(args.input, args.output) + data_dictionary.process() + if not args.skip_models: + data_dictionary.write_models() + if not args.skip_questionnaires: + data_dictionary.write_questionnaires() + if not args.skip_valuesets: + data_dictionary.write_valuesets() + if not args.skip_concepts: + data_dictionary.write_concepts() + + +if __name__ == "__main__": + main() diff --git a/who_l3_smart_tools/core/l2/__init__.py b/who_l3_smart_tools/core/l2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/who_l3_smart_tools/core/l2/data_dictionary.py b/who_l3_smart_tools/core/l2/data_dictionary.py new file mode 100644 index 0000000..3c0f4cd --- /dev/null +++ b/who_l3_smart_tools/core/l2/data_dictionary.py @@ -0,0 +1,324 @@ +import os +from collections import defaultdict +from typing import Optional + +from openpyxl import load_workbook + +from who_l3_smart_tools.core.l2.utils import ( + remove_special_characters, + to_camel_case, +) +from who_l3_smart_tools.utils.jinja2 import ( + DATA_TYPE_MAP, + initalize_jinja_env, + render_to_file, +) + +jinja_env = initalize_jinja_env(__name__) + + +# pylint: disable=too-many-instance-attributes +class L2Row: + """ + Represents a row in the L2 data dictionary. + + Args: + raw_row (dict): The raw row data. + coding_data_element (Optional[str]): The coding data element. + + Attributes: + raw_row (dict): The raw row data. + activity_id (str): The activity ID. + data_element_id (Optional[str]): The data element ID. + data_element_label (Optional[str]): The data element label. + description (Optional[str]): The description and definition. + choice_type (Optional[str]): The multiple choice type (if applicable). + data_type (Optional[str]): The data type. + input_options (Optional[str]): The input options. + validation_condition (Optional[str]): The validation condition. + required (Optional[str]): The required flag. + coding_data_element (Optional[str]): The coding data element. + + Methods: + validate_coding_data_element: Validates the coding data element. + _activity_id_to_invariant: Converts the activity ID to an invariant. + _get_questionare_title: Gets the questionnaire title. + to_invariant: Converts the row to an invariant. + to_concept_item: Converts the row to a concept item. + to_model_item: Converts the row to a model item. + to_questionnaire_item: Converts the row to a questionnaire item. + to_valueset_item: Converts the row to a valueset item. + """ + + DEFAULT_INVARIANT_EXPRESSION = "" + DEFAULT_INVARIANT_SEVERITY = "error" + + def __init__( + self, raw_row: dict, coding_data_element: Optional[str] = None + ) -> None: + self.raw_row = raw_row + self.activity_id = raw_row["Activity ID"] + self.data_element_id = raw_row["Data Element ID"] + self.data_element_label = raw_row["Data Element Label"] + self.description = raw_row["Description and Definition"] + self.choice_type = raw_row["Multiple Choice Type (if applicable)"] + self.data_type = raw_row["Data Type"] + self.input_options = raw_row["Input Options"] + self.validation_condition = raw_row["Validation Condition"] + self.required = raw_row["Required"] + self.coding_data_element = coding_data_element + + @property + def cardinality(self): + minimum = "0" + maximum = "1" + + if self.required == "R": + minimum = "1" + + if self.choice_type == "Select all that apply": + maximum = "*" + + return f"{minimum}..{maximum}" + + def validate_coding_data_element(self) -> bool: + if self.data_type == "Codes" and not self.coding_data_element: + raise ValueError( + f"Coding Data Element is required for data element {self.data_element_id}" + f"of type Code" + ) + + @property + def questionare_title(self) -> str: + parts = self.activity_id.split(" ", 1) + return parts[1] if len(parts) > 1 else parts[0] + + @property + def question_instance(self) -> str: + parts = self.activity_id.split(" ", 1) + return ( + remove_special_characters(f"{parts[0]}{parts[1].capitalize()}") + if len(parts) > 1 + else remove_special_characters(parts[0]) + ) + + def to_invariant(self) -> Optional[dict[str, str]]: + if self.validation_condition and self.validation_condition.lower() != "none": + return { + "id": self.activity_id[:5], + "description": self.validation_condition, + "expression": self.DEFAULT_INVARIANT_EXPRESSION, + "severity": self.DEFAULT_INVARIANT_SEVERITY, + } + return None + + def to_concept_item(self) -> dict[str, str]: + return { + "id": self.data_element_id, + "label": self.data_element_label, + "description": self.description, + "data_type": self.data_type, + } + + def to_model_item(self) -> dict[str, str]: + return { + "id": self.data_element_id, + "slug": to_camel_case(self.data_element_label), + "condition": self.cardinality, + "type": DATA_TYPE_MAP[self.data_type], + "label": self.data_element_label, + "description": self.description, + } + + def to_questionnaire_item(self) -> dict[str, str]: + return { + "id": self.data_element_id, + "linkID": self.data_element_id, + "type": self.data_type, + "text": self.data_element_label, + "required": "true" if self.required in ["R", "C"] else "false", + "repeats": "false", + "readOnly": "false", + } + + def to_valueset_item(self) -> Optional[dict[str, str]]: + if self.data_type == "Codes": + return {"id": self.data_element_id, "label": self.data_element_label} + return None + + +# pylint: disable=too-many-instance-attributes +class L2Dictionary: + """ + Represents a data dictionary for Level 2 (L2) data. + + Args: + file_path (str): The file path of the data dictionary. + sheet_name_prefix (str, optional): The prefix of the sheet names to process. + Defaults to "HIV". + + Attributes: + workbook: The loaded workbook object. + sheet_name_prefix (str): The prefix of the sheet names to process. + active_coding_data_element: The currently active coding data element. + concepts (list): A list of concepts extracted from the data dictionary. + models (dict): A dictionary of models extracted from the data dictionary. + questionnaires (dict): A dictionary of questionnaires extracted from the data dictionary. + valuesets (dict): A dictionary of valuesets extracted from the data dictionary. + + Methods: + set_active_coding(row): Sets the active coding data element based on the given row. + add_to_model(sheet_name, row): Adds a row to the model based on the given sheet + name and row. + add_to_questionnaire(row): Adds a row to the questionnaire. + add_to_valueset(row): Adds a row to the valueset. + process(): Processes the data dictionary. + + """ + + def __init__( + self, file_path: str, output_path: str, sheet_name_prefix: str = "HIV" + ) -> None: + self.workbook = load_workbook(file_path) + self.output_path = output_path + self.sheet_name_prefix = sheet_name_prefix + self.active_coding_data_element = None + self.concepts = [] + self.models = {} + self.questionnaires = {} + self.valuesets = {} + + def set_active_coding(self, row: L2Row) -> None: + if self.active_coding_data_element and row.data_type != "Codes": + self.active_coding_data_element = None + if row.data_type == "Coding": + self.active_coding_data_element = row.data_element_id + + def add_to_model(self, sheet_name: str, row: L2Row) -> None: + if row.data_type == "Codes": + return + _id = remove_special_characters(sheet_name) + if _id in self.models: + self.models[_id]["m_items"].append(row.to_model_item()) + else: + self.models[_id] = { + "m_items": [row.to_model_item()], + "invariants": [], + "title": sheet_name, + "id": _id, + } + if row.to_invariant(): + row_invariant = row.to_invariant() + max_id = max( + [int(i["id"][6:]) for i in self.models[_id]["invariants"]] or [0] + ) + row_invariant["id"] = f'{row_invariant["id"]}-{max_id + 1}'.replace( + ".", "-" + ) + invariant_texts = [i["description"] for i in self.models[_id]["invariants"]] + if row_invariant["description"] not in invariant_texts: + self.models[_id]["invariants"].append(row_invariant) + + def add_to_questionnaire(self, row: L2Row) -> None: + if row.data_type == "Codes": + return + title = row.questionare_title + if title in self.questionnaires: + self.questionnaires[title]["q_items"].append(row.to_questionnaire_item()) + else: + self.questionnaires[title] = { + "q_items": [row.to_questionnaire_item()], + "title": title, + "instanceName": row.question_instance, + } + + def add_to_valueset(self, row: L2Row) -> None: + if row.data_type == "Codes": + if self.active_coding_data_element in self.valuesets: + self.valuesets[self.active_coding_data_element]["v_items"].append( + row.to_valueset_item() + ) + else: + self.valuesets[self.active_coding_data_element] = { + "v_items": [row.to_valueset_item()], + "name": remove_special_characters(self.active_coding_data_element), + "id": self.active_coding_data_element, + } + + def format_concepts_for_cql(self) -> list[dict[str, str]]: + reformatted_concepts = [] + concept_by_label = defaultdict(list) + for concept in self.concepts: + concept_by_label[concept["label"]].append(concept) + for label, concepts in concept_by_label.items(): + if len(concepts) > 1: + for concept in concepts: + concept["label"] = f"{label} - {concept["id"]}" + reformatted_concepts.append(concept) + else: + reformatted_concepts.extend(concepts) + return reformatted_concepts + + def process(self): + for sheet_name in self.workbook.sheetnames: + if not sheet_name.startswith(self.sheet_name_prefix): + continue + sheet = self.workbook[sheet_name] + header: Optional[list[str]] = None + for row in sheet.iter_rows(values_only=True): + if not header: + header = row + continue + raw_row = dict(zip(header, row)) + l2_row = L2Row(raw_row, self.active_coding_data_element) + self.set_active_coding(l2_row) + self.concepts.append(l2_row.to_concept_item()) + self.add_to_model(sheet_name, l2_row) + self.add_to_questionnaire(l2_row) + self.add_to_valueset(l2_row) + + def write_concepts(self): + for _type in ["cql", "fsh"]: + if _type == "cql": + concepts = self.format_concepts_for_cql() + else: + concepts = self.concepts + concepts_dir = "codesystems" + output_path = os.path.join( + self.output_path, concepts_dir, f"HIVConcepts.{_type}" + ) + os.makedirs(os.path.join(self.output_path, concepts_dir), exist_ok=True) + template = jinja_env.get_template(f"concepts.{_type}.j2") + render_to_file(template, {"concepts": concepts}, output_path) + + def write_models(self): + models_dir = "models" + os.makedirs(os.path.join(self.output_path, models_dir), exist_ok=True) + template = jinja_env.get_template("model.fsh.j2") + for model in self.models.values(): + output_path = os.path.join( + self.output_path, models_dir, f"{model['id']}.fsh" + ) + render_to_file(template, {"model": model}, output_path) + + def write_questionnaires(self): + questionnaires_dir = "questionnaires" + os.makedirs(os.path.join(self.output_path, questionnaires_dir), exist_ok=True) + template = jinja_env.get_template("questionnaire.fsh.j2") + for questionnaire in self.questionnaires.values(): + output_path = os.path.join( + self.output_path, + questionnaires_dir, + f"{questionnaire['instanceName']}.fsh", + ) + render_to_file(template, {"q": questionnaire}, output_path) + + def write_valuesets(self): + valuesets_dir = "valuesets" + os.makedirs(os.path.join(self.output_path, valuesets_dir), exist_ok=True) + template = jinja_env.get_template("valueset.fsh.j2") + for valueset in self.valuesets.values(): + output_path = os.path.join( + self.output_path, valuesets_dir, f"{valueset['name']}.fsh" + ) + render_to_file(template, {"valueset": valueset}, output_path) diff --git a/who_l3_smart_tools/core/l2/templates/concepts.cql.j2 b/who_l3_smart_tools/core/l2/templates/concepts.cql.j2 new file mode 100644 index 0000000..09289df --- /dev/null +++ b/who_l3_smart_tools/core/l2/templates/concepts.cql.j2 @@ -0,0 +1,21 @@ +// **Automatically generated from DAK Data Dictionary** + +// This file contains all concepts from the Data Dictionary that are labeled +// as linked to Aggregate Indicators in the indicator CQL files and CDS CQL files. + +// Valuesets reference the IG ValueSet definitions and are labeled with `Choices` +// Codes are provided for each Data Dictionary concept +// Specific Data Element IDs are appended to the label if the label is not unique within the DAK + +library HIVConcepts +codesystem "HIVConcepts": 'http://smart.who.int/hiv/CodeSystem/HIVConcepts' + +{% for concept in concepts %} +{% if concept.data_type == 'Coding' %} +valueset "{{ concept.label }} Choices": 'http://smart.who.int/hiv/ValueSet/{{ concept.id}}' +{% endif %} +{% endfor %} + +{% for concept in concepts %} +code "{{ concept.label }}": '{{ concept.id }}' from "HIVConcepts" display '{{ concept.label}}' +{% endfor %} diff --git a/who_l3_smart_tools/core/l2/templates/concepts.fsh.j2 b/who_l3_smart_tools/core/l2/templates/concepts.fsh.j2 new file mode 100644 index 0000000..c41c022 --- /dev/null +++ b/who_l3_smart_tools/core/l2/templates/concepts.fsh.j2 @@ -0,0 +1,12 @@ +CodeSystem: HIVConcepts +Title: "WHO SMART HIV Concepts CodeSystem" +Description: "This code system defines the concepts used in theWorld Health Organization SMART HIV DAK" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablecodesystem" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablecodesystem" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGCodeSystem" +* ^experimental = true +* ^caseSensitive = false + +{% for concept in concepts %} +* #{{ concept.id }} "{{ concept.label }}" "{{ concept.description }}" +{% endfor %} diff --git a/who_l3_smart_tools/core/l2/templates/model.fsh.j2 b/who_l3_smart_tools/core/l2/templates/model.fsh.j2 new file mode 100644 index 0000000..8893cee --- /dev/null +++ b/who_l3_smart_tools/core/l2/templates/model.fsh.j2 @@ -0,0 +1,26 @@ +{% for invariant in model.invariants %} +Invariant: {{ invariant.id }} +Description: "{{ invariant.description }}" +Expression: "{{ invariant.expression }}" +Severity: #{{ invariant.severity }} + +{% endfor %} + +Logical: {{ model.id }} +Title: "{{ model.title }}" +Description: "" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablestructuredefinition" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablestructuredefinition" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGLogicalModel" +* ^extension[http://hl7.org/fhir/tools/StructureDefinition/logical-target].valueBoolean = true +* ^experimental = true +* ^name = "{{ model.id }}" +* ^status = #active + +{% for item in model.m_items %} +* {{ item.slug }} {{ item.condition }} {{ item.type }} "{{ item.label }}" "{{ item.description }}" + * ^code[+] = HIVConcepts#{{ item.id }} +{% if item.type == "Coding" %} +* referredBy from {{ item.id }} +{% endif %} +{% endfor %} \ No newline at end of file diff --git a/who_l3_smart_tools/core/l2/templates/questionnaire.fsh.j2 b/who_l3_smart_tools/core/l2/templates/questionnaire.fsh.j2 new file mode 100644 index 0000000..583d9dc --- /dev/null +++ b/who_l3_smart_tools/core/l2/templates/questionnaire.fsh.j2 @@ -0,0 +1,25 @@ +Instance: {{ q.instanceName }} +InstanceOf: sdc-questionnaire-extr-smap +Title: "{{ q.title}}" +Description: "Questionnaire for {{ q.title|capitalize }}" +Usage: #definition +* meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablequestionnaire" +* meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablequestionnaire" +* subjectType = #Patient +* language = #en +* status = #draft +* experimental = true + +{% for item in q.q_items %} +* item[+] + * id = "{{ item.id }}" + * linkId = "{{ item.linkID }}" + * type = #{{ item.type }} + * text = "{{ item.text }}" + * required = {{ item.required}} + * repeats = {{ item.repeats }} + * readOnly = {{ item.readOnly }} + {% if item.type == 'Coding' %} + * answerValueSet = "#{{ item.id }}" + {% endif %} +{% endfor %} \ No newline at end of file diff --git a/who_l3_smart_tools/core/l2/templates/valueset.fsh.j2 b/who_l3_smart_tools/core/l2/templates/valueset.fsh.j2 new file mode 100644 index 0000000..01f6aae --- /dev/null +++ b/who_l3_smart_tools/core/l2/templates/valueset.fsh.j2 @@ -0,0 +1,14 @@ +ValueSet: {{ valueset.id }} +Title: "{{ valueset.title }} ValueSet" +Description: "Value set of {{ valueset.description }}" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-shareablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-publishablevalueset" +* ^meta.profile[+] = "http://hl7.org/fhir/uv/crmi/StructureDefinition/crmi-computablevalueset" +* ^meta.profile[+] = "http://smart.who.int/base/StructureDefinition/SGValueSet" +* ^status = #active +* ^experimental = true +* ^name = "{{ valueset.name }}" + +{% for item in valueset.v_items %} +* HIVConcepts#{{ item.id }} "{{ item.label }}" +{% endfor %} \ No newline at end of file diff --git a/who_l3_smart_tools/core/l2/utils.py b/who_l3_smart_tools/core/l2/utils.py new file mode 100644 index 0000000..db416ea --- /dev/null +++ b/who_l3_smart_tools/core/l2/utils.py @@ -0,0 +1,12 @@ +import re + + +def to_camel_case(snake_str): + words = re.split(r"[_\s-]", snake_str) + first_word = words[0].lower() + camel_case = first_word + "".join(word.capitalize() for word in words[1:]) + return camel_case + + +def remove_special_characters(value): + return re.sub(r"[^A-Za-z0-9]", "", value) diff --git a/who_l3_smart_tools/utils/jinja2.py b/who_l3_smart_tools/utils/jinja2.py index cda9a6c..85cd98e 100644 --- a/who_l3_smart_tools/utils/jinja2.py +++ b/who_l3_smart_tools/utils/jinja2.py @@ -8,6 +8,7 @@ "Coding": "choice", "ID": "string", "Quantity": "integer", + "Codes": "codes", }