Skip to content

Commit

Permalink
GH-15741 force col types during parsing (#15775)
Browse files Browse the repository at this point in the history
* GH-15741: added force_col_types parameter to force column types specified in parquet schema or in col_types.
GH-15741: finished forcing column types for non-parque parser
GH-15741: Add force_col_types to parquet parser
GH-15741: added force_col_types support to R client

Co-authored-by: Marek Novotný <[email protected]>

* use elegant idea from Tomas Frydas to perform integer to double column type conversion.

* remove commented out code and extra space.

* remove runtime dependencies to build.gradle.

---------

Co-authored-by: Marek Novotný <[email protected]>
  • Loading branch information
wendycwong and mn-mikke authored Oct 10, 2023
1 parent 5406765 commit 407f2bc
Show file tree
Hide file tree
Showing 21 changed files with 454 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static void gbm_example_flow() {
null,
null,
(byte)'\\',
false,
null).execute().body();
System.out.println("parseSetupBody: " + parseSetupBody);

Expand All @@ -129,6 +130,7 @@ public static void gbm_example_flow() {
parseSetupBody.columnNames,
parseSetupBody.columnTypes,
parseSetupBody.skippedColumns,
parseSetupBody.forceColTypes,
null, // domains
parseSetupBody.naStrings,
parseSetupBody.chunkSize,
Expand Down
7 changes: 5 additions & 2 deletions h2o-core/src/main/java/water/api/ParseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) {
new ParseWriter.ParseErr[0], parse.chunk_size,
parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns,
parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null,
parse.escapechar);
parse.escapechar, parse.force_col_types);

if (parse.source_frames == null)
throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'");
Expand All @@ -51,7 +51,10 @@ public ParseV3 parse(int version, ParseV3 parse) {
((setup.get_parse_columns_indices()==null) || (setup.get_parse_columns_indices().length==0)))
throw new H2OIllegalArgumentException("Parser: all columns in the file are skipped and no H2OFrame" +
" can be returned."); // Need this to send error message to R


if (parse.force_col_types && parse.column_types != null)
setup.setOrigColumnTypes(parse.column_types);

