Skip to content

Commit

Permalink
[feat](nereids)set runtime filter wait time according to table row co…
Browse files Browse the repository at this point in the history
…unt and table type #42640 branch-3.0 (#44031)

pick #42640
  • Loading branch information
englefly authored Nov 29, 2024
1 parent 0ded2d9 commit ec09d89
Show file tree
Hide file tree
Showing 19 changed files with 87 additions and 94 deletions.
2 changes: 1 addition & 1 deletion fe/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
Expand All @@ -73,6 +75,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -279,6 +282,8 @@ private Plan planWithoutLock(
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
}

setRuntimeFilterWaitTimeByTableRowCountAndType();

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down Expand Up @@ -315,6 +320,45 @@ private LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

/**
* compute rf wait time according to max table row count, if wait time is not default value
* olap:
* row < 1G: 1 sec
* 1G <= row < 10G: 5 sec
* 10G < row: 20 sec
* external:
* row < 1G: 5 sec
* 1G <= row < 10G: 10 sec
* 10G < row: 50 sec
*/
private void setRuntimeFilterWaitTimeByTableRowCountAndType() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs()
!= VariableMgr.getDefaultSessionVariable().getRuntimeFilterWaitTimeMs()) {
List<LogicalCatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalCatalogRelation.class::isInstance);
double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext);
boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan));
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
if (hasExternalTable) {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "50000");
}
} else {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "1000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
}
}
}
}

private void initCascadesContext(LogicalPlan plan, PhysicalProperties requireProperties) {
cascadesContext = CascadesContext.initContext(statementContext, plan, requireProperties);
if (statementContext.getConnectContext().getTables() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.DdlException;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -62,11 +61,9 @@ public Rule build() {
if (hintName.equalsIgnoreCase("SET_VAR")) {
setVar((SelectHintSetVar) hint.getValue(), ctx.statementContext);
} else if (hintName.equalsIgnoreCase("ORDERED")) {
try {
ctx.cascadesContext.getConnectContext().getSessionVariable()
.disableNereidsJoinReorderOnce();
} catch (DdlException e) {
throw new RuntimeException(e);
if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true")) {
throw new RuntimeException("set DISABLE_JOIN_REORDER=true once failed");
}
OrderedHint ordered = new OrderedHint("Ordered");
ordered.setStatus(Hint.HintStatus.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
Expand Down Expand Up @@ -130,6 +131,7 @@
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
Expand Down Expand Up @@ -211,6 +213,25 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
*
* get the max row count of tables used in a query
*/
public static double getMaxTableRowCount(List<LogicalCatalogRelation> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
double max = -1;
for (LogicalCatalogRelation scan : scans) {
double row;
if (scan instanceof LogicalOlapScan) {
row = calculator.getOlapTableRowCount((LogicalOlapScan) scan);
} else {
row = scan.getTable().getRowCount();
}
max = Math.max(row, max);
}
return max;
}

/**
* disable join reorder if
* 1. any table rowCount is not available, or
Expand All @@ -237,7 +258,8 @@ public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRela
Optional<String> reason = calculator.checkNdvValidation((OlapScan) scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
ConnectContext.get().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true");
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
List<String> ctasCols = createTableInfo.getCtasColumns();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
if (ctasCols == null) {
// we should analyze the plan firstly to get the columns' name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
Expand Down
32 changes: 12 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4200,27 +4200,19 @@ public void setEnableStrictConsistencyDml(boolean value) {
this.enableStrictConsistencyDml = value;
}

public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new StringLiteral("false")));
}

public void disableConstantFoldingByBEOnce() throws DdlException {
if (!enableFoldConstantByBe) {
return;
/**
*
* @return true iff set success
*/
public boolean setVarOnce(String varName, String value) {
try {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(varName, new StringLiteral(value)));
return true;
} catch (DdlException e) {
LOG.warn("set onece {} = {} failed", varName, value);
return false;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, new StringLiteral("false")));
}

public void disableNereidsJoinReorderOnce() throws DdlException {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(SessionVariable.DISABLE_JOIN_REORDER, new StringLiteral("true")));
}

// return number of variables by given variable annotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3601,7 +3601,7 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
try {
try {
// disable shuffle for http stream (only 1 sink)
sessionVariable.disableStrictConsistencyDmlOnce();
sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false");
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
Expand Down
31 changes: 0 additions & 31 deletions tools/tpcds-tools/bin/run-tpcds-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,15 @@ fi
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpcds sf 1 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpcds sf 100 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpcds sf 1000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpcds sf 10000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -123,32 +119,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
rm -f "${backup_session_variables_file}"
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}='${v}';" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}")
}
clean_up() {
echo "restore session variables:"
cat "${backup_session_variables_file}"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};"
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCDS_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -205,5 +176,3 @@ done
echo "Total cold run time: ${cold_run_sum} ms"
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpcds queries.'

clean_up
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf10000.sql

This file was deleted.

28 changes: 0 additions & 28 deletions tools/tpch-tools/bin/run-tpch-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,15 @@ done
if [[ "${HELP}" -eq 1 ]]; then
usage
fi

TPCH_QUERIES_DIR="${CURDIR}/../queries"
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpch sf 1 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpch sf 100 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpch sf 1000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpch sf 10000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -120,26 +115,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}=${v};" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}")
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCH_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -197,7 +173,3 @@ echo "Total cold run time: ${cold_run_sum} ms"
# tpch 流水线依赖这个'Total hot run time'字符串
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpch queries.'

echo "Restore session variables"
run_sql "source ${backup_session_variables_file};"
rm -f "${backup_session_variables_file}"
Empty file.
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf10000.sql

This file was deleted.

0 comments on commit ec09d89

Please sign in to comment.