From 5c8f0a7857331a6a6603cdcf4090115841a8d2d6 Mon Sep 17 00:00:00 2001 From: 924060929 Date: Fri, 30 Aug 2024 21:05:10 +0800 Subject: [PATCH] optimize insert into values --- .../apache/doris/nereids/NereidsPlanner.java | 14 +- .../translator/PhysicalPlanTranslator.java | 26 ++- .../nereids/parser/LogicalPlanBuilder.java | 21 +- .../doris/nereids/parser/NereidsParser.java | 49 +++-- .../rules/FoldConstantRuleOnFE.java | 9 + .../expressions/literal/DateLiteral.java | 38 +++- .../insert/BatchInsertIntoTableCommand.java | 6 +- .../insert/InsertIntoTableCommand.java | 203 +++++++++++++++++- .../insert/InsertOverwriteTableCommand.java | 5 +- .../plans/commands/insert/InsertUtils.java | 126 ++++++++--- .../insert/OlapGroupCommitInsertExecutor.java | 8 +- .../plans/logical/LogicalInlineTable.java | 72 ++++++- .../trees/plans/logical/LogicalUnion.java | 8 + .../plans/physical/PhysicalOlapTableSink.java | 2 +- 14 files changed, 495 insertions(+), 92 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 58af5cd3e92199..0e6d8ebf8def95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -99,17 +99,19 @@ */ public class NereidsPlanner extends Planner { public static final Logger LOG = LogManager.getLogger(NereidsPlanner.class); + + protected Plan parsedPlan; + protected Plan analyzedPlan; + protected Plan rewrittenPlan; + protected Plan optimizedPlan; + protected PhysicalPlan physicalPlan; + private CascadesContext cascadesContext; private final StatementContext statementContext; private final List scanNodeList = Lists.newArrayList(); private final List physicalRelations = Lists.newArrayList(); private DescriptorTable descTable; - private Plan parsedPlan; - private Plan analyzedPlan; - private Plan rewrittenPlan; - private Plan optimizedPlan; - private PhysicalPlan physicalPlan; private FragmentIdMapping distributedPlans; // The cost of optimized plan private double cost = 0; @@ -540,7 +542,7 @@ public Group getRoot() { return cascadesContext.getMemo().getRoot(); } - private PhysicalPlan chooseNthPlan(Group rootGroup, PhysicalProperties physicalProperties, int nthPlan) { + protected PhysicalPlan chooseNthPlan(Group rootGroup, PhysicalProperties physicalProperties, int nthPlan) { if (nthPlan <= 1) { cost = rootGroup.getLowestCostPlan(physicalProperties).orElseThrow( () -> new AnalysisException("lowestCostPlans with physicalProperties(" diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index bea5eec432b2ab..1081da89413d1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -206,6 +206,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -2056,17 +2057,22 @@ public PlanFragment visitPhysicalSetOperation( } setOperationNode.setNereidsId(setOperation.getId()); - setOperation.getRegularChildrenOutputs().stream() - .map(o -> o.stream() - .map(e -> ExpressionTranslator.translate(e, context)) - .collect(ImmutableList.toImmutableList())) - .forEach(setOperationNode::addResultExprLists); + for (List regularChildrenOutput : setOperation.getRegularChildrenOutputs()) { + Builder translateOutputs = ImmutableList.builderWithExpectedSize(regularChildrenOutput.size()); + for (SlotReference childOutput : regularChildrenOutput) { + translateOutputs.add(ExpressionTranslator.translate(childOutput, context)); + } + setOperationNode.addResultExprLists(translateOutputs.build()); + } + if (setOperation instanceof PhysicalUnion) { - ((PhysicalUnion) setOperation).getConstantExprsList().stream() - .map(l -> l.stream() - .map(e -> ExpressionTranslator.translate(e, context)) - .collect(ImmutableList.toImmutableList())) - .forEach(setOperationNode::addConstExprList); + for (List unionConsts : ((PhysicalUnion) setOperation).getConstantExprsList()) { + Builder translateConsts = ImmutableList.builderWithExpectedSize(unionConsts.size()); + for (NamedExpression unionConst : unionConsts) { + translateConsts.add(ExpressionTranslator.translate(unionConst, context)); + } + setOperationNode.addConstExprList(translateConsts.build()); + } } for (PlanFragment childFragment : childrenFragments) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 15aeca952a2556..a71d41aa4f0e4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1838,10 +1838,13 @@ public LogicalPlan visitRegularQuerySpecification(RegularQuerySpecificationConte @Override public LogicalPlan visitInlineTable(InlineTableContext ctx) { - List> values = ctx.rowConstructor().stream() - .map(this::visitRowConstructor) - .collect(ImmutableList.toImmutableList()); - return new LogicalInlineTable(values); + List rowConstructorContexts = ctx.rowConstructor(); + ImmutableList.Builder> rows + = ImmutableList.builderWithExpectedSize(rowConstructorContexts.size()); + for (RowConstructorContext rowConstructorContext : rowConstructorContexts) { + rows.add(visitRowConstructor(rowConstructorContext)); + } + return new LogicalInlineTable(rows.build()); } /** @@ -2972,9 +2975,13 @@ public Expression visitParenthesizedExpression(ParenthesizedExpressionContext ct @Override public List visitRowConstructor(RowConstructorContext ctx) { - return ctx.rowConstructorItem().stream() - .map(this::visitRowConstructorItem) - .collect(ImmutableList.toImmutableList()); + List rowConstructorItemContexts = ctx.rowConstructorItem(); + ImmutableList.Builder columns + = ImmutableList.builderWithExpectedSize(rowConstructorItemContexts.size()); + for (RowConstructorItemContext rowConstructorItemContext : rowConstructorItemContexts) { + columns.add(visitRowConstructorItem(rowConstructorItemContext)); + } + return columns.build(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java index 34646c1d657953..f6b2a504c28bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java @@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger; import java.util.BitSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -273,37 +274,40 @@ private T parse(String sql, Function parseFu private T parse(String sql, @Nullable LogicalPlanBuilder logicalPlanBuilder, Function parseFunction) { - ParserRuleContext tree = toAst(sql, parseFunction); + CommonTokenStream tokenStream = parseAllTokens(sql); + ParserRuleContext tree = toAst(tokenStream, parseFunction); LogicalPlanBuilder realLogicalPlanBuilder = logicalPlanBuilder == null - ? new LogicalPlanBuilder(getHintMap(sql, DorisParser::selectHint)) : logicalPlanBuilder; + ? new LogicalPlanBuilder(getHintMap(sql, tokenStream, DorisParser::selectHint)) + : logicalPlanBuilder; return (T) realLogicalPlanBuilder.visit(tree); } public LogicalPlan parseForCreateView(String sql) { - ParserRuleContext tree = toAst(sql, DorisParser::singleStatement); + CommonTokenStream tokenStream = parseAllTokens(sql); + ParserRuleContext tree = toAst(tokenStream, DorisParser::singleStatement); LogicalPlanBuilder realLogicalPlanBuilder = new LogicalPlanBuilderForCreateView( - getHintMap(sql, DorisParser::selectHint)); + getHintMap(sql, tokenStream, DorisParser::selectHint)); return (LogicalPlan) realLogicalPlanBuilder.visit(tree); } + /** parseForSyncMv */ public Optional parseForSyncMv(String sql) { - ParserRuleContext tree = toAst(sql, DorisParser::singleStatement); + CommonTokenStream tokenStream = parseAllTokens(sql); + ParserRuleContext tree = toAst(tokenStream, DorisParser::singleStatement); LogicalPlanBuilderForSyncMv logicalPlanBuilderForSyncMv = new LogicalPlanBuilderForSyncMv( - getHintMap(sql, DorisParser::selectHint)); + getHintMap(sql, tokenStream, DorisParser::selectHint)); logicalPlanBuilderForSyncMv.visit(tree); return logicalPlanBuilderForSyncMv.getQuerySql(); } /** get hint map */ - public static Map getHintMap(String sql, + public static Map getHintMap(String sql, CommonTokenStream hintTokenStream, Function parseFunction) { // parse hint first round - DorisLexer hintLexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql))); - CommonTokenStream hintTokenStream = new CommonTokenStream(hintLexer); - Map selectHintMap = Maps.newHashMap(); - Token hintToken = hintTokenStream.getTokenSource().nextToken(); + Iterator tokenIterator = hintTokenStream.getTokens().iterator(); + Token hintToken = tokenIterator.hasNext() ? tokenIterator.next() : null; while (hintToken != null && hintToken.getType() != DorisLexer.EOF) { if (hintToken.getChannel() == 2 && sql.charAt(hintToken.getStartIndex() + 2) == '+') { String hintSql = sql.substring(hintToken.getStartIndex() + 3, hintToken.getStopIndex() + 1); @@ -313,15 +317,19 @@ public static Map getHintMap(String sql, ParserRuleContext hintContext = parseFunction.apply(hintParser); selectHintMap.put(hintToken.getStartIndex(), hintContext); } - hintToken = hintTokenStream.getTokenSource().nextToken(); + hintToken = tokenIterator.hasNext() ? tokenIterator.next() : null; } return selectHintMap; } + public static ParserRuleContext toAst( + String sql, Function parseFunction) { + return toAst(parseAllTokens(sql), parseFunction); + } + /** toAst */ - public static ParserRuleContext toAst(String sql, Function parseFunction) { - DorisLexer lexer = new DorisLexer(new CaseInsensitiveStream(CharStreams.fromString(sql))); - CommonTokenStream tokenStream = new CommonTokenStream(lexer); + public static ParserRuleContext toAst( + CommonTokenStream tokenStream, Function parseFunction) { DorisParser parser = new DorisParser(tokenStream); parser.addParseListener(POST_PROCESSOR); @@ -352,9 +360,7 @@ public static ParserRuleContext toAst(String sql, Function parseDateLiteral(String s) /** parseDateTime */ public static Result parseDateTime(String s) { - // fast parse '2022-01-01' - if (s.length() == 10 && s.charAt(4) == '-' && s.charAt(7) == '-') { - TemporalAccessor date = fastParseDate(s); - if (date != null) { - return Result.ok(date); - } - } - String originalString = s; try { + // fast parse '2022-01-01' + if ((s.length() == 10 || s.length() == 19) && s.charAt(4) == '-' && s.charAt(7) == '-') { + if (s.length() == 10) { + TemporalAccessor date = fastParseDate(s); + if (date != null) { + return Result.ok(date); + } + } else if (s.charAt(10) == ' ' && s.charAt(13) == ':' && s.charAt(16) == ':') { + TemporalAccessor date = fastParseDateTime(s); + if (date != null) { + return Result.ok(date); + } + } + } + TemporalAccessor dateTime; // remove suffix/prefix ' ' @@ -566,6 +573,21 @@ private static TemporalAccessor fastParseDate(String date) { } } + private static TemporalAccessor fastParseDateTime(String date) { + Integer year = readNextInt(date, 0, 4); + Integer month = readNextInt(date, 5, 2); + Integer day = readNextInt(date, 8, 2); + Integer hour = readNextInt(date, 11, 2); + Integer minute = readNextInt(date, 14, 2); + Integer second = readNextInt(date, 17, 2); + + if (year != null && month != null && day != null && hour != null && minute != null && second != null) { + return LocalDateTime.of(year, month, day, hour, minute, second); + } else { + return null; + } + } + private static Integer readNextInt(String str, int offset, int readLength) { int value = 0; int realReadLength = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index b4a7a9eee3a148..a588d61a330a5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -78,7 +78,7 @@ public BatchInsertIntoTableCommand(LogicalPlan logicalQuery) { @Override public Plan getExplainPlan(ConnectContext ctx) throws Exception { - return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); + return InsertUtils.getPlanForExplain(ctx, Optional.empty(), this.logicalQuery); } @Override @@ -98,7 +98,9 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); targetTableIf.readLock(); try { - this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf, Optional.empty()); + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan( + logicalQuery, targetTableIf, Optional.empty(), Optional.empty() + ); LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 0999c4baa79e3b..f689ef02b7788f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -30,26 +30,45 @@ import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.memo.Group; +import org.apache.doris.nereids.memo.GroupId; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.analysis.BindSink; +import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysicalOlapTableSink; +import org.apache.doris.nereids.rules.rewrite.MergeProjects; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; +import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.planner.DataSink; import org.apache.doris.qe.ConnectContext; @@ -60,6 +79,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -84,6 +104,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class); private LogicalPlan logicalQuery; + private Optional analyzeContext; private Optional labelName; /** * When source it's from job scheduler,it will be set. @@ -160,8 +181,14 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec // should lock target table until we begin transaction. targetTableIf.readLock(); try { + this.analyzeContext = Optional.of( + CascadesContext.initContext(ctx.getStatementContext(), logicalQuery, PhysicalProperties.ANY) + ); + // 1. process inline table (default values, empty values) - this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf, insertCtx); + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan( + logicalQuery, targetTableIf, analyzeContext, insertCtx + ); if (cte.isPresent()) { this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } @@ -325,6 +352,10 @@ private ExecutorFactory selectInsertExecutorFactory( private BuildInsertExecutorResult planInsertExecutor( ConnectContext ctx, StmtExecutor stmtExecutor, LogicalPlanAdapter logicalPlanAdapter, TableIf targetTableIf) throws Throwable { + LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan(); + + boolean isInsertIntoValues + = logicalPlan instanceof UnboundTableSink && logicalPlan.child(0) instanceof LogicalInlineTable; // the key logical when use new coordinator: // 1. use NereidsPlanner to generate PhysicalPlan // 2. use PhysicalPlan to select InsertExecutorFactory, some InsertExecutors want to control @@ -335,10 +366,9 @@ private BuildInsertExecutorResult planInsertExecutor( // 3. NereidsPlanner use PhysicalPlan and the provided backend to generate DistributePlan // 4. ExecutorFactory use the DistributePlan to generate the NereidsSqlCoordinator and InsertExecutor - StatementContext statementContext = ctx.getStatementContext(); - AtomicReference executorFactoryRef = new AtomicReference<>(); - NereidsPlanner planner = new NereidsPlanner(statementContext) { + InsertByInlineTablePlanner planner = new InsertByInlineTablePlanner( + ctx.getStatementContext(), isInsertIntoValues) { @Override protected void doDistribute(boolean canUseNereidsDistributePlanner) { // when enter this method, the step 1 already executed @@ -374,7 +404,7 @@ public boolean isExternalTableSink() { @Override public Plan getExplainPlan(ConnectContext ctx) { - return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); + return InsertUtils.getPlanForExplain(ctx, this.analyzeContext, this.logicalQuery); } @Override @@ -449,4 +479,167 @@ public BuildInsertExecutorResult(NereidsPlanner planner, AbstractInsertExecutor this.physicalSink = physicalSink; } } + + private static class InsertByInlineTablePlanner extends NereidsPlanner { + private static final Rule toPhysicalOlapTableSink = new LogicalOlapTableSinkToPhysicalOlapTableSink() + .build(); + private final AtomicReference rootGroupRef = new AtomicReference<>(); + + private final boolean isInsertIntoValues; + + public InsertByInlineTablePlanner(StatementContext statementContext, boolean isInsertIntoValues) { + super(statementContext); + this.isInsertIntoValues = isInsertIntoValues; + } + + @Override + protected void analyze(boolean showPlanProcess) { + if (!isInsertIntoValues) { + super.analyze(showPlanProcess); + return; + } + + DefaultPlanRewriter analyzer = new DefaultPlanRewriter() { + @Override + public Plan visitUnboundTableSink( + UnboundTableSink olapTableSink, Void context) { + olapTableSink = + (UnboundTableSink) super.visitUnboundTableSink(olapTableSink, context); + + return (LogicalOlapTableSink) new BindSink() + .buildRules() + .stream() + .filter(rule -> rule.getRuleType() == RuleType.BINDING_INSERT_TARGET_TABLE) + .findFirst() + .get() + .transform(olapTableSink, getCascadesContext()) + .get(0); + } + + @Override + public Plan visitLogicalInlineTable(LogicalInlineTable logicalInlineTable, Void context) { + logicalInlineTable = + (LogicalInlineTable) super.visitLogicalInlineTable(logicalInlineTable, context); + + List outputs = Lists.newArrayList(); + for (Slot slot : logicalInlineTable.getOutput()) { + outputs.add(new SlotReference(slot.getName(), slot.getDataType(), slot.nullable())); + } + + LogicalUnion union = new LogicalUnion( + Qualifier.ALL, logicalInlineTable.getConstantExprsList(), ImmutableList.of() + ).withNewOutputs(outputs); + + return union; + } + }; + + Plan boundPlan = getCascadesContext().getRewritePlan().accept(analyzer, null); + getCascadesContext().setRewritePlan(boundPlan); + } + + @Override + protected void rewrite(boolean showPlanProcess) { + if (!isInsertIntoValues) { + super.rewrite(showPlanProcess); + return; + } + + DefaultPlanRewriter rewriter = new DefaultPlanRewriter() { + @Override + public Plan visitLogicalProject(LogicalProject project, Void context) { + project = (LogicalProject) super.visitLogicalProject(project, context); + if (project.child() instanceof LogicalProject) { + return new MergeProjects() + .build() + .transform(project, getCascadesContext()) + .get(0); + } + return project; + } + }; + getCascadesContext().setRewritePlan( + getCascadesContext().getRewritePlan().accept(rewriter, null) + ); + } + + @Override + protected void optimize() { + if (!isInsertIntoValues) { + super.optimize(); + return; + } + + DefaultPlanRewriter optimizer = new DefaultPlanRewriter() { + @Override + public Plan visitLogicalUnion(LogicalUnion logicalUnion, Void context) { + logicalUnion = (LogicalUnion) super.visitLogicalUnion(logicalUnion, context); + + return new PhysicalUnion(logicalUnion.getQualifier(), + logicalUnion.getOutputs(), + logicalUnion.getRegularChildrenOutputs(), + logicalUnion.getConstantExprsList(), + logicalUnion.getLogicalProperties(), + logicalUnion.children() + ); + } + + @Override + public Plan visitLogicalProject(LogicalProject logicalProject, Void context) { + logicalProject = + (LogicalProject) super.visitLogicalProject(logicalProject, context); + + return new PhysicalProject<>( + logicalProject.getProjects(), + logicalProject.getLogicalProperties(), + logicalProject.child() + ); + } + + @Override + public Plan visitLogicalOlapTableSink(LogicalOlapTableSink olapTableSink, + Void context) { + olapTableSink = + (LogicalOlapTableSink) super.visitLogicalOlapTableSink(olapTableSink, context); + return toPhysicalOlapTableSink + .transform(olapTableSink, getCascadesContext()) + .get(0); + } + }; + + PhysicalPlan physicalPlan = + (PhysicalPlan) getCascadesContext().getRewritePlan().accept(optimizer, null); + + super.physicalPlan = physicalPlan; + + GroupId rootGroupId = GroupId.createGenerator().getNextId(); + Group rootGroup = new Group(rootGroupId, physicalPlan.getLogicalProperties()); + rootGroupRef.set(rootGroup); + } + + @Override + public Group getRoot() { + if (!isInsertIntoValues) { + return super.getRoot(); + } + return rootGroupRef.get(); + } + + @Override + protected PhysicalPlan chooseNthPlan( + Group rootGroup, PhysicalProperties physicalProperties, int nthPlan) { + if (!isInsertIntoValues) { + return super.chooseNthPlan(rootGroup, physicalProperties, nthPlan); + } + return super.physicalPlan; + } + + @Override + protected PhysicalPlan postProcess(PhysicalPlan physicalPlan) { + if (!isInsertIntoValues) { + return super.postProcess(physicalPlan); + } + return physicalPlan; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index c89a4fc7be96ee..ed0718caf3979c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -122,7 +122,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { if (targetTableIf instanceof MTMV && !MTMVUtil.allowModifyMTMVData(ctx)) { throw new AnalysisException("Not allowed to perform current operation on async materialized view"); } - this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf, Optional.empty()); + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan( + logicalQuery, targetTableIf, Optional.empty(), Optional.empty()); if (cte.isPresent()) { this.logicalQuery = (LogicalPlan) logicalQuery.withChildren(cte.get().withChildren( this.logicalQuery.child(0))); @@ -362,7 +363,7 @@ private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor executor, lon @Override public Plan getExplainPlan(ConnectContext ctx) { - return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); + return InsertUtils.getPlanForExplain(ctx, Optional.empty(), this.logicalQuery); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 60e7e5bf805a64..271ce2e012627c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -29,26 +29,28 @@ import org.apache.doris.common.FormatOptions; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.Scope; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink; -import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.parser.LogicalPlanBuilder; import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.rules.expression.rules.FoldConstantRuleOnFE; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.DefaultValueSlot; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -56,6 +58,7 @@ import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.InsertStreamTxnExecutor; @@ -80,6 +83,7 @@ import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; @@ -260,7 +264,9 @@ private static void beginBatchInsertTransaction(ConnectContext ctx, /** * normalize plan to let it could be process correctly by nereids */ - public static Plan normalizePlan(Plan plan, TableIf table, Optional insertCtx) { + public static Plan normalizePlan(LogicalPlan plan, TableIf table, + Optional analyzeContext, + Optional insertCtx) { UnboundLogicalSink unboundLogicalSink = (UnboundLogicalSink) plan; if (table instanceof HMSExternalTable) { HMSExternalTable hiveTable = (HMSExternalTable) table; @@ -337,18 +343,41 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional oneRowRelationBuilder = ImmutableList.builder(); + ImmutableList.Builder> optimizedRowConstructors + = ImmutableList.builderWithExpectedSize(logicalInlineTable.getConstantExprsList().size()); List columns = table.getBaseSchema(false); + ConnectContext context = ConnectContext.get(); + ExpressionRewriteContext rewriteContext = null; + if (context != null && context.getStatementContext() != null) { + rewriteContext = new ExpressionRewriteContext( + CascadesContext.initContext( + context.getStatementContext(), logicalInlineTable, PhysicalProperties.ANY + ) + ); + } + + Optional analyzer = analyzeContext.map( + cascadesContext -> buildAnalyzer(plan, cascadesContext) + ); + + Boolean[] outputSlotNullables = new Boolean[logicalInlineTable.getConstantExprsList().get(0).size()]; + Arrays.fill(outputSlotNullables, false); + for (List values : logicalInlineTable.getConstantExprsList()) { - ImmutableList.Builder constantExprs = ImmutableList.builder(); + ImmutableList.Builder optimizedRowConstructor = ImmutableList.builder(); if (values.isEmpty()) { if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { throw new AnalysisException("value list should not be empty if columns are specified"); } - for (Column column : columns) { - constantExprs.add(generateDefaultExpression(column)); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + NamedExpression defaultExpression = generateDefaultExpression(column); + addColumnValue( + analyzer, optimizedRowConstructor, outputSlotNullables, i, defaultExpression + ); } } else { if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { @@ -374,10 +403,20 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional oneRowRelations = oneRowRelationBuilder.build(); - if (oneRowRelations.size() == 1) { - return plan.withChildren(oneRowRelations.get(0)); - } else { - return plan.withChildren( - LogicalPlanBuilder.reduceToLogicalPlanTree(0, oneRowRelations.size() - 1, - oneRowRelations, Qualifier.ALL)); + + return plan.withChildren(new LogicalInlineTable(optimizedRowConstructors.build(), Optional.of( + Utils.fastToImmutableList(outputSlotNullables) + ))); + } + + private static ExpressionAnalyzer buildAnalyzer(LogicalPlan plan, CascadesContext analyzeContext) { + return new ExpressionAnalyzer(plan, new Scope(ImmutableList.of()), + analyzeContext, false, false) { + @Override + public Expression visitCast(Cast cast, ExpressionRewriteContext context) { + Expression expr = super.visitCast(cast, context); + if (expr instanceof Cast) { + expr = FoldConstantRuleOnFE.evaluate(expr, context); + } + return expr; + } + }; + } + + private static void addColumnValue( + Optional analyzer, + ImmutableList.Builder optimizedRowConstructor, + Boolean[] nullable, int index, NamedExpression value) { + if (analyzer.isPresent() && !(value instanceof Alias && value.child(0) instanceof Literal)) { + ExpressionAnalyzer expressionAnalyzer = analyzer.get(); + value = (NamedExpression) expressionAnalyzer.analyze( + value, new ExpressionRewriteContext(expressionAnalyzer.getCascadesContext()) + ); } + optimizedRowConstructor.add(value); + nullable[index] |= value.nullable(); } private static Expression castValue(Expression value, DataType targetType) { if (value instanceof UnboundAlias) { - return value.withChildren(TypeCoercionUtils.castUnbound(((UnboundAlias) value).child(), targetType)); + UnboundAlias unboundAlias = (UnboundAlias) value; + return new Alias(TypeCoercionUtils.castUnbound(unboundAlias.child(), targetType)); } else { return TypeCoercionUtils.castUnbound(value, targetType); } @@ -477,8 +551,10 @@ private static NamedExpression generateDefaultExpression(Column column) { /** * get plan for explain. */ - public static Plan getPlanForExplain(ConnectContext ctx, LogicalPlan logicalQuery) { - return InsertUtils.normalizePlan(logicalQuery, InsertUtils.getTargetTable(logicalQuery, ctx), Optional.empty()); + public static Plan getPlanForExplain( + ConnectContext ctx, Optional analyzeContext, LogicalPlan logicalQuery) { + return InsertUtils.normalizePlan( + logicalQuery, InsertUtils.getTargetTable(logicalQuery, ctx), analyzeContext, Optional.empty()); } // check for insert into t1(a,b,gen_col) select 1,2,3; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java index e7b1f4d581892c..5a6736873d2a90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.planner.GroupCommitPlanner; @@ -93,8 +94,11 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Logi conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext && ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command")); conditions.add(Pair.of( - () -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion, - () -> "not one row relation or union, class: " + tableSink.child().getClass().getName())); + () -> tableSink.child() instanceof OneRowRelation + || tableSink.child() instanceof LogicalUnion + || tableSink.child() instanceof LogicalInlineTable, + () -> "not one row relation or union or inline table, class: " + + tableSink.child().getClass().getName())); ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean())); if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) { for (Pair> pair : conditions) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalInlineTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalInlineTable.java index b2a2a1d83ca3e7..b9980c03ec1e70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalInlineTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalInlineTable.java @@ -17,15 +17,18 @@ package org.apache.doris.nereids.trees.plans.logical; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; import com.google.common.collect.ImmutableList; @@ -40,15 +43,28 @@ public class LogicalInlineTable extends LogicalLeaf implements BlockFuncDepsProp private final List> constantExprsList; + private final Optional> outputNullables; + public LogicalInlineTable(List> constantExprsList) { - this(constantExprsList, Optional.empty(), Optional.empty()); + this(constantExprsList, Optional.empty(), Optional.empty(), Optional.empty()); + } + + public LogicalInlineTable(List> constantExprsList, Optional> outputNullables) { + this(constantExprsList, outputNullables, Optional.empty(), Optional.empty()); } + /** LogicalInlineTable */ public LogicalInlineTable(List> constantExprsList, + Optional> outputNullables, Optional groupExpression, Optional logicalProperties) { super(PlanType.LOGICAL_INLINE_TABLE, groupExpression, logicalProperties); - this.constantExprsList = ImmutableList.copyOf( + + if (constantExprsList.isEmpty()) { + throw new AnalysisException("constantExprsList should now be empty"); + } + this.outputNullables = Objects.requireNonNull(outputNullables, "outputNullables should not be null"); + this.constantExprsList = Utils.fastToImmutableList( Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); } @@ -63,23 +79,61 @@ public R accept(PlanVisitor visitor, C context) { @Override public List getExpressions() { - return constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); + ImmutableList.Builder expressions = ImmutableList.builderWithExpectedSize( + constantExprsList.size() * constantExprsList.get(0).size()); + + for (List namedExpressions : constantExprsList) { + expressions.addAll(namedExpressions); + } + + return expressions.build(); } @Override public Plan withGroupExpression(Optional groupExpression) { - return null; + return new LogicalInlineTable( + constantExprsList, outputNullables, groupExpression, Optional.of(getLogicalProperties()) + ); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return null; + if (!children.isEmpty()) { + throw new AnalysisException("children should not be empty"); + } + return new LogicalInlineTable(constantExprsList, outputNullables, groupExpression, logicalProperties); } @Override public List computeOutput() { - return ImmutableList.of(); + if (outputNullables.isPresent()) { + List nullables = outputNullables.get(); + int columnNum = constantExprsList.get(0).size(); + List firstRow = constantExprsList.get(0); + ImmutableList.Builder output = ImmutableList.builderWithExpectedSize(constantExprsList.size()); + for (int i = 0; i < columnNum; i++) { + NamedExpression firstRowColumn = firstRow.get(i); + output.add(new SlotReference(firstRowColumn.getName(), firstRowColumn.getDataType(), nullables.get(i))); + } + return output.build(); + } else { + int columnNum = constantExprsList.get(0).size(); + List firstRow = constantExprsList.get(0); + ImmutableList.Builder output = ImmutableList.builderWithExpectedSize(constantExprsList.size()); + for (int i = 0; i < columnNum; i++) { + NamedExpression firstRowColumn = firstRow.get(i); + boolean nullable = false; + for (List row : constantExprsList) { + if (row.get(i).nullable()) { + nullable = true; + break; + } + } + output.add(new SlotReference(firstRowColumn.getName(), firstRowColumn.getDataType(), nullable)); + } + return output.build(); + } } @Override @@ -98,4 +152,10 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(constantExprsList); } + + @Override + public String toString() { + return Utils.toSqlString("LogicalInlineTable[" + id.asInt() + "] (rowNum=" + + constantExprsList.size() + ")"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalUnion.java index 459044100b632d..89cd874c8ab082 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalUnion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalUnion.java @@ -205,6 +205,14 @@ public void computeUniform(DataTrait.Builder builder) { // don't propagate uniform slots } + @Override + public boolean hasUnboundExpression() { + if (!constantExprsList.isEmpty() && children.isEmpty()) { + return false; + } + return super.hasUnboundExpression(); + } + private List mapSlotToIndex(Plan plan, List> equalSlotsList) { Map slotToIndex = new HashMap<>(); for (int i = 0; i < plan.getOutput().size(); i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index efe0a03e3708b4..90e92ca1ae2597 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -178,7 +178,7 @@ public int hashCode() { @Override public String toString() { - return Utils.toSqlString("LogicalOlapTableSink[" + id.asInt() + "]", + return Utils.toSqlString("PhysicalOlapTableSink[" + id.asInt() + "]", "outputExprs", outputExprs, "database", database.getFullName(), "targetTable", targetTable.getName(),