Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Merge pull request #50 from bdrillard/issue-46
Browse files Browse the repository at this point in the history
Issue 46: Support for Contained Resources
  • Loading branch information
bdrillard authored Apr 24, 2019
2 parents 7221a7b + 8650bf5 commit e3c1d5e
Show file tree
Hide file tree
Showing 21 changed files with 531 additions and 33 deletions.
54 changes: 52 additions & 2 deletions bunsen-core/src/main/java/com/cerner/bunsen/Bundles.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,32 @@ public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
FhirEncoders.forVersion(fhirVersion).getOrCreate());
}

/**
* Extracts the given resource type from the RDD of bundles and returns
* it as a Dataset of that type, including any declared resources contained
* to the parent resource.
*
* @param spark the spark session
* @param bundles the RDD of FHIR Bundles
* @param resourceName the FHIR name of the resource type to extract
* (e.g., condition, patient. etc).
* @param contained the FHIR names of the resources contained to the
* parent resource.
* @param <T> the type fo the resource being extracted from the bundles.
* @return a dataset of the given resource
*/
public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
JavaRDD<BundleContainer> bundles,
String resourceName,
String... contained) {

return extractEntry(spark,
bundles,
resourceName,
FhirEncoders.forVersion(fhirVersion).getOrCreate(),
contained);
}

/**
* Extracts the given resource type from the RDD of bundles and returns
* it as a Dataset of that type.
Expand All @@ -225,11 +251,35 @@ public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
* @param <T> the type of the resource being extracted from the bundles.
* @return a dataset of the given resource
*/
public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
JavaRDD<BundleContainer> bundles,
String resourceName,
FhirEncoders encoders) {

return extractEntry(spark, bundles, resourceName, encoders, new String[]{});
}

/**
* Extracts the given resource type from the RDD of bundles and returns
* it as a Dataset of that type, including any declared resources contained
* to the parent resource.
*
* @param spark the spark session
* @param bundles the RDD of FHIR Bundles
* @param resourceName the FHIR name of the resource type to extract
* (e.g., condition, patient. etc).
* @param encoders the Encoders instance defining how the resources are encoded.
* @param contained the FHIR names of the resources contained to the
* parent resource.
* @param <T> the type fo the resource being extracted from the bundles.
* @return a dataset of the given resource
*/
public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
JavaRDD<BundleContainer> bundles,
String resourceName,
FhirEncoders encoders,
String... contained) {

FhirContext context = FhirEncoders.contextFor(encoders.getFhirVersion());

RuntimeResourceDefinition def = context.getResourceDefinition(resourceName);
Expand All @@ -238,7 +288,7 @@ public <T extends IBaseResource> Dataset<T> extractEntry(SparkSession spark,
new ToResource<T>(def.getName(),
encoders.getFhirVersion()));

Encoder<T> encoder = encoders.of((Class<T>) def.getImplementingClass());
Encoder<T> encoder = encoders.of(resourceName, contained);

return spark.createDataset(resourceRdd.rdd(), encoder);
}
Expand Down
60 changes: 53 additions & 7 deletions bunsen-core/src/main/java/com/cerner/bunsen/FhirEncoders.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.context.RuntimeResourceDefinition;
import com.cerner.bunsen.datatypes.DataTypeMappings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.hl7.fhir.instance.model.api.IBaseResource;
import scala.collection.JavaConversions;

/**
* Spark Encoders for FHIR Resources. This object is thread safe.
Expand Down Expand Up @@ -46,7 +50,7 @@ public class FhirEncoders {
/**
* Cached encoders to avoid having to re-create them.
*/
private final Map<Class, ExpressionEncoder> encoderCache = new HashMap<>();
private final Map<Integer, ExpressionEncoder> encoderCache = new HashMap<>();

