diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java index 4a7bb62d5179..084f35ee2559 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateAggregationIT.java @@ -32,6 +32,7 @@ import java.sql.Connection; import java.sql.Statement; +import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail; import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; @RunWith(IoTDBTestRunner.class) @@ -100,32 +101,95 @@ public void aggregationTest() { expectedHeader, retArray); - // value filter + having + group by time + // ascending with descending + + // group by session, condition, + + // wildcard: agg(*) + + // count_time(*) + + // agg operation expression : agg(s1+1) + + // duplicate select expressions + + // non aligned template: sg1 + + // filter cannot push down + + // having count(s1+s2) + } + + @Test + public void groupByTest() { + // + String[] expectedHeader = + new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; + String[] retArray = + new String[] { + "1,root.sg2.d1,2,2.2,false,", + "5,root.sg2.d1,5,5.5,true,", + "1,root.sg2.d2,2,22.2,false,", + "5,root.sg2.d2,5,50.0,false,", + "1,root.sg2.d3,1,111.1,true,", + }; + resultSetEqualTest( + "select max_time(s1), last_value(s1), last_value(s2) from root.sg2.** group by ([1,10), 2ms) having last_value(s2) is not null limit 5 align by device;", + expectedHeader, + retArray); + + // sliding window expectedHeader = new String[] {"Time,Device,max_time(s1),last_value(s1),last_value(s2)"}; retArray = new String[] { "1,root.sg2.d1,2,2.2,false,", "1,root.sg2.d2,2,22.2,false,", + "3,root.sg2.d2,5,50.0,false,", "5,root.sg2.d2,5,50.0,false,", "7,root.sg2.d3,8,8.8,false,", + "3,root.sg2.d4,5,5555.5,false,", "5,root.sg2.d4,5,5555.5,false,", }; resultSetEqualTest( - "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10),2ms) having avg(s1)>0 align by device;", + "SELECT max_time(s1), last_value(s1), last_value(s2) FROM root.sg2.** where s3+1=1316 or s2=false group by ([1,10),3ms,2ms) having avg(s1)>0 align by device;", expectedHeader, retArray); + } - // agg operation expression - - // group by session, - - // order by - - // wildcald + @Test + public void orderByTest() { + String[] expectedHeader = new String[] {"Time,Device,sum(s3)"}; + String[] retArray = + new String[] { + "4,root.sg2.d1,5.0,", + "4,root.sg2.d2,5.0,", + "4,root.sg2.d3,44.0,", + "4,root.sg2.d4,5555.0,", + "2,root.sg2.d1,2.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by time desc offset 8 limit 5 align by device;", + expectedHeader, + retArray); - // ascending with descending + expectedHeader = new String[] {"Time,Device,sum(s3)"}; + retArray = + new String[] { + "0,root.sg2.d1,1.0,", + "2,root.sg2.d1,2.0,", + "4,root.sg2.d1,5.0,", + "0,root.sg2.d2,11.0,", + "2,root.sg2.d2,22.0,", + }; + resultSetEqualTest( + "select sum(s3) from root.sg2.** where s1>1 GROUP BY([0, 10), 2ms) order by count(s2) desc limit 5 align by device;", + expectedHeader, + retArray); - // count_time + // order by non-existent measurement + assertTestFail( + "select sum(s3) from root.sg2.** where s1>1 order by count(s_null) desc limit 5 align by device;", + "count(s_null) in order by clause doesn't exist."); } protected static void insertData() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java index cf19a791482f..dc926303880a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java @@ -64,6 +64,10 @@ static boolean canBuildAggregationPlanUseTemplate( MPPQueryContext context, Template template) { + if (queryStatement.hasOrderByExpression()) { + return false; + } + analysis.setNoWhereAndAggregation(false); List deviceList = analyzeFrom(queryStatement, schemaTree); @@ -93,7 +97,7 @@ static boolean canBuildAggregationPlanUseTemplate( return true; } - analyzeHaving(analysis, queryStatement, schemaTree, deviceList); + analyzeHaving(analysis, queryStatement); analyzeDeviceToAggregation(analysis); analyzeDeviceToSourceTransform(analysis); @@ -177,11 +181,7 @@ private static boolean analyzeSelect( return true; } - private static void analyzeHaving( - Analysis analysis, - QueryStatement queryStatement, - ISchemaTree schemaTree, - List deviceSet) { + private static void analyzeHaving(Analysis analysis, QueryStatement queryStatement) { if (!queryStatement.hasHaving()) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index bc14b2c55cdf..1c56f3c4ba26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -270,7 +270,7 @@ static void analyzeDeviceToWhere(Analysis analysis, QueryStatement queryStatemen } } - private static void analyzeDeviceToOrderBy( + static void analyzeDeviceToOrderBy( Analysis analysis, QueryStatement queryStatement, ISchemaTree schemaTree, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java index 2ae63c12a90f..d9981a97a91c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java @@ -77,15 +77,12 @@ public class TemplatedInfo { private int maxTsBlockLineNum = -1; // variables related to predicate push down - // TODO when to init pushDownPredicate in agg situation? private Expression pushDownPredicate; // variables related to aggregation - public List aggregationDescriptorList; - public GroupByTimeParameter groupByTimeParameter; - public boolean outputEndTime; - - private Expression havingExpression; + private List aggregationDescriptorList; + private final GroupByTimeParameter groupByTimeParameter; + private final boolean outputEndTime; public TemplatedInfo( List measurementList, @@ -117,11 +114,11 @@ public TemplatedInfo( this.predicate = predicate; if (predicate != null) { this.keepNull = keepNull; - this.schemaMap = schemaMap; this.filterLayoutMap = filterLayoutMap; } this.pushDownPredicate = pushDownPredicate; + this.schemaMap = schemaMap; this.aggregationDescriptorList = aggregationDescriptorList; this.groupByTimeParameter = groupByTimeParameter; this.outputEndTime = outputEndTime; @@ -202,6 +199,22 @@ public Expression[] getProjectExpressions() { return projectExpressions; } + public void setAggregationDescriptorList(List aggregationDescriptorList) { + this.aggregationDescriptorList = aggregationDescriptorList; + } + + public List getAggregationDescriptorList() { + return this.aggregationDescriptorList; + } + + public GroupByTimeParameter getGroupByTimeParameter() { + return this.groupByTimeParameter; + } + + public boolean isOutputEndTime() { + return outputEndTime; + } + public static Map> makeLayout(List measurementList) { Map> outputMappings = new LinkedHashMap<>(); int tsBlockIndex = 0; @@ -273,6 +286,8 @@ public void serialize(ByteBuffer byteBuffer) { } else { ReadWriteIOUtils.write((byte) 0, byteBuffer); } + + ReadWriteIOUtils.write(outputEndTime, byteBuffer); } public void serialize(DataOutputStream stream) throws IOException { @@ -332,6 +347,8 @@ public void serialize(DataOutputStream stream) throws IOException { } else { ReadWriteIOUtils.write((byte) 0, stream); } + + ReadWriteIOUtils.write(outputEndTime, stream); } public static TemplatedInfo deserialize(ByteBuffer byteBuffer) { @@ -377,20 +394,21 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) { long limitValue = ReadWriteIOUtils.readLong(byteBuffer); Expression predicate = null; - byte hasFilter = ReadWriteIOUtils.readByte(byteBuffer); - Map currentSchemaMap = null; - Map> layoutMap = null; boolean keepNull = false; + Map> filterLayoutMap = null; + byte hasFilter = ReadWriteIOUtils.readByte(byteBuffer); if (hasFilter == 1) { predicate = Expression.deserialize(byteBuffer); keepNull = ReadWriteIOUtils.readBool(byteBuffer); - currentSchemaMap = new HashMap<>(); - for (IMeasurementSchema measurementSchema : measurementSchemaList) { - currentSchemaMap.put(measurementSchema.getMeasurementId(), measurementSchema); - } - layoutMap = makeLayout(measurementList); + filterLayoutMap = makeLayout(measurementList); } + Map measurementSchemaMap = + new HashMap<>(measurementSchemaList.size()); + measurementSchemaList.forEach( + measurementSchema -> + measurementSchemaMap.put(measurementSchema.getMeasurementId(), measurementSchema)); + Expression pushDownPredicate = null; byte hasPushDownFilter = ReadWriteIOUtils.readByte(byteBuffer); if (hasPushDownFilter == 1) { @@ -412,7 +430,7 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) { groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer); } - // TODO add outputEndTime serialization and deserialization + boolean outputEndTime = ReadWriteIOUtils.readBool(byteBuffer); return new TemplatedInfo( measurementList, @@ -426,11 +444,11 @@ public static TemplatedInfo deserialize(ByteBuffer byteBuffer) { limitValue, predicate, keepNull, - currentSchemaMap, - layoutMap, + measurementSchemaMap, + filterLayoutMap, pushDownPredicate, aggregationDescriptorList, groupByTimeParameter, - false); + outputEndTime); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java index 2e65e7501937..1df1279903d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java @@ -194,6 +194,7 @@ public PlanNode visitDeviceView(DeviceViewNode node, RewriterContext context) { @Override public PlanNode visitSingleDeviceView(SingleDeviceViewNode node, RewriterContext context) { context.setCurDevice(node.getDevice()); + context.setCurDevicePath(new PartialPath(node.getDevice().split(","))); PlanNode rewrittenChild = node.getChild().accept(this, context); node.setChild(rewrittenChild); return node; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index b696501a4174..f1beddaaf456 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -736,9 +736,6 @@ public LogicalPlanBuilder planSlidingWindowAggregation( GroupByTimeParameter groupByTimeParameter, AggregationStep curStep, Ordering scanOrder) { - if (aggregationExpressions == null) { - return this; - } this.root = createSlidingWindowAggregationNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 6b2ecccfd65a..0e48759983c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -494,7 +494,7 @@ public Operator visitProject(ProjectNode node, LocalExecutionPlanContext context List inputColumnNames; List outputColumnNames = node.getOutputColumnNames(); if (outputColumnNames == null) { - if (context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList != null) { + if (context.getTypeProvider().getTemplatedInfo().getAggregationDescriptorList() != null) { // TODO fix it // outputColumnNames is aggregation expression outputColumnNames = context.getTypeProvider().getTemplatedInfo().getDeviceViewOutputNames(); @@ -600,14 +600,15 @@ public Operator visitSeriesAggregationScan( public Operator visitAlignedSeriesAggregationScan( AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext context) { if (context.isBuildPlanUseTemplate()) { + // TODO template situation, variables such as aggregator, scanOptions may be serialized once return constructAlignedSeriesAggregationScanOperator( node.getPlanNodeId(), node.getAlignedPath(), - context.getTemplatedInfo().aggregationDescriptorList, + context.getTemplatedInfo().getAggregationDescriptorList(), context.getTemplatedInfo().getPushDownPredicate(), context.getTemplatedInfo().getScanOrder(), - context.getTemplatedInfo().groupByTimeParameter, - context.getTemplatedInfo().outputEndTime, + context.getTemplatedInfo().getGroupByTimeParameter(), + context.getTemplatedInfo().isOutputEndTime(), context); } @@ -640,7 +641,6 @@ private Operator constructAlignedSeriesAggregationScanOperator( Expression expression = descriptor.getInputExpressions().get(0); if (expression instanceof TimeSeriesOperand) { - // TODO for template_agg, no need use getPath.getMeasurement String inputSeries = ((TimeSeriesOperand) (descriptor.getInputExpressions().get(0))) .getPath() @@ -654,7 +654,6 @@ private Operator constructAlignedSeriesAggregationScanOperator( descriptor.getAggregationFuncName(), descriptor.getAggregationType(), Collections.singletonList(seriesDataType), - // TODO inputExpression must be devicePath+measurement descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending, @@ -669,7 +668,6 @@ private Operator constructAlignedSeriesAggregationScanOperator( descriptor.getAggregationFuncName(), descriptor.getAggregationType(), Collections.singletonList(TSDataType.INT64), - // TODO inputExpression must be devicePath+measurement descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending, @@ -698,7 +696,6 @@ private Operator constructAlignedSeriesAggregationScanOperator( convertPredicateToFilter( pushDownPredicate, alignedPath.getMeasurementList(), - // TODO what's the meaning of isBuildPlanUseTemplate context.getTypeProvider().getTemplatedInfo() != null, context.getTypeProvider())); } @@ -1364,7 +1361,12 @@ public Operator visitTransform(TransformNode node, LocalExecutionPlanContext con final Map, TSDataType> expressionTypes = new HashMap<>(); for (Expression projectExpression : projectExpressions) { - ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression); + if (context.isBuildPlanUseTemplate()) { + ExpressionTypeAnalyzer.analyzeExpressionUsingTemplatedInfo( + expressionTypes, projectExpression, context.getTemplatedInfo()); + } else { + ExpressionTypeAnalyzer.analyzeExpression(expressionTypes, projectExpression); + } } boolean hasNonMappableUDF = false; @@ -1764,13 +1766,24 @@ public Operator visitSlidingWindowAggregation( List aggregationDescriptors = node.getAggregationDescriptorList(); for (AggregationDescriptor descriptor : aggregationDescriptors) { List inputLocationList = calcInputLocationList(descriptor, layout); + List dataTypes = new ArrayList<>(); + for (Expression expression : descriptor.getInputExpressions()) { + if (context.isBuildPlanUseTemplate() && expression instanceof TimeSeriesOperand) { + dataTypes.add( + context + .getTemplatedInfo() + .getSchemaMap() + .get(expression.getExpressionString()) + .getType()); + } else { + dataTypes.add(context.getTypeProvider().getType(expression.getExpressionString())); + } + } aggregators.add( SlidingWindowAggregatorFactory.createSlidingWindowAggregator( descriptor.getAggregationFuncName(), descriptor.getAggregationType(), - descriptor.getInputExpressions().stream() - .map(x -> context.getTypeProvider().getType(x.getExpressionString())) - .collect(Collectors.toList()), + dataTypes, descriptor.getInputExpressions(), descriptor.getInputAttributes(), ascending, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index a00aee711d70..73b29b76f7f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -298,19 +298,10 @@ public PlanNode visitQueryBody(PartialPath devicePath) { // ============== Methods below are used for templated aggregation ====================== private PlanNode visitAggregation() { + // group by level and group by tag is not allowed in align by device boolean outputPartial = - queryStatement.isGroupByLevel() - || queryStatement.isGroupByTag() - || (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()); + queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap(); AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; - - if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { - curStep = - (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag()) - ? AggregationStep.INTERMEDIATE - : AggregationStep.FINAL; - } - aggregationDescriptorList = constructAggregationDescriptorList(analysis.getAggregationExpressions(), curStep); updateTypeProvider(analysis.getAggregationExpressions()); @@ -321,13 +312,14 @@ private PlanNode visitAggregation() { aggregationDescriptor, context.getTypeProvider())); } - context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList = - aggregationDescriptorList; - - LogicalPlanBuilder planBuilder = + LogicalPlanBuilder templatedPlanBuilder = new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList); Map deviceToSubPlanMap = new LinkedHashMap<>(); - deduplicatedDescriptors = getDeduplicatedDescriptors(aggregationDescriptorList); + aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList); + context + .getTypeProvider() + .getTemplatedInfo() + .setAggregationDescriptorList(aggregationDescriptorList); for (PartialPath devicePath : analysis.getDeviceList()) { String deviceName = devicePath.getFullPath(); PlanNode rootNode = visitDeviceAggregationBody(devicePath, curStep); @@ -339,46 +331,44 @@ private PlanNode visitAggregation() { deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot()); } - // convert to ALIGN BY DEVICE view - planBuilder = - planBuilder.planDeviceView( - deviceToSubPlanMap, - analysis.getDeviceViewOutputExpressions(), - analysis.getDeviceViewInputIndexesMap(), - analysis.getSelectExpressions(), - queryStatement, - analysis); - - planBuilder = - planBuilder.planHavingAndTransform( - analysis.getHavingExpression(), - analysis.getSelectExpressions(), - analysis.getOrderByExpressions(), - queryStatement.isGroupByTime(), - queryStatement.getResultTimeOrder()); + templatedPlanBuilder = + templatedPlanBuilder + .planDeviceView( + deviceToSubPlanMap, + analysis.getDeviceViewOutputExpressions(), + analysis.getDeviceViewInputIndexesMap(), + analysis.getSelectExpressions(), + queryStatement, + analysis) + .planHavingAndTransform( + analysis.getHavingExpression(), + analysis.getSelectExpressions(), + analysis.getOrderByExpressions(), + queryStatement.isGroupByTime(), + queryStatement.getResultTimeOrder()); if (!queryStatement.needPushDownSort()) { - planBuilder = planBuilder.planOrderBy(queryStatement, analysis); + templatedPlanBuilder = templatedPlanBuilder.planOrderBy(queryStatement, analysis); } - planBuilder = - planBuilder + templatedPlanBuilder = + templatedPlanBuilder .planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder()) .planOffset(queryStatement.getRowOffset()); if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) { - planBuilder = planBuilder.planLimit(queryStatement.getRowLimit()); + templatedPlanBuilder = templatedPlanBuilder.planLimit(queryStatement.getRowLimit()); } - return planBuilder.getRoot(); + return templatedPlanBuilder.getRoot(); } private PlanNode visitDeviceAggregationBody(PartialPath devicePath, AggregationStep curStep) { - TemplatedLogicalPlanBuilder planBuilder = + TemplatedLogicalPlanBuilder templatedPlanBuilder = new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList, newSchemaList); - planBuilder = - planBuilder + templatedPlanBuilder = + templatedPlanBuilder .planRawDataSource( devicePath, queryStatement.getResultTimeOrder(), @@ -388,30 +378,23 @@ private PlanNode visitDeviceAggregationBody(PartialPath devicePath, AggregationS .planFilter( whereExpression, queryStatement.isGroupByTime(), + queryStatement.getResultTimeOrder()) + .planRawDataAggregation( + analysis.getAggregationExpressions(), + null, + analysis.getGroupByTimeParameter(), + analysis.getGroupByParameter(), + queryStatement.isOutputEndTime(), + curStep, + queryStatement.getResultTimeOrder(), + aggregationDescriptorList) + .planSlidingWindowAggregation( + queryStatement, + analysis.getAggregationExpressions(), + analysis.getGroupByTimeParameter(), queryStatement.getResultTimeOrder()); - planBuilder = - planBuilder.planRawDataAggregation( - analysis.getAggregationExpressions(), - null, - analysis.getGroupByTimeParameter(), - analysis.getGroupByParameter(), - queryStatement.isOutputEndTime(), - curStep, - queryStatement.getResultTimeOrder(), - deduplicatedDescriptors); - - if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { - planBuilder = - planBuilder.planSlidingWindowAggregation( - analysis.getSelectExpressions(), - analysis.getGroupByTimeParameter(), - curStep, - queryStatement.getResultTimeOrder()); - } - - // no group by level and group by tag - return planBuilder.getRoot(); + return templatedPlanBuilder.getRoot(); } private List constructAggregationDescriptorList( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java index 3ee79317753c..1d8350ba0075 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java @@ -35,13 +35,17 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE; + /** * This class provides accelerated implementation for multiple devices align by device query. This * optimization is only used for devices set in only one template, using template can avoid many @@ -167,17 +171,29 @@ public TemplatedLogicalPlanBuilder planRawDataAggregation( } public TemplatedLogicalPlanBuilder planSlidingWindowAggregation( + QueryStatement queryStatement, Set aggregationExpressions, GroupByTimeParameter groupByTimeParameter, - AggregationStep curStep, Ordering scanOrder) { - if (aggregationExpressions == null) { + if (!queryStatement.isGroupByTime() || !analysis.getGroupByTimeParameter().hasOverlap()) { return this; } + LinkedHashSet slidingWindowsExpressions = new LinkedHashSet<>(); + aggregationExpressions.forEach( + expression -> { + if (!DEVICE.equalsIgnoreCase(expression.getOutputSymbol())) { + slidingWindowsExpressions.add(expression); + } + }); this.root = createSlidingWindowAggregationNode( - this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder); + this.getRoot(), + slidingWindowsExpressions, + groupByTimeParameter, + AggregationStep.FINAL, + scanOrder); + return this; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java index 7bdbea3a577a..adaad506184c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java @@ -287,11 +287,11 @@ public static AlignedSeriesAggregationScanNode deserializeUseTemplate( return new AlignedSeriesAggregationScanNode( planNodeId, alignedPath, - typeProvider.getTemplatedInfo().aggregationDescriptorList, + typeProvider.getTemplatedInfo().getAggregationDescriptorList(), typeProvider.getTemplatedInfo().getScanOrder(), - typeProvider.getTemplatedInfo().outputEndTime, + typeProvider.getTemplatedInfo().isOutputEndTime(), typeProvider.getTemplatedInfo().getPushDownPredicate(), - typeProvider.getTemplatedInfo().groupByTimeParameter, + typeProvider.getTemplatedInfo().getGroupByTimeParameter(), null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java index d4b979f91a85..9de54f85c0a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java @@ -162,8 +162,4 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hash(startTime, endTime, interval, slidingStep, leftCRightO); } - - public GroupByTimeParameter clone() { - return new GroupByTimeParameter(startTime, endTime, interval, slidingStep, leftCRightO); - } }