Skip to content

Commit

Permalink
branch-3.0: [fix](nereids) fix sql cache bug and some tests #46443 (#…
Browse files Browse the repository at this point in the history
…46506)

1. use retry to fix unstable test `colocate_union_numbers`, `prune_bucket_with_bucket_shuffle_join`
2. fix sql cache bug which use stale cache after drop table and create(table id changed), test in `parse_sql_from_sql_cache`, introduced by #33262
3. regression test add `foreachFrontends`, `foreachBackends`, `retry` function
  • Loading branch information
github-actions[bot] authored Jan 7, 2025
1 parent bf87821 commit 6551775
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 833 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.util.DebugUtil;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
import org.apache.doris.nereids.SqlCacheContext.TableVersion;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundVariable;
import org.apache.doris.nereids.parser.NereidsParser;
Expand Down Expand Up @@ -196,14 +198,14 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));

if (sqlCacheContextWithVariable != null) {
return tryParseSqlWithoutCheckVariable(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
return tryParseSql(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity, true
);
} else {
return Optional.empty();
}
} else {
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
return tryParseSql(connectContext, key, sqlCacheContext, currentUserIdentity, false);
}
}

Expand All @@ -220,9 +222,9 @@ private String normalizeSql(String sql) {
return NereidsParser.removeCommentAndTrimBlank(sql);
}

private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
ConnectContext connectContext, String key,
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
private Optional<LogicalSqlCache> tryParseSql(
ConnectContext connectContext, String key, SqlCacheContext sqlCacheContext,
UserIdentity currentUserIdentity, boolean checkUserVariable) {
Env env = connectContext.getEnv();

if (!tryLockTables(connectContext, env, sqlCacheContext)) {
Expand Down Expand Up @@ -256,8 +258,12 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
try {
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();

List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext);
List<Variable> currentVariables = ImmutableList.of();
if (checkUserVariable) {
currentVariables = resolveUserVariables(sqlCacheContext);
}
boolean usedVariablesChanged
= checkUserVariable && usedVariablesChanged(currentVariables, sqlCacheContext);
if (resultSetInFe.isPresent() && !usedVariablesChanged) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

Expand All @@ -271,9 +277,15 @@ private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
}

Status status = new Status();
PUniqueId cacheKeyMd5 = usedVariablesChanged
? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))
: sqlCacheContext.getOrComputeCacheKeyMd5();

PUniqueId cacheKeyMd5;
if (usedVariablesChanged) {
invalidateCache(key);
cacheKeyMd5 = sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables));
} else {
cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
}

InternalService.PFetchCacheResult cacheData =
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
Expand Down Expand Up @@ -305,20 +317,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
return true;
}

for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
// the query maybe scan empty partition of the table, we should check these table version too,
// but the table not exists in sqlCacheContext.getScanTables(), so we need check here.
// check table type and version
for (Entry<FullTableName, TableVersion> scanTable : sqlCacheContext.getUsedTables().entrySet()) {
TableVersion tableVersion = scanTable.getValue();
if (tableVersion.type != TableType.OLAP) {
return true;
}
TableIf tableIf = findTableIf(env, scanTable.getKey());
if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) {
return true;
}

OlapTable olapTable = (OlapTable) tableIf;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = scanTable.latestVersion;
long cacheTableVersion = tableVersion.version;
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableVersion != cacheTableVersion) {
return true;
}
}