/**
* Consumers should generally use the {@link #forStu3()} or {@link #forR4()}
Expand Down Expand Up @@ -173,47 +177,89 @@ public static Builder forVersion(FhirVersionEnum fhirVersion) {
* Returns an encoder for the given FHIR resource.
*
* @param type the type of the resource to encode.
* @param contained a list of types for FHIR resources contained to the encoded resource.
* @param <T> the type of the resource to be encoded.
* @return an encoder for the resource.
*/
public <T extends IBaseResource> ExpressionEncoder<T> of(Class<T> type) {
public final <T extends IBaseResource> ExpressionEncoder<T> of(Class<T> type,
List<Class<? extends IBaseResource>> contained) {

BaseRuntimeElementCompositeDefinition definition =
context.getResourceDefinition(type);

List<BaseRuntimeElementCompositeDefinition<?>> containedDefinitions = new ArrayList<>();

for (Class resource : contained) {

containedDefinitions.add(context.getResourceDefinition(resource));
}

StringBuilder keyBuilder = new StringBuilder(type.getName());

for (Class resource : contained) {

keyBuilder.append(resource.getName());
}

int key = keyBuilder.toString().hashCode();

synchronized (encoderCache) {

ExpressionEncoder<T> encoder = encoderCache.get(type);
ExpressionEncoder<T> encoder = encoderCache.get(key);

if (encoder == null) {

encoder = (ExpressionEncoder<T>)
EncoderBuilder.of(definition,
context,
mappings,
new SchemaConverter(context, mappings));
new SchemaConverter(context, mappings),
JavaConversions.asScalaBuffer(containedDefinitions));

encoderCache.put(type, encoder);
encoderCache.put(key, encoder);
}

return encoder;
}
}

/**
* Returns an encoder for the given FHIR resource.
*
* @param type the type of the resource to encode.
* @param contained a list of types for FHIR resources contained to the encoded resource.
* @param <T> the type of the resource to be encoded.
* @return an encoder for the resource.
*/
public final <T extends IBaseResource> ExpressionEncoder<T> of(Class<T> type,
Class<? extends IBaseResource>... contained) {

return of(type, Arrays.asList(contained));
}

