Skip to content

Commit

Permalink
add read flatrecord traversal node parser (#693)
Browse files Browse the repository at this point in the history
* add flatrecord equality check and tests
  • Loading branch information
yujiaaa2019 authored Aug 19, 2024
1 parent 9a9a05d commit 5ee4831
Show file tree
Hide file tree
Showing 24 changed files with 2,880 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.netflix.hollow.core.write.HollowWriteRecord;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordReader;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalListNode;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalNode;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
Expand Down Expand Up @@ -139,6 +142,18 @@ protected Object parseFlatRecord(HollowSchema recordSchema, FlatRecordReader rea
return collection;
}

@Override
protected Object parseFlatRecordTraversalNode(FlatRecordTraversalNode node) {
List<Object> collection = new ArrayList<>();

for (FlatRecordTraversalNode elementNode : (FlatRecordTraversalListNode) node) {
Object element = elementMapper.parseFlatRecordTraversalNode(elementNode);
collection.add(element);
}

return collection;
}

@Override
protected HollowWriteRecord newWriteRecord() {
return new HollowListWriteRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordReader;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalMapNode;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalNode;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
Expand Down Expand Up @@ -156,6 +159,19 @@ protected Object parseFlatRecord(HollowSchema recordSchema, FlatRecordReader rea
return collection;
}

@Override
protected Object parseFlatRecordTraversalNode(FlatRecordTraversalNode node) {
FlatRecordTraversalMapNode mapNode = (FlatRecordTraversalMapNode) node;
Map<Object, Object> collection = new HashMap<>();

for (Map.Entry<FlatRecordTraversalNode, FlatRecordTraversalNode> entry : mapNode.entrySet()) {
Object key = keyMapper.parseFlatRecordTraversalNode(entry.getKey());
Object value = valueMapper.parseFlatRecordTraversalNode(entry.getValue());
collection.put(key, value);
}
return collection;
}

@Override
protected HollowWriteRecord newWriteRecord() {
return new HollowMapWriteRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecord;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordReader;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalNode;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalObjectNode;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashMap;
Expand Down Expand Up @@ -91,6 +94,16 @@ public void writeFlat(Object o, FlatRecordWriter flatRecordWriter) {
typeMapper.writeFlat(o, flatRecordWriter);
}

public <T> T readFlat(FlatRecordTraversalNode node) {
String schemaName = node.getSchema().getName();
HollowTypeMapper typeMapper = typeMappers.get(schemaName);
if (typeMapper == null) {
throw new IllegalArgumentException("No type mapper found for schema " + schemaName);
}
Object obj = typeMapper.parseFlatRecordTraversalNode(node);
return (T) obj;
}

public <T> T readFlat(FlatRecord record) {
FlatRecordReader recordReader = new FlatRecordReader(record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalNode;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalObjectNode;
import sun.misc.Unsafe;

@SuppressWarnings("restriction")
Expand Down Expand Up @@ -290,6 +293,49 @@ protected Object parseFlatRecord(HollowSchema recordSchema, FlatRecordReader rea
throw new RuntimeException(ex);
}
}
@Override
protected Object parseFlatRecordTraversalNode(FlatRecordTraversalNode node) {
try {
FlatRecordTraversalObjectNode objectNode = (FlatRecordTraversalObjectNode) node;
HollowObjectSchema recordObjectSchema = objectNode.getSchema();

Object obj = null;
if (BOXED_WRAPPERS.contains(clazz)) {
// if `clazz` is a BoxedWrapper then by definition its OBJECT schema will have a single primitive
// field so find it in the FlatRecord and ignore all other fields.
for (int i = 0; i < recordObjectSchema.numFields(); i++) {
int posInPojoSchema = schema.getPosition(recordObjectSchema.getFieldName(i));
if (posInPojoSchema != -1) {
obj = mappedFields.get(posInPojoSchema).parseBoxedWrapper(objectNode);
}
}
} else if (clazz.isEnum()) {
// if `clazz` is an enum, then we should expect to find a field called `_name` in the FlatRecord.
// There may be other fields if the producer enum contained custom properties, we ignore them
// here assuming the enum constructor will set them if needed.
for (int i = 0; i < recordObjectSchema.numFields(); i++) {
String fieldName = recordObjectSchema.getFieldName(i);
int posInPojoSchema = schema.getPosition(fieldName);
if (fieldName.equals(MappedFieldType.ENUM_NAME.getSpecialFieldName()) && posInPojoSchema != -1) {
obj = mappedFields.get(posInPojoSchema).parseBoxedWrapper(objectNode);
}
}
} else {
obj = unsafe.allocateInstance(clazz);
for (int i = 0; i < recordObjectSchema.numFields(); i++) {
int posInPojoSchema = schema.getPosition(recordObjectSchema.getFieldName(i));
if (posInPojoSchema != -1) {
mappedFields.get(posInPojoSchema).parse(obj, objectNode);
}
}
}

return obj;
}
catch(Exception ex) {
throw new RuntimeException(ex);
}
}

Object[] extractPrimaryKey(Object obj) {
int[][] primaryKeyFieldPathIdx = this.primaryKeyFieldPathIdx;
Expand Down Expand Up @@ -849,6 +895,220 @@ private Object parseBoxedWrapper(FlatRecordReader reader) {
return null;
}

private Object parseBoxedWrapper(FlatRecordTraversalObjectNode record) {
switch (fieldType) {
case BOOLEAN:
return record.getFieldValueBooleanBoxed(fieldName);
case INT:
return record.getFieldValueIntBoxed(fieldName);
case SHORT:
int shortValue = record.getFieldValueInt(fieldName);
if (shortValue == Integer.MIN_VALUE) {
return null;
}
return Short.valueOf((short) shortValue);
case BYTE:
int byteValue = record.getFieldValueInt(fieldName);
if (byteValue == Integer.MIN_VALUE) {
return null;
}
return Byte.valueOf((byte) byteValue);
case CHAR:
int charValue = record.getFieldValueInt(fieldName);
if (charValue == Integer.MIN_VALUE) {
return null;
}
return Character.valueOf((char) charValue);
case LONG:
return record.getFieldValueLongBoxed(fieldName);
case FLOAT:
return record.getFieldValueFloatBoxed(fieldName);
case DOUBLE:
return record.getFieldValueDoubleBoxed(fieldName);
case STRING:
return record.getFieldValueString(fieldName);
case BYTES:
return record.getFieldValueBytes(fieldName);
case ENUM_NAME:
String enumName = record.getFieldValueString(fieldName);
if (enumName == null) {
return null;
}
return Enum.valueOf((Class<Enum>) clazz, enumName);
case DATE_TIME: {
long dateValue = record.getFieldValueLong(fieldName);
if (dateValue == Long.MIN_VALUE) {
return null;
}
return new Date(dateValue);
}
default:
throw new IllegalArgumentException("Unexpected field type " + fieldType + " for field " + fieldName);
}
}

private void parse(Object obj, FlatRecordTraversalObjectNode node) {
switch(fieldType) {
case BOOLEAN: {
Boolean value = node.getFieldValueBooleanBoxed(fieldName);
if (value != null) {
unsafe.putBoolean(obj, fieldOffset, value == Boolean.TRUE);
}
break;
}
case INT: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putInt(obj, fieldOffset, value);
}
break;
}
case SHORT: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putShort(obj, fieldOffset, (short) value);
}
break;
}
case BYTE: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putByte(obj, fieldOffset, (byte) value);
}
break;
}
case CHAR: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putChar(obj, fieldOffset, (char) value);
}
break;
}
case LONG: {
long value = node.getFieldValueLong(fieldName);
if (value != Long.MIN_VALUE) {
unsafe.putLong(obj, fieldOffset, value);
}
break;
}
case FLOAT: {
float value = node.getFieldValueFloat(fieldName);
if (!Float.isNaN(value)) {
unsafe.putFloat(obj, fieldOffset, value);
}
break;
}
case DOUBLE: {
double value = node.getFieldValueDouble(fieldName);
if (!Double.isNaN(value)) {
unsafe.putDouble(obj, fieldOffset, value);
}
break;
}
case STRING: {
String value = node.getFieldValueString(fieldName);
if (value != null) {
unsafe.putObject(obj, fieldOffset, value);
}
break;
}
case BYTES: {
byte[] value = node.getFieldValueBytes(fieldName);
if (value != null) {
unsafe.putObject(obj, fieldOffset, value);
}
break;
}
case INLINED_BOOLEAN: {
Boolean value = node.getFieldValueBooleanBoxed(fieldName);
if (value != null) {
unsafe.putObject(obj, fieldOffset, value);
}
break;
}
case INLINED_INT: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, Integer.valueOf(value));
}
break;
}
case INLINED_SHORT: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, Short.valueOf((short) value));
}
break;
}
case INLINED_BYTE: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, Byte.valueOf((byte) value));
}
break;
}
case INLINED_CHAR: {
int value = node.getFieldValueInt(fieldName);
if (value != Integer.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, Character.valueOf((char) value));
}
break;
}
case INLINED_LONG: {
long value = node.getFieldValueLong(fieldName);
if (value != Long.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, Long.valueOf(value));
}
break;
}
case INLINED_FLOAT: {
float value = node.getFieldValueFloat(fieldName);
if (!Float.isNaN(value)) {
unsafe.putObject(obj, fieldOffset, Float.valueOf(value));
}
break;
}
case INLINED_DOUBLE: {
double value = node.getFieldValueDouble(fieldName);
if (!Double.isNaN(value)) {
unsafe.putObject(obj, fieldOffset, Double.valueOf(value));
}
break;
}
case INLINED_STRING: {
String value = node.getFieldValueString(fieldName);
if (value != null) {
unsafe.putObject(obj, fieldOffset, value);
}
break;
}
case DATE_TIME: {
long value = node.getFieldValueLong(fieldName);
if (value != Long.MIN_VALUE) {
unsafe.putObject(obj, fieldOffset, new Date(value));
}
break;
}
case ENUM_NAME: {
String value = node.getFieldValueString(fieldName);
if (value != null) {
unsafe.putObject(obj, fieldOffset, Enum.valueOf((Class) type, value));
}
break;
}
case REFERENCE: {
FlatRecordTraversalNode childNode = node.getFieldNode(fieldName);
if (childNode != null) {
unsafe.putObject(obj, fieldOffset, subTypeMapper.parseFlatRecordTraversalNode(childNode));
}
break;
}
default:
throw new IllegalArgumentException("Unknown field type: " + fieldType);
}
}


private void parse(Object obj, FlatRecordReader reader, Map<Integer, Object> parsedRecords) {
switch(fieldType) {
case BOOLEAN: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordReader;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalNode;
import com.netflix.hollow.core.write.objectmapper.flatrecords.traversal.FlatRecordTraversalSetNode;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -132,6 +136,15 @@ protected Object parseFlatRecord(HollowSchema recordSchema, FlatRecordReader rea
return collection;
}

@Override
protected Object parseFlatRecordTraversalNode(FlatRecordTraversalNode node) {
Set<Object> collection = new HashSet<>();
for (FlatRecordTraversalNode elementNode : (FlatRecordTraversalSetNode) node) {
collection.add(elementMapper.parseFlatRecordTraversalNode(elementNode));
}
return collection;
}

@Override
protected HollowWriteRecord newWriteRecord() {
return new HollowSetWriteRecord();
Expand Down
Loading

0 comments on commit 5ee4831

Please sign in to comment.