// check partition version
for (ScanTable scanTable : sqlCacheContext.getScanTables()) {
FullTableName fullTableName = scanTable.fullTableName;
TableIf tableIf = findTableIf(env, fullTableName);
if (!(tableIf instanceof OlapTable)) {
return true;
}
OlapTable olapTable = (OlapTable) tableIf;
Collection<Long> partitionIds = scanTable.getScanPartitions();
olapTable.getVersionInBatchForCloudMode(partitionIds);

Expand Down Expand Up @@ -389,7 +417,7 @@ private boolean dataMaskPoliciesChanged(
*/
private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) {
StatementContext currentStatementContext = connectContext.getStatementContext();
for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) {
for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) {
TableIf tableIf = findTableIf(env, fullTableName);
if (tableIf == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.FieldInfo;
Expand All @@ -41,6 +43,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -61,7 +64,8 @@ public class SqlCacheContext {
private volatile long latestPartitionTime = -1;
private volatile long latestPartitionVersion = -1;
private volatile long sumOfPartitionNum = -1;
private final Set<FullTableName> usedTables = Sets.newLinkedHashSet();
// value: version of table
private final Map<FullTableName, TableVersion> usedTables = Maps.newLinkedHashMap();
// value: ddl sql
private final Map<FullTableName, String> usedViews = Maps.newLinkedHashMap();
// value: usedColumns
Expand Down Expand Up @@ -135,8 +139,13 @@ public synchronized void addUsedTable(TableIf tableIf) {
return;
}

usedTables.add(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName())
usedTables.put(
new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()),
new TableVersion(
tableIf.getId(),
tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L,
tableIf.getType()
)
);
}

Expand Down Expand Up @@ -282,8 +291,8 @@ public void setCacheProxy(CacheProxy cacheProxy) {
this.cacheProxy = cacheProxy;
}

public Set<FullTableName> getUsedTables() {
return ImmutableSet.copyOf(usedTables);
public Map<FullTableName, TableVersion> getUsedTables() {
return Collections.unmodifiableMap(usedTables);
}

public Map<FullTableName, String> getUsedViews() {
Expand Down Expand Up @@ -458,6 +467,15 @@ public void addScanPartition(Long partitionId) {
}
}

/** TableVersion */
@lombok.Data
@lombok.AllArgsConstructor
public static class TableVersion {
public final long id;
public final long version;
public final TableType type;
}

/** CacheKeyType */
public enum CacheKeyType {
// use `userIdentity`:`sql`.trim() as Cache key in FE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta
this.sqlCacheContext = new SqlCacheContext(
connectContext.getCurrentUserIdentity(), connectContext.queryId());
if (originStatement != null) {
this.sqlCacheContext.setOriginSql(originStatement.originStmt.trim());
this.sqlCacheContext.setOriginSql(originStatement.originStmt);
}
} else {
this.sqlCacheContext = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ public List<Plan> getAncestors() {
}

public void updateActualRowCount(long actualRowCount) {
statistics.setActualRowCount(actualRowCount);
if (statistics != null) {
statistics.setActualRowCount(actualRowCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.google.common.collect.Maps
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.Uninterruptibles
import com.google.gson.Gson
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
Expand All @@ -37,7 +38,6 @@ import org.apache.doris.regression.RegressionTest
import org.apache.doris.regression.action.BenchmarkAction
import org.apache.doris.regression.action.ProfileAction
import org.apache.doris.regression.action.WaitForAction
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.OutputUtils
import org.apache.doris.regression.action.CreateMVAction
import org.apache.doris.regression.action.ExplainAction
Expand All @@ -59,13 +59,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.sql.Connection
import java.io.File
import java.math.BigDecimal;
import java.sql.PreparedStatement
import java.sql.ResultSetMetaData
import java.util.Map;
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ThreadFactory
Expand Down Expand Up @@ -700,6 +694,23 @@ class Suite implements GroovyInterceptable {
return sql
}

<T> T retry(int executeTimes = 3, int intervalMillis = 1000, Closure<Integer> closure) {
Throwable throwable = null
for (int i = 1; i <= executeTimes; ++i) {
try {
return closure(i) as T
} catch (Throwable t) {
logger.warn("Retry failed: $t", t)
throwable = t
Uninterruptibles.sleepUninterruptibly(intervalMillis, TimeUnit.MILLISECONDS)
}
}
if (throwable != null) {
throw throwable
}
return null
}

void explain(Closure actionSupplier) {
if (context.useArrowFlightSql()) {
runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier)
Expand Down Expand Up @@ -1039,6 +1050,20 @@ class Suite implements GroovyInterceptable {
}
}

void foreachFrontends(Closure action) {
def rows = sql_return_maparray("show frontends")
for (def row in rows) {
action(row)
}
}

void foreachBackends(Closure action) {
def rows = sql_return_maparray("show backends")
for (def row in rows) {
action(row)
}
}

List<String> getFrontendIpHttpPort() {
return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort };
}
Expand Down
Loading

0 comments on commit 6551775

Please sign in to comment.