Skip to content

Commit

Permalink
HPCC4J-650 DFSClient should retain integer subtypes
Browse files Browse the repository at this point in the history
- Modified record translation code to maintain integer subtype
- Fixed record translation test case
- Added index record translation to test case

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Sep 27, 2024
1 parent 168d835 commit 2f73709
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class FieldDef implements Serializable
private long len = 0;
private boolean fixedLength = false;
private boolean isUnsigned = false;
private boolean isBiased = false;
private int additionalFlags = 0;

/**
Expand All @@ -54,7 +53,6 @@ public FieldDef(FieldDef rhs)
this.len = rhs.len;
this.fixedLength = rhs.fixedLength;
this.isUnsigned = rhs.isUnsigned;
this.isBiased = rhs.isBiased;
this.additionalFlags = rhs.additionalFlags;
}

Expand Down Expand Up @@ -172,6 +170,16 @@ public HpccSrcType getSourceType()
return this.srcType;
}

/**
* Sets data type on the HPCC cluster.
*
* @param srcType the new source type
*/
public void setSourceType(HpccSrcType srcType)
{
this.srcType = srcType;
}

/**
* Length of data or minimum length if variable.
*
Expand Down Expand Up @@ -307,16 +315,25 @@ public boolean isUnsigned()
/**
* Is the underlying value biased?
*
* @return true when biased
* @return true when biased
*
* @deprecated
*/
public boolean isBiased()
{
return this.isBiased;
return isNonStandardInt();
}

