Skip to content

Commit

Permalink
Support to parse numbers in text-based input formats (#17082)
Browse files Browse the repository at this point in the history
Text-based input formats like csv and tsv currently parse inputs only as strings, following the RFC4180Parser spec).
To workaround this, the web-console and other tools need to further inspect the sample data returned to sample data returned by the Druid sampler API to parse them as numbers. 

This patch introduces a new optional config, tryParseNumbers, for the csv and tsv input formats. If enabled, any numbers present in the input will be parsed in the following manner -- long data type for integer types and double for floating-point numbers, and if parsing fails for whatever reason, the input is treated as a string. By default, this configuration is set to false, so numeric strings will be treated as strings.
  • Loading branch information
abhishekrb19 authored Sep 19, 2024
1 parent 4f137d2 commit 635e418
Show file tree
Hide file tree
Showing 57 changed files with 858 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void prepareData() throws Exception
@Setup(Level.Trial)
public void prepareFormat()
{
format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1);
format = new DelimitedInputFormat(fromHeader ? null : COLUMNS, null, "\t", null, fromHeader, fromHeader ? 0 : 1, null);
}

@Benchmark
Expand Down
2 changes: 2 additions & 0 deletions docs/ingestion/data-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Configure the CSV `inputFormat` to load CSV data as follows:
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
| tryParseNumbers| Boolean| If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by `listDelimiter`. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |

For example:

Expand All @@ -150,6 +151,7 @@ Configure the TSV `inputFormat` to load TSV data as follows:
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if `findColumnsFromHeader` is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that `skipHeaderRows` will be applied before finding column names from the header. For example, if you set `skipHeaderRows` to 2 and `findColumnsFromHeader` to true, the task will skip the first two lines and then extract column information from the third line. `columns` will be ignored if this is set to true. | no (default = false if `columns` is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first `skipHeaderRows` rows. | no (default = 0) |
| tryParseNumbers| Boolean| If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by `listDelimiter`. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |

Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down Expand Up @@ -584,7 +584,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.catalog.storage.sql.CatalogManager;
import org.apache.druid.catalog.storage.sql.SQLCatalogManager;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule;
import org.apache.druid.server.security.Access;
Expand All @@ -35,8 +33,6 @@
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;

import java.util.Arrays;

public class CatalogTests
{
public static final String TEST_AUTHORITY = "test";
Expand Down Expand Up @@ -74,17 +70,6 @@ public Access authorize(
}
}

public static InputFormat csvFormat()
{
return new CsvInputFormat(
Arrays.asList("x", "y", "z"),
null, // listDelimiter
false, // hasHeaderRow
false, // findColumnsFromHeader
0 // skipHeaderRows
);
}

public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();

public static class DbFixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
null
);

Expand Down Expand Up @@ -453,7 +453,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
null,
false,
null,
0
0,
null
);

public static class ConstructorTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,8 @@ public void testValueInCsvFormat() throws IOException
null,
false,
false,
0
0,
null
),
"kafka.newheader.",
"kafka.newkey.key",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,8 @@ public void testValueInCsvFormat() throws IOException
null,
false,
false,
0
0,
null
),
"kinesis.newts.partitionKey",
"kinesis.newts.timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public CSVFlatDataParser(
this.valueColumn,
Arrays.toString(columns.toArray())
);
CSVParser csvParser = new CSVParser(null, columns, hasHeaderRow, skipHeaderRows);
CSVParser csvParser = new CSVParser(null, columns, hasHeaderRow, skipHeaderRows, false);
csvParser.startFileFromBeginning();
this.parser = new DelegateParser(
csvParser,
Expand Down Expand Up @@ -355,13 +355,13 @@ public List<String> getColumns()
@JsonProperty
public String getKeyColumn()
{
return this.keyColumn;
return keyColumn;
}

@JsonProperty
public String getValueColumn()
{
return this.valueColumn;
return valueColumn;
}

@Override
Expand Down Expand Up @@ -431,7 +431,8 @@ public TSVFlatDataParser(
StringUtils.emptyToNullNonDruidDataString(delimiter),
StringUtils.emptyToNullNonDruidDataString(listDelimiter),
hasHeaderRow,
skipHeaderRows
skipHeaderRows,
false
);
delegate.startFileFromBeginning();
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testCSVWithHeader()
// The third row will parse to data
Assert.assertEquals(ImmutableMap.of("val2", "val3"), parser.getParser().parseToMap("val1,val2,val3"));
}

