Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-6328] Add optimization for aggregation query in align by device with template situation #12513

Merged
merged 35 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2b799e7
add temp impl
Beyyes May 9, 2024
304f68f
add select max_time(s1), last_value(s1), last_value(s2) impl
Beyyes May 11, 2024
a63cb4e
fix normal query error
Beyyes May 11, 2024
b732c75
add distributed serialize
Beyyes May 11, 2024
8699ede
fix sourcenode
Beyyes May 13, 2024
e7985fa
fix template serizlize error; fix filter can not push down bug
Beyyes May 13, 2024
1592507
process ProjectNode
liuminghui233 May 13, 2024
8ba6959
fix UT
liuminghui233 May 13, 2024
d1924f3
add more UT
liuminghui233 May 13, 2024
41f0b02
fix IT
liuminghui233 May 14, 2024
884dcca
not support agg(*) or agg(s1+1) now; perfect serialize method of RawD…
Beyyes May 14, 2024
613a344
fix extend aligned path
liuminghui233 May 14, 2024
6a69442
merge master
Beyyes May 14, 2024
303df4b
fix ut
Beyyes May 14, 2024
a70fc78
Merge branch 'lmh/fixPushDownProject' into beyyes/agg_template_alignb…
Beyyes May 14, 2024
c8199d2
fix descending aggregator problem; add more comments
Beyyes May 14, 2024
c1b9f3a
fix group by time; fix select agg1() having agg2()
Beyyes May 15, 2024
bd76f67
fix typeProvider.getTemplatedInfo().groupByTimeParameter problem
Beyyes May 15, 2024
135c808
merge with master
Beyyes May 15, 2024
7d8f8f8
add IoTDBAlignByDeviceWithTemplateAggregationIT
Beyyes May 15, 2024
8854da1
support sliding window, fix visitSlidingWindow method of OperatorGene…
Beyyes May 16, 2024
569d6ed
add ascending and descending aggregation descriptors support
Beyyes May 24, 2024
3d42a2a
merge master
Beyyes May 25, 2024
ef77dc5
fix merge error in TemplatedAnalyze
Beyyes May 26, 2024
d16c17e
Merge branch 'master' into beyyes/agg_template_alignbydevice
Beyyes May 26, 2024
2a1b868
fix outputEndTime serialize in templateInfo. remove redundant judgeme…
Beyyes May 27, 2024
126f856
add count_time support, add more its
Beyyes May 27, 2024
d86728d
fix select agg where s1>1 bug which is aggreagtion pushdown situation
Beyyes May 27, 2024
d596aa7
perfect logic of deviceToMeasurementIndexes, remove analyzeDeviceInpu…
Beyyes May 28, 2024
29caf67
fix deviceToMeasurementIdx error in SingleDeviceViewNode
Beyyes May 28, 2024
8f0fbe3
Merge branch 'master' into beyyes/agg_template_alignbydevice
Beyyes May 28, 2024
30b041e
remove useless code
Beyyes May 28, 2024
d5dbbaf
add more having case
Beyyes May 28, 2024
c5cad39
merge master, fix conflict
Beyyes May 28, 2024
b6acca4
fix smell
Beyyes May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ aggregation results last_value(temperature) and last_value(status), whereas buck
private Template deviceTemplate;
// when deviceTemplate is not empty and all expressions in this query are templated measurements,
// i.e. no aggregation and arithmetic expression
private boolean onlyQueryTemplateMeasurements = true;
private boolean noWhereAndAggregation = true;
// if it is wildcard query in templated align by device query
private boolean templateWildCardQuery;
// all queried measurementList and schemaList in deviceTemplate.
Expand Down Expand Up @@ -437,8 +437,8 @@ public TSDataType getType(Expression expression) {
return null;
}

