diff --git a/bunsen-core/src/main/java/com/cerner/bunsen/Bundles.java b/bunsen-core/src/main/java/com/cerner/bunsen/Bundles.java index ecf1a518..ad7645ee 100644 --- a/bunsen-core/src/main/java/com/cerner/bunsen/Bundles.java +++ b/bunsen-core/src/main/java/com/cerner/bunsen/Bundles.java @@ -213,6 +213,32 @@ public Dataset extractEntry(SparkSession spark, FhirEncoders.forVersion(fhirVersion).getOrCreate()); } + /** + * Extracts the given resource type from the RDD of bundles and returns + * it as a Dataset of that type, including any declared resources contained + * to the parent resource. + * + * @param spark the spark session + * @param bundles the RDD of FHIR Bundles + * @param resourceName the FHIR name of the resource type to extract + * (e.g., condition, patient. etc). + * @param contained the FHIR names of the resources contained to the + * parent resource. + * @param the type fo the resource being extracted from the bundles. + * @return a dataset of the given resource + */ + public Dataset extractEntry(SparkSession spark, + JavaRDD bundles, + String resourceName, + String... contained) { + + return extractEntry(spark, + bundles, + resourceName, + FhirEncoders.forVersion(fhirVersion).getOrCreate(), + contained); + } + /** * Extracts the given resource type from the RDD of bundles and returns * it as a Dataset of that type. @@ -225,11 +251,35 @@ public Dataset extractEntry(SparkSession spark, * @param the type of the resource being extracted from the bundles. * @return a dataset of the given resource */ - public Dataset extractEntry(SparkSession spark, + public Dataset extractEntry(SparkSession spark, JavaRDD bundles, String resourceName, FhirEncoders encoders) { + return extractEntry(spark, bundles, resourceName, encoders, new String[]{}); + } + + /** + * Extracts the given resource type from the RDD of bundles and returns + * it as a Dataset of that type, including any declared resources contained + * to the parent resource. + * + * @param spark the spark session + * @param bundles the RDD of FHIR Bundles + * @param resourceName the FHIR name of the resource type to extract + * (e.g., condition, patient. etc). + * @param encoders the Encoders instance defining how the resources are encoded. + * @param contained the FHIR names of the resources contained to the + * parent resource. + * @param the type fo the resource being extracted from the bundles. + * @return a dataset of the given resource + */ + public Dataset extractEntry(SparkSession spark, + JavaRDD bundles, + String resourceName, + FhirEncoders encoders, + String... contained) { + FhirContext context = FhirEncoders.contextFor(encoders.getFhirVersion()); RuntimeResourceDefinition def = context.getResourceDefinition(resourceName); @@ -238,7 +288,7 @@ public Dataset extractEntry(SparkSession spark, new ToResource(def.getName(), encoders.getFhirVersion())); - Encoder encoder = encoders.of((Class) def.getImplementingClass()); + Encoder encoder = encoders.of(resourceName, contained); return spark.createDataset(resourceRdd.rdd(), encoder); } diff --git a/bunsen-core/src/main/java/com/cerner/bunsen/FhirEncoders.java b/bunsen-core/src/main/java/com/cerner/bunsen/FhirEncoders.java index e13f879a..343692b0 100644 --- a/bunsen-core/src/main/java/com/cerner/bunsen/FhirEncoders.java +++ b/bunsen-core/src/main/java/com/cerner/bunsen/FhirEncoders.java @@ -8,10 +8,14 @@ import ca.uhn.fhir.context.FhirVersionEnum; import ca.uhn.fhir.context.RuntimeResourceDefinition; import com.cerner.bunsen.datatypes.DataTypeMappings; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.hl7.fhir.instance.model.api.IBaseResource; +import scala.collection.JavaConversions; /** * Spark Encoders for FHIR Resources. This object is thread safe. @@ -46,7 +50,7 @@ public class FhirEncoders { /** * Cached encoders to avoid having to re-create them. */ - private final Map encoderCache = new HashMap<>(); + private final Map encoderCache = new HashMap<>(); /** * Consumers should generally use the {@link #forStu3()} or {@link #forR4()} @@ -173,17 +177,35 @@ public static Builder forVersion(FhirVersionEnum fhirVersion) { * Returns an encoder for the given FHIR resource. * * @param type the type of the resource to encode. + * @param contained a list of types for FHIR resources contained to the encoded resource. * @param the type of the resource to be encoded. * @return an encoder for the resource. */ - public ExpressionEncoder of(Class type) { + public final ExpressionEncoder of(Class type, + List> contained) { BaseRuntimeElementCompositeDefinition definition = context.getResourceDefinition(type); + List> containedDefinitions = new ArrayList<>(); + + for (Class resource : contained) { + + containedDefinitions.add(context.getResourceDefinition(resource)); + } + + StringBuilder keyBuilder = new StringBuilder(type.getName()); + + for (Class resource : contained) { + + keyBuilder.append(resource.getName()); + } + + int key = keyBuilder.toString().hashCode(); + synchronized (encoderCache) { - ExpressionEncoder encoder = encoderCache.get(type); + ExpressionEncoder encoder = encoderCache.get(key); if (encoder == null) { @@ -191,29 +213,53 @@ public ExpressionEncoder of(Class type) { EncoderBuilder.of(definition, context, mappings, - new SchemaConverter(context, mappings)); + new SchemaConverter(context, mappings), + JavaConversions.asScalaBuffer(containedDefinitions)); - encoderCache.put(type, encoder); + encoderCache.put(key, encoder); } return encoder; } } + /** + * Returns an encoder for the given FHIR resource. + * + * @param type the type of the resource to encode. + * @param contained a list of types for FHIR resources contained to the encoded resource. + * @param the type of the resource to be encoded. + * @return an encoder for the resource. + */ + public final ExpressionEncoder of(Class type, + Class... contained) { + + return of(type, Arrays.asList(contained)); + } + /** * Returns an encoder for the given FHIR resource by name, as defined * by the FHIR specification. * * @param resourceName the name of the FHIR resource to encode, such as * "Encounter", "Condition", "Observation", etc. + * @param contained the names of FHIR resources contained to the encoded resource. * @param the type of the resource to be encoded. * @return an encoder for the resource. */ - public ExpressionEncoder of(String resourceName) { + public ExpressionEncoder of(String resourceName, + String... contained) { RuntimeResourceDefinition definition = context.getResourceDefinition(resourceName); - return of((Class) definition.getImplementingClass()); + List> containedClasses = new ArrayList<>(); + + for (String containedName : contained) { + + containedClasses.add(context.getResourceDefinition(containedName).getImplementingClass()); + } + + return of((Class) definition.getImplementingClass(), containedClasses); } /** diff --git a/bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala b/bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala index a7b01701..a650a5c2 100644 --- a/bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala +++ b/bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala @@ -8,8 +8,9 @@ import com.cerner.bunsen.datatypes.DataTypeMappings import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.objects._ -import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.hl7.fhir.instance.model.api.{IBase, IBaseDatatype} @@ -17,6 +18,7 @@ import org.hl7.fhir.utilities.xhtml.XhtmlNode import scala.collection.JavaConversions._ import scala.collection.immutable.Stream.Empty +import scala.collection.mutable import scala.reflect.ClassTag /** @@ -28,17 +30,19 @@ private[bunsen] object EncoderBuilder { * Returns an encoder for the FHIR resource implemented by the given class * * @param definition The FHIR resource definition + * @param contained The FHIR resources to be contained to the given definition * @return An ExpressionEncoder for the resource */ def of(definition: BaseRuntimeElementCompositeDefinition[_], context: FhirContext, mappings: DataTypeMappings, - converter: SchemaConverter): ExpressionEncoder[_] = { + converter: SchemaConverter, + contained: mutable.Buffer[BaseRuntimeElementCompositeDefinition[_]] = mutable.Buffer.empty): ExpressionEncoder[_] = { - val fhirClass = definition.getImplementingClass() + val fhirClass = definition.getImplementingClass - val schema = converter.compositeToStructType(definition) + val schema = converter.parentToStructType(definition, contained) val inputObject = BoundReference(0, ObjectType(fhirClass), nullable = true) @@ -46,12 +50,12 @@ private[bunsen] object EncoderBuilder { mappings, converter) - val serializers = encoderBuilder.serializer(inputObject, definition) + val serializers = encoderBuilder.serializer(inputObject, definition, contained) - assert(schema.fields.size == serializers.size, + assert(schema.fields.length == serializers.size, "Must have a serializer for each field.") - val deserializer = encoderBuilder.compositeToDeserializer(definition, None) + val deserializer = encoderBuilder.compositeToDeserializer(definition, None, contained) new ExpressionEncoder( schema, @@ -293,15 +297,36 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext, } private def serializer(inputObject: Expression, - definition: BaseRuntimeElementCompositeDefinition[_]): Seq[Expression] = { + definition: BaseRuntimeElementCompositeDefinition[_], + contained: Seq[BaseRuntimeElementCompositeDefinition[_]]): + Seq[Expression] = { // Map to (name, value, name, value) expressions for child elements. val childFields: Seq[Expression] = definition.getChildren .flatMap(child => childToExpr(inputObject, child)) + // Map to (name, value, name, value) expressions for all contained resources. + val containedChildFields = contained.flatMap { containedDefinition => + + val containedChild = GetClassFromContained(inputObject, + containedDefinition.getImplementingClass) + + Literal(containedDefinition.getName) :: + CreateNamedStruct(containedDefinition.getChildren + .flatMap(child => childToExpr(containedChild, child))) :: + Nil + } + + // Create a 'contained' struct having the contained elements if declared for the parent. + val containedChildren = if (contained.nonEmpty) { + Literal("contained") :: CreateNamedStruct(containedChildFields) :: Nil + } else { + Nil + } + // The fields are (name, expr) tuples, so just get the expressions for the top level. - childFields.grouped(2) + (childFields ++ containedChildren).grouped(2) .map(group => group.get(1)) .toList } @@ -396,7 +421,6 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext, private def childToDeserializer(childDefinition: BaseRuntimeChildDefinition, path: Option[Expression]): Map[String, Expression] = { - def getPath: Expression = path.getOrElse(GetColumnByOrdinal(0, ObjectType(classOf[String]))) def addToPath(part: String): Expression = path @@ -534,10 +558,12 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext, } /** - * Returns an expression for deserializing a composite structure at the given path. + * Returns an expression for deserializing a composite structure at the given path along with + * any contained resources declared against the structure. */ private def compositeToDeserializer(definition: BaseRuntimeElementCompositeDefinition[_], - path: Option[Expression]): Expression = { + path: Option[Expression], + contained: Seq[BaseRuntimeElementCompositeDefinition[_]] = Nil): Expression = { def addToPath(part: String): Expression = path .map(p => UnresolvedExtractValue(p, expressions.Literal(part))) @@ -567,7 +593,18 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext, (setterFor(childDefinition), expression) } - val result = InitializeJavaBean(compositeInstance, setters) + val bean: Expression = InitializeJavaBean(compositeInstance, setters) + + // Deserialize any Contained resources to the new Object through successive calls + // to 'addContained'. + val result = contained.foldLeft(bean)((value, containedResource) => { + + Invoke(value, + "addContained", + ObjectType(definition.getImplementingClass), + compositeToDeserializer(containedResource, + Some(UnresolvedAttribute("contained." + containedResource.getName))) :: Nil) + }) if (path.nonEmpty) { expressions.If( @@ -580,4 +617,39 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext, } } - +/** + * An Expression extracting an object having the given class definition from a List of FHIR + * Resources. + */ +case class GetClassFromContained(targetObject: Expression, + containedClass: Class[_]) + extends Expression with NonSQLExpression { + + override def nullable: Boolean = targetObject.nullable + override def children: Seq[Expression] = targetObject :: Nil + override def dataType: DataType = ObjectType(containedClass) + + override def eval(input: InternalRow): Any = + throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + + val javaType = containedClass.getName + val obj = targetObject.genCode(ctx) + + ev.copy(code = + s""" + |${obj.code} + |$javaType ${ev.value} = null; + |boolean ${ev.isNull} = true; + |java.util.List contained = ${obj.value}.getContained(); + | + |for (int containedIndex = 0; containedIndex < contained.size(); containedIndex++) { + | if (contained.get(containedIndex) instanceof $javaType) { + | ${ev.value} = ($javaType) contained.get(containedIndex); + | ${ev.isNull} = false; + | } + |} + """.stripMargin) + } +} diff --git a/bunsen-core/src/main/scala/com/cerner/bunsen/SchemaConverter.scala b/bunsen-core/src/main/scala/com/cerner/bunsen/SchemaConverter.scala index ac85cb4a..545aa725 100644 --- a/bunsen-core/src/main/scala/com/cerner/bunsen/SchemaConverter.scala +++ b/bunsen-core/src/main/scala/com/cerner/bunsen/SchemaConverter.scala @@ -98,4 +98,32 @@ class SchemaConverter(fhirContext: FhirContext, dataTypeMappings: DataTypeMappin StructType(fields) } + /** + * Returns the Spark struct type used to encode the given parent FHIR composite and any optional + * contained FHIR resources. + * + * @param definition The FHIR definition of the parent having a composite type. + * @param contained The FHIR definitions of resources contained to the parent having composite + * types. + * @return The schema of the parent as a Spark StructType + */ + private[bunsen] def parentToStructType(definition: BaseRuntimeElementCompositeDefinition[_], + contained: Seq[BaseRuntimeElementCompositeDefinition[_]]): StructType = { + + val parent = compositeToStructType(definition) + + if (contained.nonEmpty) { + val containedFields = contained.map(containedElement => + StructField(containedElement.getName, + compositeToStructType(containedElement))) + + val containedStruct = StructType(containedFields) + + parent.add(StructField("contained", containedStruct)) + + } else { + + parent + } + } } diff --git a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/BundlesTest.java b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/BundlesTest.java index dc1cd67b..b499d4d5 100644 --- a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/BundlesTest.java +++ b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/BundlesTest.java @@ -12,8 +12,10 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; import org.hl7.fhir.r4.model.Bundle; import org.hl7.fhir.r4.model.Condition; +import org.hl7.fhir.r4.model.MedicationRequest; import org.hl7.fhir.r4.model.Patient; import org.junit.AfterClass; import org.junit.Assert; @@ -130,6 +132,22 @@ private void checkConditions(Dataset conditions) { Assert.assertTrue(conditionIds.containsAll(expectedIds)); } + private void checkContained(Dataset medicationRequests) { + List medicationIds = medicationRequests + .select("contained.medication.id") + .where(functions.col("id").isNotNull()) + .as(Encoders.STRING()) + .collectAsList(); + + Assert.assertEquals(2, medicationIds.size()); + + List expectedIds = ImmutableList.of( + "#201", + "#202"); + + Assert.assertTrue(medicationIds.containsAll(expectedIds)); + } + @Test public void testGetResourcesByClass() { @@ -160,6 +178,17 @@ public void testGetCaseInsensitive() { checkConditions(conditions); } + @Test + public void getContained() { + + Dataset medicationRequests = bundles.extractEntry(spark, + bundlesRdd, + "MedicationRequest", + "Medication"); + + checkContained(medicationRequests); + } + @Test public void testXmlBundleStrings() { diff --git a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/FhirEncodersTest.java b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/FhirEncodersTest.java index d5d3c061..a7f61c92 100644 --- a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/FhirEncodersTest.java +++ b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/FhirEncodersTest.java @@ -19,9 +19,12 @@ import org.hl7.fhir.r4.model.Condition; import org.hl7.fhir.r4.model.DateTimeType; import org.hl7.fhir.r4.model.IntegerType; +import org.hl7.fhir.r4.model.Medication; import org.hl7.fhir.r4.model.MedicationRequest; import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.Provenance; +import org.hl7.fhir.r4.model.Provenance.ProvenanceEntityComponent; import org.hl7.fhir.r4.model.Quantity; import org.junit.AfterClass; import org.junit.Assert; @@ -75,7 +78,7 @@ public static void setUp() { decodedObservation = observationsDataset.head(); medDataset = spark.createDataset(ImmutableList.of(medRequest), - encoders.of(MedicationRequest.class)); + encoders.of(MedicationRequest.class, Medication.class, Provenance.class)); decodedMedRequest = medDataset.head(); } @@ -233,6 +236,42 @@ public void annotation() throws FHIRException { } + @Test + public void contained() throws FHIRException { + + // Contained resources should be put to the Contained list in order of the Encoder arguments + Assert.assertTrue(decodedMedRequest.getContained().get(0) instanceof Medication); + + Medication originalMedication = (Medication) medRequest.getContained().get(0); + Medication decodedMedication = (Medication) decodedMedRequest.getContained().get(0); + + Assert.assertEquals(originalMedication.getId(), decodedMedication.getId()); + Assert.assertEquals(originalMedication.getIngredientFirstRep() + .getItemCodeableConcept() + .getCodingFirstRep() + .getCode(), + decodedMedication.getIngredientFirstRep() + .getItemCodeableConcept() + .getCodingFirstRep() + .getCode()); + + Assert.assertTrue(decodedMedRequest.getContained().get(1) instanceof Provenance); + + Provenance decodedProvenance = (Provenance) decodedMedRequest.getContained().get(1); + Provenance originalProvenance = (Provenance) medRequest.getContained().get(1); + + Assert.assertEquals(originalProvenance.getId(), decodedProvenance.getId()); + Assert.assertEquals(originalProvenance.getTargetFirstRep().getReference(), + decodedProvenance.getTargetFirstRep().getReference()); + + ProvenanceEntityComponent originalEntity = originalProvenance.getEntityFirstRep(); + ProvenanceEntityComponent decodedEntity = decodedProvenance.getEntityFirstRep(); + + Assert.assertEquals(originalEntity.getRole(), decodedEntity.getRole()); + Assert.assertEquals(originalEntity.getWhat().getReference(), + decodedEntity.getWhat().getReference()); + } + /** * Sanity test with a deep copy to check we didn't break internal state used by copies. */ diff --git a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/TestData.java b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/TestData.java index f0e5d927..de75a7db 100644 --- a/bunsen-r4/src/test/java/com/cerner/bunsen/r4/TestData.java +++ b/bunsen-r4/src/test/java/com/cerner/bunsen/r4/TestData.java @@ -1,5 +1,6 @@ package com.cerner.bunsen.r4; +import com.google.common.collect.ImmutableList; import org.hl7.fhir.r4.model.Annotation; import org.hl7.fhir.r4.model.CodeableConcept; import org.hl7.fhir.r4.model.Coding; @@ -7,10 +8,14 @@ import org.hl7.fhir.r4.model.DateTimeType; import org.hl7.fhir.r4.model.Identifier; import org.hl7.fhir.r4.model.IntegerType; +import org.hl7.fhir.r4.model.Medication; +import org.hl7.fhir.r4.model.Medication.MedicationIngredientComponent; import org.hl7.fhir.r4.model.MedicationRequest; import org.hl7.fhir.r4.model.Narrative; import org.hl7.fhir.r4.model.Observation; import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.Provenance; +import org.hl7.fhir.r4.model.Provenance.ProvenanceEntityRole; import org.hl7.fhir.r4.model.Quantity; import org.hl7.fhir.r4.model.Reference; import org.hl7.fhir.r4.model.codesystems.ConditionVerStatus; @@ -114,6 +119,47 @@ public static Patient newPatient() { return patient; } + /** + * Returns a FHIR medication to be contained to a medication request for testing purposes. + */ + public static Medication newMedication() { + + Medication medication = new Medication(); + + medication.setId("test-med"); + + MedicationIngredientComponent ingredient = new MedicationIngredientComponent(); + + CodeableConcept item = new CodeableConcept(); + item.addCoding() + .setSystem("test/ingredient/system") + .setCode("test-code"); + + ingredient.setItem(item); + + medication.addIngredient(ingredient); + + return medication; + } + + /** + * Returns a FHIR Provenance to be contained to a medication request for testing purposes. + */ + public static Provenance newProvenance() { + + Provenance provenance = new Provenance(); + + provenance.setId("test-provenance"); + + provenance.setTarget(ImmutableList.of(new Reference("test-target"))); + + provenance.getEntityFirstRep() + .setRole(ProvenanceEntityRole.SOURCE) + .setWhat(new Reference("test-entity")); + + return provenance; + } + /** * Returns a FHIR medication request for testing purposes. */ @@ -144,6 +190,10 @@ public static MedicationRequest newMedRequest() { medReq.addNote(annotation); + // Add contained resources + medReq.addContained(newMedication()); + medReq.addContained(newProvenance()); + return medReq; } } diff --git a/bunsen-r4/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json b/bunsen-r4/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json index f7427bcf..9588eec0 100644 --- a/bunsen-r4/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json +++ b/bunsen-r4/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json @@ -72,6 +72,12 @@ { "resource": { "resourceType": "MedicationRequest", + "contained": [ + { + "resourceType": "Medication", + "id": "201" + } + ], "id": "101", "text": { "status": "generated", diff --git a/bunsen-r4/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json b/bunsen-r4/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json index e362e4cc..b9a1ad0e 100644 --- a/bunsen-r4/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json +++ b/bunsen-r4/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json @@ -54,6 +54,12 @@ { "resource": { "resourceType": "MedicationRequest", + "contained": [ + { + "resourceType": "Medication", + "id": "202" + } + ], "id": "353", "text": { "status": "generated", diff --git a/bunsen-r4/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml b/bunsen-r4/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml index bb9527fa..e012684d 100644 --- a/bunsen-r4/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml +++ b/bunsen-r4/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml @@ -67,6 +67,11 @@ + + + + +
diff --git a/bunsen-r4/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml b/bunsen-r4/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml index f2532186..5d87dee2 100644 --- a/bunsen-r4/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml +++ b/bunsen-r4/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml @@ -95,6 +95,11 @@ + + + + +
diff --git a/bunsen-stu3/src/main/scala/com/cerner/bunsen/stu3/Stu3DataTypeMappings.scala b/bunsen-stu3/src/main/scala/com/cerner/bunsen/stu3/Stu3DataTypeMappings.scala index 74410ee8..8435c8a5 100644 --- a/bunsen-stu3/src/main/scala/com/cerner/bunsen/stu3/Stu3DataTypeMappings.scala +++ b/bunsen-stu3/src/main/scala/com/cerner/bunsen/stu3/Stu3DataTypeMappings.scala @@ -106,7 +106,7 @@ class Stu3DataTypeMappings extends DataTypeMappings { override def skipField(definition: BaseRuntimeElementCompositeDefinition[_], child: BaseRuntimeChildDefinition ) : Boolean = { - // References may be recursive, so include only the reference adn display name. + // References may be recursive, so include only the reference and display name. val skipRecursiveReference = definition.getImplementingClass == classOf[Reference] && !(child.getElementName == "reference" || child.getElementName == "display") diff --git a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/BundlesTest.java b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/BundlesTest.java index dd80158b..55f64101 100644 --- a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/BundlesTest.java +++ b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/BundlesTest.java @@ -12,8 +12,10 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; import org.hl7.fhir.dstu3.model.Bundle; import org.hl7.fhir.dstu3.model.Condition; +import org.hl7.fhir.dstu3.model.MedicationRequest; import org.hl7.fhir.dstu3.model.Patient; import org.junit.AfterClass; import org.junit.Assert; @@ -130,6 +132,22 @@ private void checkConditions(Dataset conditions) { Assert.assertTrue(conditionIds.containsAll(expectedIds)); } + private void checkContained(Dataset medicationRequests) { + List medicationIds = medicationRequests + .select("contained.medication.id") + .where(functions.col("id").isNotNull()) + .as(Encoders.STRING()) + .collectAsList(); + + Assert.assertEquals(2, medicationIds.size()); + + List expectedIds = ImmutableList.of( + "#201", + "#202"); + + Assert.assertTrue(medicationIds.containsAll(expectedIds)); + } + @Test public void testGetResourcesByClass() { @@ -160,6 +178,17 @@ public void testGetCaseInsensitive() { checkConditions(conditions); } + @Test + public void getContained() { + + Dataset medicationRequests = bundles.extractEntry(spark, + bundlesRdd, + "MedicationRequest", + "Medication"); + + checkContained(medicationRequests); + } + @Test public void testXmlBundleStrings() { diff --git a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/FhirEncodersTest.java b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/FhirEncodersTest.java index 8f3434c8..0387baba 100644 --- a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/FhirEncodersTest.java +++ b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/FhirEncodersTest.java @@ -6,6 +6,7 @@ import java.math.BigDecimal; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Map; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -18,11 +19,18 @@ import org.hl7.fhir.dstu3.model.Condition; import org.hl7.fhir.dstu3.model.DateTimeType; import org.hl7.fhir.dstu3.model.IntegerType; +import org.hl7.fhir.dstu3.model.Medication; import org.hl7.fhir.dstu3.model.MedicationRequest; import org.hl7.fhir.dstu3.model.Observation; import org.hl7.fhir.dstu3.model.Patient; +import org.hl7.fhir.dstu3.model.Procedure; +import org.hl7.fhir.dstu3.model.Provenance; +import org.hl7.fhir.dstu3.model.Provenance.ProvenanceEntityComponent; import org.hl7.fhir.dstu3.model.Quantity; +import org.hl7.fhir.dstu3.model.Resource; import org.hl7.fhir.exceptions.FHIRException; +import org.hl7.fhir.instance.model.api.IBaseReference; +import org.hl7.fhir.instance.model.api.IBaseResource; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -75,8 +83,7 @@ public static void setUp() { decodedObservation = observationsDataset.head(); medDataset = spark.createDataset(ImmutableList.of(medRequest), - encoders.of(MedicationRequest.class)); - + encoders.of(MedicationRequest.class, Medication.class, Provenance.class)); decodedMedRequest = medDataset.head(); } @@ -227,6 +234,42 @@ public void annotation() throws FHIRException { } + @Test + public void contained() throws FHIRException { + + // Contained resources should be put to the Contained list in order of the Encoder arguments + Assert.assertTrue(decodedMedRequest.getContained().get(0) instanceof Medication); + + Medication originalMedication = (Medication) medRequest.getContained().get(0); + Medication decodedMedication = (Medication) decodedMedRequest.getContained().get(0); + + Assert.assertEquals(originalMedication.getId(), decodedMedication.getId()); + Assert.assertEquals(originalMedication.getIngredientFirstRep() + .getItemCodeableConcept() + .getCodingFirstRep() + .getCode(), + decodedMedication.getIngredientFirstRep() + .getItemCodeableConcept() + .getCodingFirstRep() + .getCode()); + + Assert.assertTrue(decodedMedRequest.getContained().get(1) instanceof Provenance); + + Provenance decodedProvenance = (Provenance) decodedMedRequest.getContained().get(1); + Provenance originalProvenance = (Provenance) medRequest.getContained().get(1); + + Assert.assertEquals(originalProvenance.getId(), decodedProvenance.getId()); + Assert.assertEquals(originalProvenance.getTargetFirstRep().getReference(), + decodedProvenance.getTargetFirstRep().getReference()); + + ProvenanceEntityComponent originalEntity = originalProvenance.getEntityFirstRep(); + ProvenanceEntityComponent decodedEntity = decodedProvenance.getEntityFirstRep(); + + Assert.assertEquals(originalEntity.getRole(), decodedEntity.getRole()); + Assert.assertEquals(originalEntity.getWhatReference().getReference(), + decodedEntity.getWhatReference().getReference()); + } + /** * Sanity test with a deep copy to check we didn't break internal state used by copies. */ diff --git a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/TestData.java b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/TestData.java index 9fa19c62..681467fb 100644 --- a/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/TestData.java +++ b/bunsen-stu3/src/test/java/com/cerner/bunsen/stu3/TestData.java @@ -1,15 +1,22 @@ package com.cerner.bunsen.stu3; +import com.google.common.collect.ImmutableList; import org.hl7.fhir.dstu3.model.Annotation; import org.hl7.fhir.dstu3.model.CodeableConcept; import org.hl7.fhir.dstu3.model.Condition; import org.hl7.fhir.dstu3.model.DateTimeType; import org.hl7.fhir.dstu3.model.Identifier; import org.hl7.fhir.dstu3.model.IntegerType; +import org.hl7.fhir.dstu3.model.Medication; +import org.hl7.fhir.dstu3.model.Medication.MedicationIngredientComponent; +import org.hl7.fhir.dstu3.model.Medication.MedicationStatus; import org.hl7.fhir.dstu3.model.MedicationRequest; import org.hl7.fhir.dstu3.model.Narrative; import org.hl7.fhir.dstu3.model.Observation; import org.hl7.fhir.dstu3.model.Patient; +import org.hl7.fhir.dstu3.model.Procedure; +import org.hl7.fhir.dstu3.model.Provenance; +import org.hl7.fhir.dstu3.model.Provenance.ProvenanceEntityRole; import org.hl7.fhir.dstu3.model.Quantity; import org.hl7.fhir.dstu3.model.Reference; import org.hl7.fhir.utilities.xhtml.NodeType; @@ -108,6 +115,47 @@ public static Patient newPatient() { return patient; } + /** + * Returns a FHIR medication to be contained to a medication request for testing purposes. + */ + public static Medication newMedication() { + + Medication medication = new Medication(); + + medication.setId("test-med"); + + MedicationIngredientComponent ingredient = new MedicationIngredientComponent(); + + CodeableConcept item = new CodeableConcept(); + item.addCoding() + .setSystem("test/ingredient/system") + .setCode("test-code"); + + ingredient.setItem(item); + + medication.addIngredient(ingredient); + + return medication; + } + + /** + * Returns a FHIR Provenance to be contained to a medication request for testing purposes. + */ + public static Provenance newProvenance() { + + Provenance provenance = new Provenance(); + + provenance.setId("test-provenance"); + + provenance.setTarget(ImmutableList.of(new Reference("test-target"))); + + provenance.getEntityFirstRep() + .setRole(ProvenanceEntityRole.SOURCE) + .setWhat(new Reference("test-entity")); + + return provenance; + } + /** * Returns a FHIR medication request for testing purposes. */ @@ -115,7 +163,7 @@ public static MedicationRequest newMedRequest() { MedicationRequest medReq = new MedicationRequest(); - medReq.setId("test-med"); + medReq.setId("test-medreq"); // Medication code CodeableConcept med = new CodeableConcept(); @@ -138,6 +186,10 @@ public static MedicationRequest newMedRequest() { medReq.addNote(annotation); + // Add contained resources + medReq.addContained(newMedication()); + medReq.addContained(newProvenance()); + return medReq; } } diff --git a/bunsen-stu3/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json b/bunsen-stu3/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json index f7427bcf..9588eec0 100644 --- a/bunsen-stu3/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json +++ b/bunsen-stu3/src/test/resources/json/bundles/patient-1032702.fhir-bundle.json @@ -72,6 +72,12 @@ { "resource": { "resourceType": "MedicationRequest", + "contained": [ + { + "resourceType": "Medication", + "id": "201" + } + ], "id": "101", "text": { "status": "generated", diff --git a/bunsen-stu3/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json b/bunsen-stu3/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json index e362e4cc..b9a1ad0e 100644 --- a/bunsen-stu3/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json +++ b/bunsen-stu3/src/test/resources/json/bundles/patient-6666001.fhir-bundle.json @@ -54,6 +54,12 @@ { "resource": { "resourceType": "MedicationRequest", + "contained": [ + { + "resourceType": "Medication", + "id": "202" + } + ], "id": "353", "text": { "status": "generated", diff --git a/bunsen-stu3/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml b/bunsen-stu3/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml index bb9527fa..e012684d 100644 --- a/bunsen-stu3/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml +++ b/bunsen-stu3/src/test/resources/xml/bundles/patient-1032702.fhir-bundle.xml @@ -67,6 +67,11 @@ + + + + +
diff --git a/bunsen-stu3/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml b/bunsen-stu3/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml index f2532186..5e780746 100644 --- a/bunsen-stu3/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml +++ b/bunsen-stu3/src/test/resources/xml/bundles/patient-6666001.fhir-bundle.xml @@ -53,6 +53,11 @@ + + + + +
diff --git a/python/bunsen/r4/bundles.py b/python/bunsen/r4/bundles.py index 034ced30..be549a1b 100644 --- a/python/bunsen/r4/bundles.py +++ b/python/bunsen/r4/bundles.py @@ -76,7 +76,7 @@ def extract_entry(sparkSession, javaRDD, resourceName): bundles.extractEntry(sparkSession._jsparkSession, javaRDD, resourceName), sparkSession._wrapped) -def extract_entry(sparkSession, javaRDD, resourceName): +def extract_entry(sparkSession, javaRDD, resourceName, contained=[]): """ Returns a dataset for the given entry type from the bundles. @@ -85,13 +85,21 @@ def extract_entry(sparkSession, javaRDD, resourceName): in this package :param resourceName: the name of the FHIR resource to extract (condition, observation, etc) + :param contained: the list of names of the FHIR resources contained to the parent + resource :return: a DataFrame containing the given resource encoded into Spark columns """ + jArray = sparkSession.sparkContext._gateway \ + .new_array(sparkSession._jvm.java.lang.String, len(contained)) + + for idx, c in enumerate(contained): + jArray[idx] = contained[idx] + bundles = _bundles(sparkSession._jvm) return DataFrame( - bundles.extractEntry(sparkSession._jsparkSession, javaRDD, resourceName), - sparkSession._wrapped) + bundles.extractEntry(sparkSession._jsparkSession, javaRDD, resourceName, jArray), + sparkSession._wrapped) def write_to_database(sparkSession, javaRDD, databaseName, resourceNames): """ diff --git a/python/bunsen/stu3/bundles.py b/python/bunsen/stu3/bundles.py index 92f6222d..2ad54ff5 100644 --- a/python/bunsen/stu3/bundles.py +++ b/python/bunsen/stu3/bundles.py @@ -59,7 +59,7 @@ def from_xml(df, column): bundles = _bundles(df._sc._jvm) return bundles.fromXml(df._jdf, column) -def extract_entry(sparkSession, javaRDD, resourceName): +def extract_entry(sparkSession, javaRDD, resourceName, contained=[]): """ Returns a dataset for the given entry type from the bundles. @@ -68,12 +68,20 @@ def extract_entry(sparkSession, javaRDD, resourceName): in this package :param resourceName: the name of the FHIR resource to extract (condition, observation, etc) + :param contained: the list of names of the FHIR resources contained to the parent + resource :return: a DataFrame containing the given resource encoded into Spark columns """ + jArray = sparkSession.sparkContext._gateway \ + .new_array(sparkSession._jvm.java.lang.String, len(contained)) + + for idx, c in enumerate(contained): + jArray[idx] = contained[idx] + bundles = _bundles(sparkSession._jvm) return DataFrame( - bundles.extractEntry(sparkSession._jsparkSession, javaRDD, resourceName), + bundles.extractEntry(sparkSession._jsparkSession, javaRDD, resourceName, jArray), sparkSession._wrapped) def write_to_database(sparkSession, javaRDD, databaseName, resourceNames):