Skip to content

Commit

Permalink
Merge branch 'master' into working
Browse files Browse the repository at this point in the history
  • Loading branch information
santanusinha committed Jan 31, 2022
2 parents bfc764f + 21c593e commit aaabab6
Show file tree
Hide file tree
Showing 12 changed files with 572 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import com.flipkart.foxtrot.common.Opcodes;
import com.flipkart.foxtrot.common.enums.CountPrecision;
import com.flipkart.foxtrot.common.query.Filter;
import com.flipkart.foxtrot.common.stats.Stat;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.hibernate.validator.constraints.NotEmpty;

import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Set;

/**
* User: Santanu Sinha ([email protected])
Expand All @@ -41,8 +43,13 @@ public class GroupRequest extends ActionRequest {
@NotEmpty
private String table;

// Kept for backward compatibility
private String uniqueCountOn;

private String aggregationField;

private Stat aggregationType;

@NotNull
@NotEmpty
private List<String> nesting;
Expand All @@ -53,46 +60,29 @@ public GroupRequest() {
super(Opcodes.GROUP);
}

public GroupRequest(List<Filter> filters, String table, String uniqueCountOn, List<String> nesting) {
public GroupRequest(List<Filter> filters, String table, String uniqueCountOn,
String aggregationField, Stat aggregationType,
List<String> nesting, CountPrecision precision) {
super(Opcodes.GROUP, filters);
this.table = table;
this.uniqueCountOn = uniqueCountOn;
this.aggregationField = aggregationField;
this.aggregationType = aggregationType;
this.nesting = nesting;
this.precision = precision;
}

public <T> T accept(ActionRequestVisitor<T> visitor) {
return visitor.visit(this);
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public String getUniqueCountOn() {
return uniqueCountOn;
}

public void setUniqueCountOn(String uniqueCountOn) {
this.uniqueCountOn = uniqueCountOn;
}

public List<String> getNesting() {
return nesting;
}

public void setNesting(List<String> nesting) {
this.nesting = nesting;
}

@Override
public String toString() {
return new ToStringBuilder(this).appendSuper(super.toString())
.append("table", table)
.append("aggregationType", aggregationType)
.append("uniqueCountOn", uniqueCountOn)
.append("aggregationField", aggregationField)
.append("nesting", nesting)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.flipkart.foxtrot.common.query.general.NotInFilter;
import com.flipkart.foxtrot.common.query.numeric.*;
import com.flipkart.foxtrot.common.query.string.ContainsFilter;
import com.flipkart.foxtrot.common.stats.Stat;
import com.flipkart.foxtrot.common.util.CollectionUtils;
import com.flipkart.foxtrot.common.visitor.CountPrecisionThresholdVisitorAdapter;
import com.flipkart.foxtrot.core.common.Action;
Expand All @@ -44,18 +45,22 @@
import com.flipkart.foxtrot.core.querystore.impl.ElasticsearchUtils;
import com.flipkart.foxtrot.core.table.TableMetadataManager;
import com.flipkart.foxtrot.core.util.ElasticsearchQueryUtils;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityAggregationBuilder;
import org.joda.time.Interval;

import java.io.IOException;
Expand All @@ -65,6 +70,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.flipkart.foxtrot.core.querystore.actions.Utils.statsString;
import static com.flipkart.foxtrot.core.util.ElasticsearchQueryUtils.QUERY_SIZE;

/**
Expand Down Expand Up @@ -111,6 +117,15 @@ public String getRequestCacheKey() {
.hashCode();
}

if (null != query.getAggregationField()) {
filterHashKey += 31 * query.getAggregationField()
.hashCode();
}

if(null != query.getAggregationType()){
filterHashKey += 31 * query.getAggregationType().hashCode();
}

for (int i = 0; i < query.getNesting()
.size(); i++) {
filterHashKey += 31 * query.getNesting()
Expand Down Expand Up @@ -140,7 +155,12 @@ public void validateImpl(GroupRequest parameter) {

if (parameter.getUniqueCountOn() != null && parameter.getUniqueCountOn()
.isEmpty()) {
validationErrors.add("unique field cannot be empty (can be null)");
validationErrors.add("uniqueCountOn cannot be empty (can be null)");
}

if (parameter.getAggregationField() != null && parameter.getAggregationField()
.isEmpty()) {
validationErrors.add("aggregation field cannot be empty (can be null)");
}

validateCardinality(parameter);
Expand All @@ -158,7 +178,7 @@ public ActionResponse execute(GroupRequest parameter) {
try {
SearchResponse response = getConnection()
.getClient()
.search(query);
.search(query, RequestOptions.DEFAULT);
return getResponse(response, parameter);
}
catch (IOException e) {
Expand Down Expand Up @@ -713,18 +733,42 @@ private Long getValidCount(Long count) {
: count;
}

private AbstractAggregationBuilder buildAggregation(GroupRequest parameter) {
private AbstractAggregationBuilder buildAggregation(GroupRequest groupRequest) {
return Utils.buildTermsAggregation(getParameter().getNesting()
.stream()
.map(x -> new ResultSort(x, ResultSort.Order.asc))
.collect(Collectors.toList()),
!CollectionUtils.isNullOrEmpty(getParameter().getUniqueCountOn())
? Sets.newHashSet(
Utils.buildCardinalityAggregation(getParameter().getUniqueCountOn(),
parameter.accept(new CountPrecisionThresholdVisitorAdapter(
elasticsearchTuningConfig.getPrecisionThreshold()))))
: Sets.newHashSet(), elasticsearchTuningConfig.getAggregationSize());
.collect(Collectors.toList()),buildSubAggregation(getParameter()),
elasticsearchTuningConfig.getAggregationSize());

}

private Set<AggregationBuilder> buildSubAggregation(GroupRequest groupRequest) {
// Keep this for backward compatibility to support uniqueCountOn attribute coming from old requests
if(!Strings.isNullOrEmpty(groupRequest.getUniqueCountOn())){
return Sets.newHashSet(buildCardinalityAggregation(groupRequest.getUniqueCountOn(), groupRequest));
}

if(Strings.isNullOrEmpty(groupRequest.getAggregationField())){
return Sets.newHashSet();
}

boolean isNumericField = Utils.isNumericField(getTableMetadataManager(), groupRequest.getTable(),
groupRequest.getAggregationField());
final AbstractAggregationBuilder groupAggStats;
if (isNumericField) {
groupAggStats = Utils.buildStatsAggregation(groupRequest.getAggregationField(),
Collections.singleton(groupRequest.getAggregationType()));
} else {
groupAggStats = buildCardinalityAggregation(groupRequest.getAggregationField(), groupRequest);
}
return Sets.newHashSet(groupAggStats);
}

private CardinalityAggregationBuilder buildCardinalityAggregation(String aggregationField,
GroupRequest groupRequest) {
return Utils.buildCardinalityAggregation(aggregationField,
groupRequest.accept(new CountPrecisionThresholdVisitorAdapter(
elasticsearchTuningConfig.getPrecisionThreshold())));
}

private Map<String, Object> getMap(List<String> fields, Aggregations aggregations) {
Expand All @@ -736,12 +780,17 @@ private Map<String, Object> getMap(List<String> fields, Aggregations aggregation
Map<String, Object> levelCount = Maps.newHashMap();
for (Terms.Bucket bucket : terms.getBuckets()) {
if (fields.size() == 1) {
if (!CollectionUtils.isNullOrEmpty(getParameter().getUniqueCountOn())) {
if (!Strings.isNullOrEmpty(getParameter().getUniqueCountOn())) {
String key = Utils.sanitizeFieldForAggregation(getParameter().getUniqueCountOn());
Cardinality cardinality = bucket.getAggregations()
.get(key);
levelCount.put(String.valueOf(bucket.getKey()), cardinality.getValue());
}
else if (!Strings.isNullOrEmpty(getParameter().getAggregationField())) {
String metricKey = Utils.getExtendedStatsAggregationKey(getParameter().getAggregationField());
levelCount.put(String.valueOf(bucket.getKey()), Utils.toStats(
bucket.getAggregations().get(metricKey)).get(statsString(getParameter().getAggregationType())));
}
else {
levelCount.put(String.valueOf(bucket.getKey()), bucket.getDocCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.flipkart.foxtrot.common.query.Filter;
import com.flipkart.foxtrot.common.query.ResultSort;
import com.flipkart.foxtrot.common.stats.Stat;
import com.flipkart.foxtrot.common.stats.Stat.StatVisitor;
import com.flipkart.foxtrot.common.util.CollectionUtils;
import com.flipkart.foxtrot.core.exception.FoxtrotExceptions;
import com.flipkart.foxtrot.core.querystore.impl.ElasticsearchUtils;
Expand Down Expand Up @@ -44,14 +45,14 @@ public class Utils {
private static final double[] DEFAULT_PERCENTILES = {1d, 5d, 25, 50d, 75d, 95d, 99d};
private static final double DEFAULT_COMPRESSION = 100.0;
private static final int PRECISION_THRESHOLD = 500;
private static final String COUNT = "count";
private static final String AVG = "avg";
private static final String SUM = "sum";
private static final String MIN = "min";
private static final String MAX = "max";
private static final String SUM_OF_SQUARES = "sum_of_squares";
private static final String VARIANCE = "variance";
private static final String STD_DEVIATION = "std_deviation";
public static final String COUNT = "count";
public static final String AVG = "avg";
public static final String SUM = "sum";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String SUM_OF_SQUARES = "sum_of_squares";
public static final String VARIANCE = "variance";
public static final String STD_DEVIATION = "std_deviation";
private static final EnumSet<FieldType> NUMERIC_FIELD_TYPES
= EnumSet.of(FieldType.INTEGER, FieldType.LONG, FieldType.FLOAT, FieldType.DOUBLE);

Expand Down Expand Up @@ -355,9 +356,54 @@ public static boolean isNumericField(TableMetadataManager tableMetadataManager,
}

public static boolean hasTemporalFilters(List<Filter> filters) {
if(null == filters) {
if (null == filters) {
return false;
}
return filters.stream().anyMatch(Filter::isFilterTemporal);
}

public static String statsString(Stat aggregationType) {
return aggregationType
.visit(new StatVisitor<String>() {
@Override
public String visitCount() {
return Utils.COUNT;
}

@Override
public String visitMin() {
return Utils.MIN;
}

@Override
public String visitMax() {
return Utils.MAX;
}

@Override
public String visitAvg() {
return Utils.AVG;
}

@Override
public String visitSum() {
return Utils.SUM;
}

@Override
public String visitSumOfSquares() {
return Utils.SUM_OF_SQUARES;
}

@Override
public String visitVariance() {
return Utils.VARIANCE;
}

@Override
public String visitStdDeviation() {
return Utils.STD_DEVIATION;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testCardinalityEmailBuild() throws JsonProcessingException {
Assert.assertEquals("Blocked query as it might have screwed up the cluster", email.getSubject());
Assert.assertEquals(
"Blocked Query: {\"opcode\":\"group\",\"filters\":[],\"bypassCache\":false,\"table\":\"test-table\"," +
"\"uniqueCountOn\":null,\"nesting\":[\"os\",\"deviceId\"],\"precision\":null}\n" +
"\"uniqueCountOn\":null,\"aggregationField\":null,\"aggregationType\":null,\"nesting\":[\"os\",\"deviceId\"],\"precision\":null}\n" +
"Suspect field: deviceId\n" +
"Probability of screwing up the cluster: 0.75",
email.getContent());
Expand Down
Loading

0 comments on commit aaabab6

Please sign in to comment.