Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](nereids)set runtime filter wait time according to table row count and table type #42640 branch-3.0 #44031

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fe/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
Expand All @@ -73,6 +75,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -279,6 +282,8 @@ private Plan planWithoutLock(
disableJoinReorderReason.ifPresent(statementContext::setDisableJoinReorderReason);
}

setRuntimeFilterWaitTimeByTableRowCountAndType();

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
Expand Down Expand Up @@ -315,6 +320,45 @@ private LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

/**
* compute rf wait time according to max table row count, if wait time is not default value
* olap:
* row < 1G: 1 sec
* 1G <= row < 10G: 5 sec
* 10G < row: 20 sec
* external:
* row < 1G: 5 sec
* 1G <= row < 10G: 10 sec
* 10G < row: 50 sec
*/
private void setRuntimeFilterWaitTimeByTableRowCountAndType() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().getRuntimeFilterWaitTimeMs()
!= VariableMgr.getDefaultSessionVariable().getRuntimeFilterWaitTimeMs()) {
List<LogicalCatalogRelation> scans = cascadesContext.getRewritePlan()
.collectToList(LogicalCatalogRelation.class::isInstance);
double maxRow = StatsCalculator.getMaxTableRowCount(scans, cascadesContext);
boolean hasExternalTable = scans.stream().anyMatch(scan -> !(scan instanceof LogicalOlapScan));
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
if (hasExternalTable) {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "50000");
}
} else {
if (maxRow < 1_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "1000");
} else if (maxRow < 10_000_000_000L) {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "5000");
} else {
sessionVariable.setVarOnce(SessionVariable.RUNTIME_FILTER_WAIT_TIME_MS, "20000");
}
}
}
}