if (isAllDevicesInOneTemplate()
&& (isOnlyQueryTemplateMeasurements() || expression instanceof TimeSeriesOperand)) {
if (allDevicesInOneTemplate()
&& (noWhereAndAggregation() || expression instanceof TimeSeriesOperand)) {
TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
return deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType();
}
Expand Down Expand Up @@ -921,7 +921,7 @@ public List<PartialPath> getDeviceList() {
// All Queries Devices Set In One Template
/////////////////////////////////////////////////////////////////////////////////////////////////

public boolean isAllDevicesInOneTemplate() {
public boolean allDevicesInOneTemplate() {
return this.deviceTemplate != null;
}

Expand All @@ -933,12 +933,12 @@ public void setDeviceTemplate(Template template) {
this.deviceTemplate = template;
}

public boolean isOnlyQueryTemplateMeasurements() {
return onlyQueryTemplateMeasurements;
public boolean noWhereAndAggregation() {
return noWhereAndAggregation;
}

public void setOnlyQueryTemplateMeasurements(boolean onlyQueryTemplateMeasurements) {
this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements;
public void setNoWhereAndAggregation(boolean value) {
this.noWhereAndAggregation = value;
}

public List<String> getMeasurementList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
static final Expression DEVICE_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(DEVICE, TSDataType.TEXT);

static final Expression END_TIME_EXPRESSION =
public static final Expression END_TIME_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME, TSDataType.INT64);

private final List<String> lastQueryColumnNames =
Expand Down Expand Up @@ -1904,20 +1904,22 @@ private void checkGroupByConditionExpressionType(
&& rightExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
"Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
"Please check the keep condition ([%s]), "
+ "it need to be a constant or a compare expression constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
return;
}
if (!(keepExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
"Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
"Please check the keep condition ([%s]), "
+ "it need to be a constant or a compare expression constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
}

private void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) {
static void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) {
if (!queryStatement.isGroupByTime()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ private ExpressionTypeAnalyzer() {}
public static TSDataType analyzeExpression(Analysis analysis, Expression expression) {
if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
analyzer.analyze(expression, null);

Map<String, IMeasurementSchema> context =
analysis.allDevicesInOneTemplate() ? analysis.getDeviceTemplate().getSchemaMap() : null;
analyzer.analyze(expression, context);

addExpressionTypes(analysis, analyzer);
}
Expand Down Expand Up @@ -96,7 +99,9 @@ public static void analyzeExpressionUsingTemplatedInfo(
Expression expression,
TemplatedInfo templatedInfo) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
analyzer.analyze(expression, templatedInfo.getSchemaMap());

Map<String, IMeasurementSchema> schemaMap = templatedInfo.getSchemaMap();
analyzer.analyze(expression, schemaMap);

types.putAll(analyzer.getExpressionTypes());
}
Expand Down Expand Up @@ -369,6 +374,7 @@ public TSDataType visitTimeSeriesOperand(
return setExpressionType(
timeSeriesOperand, context.get(timeSeriesOperand.getOutputSymbol()).getType());
}

return setExpressionType(timeSeriesOperand, timeSeriesOperand.getPath().getSeriesType());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeGroupByTime;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom;
import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;

/** Methods in this class are used for aggregation, templated with align by device situation. */
public class TemplatedAggregationAnalyze {

static boolean canBuildAggregationPlanUseTemplate(
Analysis analysis,
QueryStatement queryStatement,
IPartitionFetcher partitionFetcher,
ISchemaTree schemaTree,
MPPQueryContext context,
Template template) {

// not support order by expression and non-aligned template
if (queryStatement.hasOrderByExpression() || !template.isDirectAligned()) {
return false;
}

analysis.setNoWhereAndAggregation(false);

List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);

if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
// remove the device which won't appear in resultSet after limit/offset
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement);
}

List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
boolean valid = analyzeSelect(queryStatement, analysis, outputExpressions, template);
if (!valid) {
analysis.setDeviceTemplate(null);
return false;
}

analyzeDeviceToWhere(analysis, queryStatement);
if (deviceList.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
return true;
}
analysis.setDeviceList(deviceList);

if (analysis.getWhereExpression() != null
&& ConstantOperand.FALSE.equals(analysis.getWhereExpression())) {
analyzeOutput(analysis, queryStatement, outputExpressions);
analysis.setFinishQueryAfterAnalyze(true);
return true;
}

valid = analyzeHaving(analysis, queryStatement);
if (!valid) {
analysis.setDeviceTemplate(null);
return false;
}

analyzeDeviceToExpressions(analysis);

analyzeDeviceViewOutput(analysis, queryStatement);

// generate result set header according to output expressions
analyzeOutput(analysis, queryStatement, outputExpressions);

analyzeGroupByTime(analysis, queryStatement);
context.generateGlobalTimeFilter(analysis);

// fetch partition information
analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter());
return true;
}

private static boolean analyzeSelect(
QueryStatement queryStatement,
Analysis analysis,
List<Pair<Expression, String>> outputExpressions,
Template template) {
LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
selectExpressions.add(DEVICE_EXPRESSION);
if (queryStatement.isOutputEndTime()) {
return false;
}

analysis.setDeviceTemplate(template);

ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());

Set<Expression> aggregationExpressions = new LinkedHashSet<>();
for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
} else if (paginationController.hasCurLimit()) {
Expression selectExpression = resultColumn.getExpression();
outputExpressions.add(new Pair<>(selectExpression, resultColumn.getAlias()));
selectExpressions.add(selectExpression);
aggregationExpressions.add(selectExpression);
if (selectExpression instanceof FunctionExpression
&& "count_time"
.equalsIgnoreCase(((FunctionExpression) selectExpression).getFunctionName())) {
analysis.getExpressionTypes().put(NodeRef.of(selectExpression), TSDataType.INT64);
((FunctionExpression) selectExpression)
.setExpressions(Collections.singletonList(new TimestampOperand()));
} else {
analyzeExpressionType(analysis, selectExpression);
}
} else {
break;
}
}

