diff --git a/cumulus_etl/fhir/fhir_schemas.py b/cumulus_etl/fhir/fhir_schemas.py index 1ab9325d..e358012e 100644 --- a/cumulus_etl/fhir/fhir_schemas.py +++ b/cumulus_etl/fhir/fhir_schemas.py @@ -5,11 +5,15 @@ from typing import Any import pyarrow -from fhirclient.models import fhirabstractbase, fhirdate, fhirelementfactory +from fhirclient.models import codeableconcept, coding, extension, fhirabstractbase, fhirdate, fhirelementfactory FhirProperty = namedtuple("FhirProperty", ["name", "json_name", "pytype", "is_list", "of_many", "required"]) +# We include one level of the FHIR spec in our schema, regardless of what's in the source data. +# This is to help downstream SQL by at least making sure each column is in the schema. +LEVEL_INCLUSION = 1 + def pyarrow_schema_from_resource_batch(resource_type: str, batch: list[dict]) -> pyarrow.Schema: """ @@ -92,20 +96,49 @@ def fhir_obj_to_pyarrow_fields( ) -> list[pyarrow.Field]: """Convert a FHIR instance to a Pyspark StructType schema definition""" properties = map(FhirProperty._make, base_obj.elementProperties()) - return list(filter(None, map(partial(fhir_to_pyarrow_property, batch_shape=batch_shape, level=level), properties))) + return list( + filter( + None, + map(partial(fhir_to_pyarrow_property, base_obj=base_obj, batch_shape=batch_shape, level=level), properties), + ) + ) -def fhir_to_pyarrow_property(prop: FhirProperty, *, batch_shape: dict = None, level: int) -> pyarrow.Field | None: +def fhir_to_pyarrow_property( + prop: FhirProperty, *, base_obj: fhirabstractbase.FHIRAbstractBase, batch_shape: dict = None, level: int +) -> pyarrow.Field | None: """Converts a single FhirProperty to a Pyspark StructField, returning None if this field should be skipped""" if batch_shape is not None: batch_shape = batch_shape.get(prop.json_name) - if level > 1 and batch_shape is None: - # If we're deep, only include fields we actually see in data - return None - pyarrow_type = fhir_to_pyarrow_type(prop.pytype, batch_shape, level=level) - if pyarrow_type is None: - return None + # If we see a piece of a Concept or Coding, we like to grab the full schema for it. + # This helps downstream SQL avoid dealing about incomplete Coding fields - which do appear a lot. + full_schema_types = (codeableconcept.CodeableConcept, coding.Coding) + is_inside_full_schema_type = isinstance(base_obj, full_schema_types) + is_extension_type = issubclass(prop.pytype, extension.Extension) + force_inclusion = is_inside_full_schema_type and not is_extension_type + + # OK how do we handle this field? Include or exclude - descend or not? + present_in_shape = batch_shape is not None + include_in_schema = present_in_shape or force_inclusion + is_struct = issubclass(prop.pytype, fhirabstractbase.FHIRAbstractBase) + + if is_struct: + if level >= LEVEL_INCLUSION and not include_in_schema: + # Skip this element entirely and do not descend, to avoid infinite recursion. + # Note that in theory this might leave a struct with no child fields + # (if a struct's only children were also structs), + # which parquet/spark would have an issue with because they won't allow empty structs. + # But in practice with FHIR, all BackboneElements have at least an id (string) field, + # so we dodge that bullet. + return None + # Recurse! + pyarrow_type = pyarrow.struct(fhir_obj_to_pyarrow_fields(prop.pytype(), batch_shape, level=level + 1)) + else: + if level > LEVEL_INCLUSION and not include_in_schema: + # If we're deeper than our inclusion level, bail if we don't actually see the field in the data + return None + pyarrow_type = basic_fhir_to_pyarrow_type(prop.pytype) # Wrap lists in an ListType if prop.is_list: @@ -116,8 +149,8 @@ def fhir_to_pyarrow_property(prop: FhirProperty, *, batch_shape: dict = None, le return pyarrow.field(prop.json_name, pyarrow_type, nullable=True) -def fhir_to_pyarrow_type(pytype: type, batch_shape: dict, *, level: int) -> pyarrow.DataType | None: - """Converts a basic python type to a Pyspark type, returning None if this element should be skipped""" +def basic_fhir_to_pyarrow_type(pytype: type) -> pyarrow.DataType: + """Converts a basic python type to a Pyspark type""" if pytype is int: return pyarrow.int32() elif pytype is float: @@ -135,15 +168,4 @@ def fhir_to_pyarrow_type(pytype: type, batch_shape: dict, *, level: int) -> pyar return pyarrow.bool_() elif pytype is fhirdate.FHIRDate: return pyarrow.string() # just leave it as a string, like it appears in the JSON - elif issubclass(pytype, fhirabstractbase.FHIRAbstractBase): - # If this field is present in the inferred schema, that means some part of the data has this field. - # So we should recurse and extend our normally shallow schema to be deep enough to include this field too. - if level == 0 or batch_shape is not None: - return pyarrow.struct(fhir_obj_to_pyarrow_fields(pytype(), batch_shape, level=level + 1)) - - # Else skip this element entirely and do not descend, to avoid infinite recursion. - # Note that in theory this might leave a struct with no child fields (if a struct's only children where also - # structs), which parquet/spark would have an issue with -- it won't allow empty structs. - # But in practice with FHIR, all BackboneElements have at least an id (string) field, so we dodge that bullet. - return None raise ValueError(f"Unexpected type: {pytype}") diff --git a/tests/etl/test_tasks.py b/tests/etl/test_tasks.py index 7716a8bf..929db633 100644 --- a/tests/etl/test_tasks.py +++ b/tests/etl/test_tasks.py @@ -170,51 +170,95 @@ async def test_batch_has_wide_schema(self): async def test_batch_schema_includes_inferred_fields(self): """Verify that deep (inferred) fields are also included in the final schema""" # Make sure that we include different deep fields for each - final schema should be a union - self.make_json("ServiceRequest.1", "A", category=[{"coding": [{"version": "1.0"}]}]) - self.make_json("ServiceRequest.2", "B", asNeededCodeableConcept={"coding": [{"userSelected": True}]}) + self.make_json("Condition.1", "A", stage=[{"type": {"coding": [{"version": "1.0"}]}}]) + self.make_json("Condition.2", "B", onsetRange={"low": {"value": 1.0}}) - await basic_tasks.ServiceRequestTask(self.job_config, self.scrubber).run() + await basic_tasks.ConditionTask(self.job_config, self.scrubber).run() schema = self.format.write_records.call_args[0][0].schema - # Start with simple, non-inferred CodeableConcept -- this should be bare-bones + # Start with simple, non-present CodeableConcept at level zero -- this should be fully described + self.assertEqual( + pyarrow.struct( + { + "id": pyarrow.string(), + "coding": pyarrow.list_( + pyarrow.struct( + { + "id": pyarrow.string(), + "code": pyarrow.string(), + "display": pyarrow.string(), + "system": pyarrow.string(), + "userSelected": pyarrow.bool_(), + "version": pyarrow.string(), + } + ) + ), + "text": pyarrow.string(), + } + ), + schema.field("code").type, # CodeableConcept type + ) + # While a deeper non-present CodeableConcept should be ignored self.assertEqual( - pyarrow.struct({"id": pyarrow.string(), "text": pyarrow.string()}), schema.field("performerType").type + pyarrow.list_( + pyarrow.struct( + { + "id": pyarrow.string(), + # "code" field is missing (CodeableConcept type) + # "detail" field is missing (Reference type) + } + ) + ), + schema.field("evidence").type, # BackboneElement type ) - # Now the two custom/inferred/deep fields + # But if any piece of a deep CodeableConcept is present, it gets fully expanded. self.assertEqual( pyarrow.list_( pyarrow.struct( { "id": pyarrow.string(), - "coding": pyarrow.list_( - pyarrow.struct( - { - "version": pyarrow.string(), - } - ) + # "assessment" field is missing (Reference type) + # "summary" field is missing (CodeableConcept type) + # But the "type" is here in full because a piece of it was in the input + "type": pyarrow.struct( + { + "id": pyarrow.string(), + "coding": pyarrow.list_( + pyarrow.struct( + { + "id": pyarrow.string(), + "code": pyarrow.string(), + "display": pyarrow.string(), + "system": pyarrow.string(), + "userSelected": pyarrow.bool_(), + "version": pyarrow.string(), + } + ) + ), + "text": pyarrow.string(), + } ), - "text": pyarrow.string(), } ) ), - schema.field("category").type, + schema.field("stage").type, # BackboneElement type ) + # Other deep-and-partial elements do not get the same expansion treatment. + # Here is a deep Quantity element. + # The parts present in the input are also in the schema, but only those parts. self.assertEqual( pyarrow.struct( { "id": pyarrow.string(), - "coding": pyarrow.list_( - pyarrow.struct( - { - "userSelected": pyarrow.bool_(), - } - ) + "low": pyarrow.struct( + { + "value": pyarrow.float64(), + } ), - "text": pyarrow.string(), } ), - schema.field("asNeededCodeableConcept").type, + schema.field("onsetRange").type, ) async def test_batch_schema_types_are_coerced(self):