parse.job = new JobV3(ParseDataset.parse(
parse.destination_frame.key(), srcs, parse.delete_on_done, setup, parse.blocking
)._job);
Expand Down
5 changes: 5 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public class ParseSetupV3 extends RequestSchemaV3<ParseSetup, ParseSetupV3> {

@API(help="One ASCII character used to escape other characters.", direction=API.Direction.INOUT)
public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR;

@API(help="If true, will force the column types to be either the ones in Parquet schema for Parquet files or the" +
" ones specified in column_types. This parameter is used for numerical columns only. Other column settings" +
" will happen without setting this parameter. Defaults to false.", direction=API.Direction.INPUT)
public boolean force_col_types;

@Override
public ParseSetup fillImpl(ParseSetup impl) {
Expand Down
6 changes: 6 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class ParseV3 extends RequestSchemaV3<Iced, ParseV3> {
@API(help="Skipped columns indices", direction=API.Direction.INOUT)
public int[] skipped_columns;

@API(help="If true, will force the column types to be either the ones in Parquet schema for Parquet files or the " +
"ones specified in column_types. This parameter is used for numerical columns only. Other column" +
"settings will happen without setting this parameter. Defaults to false.",
direction=API.Direction.INPUT)
public boolean force_col_types;

@API(help="Domains for categorical columns")
public String[][] domains;

Expand Down
10 changes: 8 additions & 2 deletions h2o-core/src/main/java/water/fvec/Vec.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,7 @@ public static Vec makeCon(double x, long len, int log_rows_per_chunk, boolean re
// System.out.println("made vecs " + Arrays.toString(res));
return res;
}



private static void fillDoubleChunks(Vec v, final Vec[] ds, final double [] values){
new MRTask(){
public void map(Chunk c){
Expand Down Expand Up @@ -1335,6 +1334,13 @@ public TwoDimTable toTwoDimTable() {
* @return A categorical vector based on the contents of the original vector.
*/
public Vec toCategoricalVec() {return VecUtils.toCategoricalVec(this);}

public Vec toIntegerVec() {return VecUtils.toIntegerVec(this);}

public void asDouble() {
assert _type==T_NUM;
rollupStats()._isInt=false;
}
/**
* Convenience method for converting to a string vector.
* @return A string vector based on the contents of the original vector.
Expand Down
57 changes: 56 additions & 1 deletion h2o-core/src/main/java/water/parser/ParseDataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import water.fvec.Vec.VectorGroup;
import water.nbhm.NonBlockingHashMap;
import water.nbhm.NonBlockingSetInt;
import water.util.*;
import water.util.ArrayUtils;
import water.util.FrameUtils;
import water.util.Log;
import water.util.PrettyPrint;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,6 +30,7 @@ public final class ParseDataset {
public Job<Frame> _job;
private MultiFileParseTask _mfpt; // Access to partially built vectors for cleanup after parser crash


// Keys are limited to ByteVec Keys and Frames-of-1-ByteVec Keys
public static Frame parse(Key okey, Key... keys) {
return parse(null, okey, keys);
Expand Down Expand Up @@ -404,6 +408,18 @@ public int compare(ParseWriter.ParseErr o1, ParseWriter.ParseErr o2) {
}
job.update(0,"Calculating data summary.");
logParseResults(fr);
if (setup.getForceColTypes()) {
String parseType = setup.getParseType().name();
String[] originalColumnTypes = "PARQUET".equals(parseType) ? setup.getParquetColumnTypes()
: setup.getOrigColumnTypes();
if (originalColumnTypes != null) {
if ("PARQUET".equals(parseType)) // force change the column types specified by user
forceChangeColumnTypesParquet(fr, originalColumnTypes);
else
forceChangeColumnTypes(fr, originalColumnTypes);
}
}

// Release the frame for overwriting
fr.update(job);
Frame fr2 = DKV.getGet(fr._key);
Expand All @@ -417,6 +433,45 @@ public int compare(ParseWriter.ParseErr o1, ParseWriter.ParseErr o2) {
}
return pds;
}

public static void forceChangeColumnTypesParquet(Frame fr, String[] columnTypes) {
int numCols = columnTypes.length;
for (int index=0; index<numCols; index++) {
switch (columnTypes[index]) {
case "INT32":
case "INT64":
if (!fr.vec(index).isInt() && !fr.vec(index).isBad())
fr.replace((index), fr.vec(index).toIntegerVec());
break;
case "FLOAT":
case "DOUBLE":
if (fr.vec(index).isInt() && !fr.vec(index).isBad())
fr.vec(index).asDouble();
break;
default: break; // no change for other types
}
}
}

private static void forceChangeColumnTypes(Frame fr, String[] columnTypes) {
int numCols = columnTypes.length;
for (int index=0; index<numCols; index++) {
switch (columnTypes[index]) {
case "int":
case "long":
if (!fr.vec(index).isInt() && !fr.vec(index).isBad())
fr.replace((index), fr.vec(index).toIntegerVec()).remove();
break;
case "float":
case "double":
case "real":
if (fr.vec(index).isInt() && !fr.vec(index).isBad())
fr.vec(index).asDouble();
break;
default: break; // no conversion for other data types.
}
}
}
private static class CreateParse2GlobalCategoricalMaps extends DTask<CreateParse2GlobalCategoricalMaps> {
private final Key _parseCatMapsKey;
private final Key _frKey;
Expand Down
44 changes: 42 additions & 2 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class ParseSetup extends Iced {
String[][] _data; // First few rows of parsed/tokenized data
int[] _parse_columns_indices; // store column indices to be parsed into the final file
byte[] _nonDataLineMarkers;
boolean _force_col_types = false; // at end of parsing, change column type to users specified ones
String[] _orig_column_types; // copy over the original column type setup before translating to byte[]

String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame
String[][] _synthetic_column_values; // For each imported file contains array of values for each synthetic column
Expand All @@ -64,6 +66,7 @@ public void addErrs(ParseWriter.ParseErr... errs){

public int _chunk_size = FileVec.DFLT_CHUNK_SIZE; // Optimal chunk size to be used store values
PreviewParseWriter _column_previews = null;
public String[] parquetColumnTypes; // internal parameters only

public ParseSetup(ParseSetup ps) {
this(ps._parse_type,
Expand Down Expand Up @@ -91,6 +94,14 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers,
byte escapeChar, boolean force_col_types) {
_parse_type = parse_type;
_separator = sep;
_nonDataLineMarkers = nonDataLineMarkers;
Expand All @@ -107,6 +118,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
_decrypt_tool = decrypt_tool;
_skipped_columns = skipped_columns;
_escapechar = escapeChar;
_force_col_types = force_col_types;
setParseColumnIndices(ncols, _skipped_columns);
}

Expand Down Expand Up @@ -135,6 +147,10 @@ public void setSyntheticColumns(String[] names, String[][] valueMapping, byte sy
_synthetic_column_type = synthetic_column_type;
}

public void setParquetColumnTypes(String[] columnTypes) {
parquetColumnTypes = columnTypes.clone();
}

/**
* Create a ParseSetup with parameters from the client.
*
Expand All @@ -156,7 +172,9 @@ ps.column_names, strToColumnTypes(ps.column_types),
ps.chunk_size,
ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns,
ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null,
ps.escapechar);
ps.escapechar, ps.force_col_types);
this._force_col_types = ps.force_col_types;
this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null;
}

/**
Expand Down Expand Up @@ -233,6 +251,14 @@ public String[] getColumnTypeStrings() {
types[i] = Vec.TYPE_STR[_column_types[i]];
return types;
}
public String[] getOrigColumnTypes() {
return _orig_column_types;
}

public boolean getForceColTypes() {
return _force_col_types;
}

public byte[] getColumnTypes() { return _column_types; }

public static byte[] strToColumnTypes(String[] strs) {
Expand Down Expand Up @@ -265,7 +291,7 @@ public static byte[] strToColumnTypes(String[] strs) {
* Should be override in subclasses. */
protected Parser parser(Key jobKey) {
ParserProvider pp = ParserService.INSTANCE.getByInfo(_parse_type);
if (pp != null) {
if (pp != null) { // fill up parquet setup here
return pp.createParser(this, jobKey);
}

Expand Down Expand Up @@ -302,6 +328,10 @@ public int getNumberColumns() {
public final DecryptionTool getDecryptionTool() {
return DecryptionTool.get(_decrypt_tool);
}

public final String[] getParquetColumnTypes() {
return parquetColumnTypes;
}

public final ParserInfo.ParseMethod parseMethod(int nfiles, Vec v) {
boolean isEncrypted = ! getDecryptionTool().isTransparent();
Expand Down Expand Up @@ -837,6 +867,16 @@ public ParseSetup setColumnTypes(byte[] column_types) {
this._column_types = column_types;
return this;
}

public ParseSetup setOrigColumnTypes(String[] col_types) {
this._orig_column_types = col_types;
return this;
}

public ParseSetup setForceColTypes(boolean force_col_types) {
this._force_col_types = force_col_types;
return this;
}

public ParseSetup setDomains(String[][] domains) {
this._domains = domains;
Expand Down
69 changes: 63 additions & 6 deletions h2o-core/src/main/java/water/util/VecUtils.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package water.util;

import hex.genmodel.InMemoryMojoReaderBackend;
import water.*;
import water.exceptions.H2OIllegalArgumentException;
import water.exceptions.H2OIllegalValueException;
import water.fvec.C0DChunk;
import water.fvec.Chunk;
import water.fvec.NewChunk;
import water.fvec.Vec;
import water.fvec.Frame;
import water.fvec.*;
import water.nbhm.NonBlockingHashMapLong;
import water.parser.BufferedString;
import water.parser.Categorical;
Expand Down Expand Up @@ -184,6 +179,35 @@ public static Vec toNumericVec(Vec src) {
}
}

public static Vec toIntegerVec(Vec src) {
switch (src.get_type()) {
case Vec.T_CAT:
return categoricalToInt(src);
case Vec.T_STR:
return stringToInteger(src);
case Vec.T_NUM:
return numToInteger(src);
default:
throw new H2OIllegalArgumentException("Unrecognized column type " + src.get_type_str()
+ " given to toNumericVec()");
}
}

public static Vec numToInteger(final Vec src) {
Vec res = new MRTask() {
@Override public void map(Chunk srcV, NewChunk destV) {
int cLen = srcV._len;
for (int index=0; index<cLen; index++) {
if (!srcV.isNA(index))
destV.addNum(Math.round(srcV.atd(index)));
else
destV.addNA();
}
}
}.doAll(Vec.T_NUM, src).outputFrame().anyVec();
return res;
}

/**
* Create a new {@link Vec} of numeric values from a string {@link Vec}. Any rows that cannot be
* converted to a number are set to NA.
Expand Down Expand Up @@ -226,6 +250,38 @@ public static Vec stringToNumeric(Vec src) {
return res;
}

public static Vec stringToInteger(Vec src) {
if(!src.isString()) throw new H2OIllegalArgumentException("stringToNumeric conversion only works on string columns");
Vec res = new MRTask() {
@Override public void map(Chunk chk, NewChunk newChk){
if (chk instanceof C0DChunk) { // all NAs
for (int i=0; i < chk._len; i++)
newChk.addNA();
} else {
BufferedString tmpStr = new BufferedString();
for (int i=0; i < chk._len; i++) {
if (!chk.isNA(i)) {
tmpStr = chk.atStr(tmpStr, i);
switch (tmpStr.getNumericType()) {
case BufferedString.NA:
newChk.addNA(); break;
case BufferedString.INT:
newChk.addNum(Long.parseLong(tmpStr.toString()),0); break;
case BufferedString.REAL:
long temp = Math.round(Double.parseDouble(tmpStr.toString()));
newChk.addNum(temp); break;
default:
throw new H2OIllegalValueException("Received unexpected type when parsing a string to a number.", this);
}
} else newChk.addNA();
}
}
}
}.doAll(Vec.T_NUM, src).outputFrame().anyVec();
assert res != null;
return res;
}

/**
* Create a new {@link Vec} of numeric values from a categorical {@link Vec}.
*
Expand Down Expand Up @@ -266,6 +322,7 @@ public static Vec categoricalToInt(final Vec src) {
}
return newVec;
}


/**
* Create a new {@link Vec} of string values from an existing {@link Vec}.
Expand Down
Loading

0 comments on commit 407f2bc

Please sign in to comment.