List<String> measurementList = new ArrayList<>();
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
Set<String> measurementSet = new HashSet<>();

if (queryStatement.isCountTimeAggregation()) {
measurementList = new ArrayList<>(template.getSchemaMap().keySet());
measurementSchemaList = new ArrayList<>(template.getSchemaMap().values());
} else {
int idx = 0;
for (Expression selectExpression : selectExpressions) {
idx++;
if (idx == 1
|| (idx == 2 && ENDTIME.equalsIgnoreCase(selectExpression.getOutputSymbol()))) {
continue;
}

String measurement = selectExpression.getExpressions().get(0).getOutputSymbol();
// not support agg(*), agg(s1+1) now
if (!template.getSchemaMap().containsKey(measurement)) {
return false;
}

// for agg1(s1) + agg2(s1), only record s1 for one time
if (!measurementSet.contains(measurement)) {
measurementSet.add(measurement);
measurementList.add(measurement);
measurementSchemaList.add(template.getSchemaMap().get(measurement));
}
}
}

analysis.setMeasurementList(measurementList);
analysis.setMeasurementSchemaList(measurementSchemaList);
analysis.setAggregationExpressions(aggregationExpressions);
analysis.setOutputExpressions(outputExpressions);
analysis.setSelectExpressions(selectExpressions);
return true;
}

private static boolean analyzeHaving(Analysis analysis, QueryStatement queryStatement) {
if (!queryStatement.hasHaving()) {
return true;
}

Set<String> measurementSet = new HashSet<>(analysis.getMeasurementList());
Set<Expression> aggregationExpressions = analysis.getAggregationExpressions();
Expression havingExpression = queryStatement.getHavingCondition().getPredicate();
for (Expression aggregationExpression : searchAggregationExpressions(havingExpression)) {
Expression normalizedAggregationExpression = normalizeExpression(aggregationExpression);

// not support having agg(s1+s2) temporarily
if (!((normalizedAggregationExpression).getExpressions().get(0)
instanceof TimeSeriesOperand)) {
return false;
}

String measurement =
normalizedAggregationExpression.getExpressions().get(0).getOutputSymbol();
if (!measurementSet.contains(measurement)) {
// adapt this case: select agg(s1) from xx having agg(s3)
measurementSet.add(measurement);
analysis.getMeasurementList().add(measurement);
analysis
.getMeasurementSchemaList()
.add(analysis.getDeviceTemplate().getSchema(measurement));
}

analyzeExpressionType(analysis, aggregationExpression);
analyzeExpressionType(analysis, normalizedAggregationExpression);

aggregationExpressions.add(aggregationExpression);
}

TSDataType outputType = analyzeExpressionType(analysis, havingExpression);
if (outputType != TSDataType.BOOLEAN) {
throw new SemanticException(
String.format(
"The output type of the expression in HAVING clause should be BOOLEAN, actual data type: %s.",
outputType));
}
analysis.setHavingExpression(havingExpression);

return true;
}

private static void analyzeDeviceToExpressions(Analysis analysis) {
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());

analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());

analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions());
}
}
Loading
Loading