@Test(expected = IllegalArgumentException.class)
public void testBadCSV()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testIngestWithSanitizedNullByte() throws IOException
.dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead), SystemFields.none()),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("agent_category", ColumnType.STRING)
Expand Down Expand Up @@ -255,7 +255,7 @@ public void testIngestWithSanitizedNullByteUsingContextParameter() throws IOExce
.dataSource(
new ExternalDataSource(
new LocalInputSource(null, null, ImmutableList.of(toRead), SystemFields.none()),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder()
.add("timestamp", ColumnType.STRING)
.add("agent_category", ColumnType.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ public void testGroupByWithLimitAndOrdering(String contextName, Map<String, Obje
.setDataSource(
new ExternalDataSource(
new InlineInputSource("dim1\nabc\nxyz\ndef\nxyz\nabc\nxyz\nabc\nxyz\ndef\nbbb\naaa"),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder().add("dim1", ColumnType.STRING).build()
)
)
Expand Down Expand Up @@ -2376,7 +2376,7 @@ public void testSelectRowsGetUntruncatedByDefault(String contextName, Map<String
Collections.nCopies(numFiles, toRead),
SystemFields.none()
),
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
RowSignature.builder().add("timestamp", ColumnType.STRING).build()
))
.intervals(querySegmentSpec(Filtration.eternity()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ public void testReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down Expand Up @@ -1063,7 +1063,7 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);
try (CloseableIterator<InputRow> readerIterator = reader.read()) {
Expand Down Expand Up @@ -1111,7 +1111,7 @@ public void testCompressedReader() throws IOException

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,8 @@ private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appen
"|",
null,
false,
0
0,
null
),
appendToExisting,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
false,
0
0,
null
);

private static final DataSchema DATA_SCHEMA =
Expand Down Expand Up @@ -473,7 +474,7 @@ public void testTransformSpec() throws Exception
indexIngestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
dimensionsSpec,
new CsvInputFormat(columns, listDelimiter, null, false, 0),
new CsvInputFormat(columns, listDelimiter, null, false, 0, null),
transformSpec,
null,
tuningConfig,
Expand Down Expand Up @@ -901,7 +902,7 @@ public void testCSVFileWithHeader() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -941,7 +942,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1341,7 +1342,7 @@ public void testIgnoreParseException() throws Exception
parseExceptionIgnoreSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1391,7 +1392,7 @@ public void testReportParseException() throws Exception
indexIngestionSpec = createIngestionSpec(
timestampSpec,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1632,7 +1633,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
ingestionSpec = createIngestionSpec(
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1751,7 +1752,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc
ingestionSpec = createIngestionSpec(
timestampSpec,
dimensionsSpec,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1845,7 +1846,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception
ingestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(null, null, null, true, 0),
new CsvInputFormat(null, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down Expand Up @@ -1915,7 +1916,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception
ingestionSpec = createIngestionSpec(
DEFAULT_TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
new CsvInputFormat(columns, null, null, true, 0),
new CsvInputFormat(columns, null, null, true, 0, null),
null,
null,
tuningConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
csvParseSpec.getListDelimiter(),
getColumnsFromHeader ? null : true,
getColumnsFromHeader ? true : null,
csvParseSpec.getSkipHeaderRows()
csvParseSpec.getSkipHeaderRows(),
null
);
} else if (parseSpec instanceof DelimitedParseSpec) {
DelimitedParseSpec delimitedParseSpec = (DelimitedParseSpec) parseSpec;
Expand All @@ -324,7 +325,8 @@ public static InputFormat createInputFormatFromParseSpec(ParseSpec parseSpec)
delimitedParseSpec.getDelimiter(),
getColumnsFromHeader ? null : true,
getColumnsFromHeader ? true : null,
delimitedParseSpec.getSkipHeaderRows()
delimitedParseSpec.getSkipHeaderRows(),
null
);
} else if (parseSpec instanceof RegexParseSpec) {
RegexParseSpec regexParseSpec = (RegexParseSpec) parseSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
false,
false,
0
0,
null
);
public static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING =
TuningConfigBuilder.forParallelIndexTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class HashPartitionAdjustingCorePartitionSizeTest extends AbstractMultiPh
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2020-01-01/P1M");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private static final String INPUT_FILTER = "test_*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexin
null,
false,
false,
0
0,
null
);
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");

Expand Down
Loading

0 comments on commit 635e418

Please sign in to comment.