From a6d5497f5515b01bf4e92a411c2ed79df9ca0c2f Mon Sep 17 00:00:00 2001 From: shrverma Date: Tue, 25 Jul 2023 16:10:38 +0000 Subject: [PATCH 1/7] Implement partial SQL support --- .../directives/column/ChangeColCaseNames.java | 32 +++++++++++++++++ .../directives/column/CleanseColumnNames.java | 35 +++++++++++++++++++ .../io/cdap/directives/column/SetHeader.java | 33 +++++++++++++++++ .../io/cdap/directives/column/SetType.java | 18 ++++++++++ .../io/cdap/directives/date/DiffDate.java | 19 ++++++++++ .../io/cdap/directives/date/FormatDate.java | 19 ++++++++++ .../directives/transformation/Decode.java | 29 +++++++++++++++ .../directives/transformation/Encode.java | 29 +++++++++++++++ .../io/cdap/directives/writer/WriteAsCSV.java | 22 ++++++++++++ .../directives/writer/WriteAsJsonMap.java | 20 +++++++++++ .../utils/SqlExpressionGenerator.java | 19 ++++++++++ 11 files changed, 275 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java index 4f6cedb89..089746d58 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,8 +38,11 @@ import io.cdap.wrangler.api.parser.Identifier; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * This class ChangeColCaseNames converts the case of the columns @@ -94,5 +102,29 @@ public Mutation lineage() { .all(Many.of()) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + Map colmap = generateColumnCaseMap(columnNames, expressionFactory.get()); + return relation.select(colmap); + } + + private Map generateColumnCaseMap(List columns, ExpressionFactory factory) { + Map columnExpMap = new LinkedHashMap<>(); + if (toLower) { + columns.forEach((colName) -> columnExpMap.put(colName.toLowerCase(), factory.compile(colName))); + } else { + columns.forEach((colName) -> columnExpMap.put(colName.toUpperCase(), factory.compile(colName))); + } + return columnExpMap; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java index 6e9f685b6..38db2108a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -30,8 +35,12 @@ import io.cdap.wrangler.api.lineage.Many; import io.cdap.wrangler.api.lineage.Mutation; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * A directive for cleanses columns names. @@ -93,4 +102,30 @@ public Mutation lineage() { .all(Many.of()) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + Map colmap = generateCleanseColumnMap(columnNames, expressionFactory.get()); + return relation.select(colmap); + } + + public static Map generateCleanseColumnMap(Collection columns, + ExpressionFactory factory) { + Map columnExpMap = new LinkedHashMap<>(); + columns.forEach((colName)-> columnExpMap.put(String + .format(colName + .toString() + .toLowerCase() + .replaceAll("[^a-zA-Z0-9_]", "_")), factory + .compile(colName.toString()))); + return columnExpMap; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java index 64134dfeb..1eabb30ed 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,9 +37,15 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * A directive for setting the columns obtained from wrangling. @@ -90,5 +101,27 @@ public Mutation lineage() { .generate(Many.of(columns)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + Map columnExpMap = new LinkedHashMap<>(); + + IntStream.range(0, Math.min(columnNames.size(), columns.size())) + .forEach(i -> columnExpMap.put(columns.get(i), expressionFactory.get().compile(columnNames.get(i)))); + + if (columnNames.size() > columns.size()) { + IntStream.range(columns.size(), columnNames.size()) + .forEach(i -> columnExpMap.put(columnNames.get(i), expressionFactory.get().compile(columnNames.get(i)))); + } + return relation.select(columnExpMap); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index f143493c0..0cf7e4692 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -36,6 +40,7 @@ import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; import io.cdap.wrangler.utils.ColumnConverter; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.math.RoundingMode; import java.util.List; @@ -108,4 +113,17 @@ public Mutation lineage() { .relation(col, col) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + String expression = SqlExpressionGenerator.getColumnTypeExp(type, col, scale); + return relation.setColumn(col, expressionFactory.get().compile(expression)); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java index 1361e8541..81b7a85b5 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,11 +36,13 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.List; +import java.util.Optional; /** * A directive for taking difference in Dates. @@ -118,4 +124,17 @@ public Mutation lineage() { .relation(column2, column2) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(destCol, expressionFactory.get() + .compile(String.format("datediff(millisecond, timestamp(%s), timestamp(%s))", column2, column1))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java index f2f3bb0e9..bdcd00e2c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.time.LocalDate; import java.time.ZoneId; @@ -40,6 +45,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for managing date formats. @@ -114,4 +120,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("date_format(timestamp(%s), '%s')", column, format))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java index c05e34e34..0fbc97f90 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Base32; import org.apache.commons.codec.binary.Base64; @@ -40,6 +45,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Locale; +import java.util.Optional; /** * A directive that decodes a column that was encoded as base-32, base-64, or hex. @@ -155,4 +161,27 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + if (method.toString().equalsIgnoreCase("base64")) { + return relation.setColumn( + String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory + .get().compile("unbase64(" + column + ")")); + } else if (method.toString().equalsIgnoreCase("base32")) { + return relation.setColumn( + String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory + .get().compile("unhex(" + column + ")")); + } else { + return new InvalidRelation(String.format("Decoding of type %s is not supported by " + + "SQL execution currently", method.toString())); + } + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java index abd1378fe..ebb2c9f4a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.apache.commons.codec.binary.Base32; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; @@ -39,6 +44,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Locale; +import java.util.Optional; /** * A directive that encodes a column as base-32, base-64, or hex. @@ -150,4 +156,27 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + if (method.toString().equalsIgnoreCase("base64")) { + return relation.setColumn( + String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory + .get().compile("base64(" + column + ")")); + } else if (method.toString().equalsIgnoreCase("base32")) { + return relation.setColumn( + String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory + .get().compile("hex(" + column + ")")); + } else { + return new InvalidRelation(String.format("Encoding of type %s is not supported by " + + "SQL execution currently", method.toString())); + } + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java index 8e51bf527..4c9d43ec8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +37,7 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -41,6 +47,8 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * A step to write the record fields as CSV. @@ -101,4 +109,18 @@ public Mutation lineage() { .generate(Many.of(column)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + return relation.setColumn(column, expressionFactory.get().compile(String + .format("to_csv(struct(%s))", String.join(",", columnNames)))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java index 640843477..4c004c715 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -34,10 +38,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; /** * A step to write the record fields as JSON. @@ -88,4 +94,18 @@ public Mutation lineage() { .generate(Many.of(column)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + return relation.setColumn(column, expressionFactory.get().compile(String + .format("to_json(struct(%s))", String.join(",", columnNames)))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java index b249663bc..2c9d70420 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java @@ -16,15 +16,19 @@ package io.cdap.wrangler.utils; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.relational.Expression; import io.cdap.cdap.etl.api.relational.ExpressionFactory; import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import javax.annotation.Nullable; @@ -105,4 +109,19 @@ public static String getColumnTypeExp(String toType, String column, @Nullable In return column; } } + + public static List generateListCols(RelationalTranformContext relationalTranformContext) { + List colnames = new ArrayList(); + Set inputRelationNames = relationalTranformContext.getInputRelationNames(); + for (String inputRelationName : inputRelationNames) { + Schema schema = relationalTranformContext.getInputSchema(inputRelationName); + List fields = schema.getFields(); + for (Schema.Field field: fields) { + colnames.add(field.getName()); + } + } + return colnames; + } + + } From ea2547c5b13f47792d93a9c9772b974f8a6d7ed8 Mon Sep 17 00:00:00 2001 From: shrverma Date: Tue, 25 Jul 2023 17:38:46 +0000 Subject: [PATCH 2/7] Implement transform directives --- .../cdap/directives/language/SetCharset.java | 18 ++++++++++++++++ .../java/io/cdap/directives/row/Fail.java | 21 +++++++++++++++++++ .../directives/row/RecordConditionFilter.java | 17 +++++++++++++++ .../transformation/ColumnExpression.java | 17 +++++++++++++++ 4 files changed, 73 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java b/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java index ae25fbe4c..edfef7aed 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java +++ b/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,6 +37,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.nio.ByteBuffer; import java.nio.CharBuffer; @@ -119,4 +124,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile(String + .format("decode(%s, '%s'))", column, charset))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java index 880ffc63c..cd9257a18 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,6 +41,7 @@ import io.cdap.wrangler.expression.ELContext; import io.cdap.wrangler.expression.ELException; import io.cdap.wrangler.expression.ELResult; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -116,4 +121,20 @@ public List getCountMetrics() { EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn("tempColumn", expressionFactory.get() + .compile(String.format("CASE %s WHEN %s THEN raise_error(\"Condition %s evaluating to true. " + + "Terminating process.\") END", el.getScriptParsedText(), + el.getScriptParsedText(), el.getScriptParsedText()))) + .dropColumn("tempColumn"); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java index b3eb4adb2..92bd5b7d2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -38,6 +42,7 @@ import io.cdap.wrangler.expression.EL; import io.cdap.wrangler.expression.ELContext; import io.cdap.wrangler.expression.ELException; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -129,4 +134,16 @@ public List getCountMetrics() { EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile(el.getScriptParsedText())); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java index 25c9c895b..0f772554b 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -39,6 +43,7 @@ import io.cdap.wrangler.expression.ELContext; import io.cdap.wrangler.expression.ELException; import io.cdap.wrangler.expression.ELResult; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -133,4 +138,16 @@ public List getCountMetrics() { EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile(el.getScriptParsedText())); + } + } From 830346cfcb5d74c411c061741c72f963b805faf8 Mon Sep 17 00:00:00 2001 From: shrverma Date: Thu, 27 Jul 2023 05:04:20 +0000 Subject: [PATCH 3/7] Implement fail-on-condition directive --- .../src/main/java/io/cdap/directives/row/Fail.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java index cd9257a18..033f2ef22 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java @@ -130,11 +130,11 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - return relation.setColumn("tempColumn", expressionFactory.get() - .compile(String.format("CASE %s WHEN %s THEN raise_error(\"Condition %s evaluating to true. " + - "Terminating process.\") END", el.getScriptParsedText(), - el.getScriptParsedText(), el.getScriptParsedText()))) - .dropColumn("tempColumn"); + + return relation.setColumn("tempColumn", expressionFactory.get().compile( + String.format("if(%s, raise_error(\"Condition '%s' evaluated to true. " + + "Terminating processing.\"), %s)", el.getScriptParsedText(), + el.getScriptParsedText(), el.getScriptParsedText()))); } } From 4c3b9f2e44274ddfe0dbd128e0985e7156f46ad3 Mon Sep 17 00:00:00 2001 From: shrverma Date: Thu, 27 Jul 2023 06:07:13 +0000 Subject: [PATCH 4/7] Implement encode-decode directives --- .../main/java/io/cdap/directives/transformation/Decode.java | 6 +++--- .../main/java/io/cdap/directives/transformation/Encode.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java index 0fbc97f90..f33be1f0a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java @@ -173,11 +173,11 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (method.toString().equalsIgnoreCase("base64")) { return relation.setColumn( String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("unbase64(" + column + ")")); - } else if (method.toString().equalsIgnoreCase("base32")) { + .get().compile("string(unbase64(" + column + "))")); + } else if (method.toString().equalsIgnoreCase("hex")) { return relation.setColumn( String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("unhex(" + column + ")")); + .get().compile("string(unhex(" + column + "))")); } else { return new InvalidRelation(String.format("Decoding of type %s is not supported by " + "SQL execution currently", method.toString())); diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java index ebb2c9f4a..35f7e1840 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java @@ -169,7 +169,7 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn( String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory .get().compile("base64(" + column + ")")); - } else if (method.toString().equalsIgnoreCase("base32")) { + } else if (method.toString().equalsIgnoreCase("hex")) { return relation.setColumn( String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory .get().compile("hex(" + column + ")")); From c08cdd82350a588ab2cb339d631eb9840628cb98 Mon Sep 17 00:00:00 2001 From: shrverma Date: Tue, 1 Aug 2023 06:30:57 +0000 Subject: [PATCH 5/7] Add sql validation to all directives --- .../java/io/cdap/directives/column/ChangeColCaseNames.java | 5 +++++ .../java/io/cdap/directives/column/CleanseColumnNames.java | 5 +++++ .../main/java/io/cdap/directives/column/CreateRecord.java | 5 +++++ .../src/main/java/io/cdap/directives/column/SetHeader.java | 5 +++++ .../src/main/java/io/cdap/directives/column/SetType.java | 5 +++++ .../src/main/java/io/cdap/directives/column/Swap.java | 5 +++++ .../src/main/java/io/cdap/directives/date/DiffDate.java | 5 +++++ .../src/main/java/io/cdap/directives/date/FormatDate.java | 5 +++++ .../main/java/io/cdap/directives/language/SetCharset.java | 5 +++++ .../java/io/cdap/directives/parser/FixedLengthParser.java | 5 +++++ .../src/main/java/io/cdap/directives/row/Fail.java | 5 +++++ .../java/io/cdap/directives/row/RecordConditionFilter.java | 5 +++++ .../main/java/io/cdap/directives/row/RecordRegexFilter.java | 5 +++++ .../java/io/cdap/directives/row/SetRecordDelimiter.java | 5 +++++ .../src/main/java/io/cdap/directives/row/SplitToRows.java | 5 +++++ .../io/cdap/directives/transformation/ColumnExpression.java | 5 +++++ .../main/java/io/cdap/directives/transformation/Decode.java | 5 +++++ .../main/java/io/cdap/directives/transformation/Encode.java | 5 +++++ .../io/cdap/directives/transformation/FillNullOrEmpty.java | 5 +++++ .../io/cdap/directives/transformation/GenerateUUID.java | 5 +++++ .../java/io/cdap/directives/transformation/SplitEmail.java | 5 +++++ .../java/io/cdap/directives/transformation/UrlDecode.java | 5 +++++ .../java/io/cdap/directives/transformation/UrlEncode.java | 6 ++++++ .../src/main/java/io/cdap/directives/writer/WriteAsCSV.java | 5 +++++ .../main/java/io/cdap/directives/writer/WriteAsJsonMap.java | 5 +++++ .../java/io/cdap/directives/writer/WriteAsJsonObject.java | 5 +++++ 26 files changed, 131 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java index 089746d58..2da92182a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java @@ -126,5 +126,10 @@ private Map generateColumnCaseMap(List columns, Expr return columnExpMap; } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java index 38db2108a..c98d0dfca 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java @@ -128,4 +128,9 @@ public static Map generateCleanseColumnMap(Collection column return columnExpMap; } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java index d49d88625..fe5b21174 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java @@ -120,4 +120,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .format("struct(%s)", getColumnString))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java index 1eabb30ed..d32c7cd55 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java @@ -123,5 +123,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.select(columnExpMap); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index 0cf7e4692..146e49d71 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -126,4 +126,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn(col, expressionFactory.get().compile(expression)); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java index 62a90119c..024638366 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java @@ -114,4 +114,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, tempRel = tempRel.setColumn(right, expressionFactory.get().compile(left)); return tempRel.setColumn(left, expressionFactory.get().compile("tempColumn")); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java index 81b7a85b5..71532c8a6 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java @@ -137,4 +137,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .compile(String.format("datediff(millisecond, timestamp(%s), timestamp(%s))", column2, column1))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java index bdcd00e2c..14cdd7056 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java @@ -133,4 +133,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .compile(String.format("date_format(timestamp(%s), '%s')", column, format))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java b/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java index edfef7aed..345ad065d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java +++ b/wrangler-core/src/main/java/io/cdap/directives/language/SetCharset.java @@ -136,5 +136,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn(column, expressionFactory.get().compile(String .format("decode(%s, '%s'))", column, charset))); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java index 65a8255c8..54a383923 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java @@ -182,4 +182,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation; } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java index 033f2ef22..e7ba6fff9 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java @@ -137,4 +137,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, el.getScriptParsedText(), el.getScriptParsedText()))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java index 92bd5b7d2..513a75662 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java @@ -146,4 +146,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.filter(expressionFactory.get().compile(el.getScriptParsedText())); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java index 916575a1a..6d0645acf 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java @@ -164,5 +164,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, } return relation.filter(expressionFactory.get().compile("rlike(" + column + ", '" + pattern + "')")); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java index cbdf290b2..cb2f269fc 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java @@ -135,4 +135,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, String.format("explode(split(%s, \"%s\", %d))", column, delimiter, limit))); return relation.select(columnExpMap); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java index 75645dc4a..0d0b65c89 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java @@ -129,5 +129,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn(column, expressionFactory.get() .compile(String.format("explode(split(%s, '%s'))", column, regex))); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java index 0f772554b..925911c61 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java @@ -150,4 +150,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn(column, expressionFactory.get().compile(el.getScriptParsedText())); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java index f33be1f0a..52673aba6 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java @@ -184,4 +184,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, } } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java index 35f7e1840..853a23ecd 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java @@ -179,4 +179,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, } } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java index 5532620ba..98ea37f01 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java @@ -123,4 +123,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .format("nvl2(%s, if(length(%s) == 0, \"%s\", %s), \"%s\")", column, column, value, column, value))); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java index 808b6f300..fc9e1e83d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java @@ -107,4 +107,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return relation.setColumn(column, expressionFactory.get() .compile(String.format("uuid()"))); } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java index a0438dabf..b8296a97a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java @@ -155,4 +155,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, expressionFactory.get().compile(domainExpression)); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java index 98ca135cd..585616a1a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java @@ -120,4 +120,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, String.format("reflect('java.net.URLDecoder', 'decode', %s, 'utf-8')", column))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java index b693672f4..5058eb023 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java @@ -119,4 +119,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext, column, expressionFactory.get().compile( String.format("reflect('java.net.URLEncoder', 'encode', %s, 'utf-8')", column))); } + + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java index 4c9d43ec8..ad7fa1a42 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java @@ -123,4 +123,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .format("to_csv(struct(%s))", String.join(",", columnNames)))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java index 4c004c715..04a1cae3a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java @@ -108,4 +108,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .format("to_json(struct(%s))", String.join(",", columnNames)))); } + @Override + public boolean isSQLSupported() { + return true; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index 5a7ef7139..378f10913 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -134,4 +134,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, .compile(String.format("struct(%s)", getColumnString))); } + @Override + public boolean isSQLSupported() { + return true; + } + } From 900bebde0660717bf013ef41bd3a723ec6d3dcd5 Mon Sep 17 00:00:00 2001 From: shrverma Date: Wed, 2 Aug 2023 06:05:38 +0000 Subject: [PATCH 6/7] Add Comments and util functions --- .../directives/column/ChangeColCaseNames.java | 4 ++- .../directives/column/CleanseColumnNames.java | 22 ++++----------- .../io/cdap/directives/column/SetHeader.java | 15 ++++------ .../io/cdap/directives/column/SetType.java | 3 ++ .../io/cdap/directives/date/DiffDate.java | 2 ++ .../io/cdap/directives/date/FormatDate.java | 2 ++ .../java/io/cdap/directives/row/Fail.java | 9 +++--- .../directives/row/RecordConditionFilter.java | 2 ++ .../transformation/ColumnExpression.java | 2 ++ .../directives/transformation/Decode.java | 28 ++++++++++--------- .../directives/transformation/Encode.java | 26 +++++++++-------- .../io/cdap/directives/writer/WriteAsCSV.java | 4 ++- .../directives/writer/WriteAsJsonMap.java | 4 ++- .../utils/SqlExpressionGenerator.java | 25 ++++++++++++++++- 14 files changed, 90 insertions(+), 58 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java index 2da92182a..1f6ed224f 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/ChangeColCaseNames.java @@ -111,7 +111,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + + // TODO: handle schema changes in relationalTranformContext for multiple directive execution + List columnNames = SqlExpressionGenerator.generateColumnNameList(relationalTranformContext); Map colmap = generateColumnCaseMap(columnNames, expressionFactory.get()); return relation.select(colmap); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java index c98d0dfca..9bbc54f12 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CleanseColumnNames.java @@ -37,11 +37,10 @@ import io.cdap.wrangler.api.parser.UsageDefinition; import io.cdap.wrangler.utils.SqlExpressionGenerator; -import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; + /** * A directive for cleanses columns names. * @@ -111,21 +110,12 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); - Map colmap = generateCleanseColumnMap(columnNames, expressionFactory.get()); - return relation.select(colmap); - } - public static Map generateCleanseColumnMap(Collection columns, - ExpressionFactory factory) { - Map columnExpMap = new LinkedHashMap<>(); - columns.forEach((colName)-> columnExpMap.put(String - .format(colName - .toString() - .toLowerCase() - .replaceAll("[^a-zA-Z0-9_]", "_")), factory - .compile(colName.toString()))); - return columnExpMap; + // TODO: handle schema changes in relationalTranformContext for multiple directive execution + List columnNames = SqlExpressionGenerator.generateColumnNameList(relationalTranformContext); + Map colmap = SqlExpressionGenerator + .generateCleanseColumnMap(columnNames, expressionFactory.get()); + return relation.select(colmap); } @Override diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java index d32c7cd55..175729b2c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetHeader.java @@ -110,19 +110,16 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); - Map columnExpMap = new LinkedHashMap<>(); - IntStream.range(0, Math.min(columnNames.size(), columns.size())) - .forEach(i -> columnExpMap.put(columns.get(i), expressionFactory.get().compile(columnNames.get(i)))); - - if (columnNames.size() > columns.size()) { - IntStream.range(columns.size(), columnNames.size()) - .forEach(i -> columnExpMap.put(columnNames.get(i), expressionFactory.get().compile(columnNames.get(i)))); - } + // TODO: handle schema changes in relationalTranformContext for multiple directive execution + List columnNames = SqlExpressionGenerator.generateColumnNameList(relationalTranformContext); + Map columnExpMap = SqlExpressionGenerator + .generateHeaders(columnNames, columns, expressionFactory.get()); return relation.select(columnExpMap); } + + @Override public boolean isSQLSupported() { return true; diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index 146e49d71..01aa0ba32 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -122,6 +122,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + + // TODO: handle decimal casting and byte casting + //TODO: implement short data type in CDAP String expression = SqlExpressionGenerator.getColumnTypeExp(type, col, scale); return relation.setColumn(col, expressionFactory.get().compile(expression)); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java index 71532c8a6..8c48d0e2b 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/DiffDate.java @@ -133,6 +133,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + + // TODO: handle columns of data type timestamp_micros, this implementation is for string data type return relation.setColumn(destCol, expressionFactory.get() .compile(String.format("datediff(millisecond, timestamp(%s), timestamp(%s))", column2, column1))); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java index 14cdd7056..3239a7a8a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java +++ b/wrangler-core/src/main/java/io/cdap/directives/date/FormatDate.java @@ -129,6 +129,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + + // TODO: handle columns of data type timestamp_micros, this implementation is for string data type return relation.setColumn(column, expressionFactory.get() .compile(String.format("date_format(timestamp(%s), '%s')", column, format))); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java index e7ba6fff9..35e555d5e 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/Fail.java @@ -130,11 +130,12 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - - return relation.setColumn("tempColumn", expressionFactory.get().compile( - String.format("if(%s, raise_error(\"Condition '%s' evaluated to true. " + + String errorExp = String.format("if(%s, raise_error(\"Condition '%s' evaluated to true. " + "Terminating processing.\"), %s)", el.getScriptParsedText(), - el.getScriptParsedText(), el.getScriptParsedText()))); + el.getScriptParsedText(), el.getScriptParsedText()); + + // TODO: handle cases where condition is not of ANSI SQL compatible syntax + return relation.setColumn("tempColumn", expressionFactory.get().compile(errorExp)); } @Override diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java index 513a75662..6797297bb 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java @@ -143,6 +143,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + + // TODO: handle cases where condition is not of ANSI SQL compatible syntax return relation.filter(expressionFactory.get().compile(el.getScriptParsedText())); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java index 925911c61..93fea2fab 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/ColumnExpression.java @@ -147,6 +147,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + + // TODO: handle cases where condition is not of ANSI SQL compatible syntax return relation.setColumn(column, expressionFactory.get().compile(el.getScriptParsedText())); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java index 52673aba6..b91c79886 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Decode.java @@ -163,24 +163,26 @@ public Mutation lineage() { } @Override - public Relation transform(RelationalTranformContext relationalTranformContext, - Relation relation) { + public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { Optional> expressionFactory = SqlExpressionGenerator .getExpressionFactory(relationalTranformContext); if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - if (method.toString().equalsIgnoreCase("base64")) { - return relation.setColumn( - String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("string(unbase64(" + column + "))")); - } else if (method.toString().equalsIgnoreCase("hex")) { - return relation.setColumn( - String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("string(unhex(" + column + "))")); - } else { - return new InvalidRelation(String.format("Decoding of type %s is not supported by " + - "SQL execution currently", method.toString())); + + switch (method.toString().toLowerCase()) { + case "base64": { + return relation.setColumn(String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), + expressionFactory.get().compile("unbase64(" + column + ")")); + } + case "hex": { + return relation.setColumn(String.format("%s_decode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), + expressionFactory.get().compile("unhex(" + column + ")")); + } + default: { + return new InvalidRelation(String.format("Decoding of type %s is not supported by " + + "SQL execution currently", method.toString())); + } } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java index 853a23ecd..d2dba718a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Encode.java @@ -158,24 +158,26 @@ public Mutation lineage() { } @Override - public Relation transform(RelationalTranformContext relationalTranformContext, - Relation relation) { + public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { Optional> expressionFactory = SqlExpressionGenerator .getExpressionFactory(relationalTranformContext); if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - if (method.toString().equalsIgnoreCase("base64")) { - return relation.setColumn( - String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("base64(" + column + ")")); - } else if (method.toString().equalsIgnoreCase("hex")) { - return relation.setColumn( - String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), expressionFactory - .get().compile("hex(" + column + ")")); - } else { - return new InvalidRelation(String.format("Encoding of type %s is not supported by " + + + switch (method.toString().toLowerCase()) { + case "base64": { + return relation.setColumn(String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), + expressionFactory.get().compile("base64(" + column + ")")); + } + case "hex": { + return relation.setColumn(String.format("%s_encode_%s", column, method.toString().toLowerCase(Locale.ENGLISH)), + expressionFactory.get().compile("hex(" + column + ")")); + } + default: { + return new InvalidRelation(String.format("Encoding of type %s is not supported by " + "SQL execution currently", method.toString())); + } } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java index ad7fa1a42..74181c351 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsCSV.java @@ -118,7 +118,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + + // TODO: handle schema changes in relationalTranformContext for multiple directive execution + List columnNames = SqlExpressionGenerator.generateColumnNameList(relationalTranformContext); return relation.setColumn(column, expressionFactory.get().compile(String .format("to_csv(struct(%s))", String.join(",", columnNames)))); } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java index 04a1cae3a..a41184abb 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonMap.java @@ -103,7 +103,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - List columnNames = SqlExpressionGenerator.generateListCols(relationalTranformContext); + + // TODO: handle schema changes in relationalTranformContext for multiple directive execution + List columnNames = SqlExpressionGenerator.generateColumnNameList(relationalTranformContext); return relation.setColumn(column, expressionFactory.get().compile(String .format("to_json(struct(%s))", String.join(",", columnNames)))); } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java index 2c9d70420..beed5d7fd 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SqlExpressionGenerator.java @@ -110,7 +110,7 @@ public static String getColumnTypeExp(String toType, String column, @Nullable In } } - public static List generateListCols(RelationalTranformContext relationalTranformContext) { + public static List generateColumnNameList(RelationalTranformContext relationalTranformContext) { List colnames = new ArrayList(); Set inputRelationNames = relationalTranformContext.getInputRelationNames(); for (String inputRelationName : inputRelationNames) { @@ -123,5 +123,28 @@ public static List generateListCols(RelationalTranformContext relational return colnames; } + public static Map generateHeaders(List columns, List headers, + ExpressionFactory factory) { + Map columnExpMap = new LinkedHashMap<>(); + for (int i = 0; i < Math.min(columns.size(), headers.size()); i++) { + columnExpMap.put(headers.get(i), factory.compile(columns.get(i))); + } + + if (columns.size() > headers.size()) { + for (int i = headers.size(); i < columns.size(); i++) { + columnExpMap.put(columns.get(i), factory.compile(columns.get(i))); + } + } + return columnExpMap; + } + + public static Map generateCleanseColumnMap(Collection columns, + ExpressionFactory factory) { + Map columnExpMap = new LinkedHashMap<>(); + columns.forEach((colName)-> columnExpMap.put(String + .format(colName.toString().toLowerCase().replaceAll("[^a-zA-Z0-9_]", "_")), factory + .compile(colName.toString()))); + return columnExpMap; + } } From 0dc5075cafdde68284fb2acac72690805c2145e2 Mon Sep 17 00:00:00 2001 From: shrverma Date: Thu, 10 Aug 2023 11:55:58 +0000 Subject: [PATCH 7/7] Implement quantization --- .../transformation/Quantization.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/Quantization.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/Quantization.java index 20e90f272..4f0c8f2bb 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/Quantization.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/Quantization.java @@ -22,6 +22,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,9 +41,11 @@ import io.cdap.wrangler.api.parser.Ranges; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A Wrangler step for quantizing a column. @@ -135,4 +141,25 @@ public Mutation lineage() { .conditional(col1, col2) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { + Optional> expressionFactory = + SqlExpressionGenerator.getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + List caseStatements = new ArrayList<>(); + this.rangeMap.asMapOfRanges().entrySet().stream() + .forEach(entry -> caseStatements.add(String.format("WHEN double(%s) BETWEEN %f AND %f THEN '%s'", col1, + entry.getKey().lowerEndpoint(), entry.getKey().upperEndpoint(), entry.getValue()))); + + return relation.setColumn(col2, expressionFactory.get().compile(String.format( + "CASE %s ELSE NULL END", String.join(" ", caseStatements)))); + } + + @Override + public boolean isSQLSupported() { + return true; + } }