Skip to content

Commit

Permalink
Fix sorting with multiple SortByClauses (#15)
Browse files Browse the repository at this point in the history
* Fix sorting with multiple SortByClauses

* Add tests for all data types in SortOperationTest

* Fix comments in SortOperationTest

* Remove unnecessary throw in tests

* Use assertDoesNotThrow in tests

* Change IP-addresses in tests
  • Loading branch information
51-code authored Jun 13, 2024
1 parent 9603a06 commit d0e2257
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ public final class BatchCollect extends SortOperation {
private boolean sortedBySingleColumn = false;

public BatchCollect(String sortColumn, int numberOfRows) {
super();

LOGGER.info("Initialized BatchCollect based on column " + sortColumn + " and a limit of " + numberOfRows
+ " row(s)");
this.sortColumn = sortColumn;
this.numberOfRows = numberOfRows;
this(sortColumn, numberOfRows, new ArrayList<>());
}

public BatchCollect(String sortColumn, int numberOfRows, List<SortByClause> listOfSortByClauses) {
Expand Down
76 changes: 39 additions & 37 deletions src/main/java/com/teragrep/functions/dpf_02/SortOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import scala.collection.JavaConversions;

import java.util.ArrayList;
import java.util.List;

public abstract class SortOperation {
private List<SortByClause> listOfSortByClauses = null;
private final List<SortByClause> listOfSortByClauses;

/**
* Initializes an empty list of SortByClauses
*/
public SortOperation() {
this.listOfSortByClauses = new ArrayList<>();
this(new ArrayList<>());
}

public SortOperation(List<SortByClause> listOfSortByClauses) {
Expand All @@ -78,79 +79,80 @@ public void addSortByClause(SortByClause sortByClause) {

// Performs orderBy operation on a dataset and returns the ordered one
public Dataset<Row> orderDatasetByGivenColumns(Dataset<Row> ds) {
final SparkSession ss = SparkSession.builder().getOrCreate();

if (this.listOfSortByClauses != null && this.listOfSortByClauses.size() > 0) {
if (!this.listOfSortByClauses.isEmpty()) {
ArrayList<Column> sortColumns = new ArrayList<>();
for (SortByClause sbc : listOfSortByClauses) {
if (sbc.getSortAsType() == SortByClause.Type.AUTOMATIC) {
SortByClause.Type autoType = detectSortByType(ds, sbc.getFieldName());
ds = orderDatasetBySortByClause(ss, ds, sbc, autoType);
}
else {
ds = orderDatasetBySortByClause(ss, ds, sbc, null);
sbc = detectSortByType(ds, sbc);
}
sortColumns.add(sortByClauseToColumn(sbc));
}
ds = ds.orderBy(JavaConversions.asScalaBuffer(sortColumns));
}

return ds;
}

// orderBy based on sortByClause type and if it is descending/ascending
private Dataset<Row> orderDatasetBySortByClause(final SparkSession ss, final Dataset<Row> unsorted, final SortByClause sortByClause, final SortByClause.Type overrideSortType) {
Dataset<Row> rv = null;
SortByClause.Type sortByType = sortByClause.getSortAsType();
if (overrideSortType != null) {
sortByType = overrideSortType;
}

switch (sortByType) {
case DEFAULT:
rv = unsorted.orderBy(sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).desc() :
functions.col(sortByClause.getFieldName()).asc());
break;
// make Column by sortByClause type and if it is descending/ascending
private Column sortByClauseToColumn(SortByClause sortByClause) {
Column column;
switch (sortByClause.getSortAsType()) {
case STRING:
rv = unsorted.orderBy(sortByClause.isDescending() ?
column = sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc());
functions.col(sortByClause.getFieldName()).cast(DataTypes.StringType).asc();
break;
case NUMERIC:
rv = unsorted.orderBy(sortByClause.isDescending() ?
column = sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).desc() :
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc());
functions.col(sortByClause.getFieldName()).cast(DataTypes.DoubleType).asc();
break;
case IP_ADDRESS:
final SparkSession ss = SparkSession.builder().getOrCreate();
UserDefinedFunction ipStringToIntUDF = functions.udf(new ConvertIPStringToInt(), DataTypes.LongType);
ss.udf().register("ip_string_to_int", ipStringToIntUDF);
Column sortingCol = functions.callUDF("ip_string_to_int", functions.col(sortByClause.getFieldName()));

rv = unsorted.orderBy(sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc());
column = sortByClause.isDescending() ? sortingCol.desc() : sortingCol.asc();
break;
case DEFAULT:
default:
column = sortByClause.isDescending() ?
functions.col(sortByClause.getFieldName()).desc() :
functions.col(sortByClause.getFieldName()).asc();
}
return rv;
return column;
}

// detect sorting type if auto() was used in sort
private SortByClause.Type detectSortByType(final Dataset<Row> ds, final String fieldName) {
private SortByClause detectSortByType(final Dataset<Row> ds, final SortByClause sbc) {
StructField[] fields = ds.schema().fields();
SortByClause newSbc = new SortByClause();
newSbc.setDescending(sbc.isDescending());
newSbc.setLimit(sbc.getLimit());
newSbc.setFieldName(sbc.getFieldName());

for (StructField field : fields) {
if (field.name().equals(fieldName)) {
if (field.name().equals(sbc.getFieldName())) {
switch (field.dataType().typeName()) {
case "string": // ip address?
return numericalStringCheck(ds, fieldName);
newSbc.setSortAsType(numericalStringCheck(ds, sbc.getFieldName()));
break;
case "long":
case "integer":
case "float":
case "double":
return SortByClause.Type.NUMERIC;
newSbc.setSortAsType(SortByClause.Type.NUMERIC);
break;
case "timestamp":
return SortByClause.Type.NUMERIC; // convert to unix epoch?
newSbc.setSortAsType(SortByClause.Type.NUMERIC); // convert to unix epoch?
break;
default:
return SortByClause.Type.DEFAULT;
newSbc.setSortAsType(SortByClause.Type.DEFAULT);
}
}
}
return SortByClause.Type.DEFAULT;
return newSbc;
}

/**
Expand Down
Loading

0 comments on commit d0e2257

Please sign in to comment.