Skip to content

Commit

Permalink
Merge pull request #290 from smart-on-fhir/mikix/coding-schema
Browse files Browse the repository at this point in the history
fix: expand partial CodeableConcepts in the final FHIR schema
  • Loading branch information
mikix authored Nov 13, 2023
2 parents b74dada + 37ece65 commit cfeb3cb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 44 deletions.
66 changes: 44 additions & 22 deletions cumulus_etl/fhir/fhir_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
from typing import Any

import pyarrow
from fhirclient.models import fhirabstractbase, fhirdate, fhirelementfactory
from fhirclient.models import codeableconcept, coding, extension, fhirabstractbase, fhirdate, fhirelementfactory


FhirProperty = namedtuple("FhirProperty", ["name", "json_name", "pytype", "is_list", "of_many", "required"])

# We include one level of the FHIR spec in our schema, regardless of what's in the source data.
# This is to help downstream SQL by at least making sure each column is in the schema.
LEVEL_INCLUSION = 1


def pyarrow_schema_from_resource_batch(resource_type: str, batch: list[dict]) -> pyarrow.Schema:
"""
Expand Down Expand Up @@ -92,20 +96,49 @@ def fhir_obj_to_pyarrow_fields(
) -> list[pyarrow.Field]:
"""Convert a FHIR instance to a Pyspark StructType schema definition"""
properties = map(FhirProperty._make, base_obj.elementProperties())
return list(filter(None, map(partial(fhir_to_pyarrow_property, batch_shape=batch_shape, level=level), properties)))
return list(
filter(
None,
map(partial(fhir_to_pyarrow_property, base_obj=base_obj, batch_shape=batch_shape, level=level), properties),
)
)


def fhir_to_pyarrow_property(prop: FhirProperty, *, batch_shape: dict = None, level: int) -> pyarrow.Field | None:
def fhir_to_pyarrow_property(
prop: FhirProperty, *, base_obj: fhirabstractbase.FHIRAbstractBase, batch_shape: dict = None, level: int
) -> pyarrow.Field | None:
"""Converts a single FhirProperty to a Pyspark StructField, returning None if this field should be skipped"""
if batch_shape is not None:
batch_shape = batch_shape.get(prop.json_name)
if level > 1 and batch_shape is None:
# If we're deep, only include fields we actually see in data
return None

pyarrow_type = fhir_to_pyarrow_type(prop.pytype, batch_shape, level=level)
if pyarrow_type is None:
return None
# If we see a piece of a Concept or Coding, we like to grab the full schema for it.
# This helps downstream SQL avoid dealing about incomplete Coding fields - which do appear a lot.
full_schema_types = (codeableconcept.CodeableConcept, coding.Coding)
is_inside_full_schema_type = isinstance(base_obj, full_schema_types)
is_extension_type = issubclass(prop.pytype, extension.Extension)
force_inclusion = is_inside_full_schema_type and not is_extension_type

# OK how do we handle this field? Include or exclude - descend or not?
present_in_shape = batch_shape is not None
include_in_schema = present_in_shape or force_inclusion
is_struct = issubclass(prop.pytype, fhirabstractbase.FHIRAbstractBase)

if is_struct:
if level >= LEVEL_INCLUSION and not include_in_schema:
# Skip this element entirely and do not descend, to avoid infinite recursion.
# Note that in theory this might leave a struct with no child fields
# (if a struct's only children were also structs),
# which parquet/spark would have an issue with because they won't allow empty structs.
# But in practice with FHIR, all BackboneElements have at least an id (string) field,
# so we dodge that bullet.
return None
# Recurse!
pyarrow_type = pyarrow.struct(fhir_obj_to_pyarrow_fields(prop.pytype(), batch_shape, level=level + 1))
else:
if level > LEVEL_INCLUSION and not include_in_schema:
# If we're deeper than our inclusion level, bail if we don't actually see the field in the data
return None
pyarrow_type = basic_fhir_to_pyarrow_type(prop.pytype)

# Wrap lists in an ListType
if prop.is_list:
Expand All @@ -116,8 +149,8 @@ def fhir_to_pyarrow_property(prop: FhirProperty, *, batch_shape: dict = None, le
return pyarrow.field(prop.json_name, pyarrow_type, nullable=True)


def fhir_to_pyarrow_type(pytype: type, batch_shape: dict, *, level: int) -> pyarrow.DataType | None:
"""Converts a basic python type to a Pyspark type, returning None if this element should be skipped"""
def basic_fhir_to_pyarrow_type(pytype: type) -> pyarrow.DataType:
"""Converts a basic python type to a Pyspark type"""
if pytype is int:
return pyarrow.int32()
elif pytype is float:
Expand All @@ -135,15 +168,4 @@ def fhir_to_pyarrow_type(pytype: type, batch_shape: dict, *, level: int) -> pyar
return pyarrow.bool_()
elif pytype is fhirdate.FHIRDate:
return pyarrow.string() # just leave it as a string, like it appears in the JSON
elif issubclass(pytype, fhirabstractbase.FHIRAbstractBase):
# If this field is present in the inferred schema, that means some part of the data has this field.
# So we should recurse and extend our normally shallow schema to be deep enough to include this field too.
if level == 0 or batch_shape is not None:
return pyarrow.struct(fhir_obj_to_pyarrow_fields(pytype(), batch_shape, level=level + 1))

# Else skip this element entirely and do not descend, to avoid infinite recursion.
# Note that in theory this might leave a struct with no child fields (if a struct's only children where also
# structs), which parquet/spark would have an issue with -- it won't allow empty structs.
# But in practice with FHIR, all BackboneElements have at least an id (string) field, so we dodge that bullet.
return None
raise ValueError(f"Unexpected type: {pytype}")
88 changes: 66 additions & 22 deletions tests/etl/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,51 +170,95 @@ async def test_batch_has_wide_schema(self):
async def test_batch_schema_includes_inferred_fields(self):
"""Verify that deep (inferred) fields are also included in the final schema"""
# Make sure that we include different deep fields for each - final schema should be a union
self.make_json("ServiceRequest.1", "A", category=[{"coding": [{"version": "1.0"}]}])
self.make_json("ServiceRequest.2", "B", asNeededCodeableConcept={"coding": [{"userSelected": True}]})
self.make_json("Condition.1", "A", stage=[{"type": {"coding": [{"version": "1.0"}]}}])
self.make_json("Condition.2", "B", onsetRange={"low": {"value": 1.0}})

await basic_tasks.ServiceRequestTask(self.job_config, self.scrubber).run()
await basic_tasks.ConditionTask(self.job_config, self.scrubber).run()

schema = self.format.write_records.call_args[0][0].schema

# Start with simple, non-inferred CodeableConcept -- this should be bare-bones
# Start with simple, non-present CodeableConcept at level zero -- this should be fully described
self.assertEqual(
pyarrow.struct(
{
"id": pyarrow.string(),
"coding": pyarrow.list_(
pyarrow.struct(
{
"id": pyarrow.string(),
"code": pyarrow.string(),
"display": pyarrow.string(),
"system": pyarrow.string(),
"userSelected": pyarrow.bool_(),
"version": pyarrow.string(),
}
)
),
"text": pyarrow.string(),
}
),
schema.field("code").type, # CodeableConcept type
)
# While a deeper non-present CodeableConcept should be ignored
self.assertEqual(
pyarrow.struct({"id": pyarrow.string(), "text": pyarrow.string()}), schema.field("performerType").type
pyarrow.list_(
pyarrow.struct(
{
"id": pyarrow.string(),
# "code" field is missing (CodeableConcept type)
# "detail" field is missing (Reference type)
}
)
),
schema.field("evidence").type, # BackboneElement type
)
# Now the two custom/inferred/deep fields
# But if any piece of a deep CodeableConcept is present, it gets fully expanded.
self.assertEqual(
pyarrow.list_(
pyarrow.struct(
{
"id": pyarrow.string(),
"coding": pyarrow.list_(
pyarrow.struct(
{
"version": pyarrow.string(),
}
)
# "assessment" field is missing (Reference type)
# "summary" field is missing (CodeableConcept type)
# But the "type" is here in full because a piece of it was in the input
"type": pyarrow.struct(
{
"id": pyarrow.string(),
"coding": pyarrow.list_(
pyarrow.struct(
{
"id": pyarrow.string(),
"code": pyarrow.string(),
"display": pyarrow.string(),
"system": pyarrow.string(),
"userSelected": pyarrow.bool_(),
"version": pyarrow.string(),
}
)
),
"text": pyarrow.string(),
}
),
"text": pyarrow.string(),
}
)
),
schema.field("category").type,
schema.field("stage").type, # BackboneElement type
)
# Other deep-and-partial elements do not get the same expansion treatment.
# Here is a deep Quantity element.
# The parts present in the input are also in the schema, but only those parts.
self.assertEqual(
pyarrow.struct(
{
"id": pyarrow.string(),
"coding": pyarrow.list_(
pyarrow.struct(
{
"userSelected": pyarrow.bool_(),
}
)
"low": pyarrow.struct(
{
"value": pyarrow.float64(),
}
),
"text": pyarrow.string(),
}
),
schema.field("asNeededCodeableConcept").type,
schema.field("onsetRange").type,
)

async def test_batch_schema_types_are_coerced(self):
Expand Down

0 comments on commit cfeb3cb

Please sign in to comment.