/**
* Returns an encoder for the given FHIR resource by name, as defined
* by the FHIR specification.
*
* @param resourceName the name of the FHIR resource to encode, such as
* "Encounter", "Condition", "Observation", etc.
* @param contained the names of FHIR resources contained to the encoded resource.
* @param <T> the type of the resource to be encoded.
* @return an encoder for the resource.
*/
public <T extends IBaseResource> ExpressionEncoder<T> of(String resourceName) {
public <T extends IBaseResource> ExpressionEncoder<T> of(String resourceName,
String... contained) {

RuntimeResourceDefinition definition = context.getResourceDefinition(resourceName);

return of((Class<T>) definition.getImplementingClass());
List<Class<? extends IBaseResource>> containedClasses = new ArrayList<>();

for (String containedName : contained) {

containedClasses.add(context.getResourceDefinition(containedName).getImplementingClass());
}

return of((Class<T>) definition.getImplementingClass(), containedClasses);
}

/**
Expand Down
100 changes: 86 additions & 14 deletions bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import com.cerner.bunsen.datatypes.DataTypeMappings
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.hl7.fhir.instance.model.api.{IBase, IBaseDatatype}
import org.hl7.fhir.utilities.xhtml.XhtmlNode

import scala.collection.JavaConversions._
import scala.collection.immutable.Stream.Empty
import scala.collection.mutable
import scala.reflect.ClassTag

/**
Expand All @@ -28,30 +30,32 @@ private[bunsen] object EncoderBuilder {
* Returns an encoder for the FHIR resource implemented by the given class
*
* @param definition The FHIR resource definition
* @param contained The FHIR resources to be contained to the given definition
* @return An ExpressionEncoder for the resource
*/

def of(definition: BaseRuntimeElementCompositeDefinition[_],
context: FhirContext,
mappings: DataTypeMappings,
converter: SchemaConverter): ExpressionEncoder[_] = {
converter: SchemaConverter,
contained: mutable.Buffer[BaseRuntimeElementCompositeDefinition[_]] = mutable.Buffer.empty): ExpressionEncoder[_] = {

val fhirClass = definition.getImplementingClass()
val fhirClass = definition.getImplementingClass

val schema = converter.compositeToStructType(definition)
val schema = converter.parentToStructType(definition, contained)

val inputObject = BoundReference(0, ObjectType(fhirClass), nullable = true)

val encoderBuilder = new EncoderBuilder(context,
mappings,
converter)

val serializers = encoderBuilder.serializer(inputObject, definition)
val serializers = encoderBuilder.serializer(inputObject, definition, contained)

assert(schema.fields.size == serializers.size,
assert(schema.fields.length == serializers.size,
"Must have a serializer for each field.")

val deserializer = encoderBuilder.compositeToDeserializer(definition, None)
val deserializer = encoderBuilder.compositeToDeserializer(definition, None, contained)

new ExpressionEncoder(
schema,
Expand Down Expand Up @@ -293,15 +297,36 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext,
}

private def serializer(inputObject: Expression,
definition: BaseRuntimeElementCompositeDefinition[_]): Seq[Expression] = {
definition: BaseRuntimeElementCompositeDefinition[_],
contained: Seq[BaseRuntimeElementCompositeDefinition[_]]):
Seq[Expression] = {

// Map to (name, value, name, value) expressions for child elements.
val childFields: Seq[Expression] =
definition.getChildren
.flatMap(child => childToExpr(inputObject, child))

// Map to (name, value, name, value) expressions for all contained resources.
val containedChildFields = contained.flatMap { containedDefinition =>

val containedChild = GetClassFromContained(inputObject,
containedDefinition.getImplementingClass)

Literal(containedDefinition.getName) ::
CreateNamedStruct(containedDefinition.getChildren
.flatMap(child => childToExpr(containedChild, child))) ::
Nil
}

// Create a 'contained' struct having the contained elements if declared for the parent.
val containedChildren = if (contained.nonEmpty) {
Literal("contained") :: CreateNamedStruct(containedChildFields) :: Nil
} else {
Nil
}

// The fields are (name, expr) tuples, so just get the expressions for the top level.
childFields.grouped(2)
(childFields ++ containedChildren).grouped(2)
.map(group => group.get(1))
.toList
}
Expand Down Expand Up @@ -396,7 +421,6 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext,
private def childToDeserializer(childDefinition: BaseRuntimeChildDefinition,
path: Option[Expression]): Map[String, Expression] = {


def getPath: Expression = path.getOrElse(GetColumnByOrdinal(0, ObjectType(classOf[String])))

def addToPath(part: String): Expression = path
Expand Down Expand Up @@ -534,10 +558,12 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext,
}

/**
* Returns an expression for deserializing a composite structure at the given path.
* Returns an expression for deserializing a composite structure at the given path along with
* any contained resources declared against the structure.
*/
private def compositeToDeserializer(definition: BaseRuntimeElementCompositeDefinition[_],
path: Option[Expression]): Expression = {
path: Option[Expression],
contained: Seq[BaseRuntimeElementCompositeDefinition[_]] = Nil): Expression = {

def addToPath(part: String): Expression = path
.map(p => UnresolvedExtractValue(p, expressions.Literal(part)))
Expand Down Expand Up @@ -567,7 +593,18 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext,
(setterFor(childDefinition), expression)
}

val result = InitializeJavaBean(compositeInstance, setters)
val bean: Expression = InitializeJavaBean(compositeInstance, setters)

// Deserialize any Contained resources to the new Object through successive calls
// to 'addContained'.
val result = contained.foldLeft(bean)((value, containedResource) => {

Invoke(value,
"addContained",
ObjectType(definition.getImplementingClass),
compositeToDeserializer(containedResource,
Some(UnresolvedAttribute("contained." + containedResource.getName))) :: Nil)
})

if (path.nonEmpty) {
expressions.If(
Expand All @@ -580,4 +617,39 @@ private[bunsen] class EncoderBuilder(fhirContext: FhirContext,
}
}


/**
* An Expression extracting an object having the given class definition from a List of FHIR
* Resources.
*/
case class GetClassFromContained(targetObject: Expression,
containedClass: Class[_])
extends Expression with NonSQLExpression {

override def nullable: Boolean = targetObject.nullable
override def children: Seq[Expression] = targetObject :: Nil
override def dataType: DataType = ObjectType(containedClass)

override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported.")

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

val javaType = containedClass.getName
val obj = targetObject.genCode(ctx)

ev.copy(code =
s"""
|${obj.code}
|$javaType ${ev.value} = null;
|boolean ${ev.isNull} = true;
|java.util.List<Object> contained = ${obj.value}.getContained();
|
|for (int containedIndex = 0; containedIndex < contained.size(); containedIndex++) {
| if (contained.get(containedIndex) instanceof $javaType) {
| ${ev.value} = ($javaType) contained.get(containedIndex);
| ${ev.isNull} = false;
| }
|}
""".stripMargin)
}
}
Loading

0 comments on commit e3c1d5e

Please sign in to comment.