From 2b799e7d3c9af45694b20bd534a5f10dc9402e63 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Thu, 9 May 2024 22:23:23 +0800 Subject: [PATCH 01/28] add temp impl --- .../java/org/apache/iotdb/SessionExample.java | 29 ++- .../plan/analyze/AnalyzeVisitor.java | 6 +- .../analyze/TemplatedAggregationAnalyze.java | 122 +++++++++++++ .../plan/analyze/TemplatedAnalyze.java | 100 +++++------ .../plan/analyze/TemplatedInfo.java | 5 + .../plan/planner/LogicalPlanBuilder.java | 4 +- .../plan/planner/TemplatedLogicalPlan.java | 165 +++++++++++++++++- .../planner/TemplatedLogicalPlanBuilder.java | 55 ++++++ 8 files changed, 430 insertions(+), 56 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 0daec0885bf7..e65127fd3883 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -61,12 +61,39 @@ public class SessionExample { private static final String ROOT_SG1_D1 = "root.sg1.d1"; private static final String ROOT_SG1 = "root.sg1"; private static final String LOCAL_HOST = "127.0.0.1"; - public static final String SELECT_D1 = "select * from root.sg1.d1"; + public static final String SELECT_D1 = + "select value from root.cmadaas_nafp_surf.nafp.NAFP_GRAPES_MESO_FOR_3KM.data.RHU.`100`.`100000`.tile limit 10"; private static Random random = new Random(); public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + session = + new Session.Builder() + .host("172.20.31.60") + .port(6667) + .username("root") + .password("root") + .version(Version.V_1_0) + .build(); + session.open(false); + + // set session fetchSize + session.setFetchSize(10000); + + long time1 = System.currentTimeMillis(); + try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1)) { + // System.out.println(dataSet.getColumnNames()); + // dataSet.setFetchSize(1024); // default is 10000 + // while (dataSet.hasNext()) { + // System.out.println(dataSet.next()); + // } + } + System.out.println("Time: " + (System.currentTimeMillis() - time1)); + } + + public static void main1(String[] args) + throws IoTDBConnectionException, StatementExecutionException { session = new Session.Builder() .host(LOCAL_HOST) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index b23a5d44f9cf..f0086b5e79a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -1848,7 +1848,8 @@ private void checkGroupByConditionExpressionType( && rightExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } return; @@ -1856,7 +1857,8 @@ private void checkGroupByConditionExpressionType( if (!(keepExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java new file mode 100644 index 000000000000..ae2afa9c8f56 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java @@ -0,0 +1,122 @@ +package org.apache.iotdb.db.queryengine.plan.analyze; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; +import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.schemaengine.template.Template; + +import org.apache.tsfile.utils.Pair; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewInput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice; + +public class TemplatedAggregationAnalyze { + + // ----------- Methods below are used for aggregation, templated with align by device -------- + + static boolean analyzeAggregation( + Analysis analysis, + QueryStatement queryStatement, + IPartitionFetcher partitionFetcher, + ISchemaTree schemaTree, + MPPQueryContext context, + Template template) { + + List deviceList = analyzeFrom(queryStatement, schemaTree); + + if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) { + // remove the device which won't appear in resultSet after limit/offset + deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement); + } + + analyzeDeviceToWhere(analysis, queryStatement); + if (deviceList.isEmpty()) { + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + analysis.setDeviceList(deviceList); + + List> outputExpressions = new ArrayList<>(); + ColumnPaginationController paginationController = + new ColumnPaginationController( + queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset()); + for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {} + + analyzeSelect(queryStatement, analysis, outputExpressions, template); + if (analysis.getWhereExpression() != null + && analysis.getWhereExpression().equals(ConstantOperand.FALSE)) { + analyzeOutput(analysis, queryStatement, outputExpressions); + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + + analyzeDeviceToAggregation(analysis); + analyzeDeviceToSourceTransform(analysis); + analyzeDeviceToSource(analysis); + + analyzeDeviceViewOutput(analysis, queryStatement); + analyzeDeviceViewInput(analysis); + + // generate result set header according to output expressions + analyzeOutput(analysis, queryStatement, outputExpressions); + + context.generateGlobalTimeFilter(analysis); + // fetch partition information + analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter()); + return true; + } + + private static void analyzeSelect( + QueryStatement queryStatement, + Analysis analysis, + List> outputExpressions, + Template template) { + LinkedHashSet selectExpressions = new LinkedHashSet<>(); + selectExpressions.add(DEVICE_EXPRESSION); + if (queryStatement.isOutputEndTime()) { + selectExpressions.add(END_TIME_EXPRESSION); + } + for (Pair pair : outputExpressions) { + Expression selectExpression = pair.left; + selectExpressions.add(selectExpression); + } + analysis.setOutputExpressions(outputExpressions); + analysis.setSelectExpressions(selectExpressions); + analysis.setDeviceTemplate(template); + // TODO only add measurement and schema occured in selectExpressions + analysis.setMeasurementList(new ArrayList<>(template.getSchemaMap().keySet())); + analysis.setMeasurementSchemaList(new ArrayList<>(template.getSchemaMap().values())); + } + + private static void analyzeDeviceToSourceTransform(Analysis analysis) { + // TODO add having into SourceTransform + analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions()); + } + + private static void analyzeDeviceToSource(Analysis analysis) { + // TODO add having into Source + analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions()); + analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions()); + } + + private static void analyzeDeviceToAggregation(Analysis analysis) { + // TODO need add having clause? + analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index 856476eae241..1814a923d241 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -71,6 +71,7 @@ import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.analyzeAggregation; /** * This class provides accelerated implementation for multiple devices align by device query. This @@ -96,9 +97,8 @@ public static boolean canBuildPlanUseTemplate( IPartitionFetcher partitionFetcher, ISchemaTree schemaTree, MPPQueryContext context) { - if (queryStatement.isAggregationQuery() - || queryStatement.isGroupBy() - || queryStatement.isGroupByTime() + if (queryStatement.isGroupBy() + || (queryStatement.isGroupByTime() && !queryStatement.isAggregationQuery()) || queryStatement.isSelectInto() || queryStatement.hasFill() || schemaTree.hasNormalTimeSeries()) { @@ -106,58 +106,61 @@ public static boolean canBuildPlanUseTemplate( } List