private void initCascadesContext(LogicalPlan plan, PhysicalProperties requireProperties) {
cascadesContext = CascadesContext.initContext(statementContext, plan, requireProperties);
if (statementContext.getConnectContext().getTables() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.DdlException;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -62,11 +61,9 @@ public Rule build() {
if (hintName.equalsIgnoreCase("SET_VAR")) {
setVar((SelectHintSetVar) hint.getValue(), ctx.statementContext);
} else if (hintName.equalsIgnoreCase("ORDERED")) {
try {
ctx.cascadesContext.getConnectContext().getSessionVariable()
.disableNereidsJoinReorderOnce();
} catch (DdlException e) {
throw new RuntimeException(e);
if (!ctx.cascadesContext.getConnectContext().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true")) {
throw new RuntimeException("set DISABLE_JOIN_REORDER=true once failed");
}
OrderedHint ordered = new OrderedHint("Ordered");
ordered.setStatus(Hint.HintStatus.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
Expand Down Expand Up @@ -130,6 +131,7 @@
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.PlanUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
Expand Down Expand Up @@ -211,6 +213,25 @@ public Map<String, ColumnStatistic> getTotalColumnStatisticMap() {
return totalColumnStatisticMap;
}

/**
*
* get the max row count of tables used in a query
*/
public static double getMaxTableRowCount(List<LogicalCatalogRelation> scans, CascadesContext context) {
StatsCalculator calculator = new StatsCalculator(context);
double max = -1;
for (LogicalCatalogRelation scan : scans) {
double row;
if (scan instanceof LogicalOlapScan) {
row = calculator.getOlapTableRowCount((LogicalOlapScan) scan);
} else {
row = scan.getTable().getRowCount();
}
max = Math.max(row, max);
}
return max;
}

/**
* disable join reorder if
* 1. any table rowCount is not available, or
Expand All @@ -237,7 +258,8 @@ public static Optional<String> disableJoinReorderIfStatsInvalid(List<CatalogRela
Optional<String> reason = calculator.checkNdvValidation((OlapScan) scan, rowCount);
if (reason.isPresent()) {
try {
ConnectContext.get().getSessionVariable().disableNereidsJoinReorderOnce();
ConnectContext.get().getSessionVariable()
.setVarOnce(SessionVariable.DISABLE_JOIN_REORDER, "true");
LOG.info("disable join reorder since col stats invalid: "
+ reason.get());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
List<String> ctasCols = createTableInfo.getCtasColumns();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
if (ctasCols == null) {
// we should analyze the plan firstly to get the columns' name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
Expand Down
32 changes: 12 additions & 20 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4195,27 +4195,19 @@ public void setEnableStrictConsistencyDml(boolean value) {
this.enableStrictConsistencyDml = value;
}

public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, new StringLiteral("false")));
}

public void disableConstantFoldingByBEOnce() throws DdlException {
if (!enableFoldConstantByBe) {
return;
/**
*
* @return true iff set success
*/
public boolean setVarOnce(String varName, String value) {
try {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(varName, new StringLiteral(value)));
return true;
} catch (DdlException e) {
LOG.warn("set onece {} = {} failed", varName, value);
return false;
}
setIsSingleSetVar(true);
VariableMgr.setVar(this,
new SetVar(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, new StringLiteral("false")));
}

public void disableNereidsJoinReorderOnce() throws DdlException {
setIsSingleSetVar(true);
VariableMgr.setVar(this, new SetVar(SessionVariable.DISABLE_JOIN_REORDER, new StringLiteral("true")));
}

// return number of variables by given variable annotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3599,7 +3599,7 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
try {
try {
// disable shuffle for http stream (only 1 sink)
sessionVariable.disableStrictConsistencyDmlOnce();
sessionVariable.setVarOnce(SessionVariable.ENABLE_STRICT_CONSISTENCY_DML, "false");
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
Expand Down
31 changes: 0 additions & 31 deletions tools/tpcds-tools/bin/run-tpcds-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,15 @@ fi
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpcds sf 1 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpcds sf 100 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf100"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpcds sf 1000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf1000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpcds sf 10000 queries"
TPCDS_QUERIES_DIR="${CURDIR}/../queries/sf10000"
TPCDS_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -123,32 +119,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
rm -f "${backup_session_variables_file}"
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}='${v}';" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCDS_OPT_CONF}")
}
clean_up() {
echo "restore session variables:"
cat "${backup_session_variables_file}"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"source ${backup_session_variables_file};"
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCDS_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -205,5 +176,3 @@ done
echo "Total cold run time: ${cold_run_sum} ms"
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpcds queries.'

clean_up
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpcds-tools/conf/opt/opt_sf10000.sql

This file was deleted.

28 changes: 0 additions & 28 deletions tools/tpch-tools/bin/run-tpch-queries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,15 @@ done
if [[ "${HELP}" -eq 1 ]]; then
usage
fi

TPCH_QUERIES_DIR="${CURDIR}/../queries"
if [[ ${SCALE_FACTOR} -eq 1 ]]; then
echo "Running tpch sf 1 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1.sql"
elif [[ ${SCALE_FACTOR} -eq 100 ]]; then
echo "Running tpch sf 100 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf100.sql"
elif [[ ${SCALE_FACTOR} -eq 1000 ]]; then
echo "Running tpch sf 1000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf1000.sql"
elif [[ ${SCALE_FACTOR} -eq 10000 ]]; then
echo "Running tpch sf 10000 queries"
TPCH_OPT_CONF="${CURDIR}/../conf/opt/opt_sf10000.sql"
else
echo "${SCALE_FACTOR} scale is NOT support currently."
exit 1
Expand Down Expand Up @@ -120,26 +115,7 @@ run_sql() {
echo "$*"
mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e "$*"
}
get_session_variable() {
k="$1"
v=$(mysql -h"${FE_HOST}" -u"${USER}" -P"${FE_QUERY_PORT}" -D"${DB}" -e"show variables like '${k}'\G" | grep " Value: ")
echo "${v/*Value: /}"
}
backup_session_variables_file="${CURDIR}/../conf/opt/backup_session_variables.sql"
backup_session_variables() {
touch "${backup_session_variables_file}"
while IFS= read -r line; do
k="${line/set global /}"
k="${k%=*}"
v=$(get_session_variable "${k}")
echo "set global ${k}=${v};" >>"${backup_session_variables_file}"
done < <(grep -v '^ *#' <"${TPCH_OPT_CONF}")
}
backup_session_variables

echo '============================================'
echo "Optimize session variables"
run_sql "source ${TPCH_OPT_CONF};"
echo '============================================'
run_sql "show variables;"
echo '============================================'
Expand Down Expand Up @@ -197,7 +173,3 @@ echo "Total cold run time: ${cold_run_sum} ms"
# tpch 流水线依赖这个'Total hot run time'字符串
echo "Total hot run time: ${best_hot_run_sum} ms"
echo 'Finish tpch queries.'

echo "Restore session variables"
run_sql "source ${backup_session_variables_file};"
rm -f "${backup_session_variables_file}"
Empty file.
Empty file.
Empty file.
1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf1000.sql

This file was deleted.

1 change: 0 additions & 1 deletion tools/tpch-tools/conf/opt/opt_sf10000.sql

This file was deleted.