void setIsBiased(boolean biased)
/**
*
*
* @return true when biased
*/
public boolean isNonStandardInt()
{
this.isBiased = biased;
return this.srcType == HpccSrcType.KEYED_INTEGER
|| this.srcType == HpccSrcType.SWAPPED_INTEGER
|| this.srcType == HpccSrcType.BIAS_SWAPPED_INTEGER;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public static JSONObject toJsonRecord(FieldDef field) throws Exception
*/
private static int getTypeID(FieldDef field) throws Exception
{
int typeID = 0;
int typeID = -1;
switch (field.getFieldType())
{
case SET:
Expand Down Expand Up @@ -517,11 +517,6 @@ private static int getTypeID(FieldDef field) throws Exception
case INTEGER:
{
typeID = type_int;
if (field.isUnsigned())
{
typeID |= FLAG_UNSIGNED;
}

HpccSrcType srcType = field.getSourceType();
if (srcType == HpccSrcType.KEYED_INTEGER)
{
Expand All @@ -535,6 +530,12 @@ else if (srcType == HpccSrcType.BIAS_SWAPPED_INTEGER)
{
typeID = type_biasedswapint;
}

if (field.isUnsigned())
{
typeID |= FLAG_UNSIGNED;
}

break;
}
case DECIMAL:
Expand Down Expand Up @@ -642,7 +643,7 @@ else if (srcType == HpccSrcType.UTF16LE || srcType == HpccSrcType.UTF16BE)
*/
private static int getTypeHash(FieldDef field) throws Exception
{
int numHashComponents = 2 + field.getNumDefs();
int numHashComponents = 4 + field.getNumDefs();
if (field.getFieldType() == FieldType.DECIMAL)
{
numHashComponents += 2;
Expand All @@ -651,8 +652,10 @@ private static int getTypeHash(FieldDef field) throws Exception
long[] hashComponents = new long[numHashComponents];
hashComponents[0] = getTypeID(field);
hashComponents[1] = field.getDataLen();
hashComponents[2] = field.getSourceType().ordinal();
hashComponents[3] = field.getAdditionalFlags();

int hashCompIndex = 2;
int hashCompIndex = 4;
for (int i = 0; i < field.getNumDefs(); i++, hashCompIndex++)
{
hashComponents[hashCompIndex] = getTypeHash(field.getDef(i));
Expand Down Expand Up @@ -750,17 +753,19 @@ private static int getJsonTypeDefinition(FieldDef field, HashMap<Integer, Intege
JSONObject childJson = new JSONObject();
childJson.put("name", childField.getFieldName());
childJson.put("type", childTypeName);
if (childTypeID > 0)

int flags = childTypeID | childField.getAdditionalFlags();
if (flags > 0)
{
int flags = childTypeID | childField.getAdditionalFlags();
childJson.put("flags", flags);
}

if (childField.getFieldType() == FieldType.DATASET)
{
char delim = 0x0001;
childJson.put("xpath", childField.getFieldName() + delim + "Row");
} else if (childField.getFieldType() == FieldType.SET)
}
else if (childField.getFieldType() == FieldType.SET)
{
char delim = 0x0001;
childJson.put("xpath", childField.getFieldName() + delim + "Item");
Expand All @@ -779,6 +784,17 @@ private static int getJsonTypeDefinition(FieldDef field, HashMap<Integer, Intege
}
}

if (field.isNonStandardInt())
{
FieldDef nonKeyedField = new FieldDef(field);
nonKeyedField.setSourceType(HpccSrcType.LITTLE_ENDIAN);

int childTypeHash = getJsonTypeDefinition(nonKeyedField, typeDefinitionMap, typeDefinitions);
int childTypeIndex = typeDefinitionMap.get(childTypeHash);
String childTypeName = "ty" + (childTypeIndex + 1);
typeDef.put("child", childTypeName);
}

int newTypeIndex = typeDefinitions.size();
typeDefinitions.add(typeDef);
typeDefinitionMap.put(typeHash, newTypeIndex);
Expand Down Expand Up @@ -956,10 +972,6 @@ private static FieldDef parseJsonTypeDefinition(JSONObject jsonTypeDefinitions,
{
FieldDef fd = new FieldDef("", fieldType, fieldType.description(), length,
isFixedLength(typeID), isUnsigned(typeID), getSourceType(typeID), new FieldDef[0]);
if ((typeID & TYPE_ID_MASK) == type_keyedint)
{
fd.setIsBiased(true);
}
return fd;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ public class TestFieldDefinitions
+ "\"ty9\"\r\n },\r\n \"ty11\": {\r\n \"fieldType\": 257,\r\n \"length\": 2\r\n },\r\n \"ty12\": {\r\n \"fieldType\": 266,\r\n \"length\": "
+ "2,\r\n \"child\": \"ty11\"\r\n },\r\n \"ty13\": {\r\n \"fieldType\": 259,\r\n \"length\": 524304\r\n },\r\n \"ty14\": {\r\n \"fieldType\":"
+ " 4,\r\n \"length\": 8\r\n },\r\n \"ty15\": {\r\n \"fieldType\": 0,\r\n \"length\": 1\r\n },\r\n \"ty16\": {\r\n \"fieldType\": 285,\r\n "
+ "\"length\": 8\r\n },\r\n \"fieldType\": 13,\r\n \"length\": 61,\r\n \"fields\": [\r\n {\r\n \"name\": \"int8\",\r\n \"type\": \"ty2\",\r\n"
+ "\"vinit\": 2,\r\n \"length\": 8\r\n },\r\n \"fieldType\": 13,\r\n \"length\": 61,\r\n \"fields\": [\r\n {\r\n \"name\": \"int8\",\r\n \"type\": \"ty2\",\r\n"
+ " \"flags\": 10\r\n },\r\n {\r\n \"name\": \"uint8\",\r\n \"type\": \"ty4\",\r\n \"flags\": 266\r\n },\r\n {\r\n \"name\": \"int4\","
+ "\r\n \"type\": \"ty6\",\r\n \"flags\": 10\r\n },\r\n {\r\n \"name\": \"uint4\",\r\n \"type\": \"ty8\",\r\n \"flags\": 266\r\n },\r\n"
+ " {\r\n \"name\": \"int2\",\r\n \"type\": \"ty10\",\r\n \"flags\": 10\r\n },\r\n {\r\n \"name\": \"uint2\",\r\n \"type\": \"ty12\",\r\n"
+ " \"flags\": 266\r\n },\r\n {\r\n \"name\": \"udec16\",\r\n \"type\": \"ty13\",\r\n \"flags\": 259\r\n },\r\n {\r\n \"name\": \"fixstr8\",\r\n"
+ " \"type\": \"ty14\",\r\n \"flags\": 4\r\n },\r\n {\r\n \"name\": \"recptr\",\r\n \"type\": \"ty4\",\r\n \"flags\": 266\r\n },\r\n {\r\n"
+ " \"name\": \"isactive\",\r\n \"type\": \"ty15\",\r\n \"flags\": 65536\r\n },\r\n {\r\n \"name\": \"__internal_fpos__\",\r\n \"type\": \"ty16\",\r\n"
+ " \"name\": \"isactive\",\r\n \"type\": \"ty15\",\r\n \"flags\": 65536\r\n },\r\n {\r\n \"name\": \"__internal_fpos__\",\r\n \"type\": \"ty16\",\r\n"
+ " \"flags\": 65821\r\n }\r\n ]\r\n}";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
}
else
{
intValue = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN, fd.isBiased());
intValue = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN);
fieldValue = Long.valueOf(intValue);
}
break;
Expand Down Expand Up @@ -434,7 +434,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
}
else
{
dataLen = (int) getInt(4, isLittleEndian, false);
dataLen = (int) getInt(4, isLittleEndian);
}

byte[] bytes = new byte[dataLen];
Expand All @@ -456,7 +456,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
break;
case BOOLEAN:
// fixed length for each boolean value specified by type def
long value = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN, fd.isBiased());
long value = getInt((int) fd.getDataLen(), fd.getSourceType() == HpccSrcType.LITTLE_ENDIAN);
fieldValue = Boolean.valueOf(value != 0);
break;
case CHAR:
Expand All @@ -480,7 +480,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars
}
else
{
codePoints = ((int) getInt(4, isLittleEndian, false));
codePoints = ((int) getInt(4, isLittleEndian));
}

fieldValue = getString(fd.getSourceType(), codePoints, shouldTrim);
Expand Down Expand Up @@ -600,7 +600,7 @@ private Object parseRecord(FieldDef recordDef, IRecordBuilder recordBuilder, boo
throw new UnparsableContentException("Set should have a single child type." + fd.getNumDefs() + " child types found.");
}

int dataLen = (int) getInt(4, isLittleEndian, false);
int dataLen = (int) getInt(4, isLittleEndian);
int childCountGuess = 1;
if (fd.getDataLen() > 0)
{
Expand Down Expand Up @@ -719,13 +719,11 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException
* the length, 1 to 8 bytes
* @param little_endian
* true if the value is little endian
* @param shouldCorrectBias
* true if the value should be corrected for index bias
* @return the integer extracted as a long
* @throws IOException
* Signals that an I/O exception has occurred.
*/
private long getInt(int len, boolean little_endian, boolean shouldCorrectBias) throws IOException
private long getInt(int len, boolean little_endian) throws IOException
{
long v = getUnsigned(len, little_endian);

Expand All @@ -739,12 +737,6 @@ private long getInt(int len, boolean little_endian, boolean shouldCorrectBias) t
}
}

if (isIndex && shouldCorrectBias)
{
// Roxie indexes are biased to allow for easier comparison. This corrects the bias
v += negMask;
}

return v;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre
for (int j = 0; j < numIncomingParts; j++)
{
DataPartition inFilePart = inFileParts[incomingFilePartIndex + j];
filePartReaders[j] = new HpccRemoteFileReader<HPCCRecord>(inFilePart, recordDef, new HPCCRecordBuilder(recordDef));
filePartReaders[j] = new HpccRemoteFileReader<HPCCRecord>(inFilePart, recordDef, new HPCCRecordBuilder(file.getProjectedRecordDefinition()));
}
incomingFilePartIndex += numIncomingParts;

Expand Down
31 changes: 24 additions & 7 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.logging.log4j.LogManager;
import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.ecl.HpccSrcType;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.hpccsystems.dfs.cluster.ClusterRemapper;
Expand Down Expand Up @@ -163,12 +164,12 @@ public static int getFilePartFromFPos(long fpos)
}

/**
* Extracts the offset in the file part from a fileposition value.
* Extracts the offset in the file part from a fileposition value.
*
* @param fpos file position
* @return the project list
*/
public static long getOffsetFromFPos(long fpos)
public static long getOffsetFromFPos(long fpos)
{
// First 48 bits store the offset
return fpos & 0xffffffffffffL;
Expand Down Expand Up @@ -198,11 +199,27 @@ public HPCCFile setProjectList(String projectList) throws Exception
this.columnPruner = new ColumnPruner(projectList);
if (this.recordDefinition != null)
{
this.projectedRecordDefinition = this.columnPruner.pruneRecordDefinition(this.recordDefinition);
updateProjectedRecordDef();
}
return this;
}

private void updateProjectedRecordDef() throws Exception
{
this.projectedRecordDefinition = this.columnPruner.pruneRecordDefinition(this.recordDefinition);

// By default project all sub-integer types to standard integers
for (int i = 0; i < this.projectedRecordDefinition.getNumDefs(); i++)
{
FieldDef field = this.projectedRecordDefinition.getDef(i);
if (field.isNonStandardInt())
{
field.setSourceType(HpccSrcType.LITTLE_ENDIAN);
}

}
}

/**
* Gets the file access expiry secs.
*
Expand Down Expand Up @@ -434,7 +451,7 @@ private void createDataParts() throws HpccFileException
this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null);
}

this.projectedRecordDefinition = this.columnPruner.pruneRecordDefinition(this.recordDefinition);
updateProjectedRecordDef();
}
else
throw new HpccFileException("Could not fetch metadata for file: '" + fileName + "'");
Expand Down Expand Up @@ -622,13 +639,13 @@ private static String acquireFileAccess(String fileName, HPCCWsDFUClient hpcc, i
String uniqueID = "HPCC-FILE: " + UUID.randomUUID().toString();
return hpcc.getFileAccessBlob(fileName, clusterName, expirySeconds, uniqueID);
}

/**
* @return the file metadata information for this HPCCFile (if it exists)
*/
public DFUFileDetailWrapper getOriginalFileMetadata()
public DFUFileDetailWrapper getOriginalFileMetadata()
{
if (originalFileMetadata==null)
if (originalFileMetadata==null)
{
HPCCWsDFUClient dfuClient = HPCCWsDFUClient.get(espConnInfo);
if (dfuClient.hasInitError())
Expand Down

0 comments on commit 2f73709

Please sign in to comment.