Skip to content

Commit

Permalink
remove tables in cascade context
Browse files Browse the repository at this point in the history
  • Loading branch information
LiBinfeng-01 committed Nov 27, 2024
1 parent f20f800 commit 66e7c45
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 173 deletions.
175 changes: 5 additions & 170 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@

package org.apache.doris.nereids;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.jobs.JobContext;
Expand Down Expand Up @@ -54,13 +47,8 @@
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.ColumnStatistic;
Expand Down Expand Up @@ -407,159 +395,6 @@ private CascadesContext execute(Job job) {
return this;
}

/**
* Extract tables.
*/
public void extractTables(LogicalPlan logicalPlan) {
Set<List<String>> tableNames = getTables(logicalPlan);
tables = Maps.newHashMap();
for (List<String> tableName : tableNames) {
try {
TableIf table = getTable(tableName);
tables.put(table.getFullQualifiers(), table);
} catch (Throwable e) {
// IGNORE
}
}

}

public Map<List<String>, TableIf> getTables() {
if (tables == null) {
return null;
} else {
return tables;
}
}

private Set<List<String>> getTables(LogicalPlan logicalPlan) {
final Set<List<String>> tableNames = new HashSet<>();
logicalPlan.foreach(p -> {
if (p instanceof LogicalFilter) {
tableNames.addAll(extractTableNamesFromFilter((LogicalFilter<?>) p));
} else if (p instanceof LogicalCTE) {
tableNames.addAll(extractTableNamesFromCTE((LogicalCTE<?>) p));
} else if (p instanceof LogicalProject) {
tableNames.addAll(extractTableNamesFromProject((LogicalProject<?>) p));
} else if (p instanceof LogicalHaving) {
tableNames.addAll(extractTableNamesFromHaving((LogicalHaving<?>) p));
} else if (p instanceof UnboundOneRowRelation) {
tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation) p));
} else {
Set<LogicalPlan> logicalPlans = p.collect(
n -> (n instanceof UnboundRelation || n instanceof UnboundTableSink));
for (LogicalPlan plan : logicalPlans) {
if (plan instanceof UnboundRelation) {
tableNames.add(((UnboundRelation) plan).getNameParts());
} else if (plan instanceof UnboundTableSink) {
tableNames.add(((UnboundTableSink<?>) plan).getNameParts());
} else {
throw new AnalysisException("get tables from plan failed. meet unknown type node " + plan);
}
}
}
});
return tableNames;
}

private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> having) {
Set<SubqueryExpr> subqueryExprs = having.getPredicate()
.collect(SubqueryExpr.class::isInstance);
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}

private Set<List<String>> extractTableNamesFromOneRowRelation(UnboundOneRowRelation oneRowRelation) {
Set<SubqueryExpr> subqueryExprs = oneRowRelation.getProjects().stream()
.<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance))
.flatMap(Set::stream)
.collect(Collectors.toSet());
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}

private Set<List<String>> extractTableNamesFromProject(LogicalProject<?> project) {
Set<SubqueryExpr> subqueryExprs = project.getProjects().stream()
.<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance))
.flatMap(Set::stream)
.collect(Collectors.toSet());
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}

private Set<List<String>> extractTableNamesFromFilter(LogicalFilter<?> filter) {
Set<SubqueryExpr> subqueryExprs = filter.getPredicate()
.collect(SubqueryExpr.class::isInstance);
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}

private Set<List<String>> extractTableNamesFromCTE(LogicalCTE<?> cte) {
List<LogicalSubQueryAlias<Plan>> subQueryAliases = cte.getAliasQueries();
Set<List<String>> tableNames = new HashSet<>();
for (LogicalSubQueryAlias<Plan> subQueryAlias : subQueryAliases) {
tableNames.addAll(getTables(subQueryAlias));
}
return tableNames;
}

private TableIf getTable(List<String> nameParts) {
switch (nameParts.size()) {
case 1: { // table
String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName();
String dbName = getConnectContext().getDatabase();
return getTable(ctlName, dbName, nameParts.get(0), getConnectContext().getEnv());
}
case 2: { // db.table
String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName();
String dbName = nameParts.get(0);
return getTable(ctlName, dbName, nameParts.get(1), getConnectContext().getEnv());
}
case 3: { // catalog.db.table
return getTable(nameParts.get(0), nameParts.get(1), nameParts.get(2), getConnectContext().getEnv());
}
default:
throw new IllegalStateException("Table name [" + String.join(".", nameParts) + "] is invalid.");
}
}

/**
* Find table from catalog.
*/
public TableIf getTable(String ctlName, String dbName, String tableName, Env env) {
CatalogIf catalog = env.getCatalogMgr().getCatalog(ctlName);
if (catalog == null) {
throw new RuntimeException("Catalog [" + ctlName + "] does not exist.");
}
DatabaseIf db = catalog.getDbNullable(dbName);
if (db == null) {
throw new RuntimeException("Database [" + dbName + "] does not exist in catalog [" + ctlName + "].");
}

TableIf table = db.getTableNullable(tableName);
if (table == null) {
throw new RuntimeException("Table [" + tableName + "] does not exist in database [" + dbName + "].");
}
return table;

}

/**
* Used to lock table
*/
Expand All @@ -573,12 +408,12 @@ public static class Lock implements AutoCloseable {
*/
public Lock(LogicalPlan plan, CascadesContext cascadesContext) {
this.cascadesContext = cascadesContext;
// tables can also be load from dump file
if (cascadesContext.getTables() == null || cascadesContext.getTables().isEmpty()) {
cascadesContext.extractTables(plan);
cascadesContext.getStatementContext().setTables(cascadesContext.getTables());
// if tables loaded from dump file, it is not empty here
if (cascadesContext.getStatementContext().getTables() == null
|| cascadesContext.getStatementContext().getTables().isEmpty()) {
cascadesContext.getStatementContext().extractTables(plan);
}
for (TableIf table : cascadesContext.tables.values()) {
for (TableIf table : cascadesContext.getStatementContext().getTables().values()) {
if (!table.needReadLockWhenPlan()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ protected Plan planWithoutLock(
analyze(showAnalyzeProcess(explainLevel, showPlanProcess));
// minidump of input must be serialized first, this process ensure minidump string not null
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getStatementContext().getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 66e7c45

Please sign in to comment.