Raw source data often needs to undergo some transformations before it is pushed to Pinot.
Transformations include extracting records from nested objects, applying simple transform functions on certain columns, filtering out unwanted columns, as well as more advanced operations like joining between datasets.
A preprocessing job is usually needed to perform these operations. In streaming data sources, you might write a Samza job and create an intermediate topic to store the transformed data.
For simple transformations, this can result in inconsistencies in the batch/stream data source and increase maintenance and operator overhead.
To make things easier, Pinot supports transformations that can be applied via the table config.
{% hint style="warning" %} If a new column is added to your table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s). To ensure accurate values are reloaded, see how to add a new column during ingestion. {% endhint %}
Pinot supports the following functions:
- Groovy functions
- Built-in functions
{% hint style="warning" %} A transformation function cannot mix Groovy and built-in functions; only use one type of function at a time. {% endhint %}
Groovy functions can be defined using the syntax:
Groovy({groovy script}, argument1, argument2...argumentN)
Any valid Groovy expression can be used.
Allowing executable Groovy in ingestion transformation can be a security vulnerability. To enable Groovy for ingestion, set the following controller configuration:
controller.disable.ingestion.groovy=false
If not set, Groovy for ingestion transformation is disabled by default.
All the functions defined in this directory annotated with @ScalarFunction
(for example, toEpochSeconds) are supported ingestion transformation functions.
Below are some commonly used built-in Pinot functions for ingestion transformations.
These functions enable time transformations.
toEpochXXX
Converts from epoch milliseconds to a higher granularity.
Function name | Description |
---|---|
toEpochSeconds | Converts epoch millis to epoch seconds. Usage: |
toEpochMinutes | Converts epoch millis to epoch minutes Usage: |
toEpochHours | Converts epoch millis to epoch hours Usage: |
toEpochDays | Converts epoch millis to epoch days Usage: |
toEpochXXXRounded
Converts from epoch milliseconds to another granularity, rounding to the nearest rounding bucket. For example, 1588469352000
(2020-05-01 42:29:12) is 26474489
minutesSinceEpoch. `toEpochMinutesRounded(1588469352000) = 26474480
(2020-05-01 42:20:00)
Function Name | Description |
---|---|
toEpochSecondsRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochSecondsRounded(millis, 30)" |
toEpochMinutesRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochMinutesRounded(millis, 10)" |
toEpochHoursRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochHoursRounded(millis, 6)" |
toEpochDaysRounded | Converts epoch millis to epoch seconds, rounding to nearest rounding bucket"toEpochDaysRounded(millis, 7)" |
fromEpochXXX
Converts from an epoch granularity to milliseconds.
Function Name | Description |
---|---|
fromEpochSeconds | Converts from epoch seconds to milliseconds
|
fromEpochMinutes | Converts from epoch minutes to milliseconds
|
fromEpochHours | Converts from epoch hours to milliseconds
|
fromEpochDays | Converts from epoch days to milliseconds
|
Simple date format
Converts simple date format strings to milliseconds and vice versa, per the provided pattern string.
Function name | Description |
---|---|
ToDateTime | Converts from milliseconds to a formatted date time string, as per the provided pattern
|
FromDateTime | Converts a formatted date time string to milliseconds, as per the provided pattern
|
{% hint style="info" %} Note
Letters that are not part of Simple Date Time legend (https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) need to be escaped. For example:
"transformFunction": "fromDateTime(dateTimeStr, 'yyyy-MM-dd''T''HH:mm:ss')"
{% endhint %}
Function name | Description |
---|---|
json_format | Converts a JSON/AVRO complex object to a string. This json map can then be queried using jsonExtractScalar function.
|
Records can be filtered as they are ingested. A filter function can be specified in the filterConfigs in the ingestionConfigs of the table config.
"tableConfig": {
"tableName": ...,
"tableType": ...,
"ingestionConfig": {
"filterConfig": {
"filterFunction": "<expression>"
}
}
}
If the expression evaluates to true, the record will be filtered out. The expressions can use any of the transform functions described in the previous section.
Consider a table that has a column timestamp
. If you want to filter out records that are older than timestamp 1589007600000, you could apply the following function:
"ingestionConfig": {
"filterConfig": {
"filterFunction": "Groovy({timestamp < 1589007600000}, timestamp)"
}
}
Consider a table that has a string column campaign
and a multi-value column double column prices
. If you want to filter out records where campaign = 'X' or 'Y' and sum of all elements in prices is less than 100, you could apply the following function:
"ingestionConfig": {
"filterConfig": {
"filterFunction": "Groovy({(campaign == \"X\" || campaign == \"Y\") && prices.sum() < 100}, prices, campaign)"
}
}
Filter config also supports SQL-like expression of built-in scalar functions for filtering records (starting v 0.11.0+). Example:
"ingestionConfig": {
"filterConfig": {
"filterFunction": "strcmp(campaign, 'X') = 0 OR strcmp(campaign, 'Y') = 0 OR timestamp < 1589007600000"
}
}
Transform functions can be defined on columns in the ingestion config of the table config.
{ "tableConfig": {
"tableName": ...,
"tableType": ...,
"ingestionConfig": {
"transformConfigs": [{
"columnName": "fieldName",
"transformFunction": "<expression>"
}]
},
...
}
For example, imagine that our source data contains the prices
and timestamp
fields. We want to extract the maximum price and store that in the maxPrices
field and convert the timestamp into the number of hours since the epoch and store it in the hoursSinceEpoch
field. You can do this by applying the following transformation:
{% code title="pinot-table-offline.json" %}
{
"tableName": "myTable",
...
"ingestionConfig": {
"transformConfigs": [{
"columnName": "maxPrice",
"transformFunction": "Groovy({prices.max()}, prices)" // groovy function
},
{
"columnName": "hoursSinceEpoch",
"transformFunction": "toEpochHours(timestamp)" // built-in function
}]
}
}
{% endcode %}
Below are some examples of commonly used functions.
Concat firstName
and lastName
to get fullName
"ingestionConfig": {
"transformConfigs": [{
"columnName": "fullName",
"transformFunction": "Groovy({firstName+' '+lastName}, firstName, lastName)"
}]
}
Find max value in array bids
"ingestionConfig": {
"transformConfigs": [{
"columnName": "maxBid",
"transformFunction": "Groovy({bids.max{ it.toBigDecimal() }}, bids)"
}]
}
Convert timestamp
from MILLISECONDS
to HOURS
"ingestionConfig": {
"transformConfigs": [{
"columnName": "hoursSinceEpoch",
"transformFunction": "Groovy({timestamp/(1000*60*60)}, timestamp)"
}]
}
Change name of the column from user_id
to userId
"ingestionConfig": {
"transformConfigs": [{
"columnName": "userId",
"transformFunction": "Groovy({user_id}, user_id)"
}]
}
Pinot doesn't support columns that have spaces, so if a source data column has a space, we'll need to store that value in a column with a supported name. To extract the value from first Name
into the column firstName
, run the following:
"ingestionConfig": {
"transformConfigs": [{
"columnName": "firstName",
"transformFunction": "\"first Name \""
}]
}
If eventType
is IMPRESSION
set impression
to 1
. Similar for CLICK
.
"ingestionConfig": {
"transformConfigs": [{
"columnName": "impressions",
"transformFunction": "Groovy({eventType == 'IMPRESSION' ? 1: 0}, eventType)"
},
{
"columnName": "clicks",
"transformFunction": "Groovy({eventType == 'CLICK' ? 1: 0}, eventType)"
}]
}
Store an AVRO Map in Pinot as two multi-value columns. Sort the keys, to maintain the mapping.
1) The keys of the map as map_keys
2) The values of the map as map_values
"ingestionConfig": {
"transformConfigs": [{
"columnName": "map2_keys",
"transformFunction": "Groovy({map2.sort()*.key}, map2)"
},
{
"columnName": "map2_values",
"transformFunction": "Groovy({map2.sort()*.value}, map2)"
}]
}
Transformations can be chained. This means that you can use a field created by a transformation in another transformation function.
For example, we might have the following JSON document in the data
field of our source data:
{
"userId": "12345678__foo__othertext"
}
We can apply one transformation to extract the userId
and then another one to pull out the numerical part of the identifier:
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "userOid",
"transformFunction": "jsonPathString(data, '$.userId')"
},
{
"columnName": "userId",
"transformFunction": "Groovy({Long.valueOf(userOid.substring(0, 8))}, userOid)"
}
]
}
There are 2 kinds of flattening:
This is not natively supported as of yet. You can write a custom Decoder/RecordReader if you want to use this. Once the Decoder generates the multiple GenericRows from the provided input record, a List<GenericRow> should be set into the destination GenericRow, with the key $MULTIPLE_RECORDS_KEY$
. The segment generation drivers will treat this as a special case and handle the multiple records case.
Feature TBD
If a new column is added to table or schema configuration during ingestion, incorrect data may appear in the consuming segment(s).
To ensure accurate values are reloaded, do the following:
- Pause consumption (and wait for pause status success):
$ curl -X POST {controllerHost}/tables/{tableName}/pauseConsumption - Apply new table or schema configurations.
- Reload segments using the Pinot Controller API or Pinot Admin Console.
- Resume consumption:
$ curl -X POST {controllerHost}/tables/{tableName}/resumeConsumption