From 65517754887e011e8502dcb9df53770b659d730d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:29:06 +0800 Subject: [PATCH] branch-3.0: [fix](nereids) fix sql cache bug and some tests #46443 (#46506) 1. use retry to fix unstable test `colocate_union_numbers`, `prune_bucket_with_bucket_shuffle_join` 2. fix sql cache bug which use stale cache after drop table and create(table id changed), test in `parse_sql_from_sql_cache`, introduced by #33262 3. regression test add `foreachFrontends`, `foreachBackends`, `retry` function --- .../doris/common/NereidsSqlCacheManager.java | 62 +- .../apache/doris/nereids/SqlCacheContext.java | 28 +- .../doris/nereids/StatementContext.java | 2 +- .../nereids/trees/plans/AbstractPlan.java | 4 +- .../doris/regression/suite/Suite.groovy | 39 +- .../cache/parse_sql_from_sql_cache.groovy | 1536 ++++++++--------- .../distribute/colocate_union_numbers.groovy | 28 +- ...une_bucket_with_bucket_shuffle_join.groovy | 28 +- .../suites/nereids_syntax_p0/explain.groovy | 3 +- 9 files changed, 897 insertions(+), 833 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index 86a2b875a93d68..5434a185cd0b15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.View; import org.apache.doris.common.ConfigBase.DefaultConfHandler; import org.apache.doris.common.util.DebugUtil; @@ -36,6 +37,7 @@ import org.apache.doris.nereids.SqlCacheContext.FullColumnName; import org.apache.doris.nereids.SqlCacheContext.FullTableName; import org.apache.doris.nereids.SqlCacheContext.ScanTable; +import org.apache.doris.nereids.SqlCacheContext.TableVersion; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundVariable; import org.apache.doris.nereids.parser.NereidsParser; @@ -196,14 +198,14 @@ public Optional tryParseSql(ConnectContext connectContext, Stri .getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5)); if (sqlCacheContextWithVariable != null) { - return tryParseSqlWithoutCheckVariable( - connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity + return tryParseSql( + connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity, true ); } else { return Optional.empty(); } } else { - return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity); + return tryParseSql(connectContext, key, sqlCacheContext, currentUserIdentity, false); } } @@ -220,9 +222,9 @@ private String normalizeSql(String sql) { return NereidsParser.removeCommentAndTrimBlank(sql); } - private Optional tryParseSqlWithoutCheckVariable( - ConnectContext connectContext, String key, - SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) { + private Optional tryParseSql( + ConnectContext connectContext, String key, SqlCacheContext sqlCacheContext, + UserIdentity currentUserIdentity, boolean checkUserVariable) { Env env = connectContext.getEnv(); if (!tryLockTables(connectContext, env, sqlCacheContext)) { @@ -256,8 +258,12 @@ private Optional tryParseSqlWithoutCheckVariable( try { Optional resultSetInFe = sqlCacheContext.getResultSetInFe(); - List currentVariables = resolveUserVariables(sqlCacheContext); - boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext); + List currentVariables = ImmutableList.of(); + if (checkUserVariable) { + currentVariables = resolveUserVariables(sqlCacheContext); + } + boolean usedVariablesChanged + = checkUserVariable && usedVariablesChanged(currentVariables, sqlCacheContext); if (resultSetInFe.isPresent() && !usedVariablesChanged) { MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); @@ -271,9 +277,15 @@ private Optional tryParseSqlWithoutCheckVariable( } Status status = new Status(); - PUniqueId cacheKeyMd5 = usedVariablesChanged - ? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)) - : sqlCacheContext.getOrComputeCacheKeyMd5(); + + PUniqueId cacheKeyMd5; + if (usedVariablesChanged) { + invalidateCache(key); + cacheKeyMd5 = sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)); + } else { + cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5(); + } + InternalService.PFetchCacheResult cacheData = SqlCache.getCacheData(sqlCacheContext.getCacheProxy(), cacheKeyMd5, sqlCacheContext.getLatestPartitionId(), @@ -305,20 +317,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) { return true; } - for (ScanTable scanTable : sqlCacheContext.getScanTables()) { - FullTableName fullTableName = scanTable.fullTableName; - TableIf tableIf = findTableIf(env, fullTableName); - if (!(tableIf instanceof OlapTable)) { + // the query maybe scan empty partition of the table, we should check these table version too, + // but the table not exists in sqlCacheContext.getScanTables(), so we need check here. + // check table type and version + for (Entry scanTable : sqlCacheContext.getUsedTables().entrySet()) { + TableVersion tableVersion = scanTable.getValue(); + if (tableVersion.type != TableType.OLAP) { + return true; + } + TableIf tableIf = findTableIf(env, scanTable.getKey()); + if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) { return true; } + OlapTable olapTable = (OlapTable) tableIf; long currentTableVersion = olapTable.getVisibleVersion(); - long cacheTableVersion = scanTable.latestVersion; + long cacheTableVersion = tableVersion.version; // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? if (currentTableVersion != cacheTableVersion) { return true; } + } + // check partition version + for (ScanTable scanTable : sqlCacheContext.getScanTables()) { + FullTableName fullTableName = scanTable.fullTableName; + TableIf tableIf = findTableIf(env, fullTableName); + if (!(tableIf instanceof OlapTable)) { + return true; + } + OlapTable olapTable = (OlapTable) tableIf; Collection partitionIds = scanTable.getScanPartitions(); olapTable.getVersionInBatchForCloudMode(partitionIds); @@ -389,7 +417,7 @@ private boolean dataMaskPoliciesChanged( */ private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) { StatementContext currentStatementContext = connectContext.getStatementContext(); - for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) { + for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) { TableIf tableIf = findTableIf(env, fullTableName); if (tableIf == null) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index 29be4af41a7675..658317191f4ec3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -20,7 +20,9 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.Pair; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.FieldInfo; @@ -41,6 +43,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -61,7 +64,8 @@ public class SqlCacheContext { private volatile long latestPartitionTime = -1; private volatile long latestPartitionVersion = -1; private volatile long sumOfPartitionNum = -1; - private final Set usedTables = Sets.newLinkedHashSet(); + // value: version of table + private final Map usedTables = Maps.newLinkedHashMap(); // value: ddl sql private final Map usedViews = Maps.newLinkedHashMap(); // value: usedColumns @@ -135,8 +139,13 @@ public synchronized void addUsedTable(TableIf tableIf) { return; } - usedTables.add( - new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()) + usedTables.put( + new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()), + new TableVersion( + tableIf.getId(), + tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L, + tableIf.getType() + ) ); } @@ -282,8 +291,8 @@ public void setCacheProxy(CacheProxy cacheProxy) { this.cacheProxy = cacheProxy; } - public Set getUsedTables() { - return ImmutableSet.copyOf(usedTables); + public Map getUsedTables() { + return Collections.unmodifiableMap(usedTables); } public Map getUsedViews() { @@ -458,6 +467,15 @@ public void addScanPartition(Long partitionId) { } } + /** TableVersion */ + @lombok.Data + @lombok.AllArgsConstructor + public static class TableVersion { + public final long id; + public final long version; + public final TableType type; + } + /** CacheKeyType */ public enum CacheKeyType { // use `userIdentity`:`sql`.trim() as Cache key in FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 0add923eec54d7..e33ce2bbfe98bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -231,7 +231,7 @@ public StatementContext(ConnectContext connectContext, OriginStatement originSta this.sqlCacheContext = new SqlCacheContext( connectContext.getCurrentUserIdentity(), connectContext.queryId()); if (originStatement != null) { - this.sqlCacheContext.setOriginSql(originStatement.originStmt.trim()); + this.sqlCacheContext.setOriginSql(originStatement.originStmt); } } else { this.sqlCacheContext = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java index eb65048050fda1..958b4fe9c424b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java @@ -228,6 +228,8 @@ public List getAncestors() { } public void updateActualRowCount(long actualRowCount) { - statistics.setActualRowCount(actualRowCount); + if (statistics != null) { + statistics.setActualRowCount(actualRowCount); + } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 65401100ec38bc..ae9348191cdd83 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -25,6 +25,7 @@ import com.google.common.collect.Maps import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.Uninterruptibles import com.google.gson.Gson import groovy.json.JsonOutput import groovy.json.JsonSlurper @@ -37,7 +38,6 @@ import org.apache.doris.regression.RegressionTest import org.apache.doris.regression.action.BenchmarkAction import org.apache.doris.regression.action.ProfileAction import org.apache.doris.regression.action.WaitForAction -import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.OutputUtils import org.apache.doris.regression.action.CreateMVAction import org.apache.doris.regression.action.ExplainAction @@ -59,13 +59,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.sql.Connection -import java.io.File -import java.math.BigDecimal; -import java.sql.PreparedStatement -import java.sql.ResultSetMetaData -import java.util.Map; import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.ThreadFactory @@ -700,6 +694,23 @@ class Suite implements GroovyInterceptable { return sql } + T retry(int executeTimes = 3, int intervalMillis = 1000, Closure closure) { + Throwable throwable = null + for (int i = 1; i <= executeTimes; ++i) { + try { + return closure(i) as T + } catch (Throwable t) { + logger.warn("Retry failed: $t", t) + throwable = t + Uninterruptibles.sleepUninterruptibly(intervalMillis, TimeUnit.MILLISECONDS) + } + } + if (throwable != null) { + throw throwable + } + return null + } + void explain(Closure actionSupplier) { if (context.useArrowFlightSql()) { runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier) @@ -1039,6 +1050,20 @@ class Suite implements GroovyInterceptable { } } + void foreachFrontends(Closure action) { + def rows = sql_return_maparray("show frontends") + for (def row in rows) { + action(row) + } + } + + void foreachBackends(Closure action) { + def rows = sql_return_maparray("show backends") + for (def row in rows) { + action(row) + } + } + List getFrontendIpHttpPort() { return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort }; } diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index a9f833d7021e36..e7fb5f3da6c435 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -19,9 +19,6 @@ import java.util.stream.Collectors suite("parse_sql_from_sql_cache") { def assertHasCache = { String sqlStr -> - if (isCloudMode()) { - return - } explain { sql ("physical plan ${sqlStr}") contains("PhysicalSqlCache") @@ -29,507 +26,502 @@ suite("parse_sql_from_sql_cache") { } def assertNoCache = { String sqlStr -> - if (isCloudMode()) { - return - } explain { sql ("physical plan ${sqlStr}") notContains("PhysicalSqlCache") } } - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - - combineFutures( - extraThread("testUsePlanCache", { - createTestTable "test_use_plan_cache" + def dbName = (sql "select database()")[0][0].toString() + foreachFrontends { fe -> + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/${dbName}" + connect(context.config.jdbcUser, context.config.jdbcPassword, url) { + sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + } + } - // after partition changed 10s, the sql cache can be used - sleep(10000) + // make sure if the table has been dropped, the cache should invalidate, + // so we should retry multiple times to check + for (def __ in 0..3) { + combineFutures( + extraThread("testUsePlanCache", { + createTestTable "test_use_plan_cache" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // after partition changed 10s, the sql cache can be used + sleep(10000) - assertNoCache "select * from test_use_plan_cache" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // create sql cache - sql "select * from test_use_plan_cache" + assertNoCache "select * from test_use_plan_cache" - // use sql cache - assertHasCache "select * from test_use_plan_cache" - }), - extraThread("testAddPartitionAndInsert", { - createTestTable "test_use_plan_cache2" + // create sql cache + sql "select * from test_use_plan_cache" - // after partition changed 10s, the sql cache can be used - sleep(10000) + // use sql cache + assertHasCache "select * from test_use_plan_cache" + }), + extraThread("testAddPartitionAndInsert", { + createTestTable "test_use_plan_cache2" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // after partition changed 10s, the sql cache can be used + sleep(10000) - assertNoCache "select * from test_use_plan_cache2" - sql "select * from test_use_plan_cache2" - assertHasCache "select * from test_use_plan_cache2" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // NOTE: in cloud mode, add empty partition can not use cache, because the table version already update, - // but in native mode, add empty partition can use cache - sql "alter table test_use_plan_cache2 add partition p6 values[('6'),('7'))" - if (isCloudMode()) { assertNoCache "select * from test_use_plan_cache2" - } else { + sql "select * from test_use_plan_cache2" assertHasCache "select * from test_use_plan_cache2" - } - - // insert data can not use cache - sql "insert into test_use_plan_cache2 values(6, 1)" - assertNoCache "select * from test_use_plan_cache2" - }), - extraThread("testDropPartition", { - createTestTable "test_use_plan_cache3" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache3" - sql "select * from test_use_plan_cache3" - assertHasCache "select * from test_use_plan_cache3" - - // drop partition can not use cache - sql "alter table test_use_plan_cache3 drop partition p5" - assertNoCache "select * from test_use_plan_cache3" - }), - extraThread("testReplacePartition", { - createTestTable "test_use_plan_cache4" - - sql "alter table test_use_plan_cache4 add temporary partition tp1 values [('1'), ('2'))" - - streamLoad { - table "test_use_plan_cache4" - set "temporaryPartitions", "tp1" - inputIterator([[1, 3], [1, 4]].iterator()) - } - sql "sync" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache4" - sql "select * from test_use_plan_cache4" - assertHasCache "select * from test_use_plan_cache4" - - // replace partition can not use cache - sql "alter table test_use_plan_cache4 replace partition (p1) with temporary partition(tp1)" - assertNoCache "select * from test_use_plan_cache4" - }), - extraThread("testStreamLoad", { - createTestTable "test_use_plan_cache5" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache5" - sql "select * from test_use_plan_cache5" - assertHasCache "select * from test_use_plan_cache5" - - streamLoad { - table "test_use_plan_cache5" - set "partitions", "p1" - inputIterator([[1, 3], [1, 4]].iterator()) - } - sql "sync" - - // stream load can not use cache - sql "select * from test_use_plan_cache5" - assertNoCache "select * from test_use_plan_cache5" - }), - extraThread("testUpdate",{ - createTestTable("test_use_plan_cache6", true) - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache6" - sql "select * from test_use_plan_cache6" - assertHasCache "select * from test_use_plan_cache6" - - sql "update test_use_plan_cache6 set value=3 where id=1" - - // update can not use cache - sql "select * from test_use_plan_cache6" - assertNoCache "select * from test_use_plan_cache6" - }), - extraThread("testDelete", { - createTestTable "test_use_plan_cache7" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache7" - sql "select * from test_use_plan_cache7" - assertHasCache "select * from test_use_plan_cache7" - - sql "delete from test_use_plan_cache7 where id = 1" - - // delete can not use cache - sql "select * from test_use_plan_cache7" - assertNoCache "select * from test_use_plan_cache7" - }), - extraThread("testDropTable", { - createTestTable "test_use_plan_cache8" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache8" - sql "select * from test_use_plan_cache8" - assertHasCache "select * from test_use_plan_cache8" - - sql "drop table test_use_plan_cache8" - - // should visible the table has bean deleted - test { - sql "select * from test_use_plan_cache8" - exception "does not exist in database" - } - }), - extraThread("testCreateAndAlterView", { - createTestTable "test_use_plan_cache9" - - sql "drop view if exists test_use_plan_cache9_view" - sql "create view test_use_plan_cache9_view as select * from test_use_plan_cache9" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache9_view" - sql "select * from test_use_plan_cache9_view" - assertHasCache "select * from test_use_plan_cache9_view" - - // alter view should not use cache - sql "alter view test_use_plan_cache9_view as select id from test_use_plan_cache9" - assertNoCache "select * from test_use_plan_cache9_view" - }), - extraThread("testDropView", { - createTestTable "test_use_plan_cache10" - - sql "drop view if exists test_use_plan_cache10_view" - sql "create view test_use_plan_cache10_view as select * from test_use_plan_cache10" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache10_view" - sql "select * from test_use_plan_cache10_view" - assertHasCache "select * from test_use_plan_cache10_view" - - sql "drop view test_use_plan_cache10_view" - // should visible the view has bean deleted - test { - sql "select * from test_use_plan_cache10_view" - exception "does not exist in database" - } - }), - extraThread("testBaseTableChanged", { - createTestTable "test_use_plan_cache11" - - sql "drop view if exists test_use_plan_cache11_view" - sql "create view test_use_plan_cache11_view as select * from test_use_plan_cache11" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache11_view" - sql "select * from test_use_plan_cache11_view" - assertHasCache "select * from test_use_plan_cache11_view" - - sql "insert into test_use_plan_cache11 values(1, 3)" - - // base table already changed, can not use cache - assertNoCache "select * from test_use_plan_cache11_view" - }), - extraThread("testNotShareCacheBetweenUsers", { - sql "drop user if exists test_cache_user1" - sql "create user test_cache_user1 identified by 'DORIS@2024'" - def dbName = context.config.getDbNameByFile(context.file) - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user1""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user1""" - } - - createTestTable "test_use_plan_cache12" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache12" - sql "select * from test_use_plan_cache12" - assertHasCache "select * from test_use_plan_cache12" - - sql "sync" - - - extraThread("test_cache_user1_thread", { - connect("test_cache_user1", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache12" + // NOTE: in cloud mode, add empty partition can not use cache, because the table version already update, + // but in native mode, add empty partition can use cache + sql "alter table test_use_plan_cache2 add partition p6 values[('6'),('7'))" + if (isCloudMode()) { + assertNoCache "select * from test_use_plan_cache2" + } else { + assertHasCache "select * from test_use_plan_cache2" } - }).get() - }), - extraThread("testAddRowPolicy", { - def dbName = context.config.getDbNameByFile(context.file) - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_2 - ON ${dbName}.test_use_plan_cache13 - FOR test_cache_user2""" - - sql "drop user if exists test_cache_user2" - sql "create user test_cache_user2 identified by 'DORIS@2024'" - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user2""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user2""" - } - - createTestTable "test_use_plan_cache13" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "sync" - - extraThread("test_cache_user2_thread", { - connect("test_cache_user2", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache13" - sql "select * from test_use_plan_cache13" - assertHasCache "select * from test_use_plan_cache13" + // insert data can not use cache + sql "insert into test_use_plan_cache2 values(6, 1)" + assertNoCache "select * from test_use_plan_cache2" + }), + extraThread("testDropPartition", { + createTestTable "test_use_plan_cache3" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache3" + sql "select * from test_use_plan_cache3" + assertHasCache "select * from test_use_plan_cache3" + + // drop partition can not use cache + sql "alter table test_use_plan_cache3 drop partition p5" + assertNoCache "select * from test_use_plan_cache3" + }), + extraThread("testReplacePartition", { + createTestTable "test_use_plan_cache4" + + sql "alter table test_use_plan_cache4 add temporary partition tp1 values [('1'), ('2'))" + + streamLoad { + table "test_use_plan_cache4" + set "temporaryPartitions", "tp1" + inputIterator([[1, 3], [1, 4]].iterator()) } - }).get() + sql "sync" - sql """ - CREATE ROW POLICY test_cache_row_policy_2 - ON ${dbName}.test_use_plan_cache13 - AS RESTRICTIVE TO test_cache_user2 - USING (id = 4)""" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "sync" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // after row policy changed, the cache is invalidate - extraThread("test_cache_user2_thread2", { - connect("test_cache_user2", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + assertNoCache "select * from test_use_plan_cache4" + sql "select * from test_use_plan_cache4" + assertHasCache "select * from test_use_plan_cache4" + + // replace partition can not use cache + sql "alter table test_use_plan_cache4 replace partition (p1) with temporary partition(tp1)" + assertNoCache "select * from test_use_plan_cache4" + }), + extraThread("testStreamLoad", { + createTestTable "test_use_plan_cache5" + + // after partition changed 10s, the sql cache can be used + sleep(10000) - assertNoCache "select * from test_use_plan_cache13" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache5" + sql "select * from test_use_plan_cache5" + assertHasCache "select * from test_use_plan_cache5" + + streamLoad { + table "test_use_plan_cache5" + set "partitions", "p1" + inputIterator([[1, 3], [1, 4]].iterator()) } - }).get() - }), - extraThread("testDropRowPolicy", { - def dbName = context.config.getDbNameByFile(context.file) - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - FOR test_cache_user3""" - - sql "drop user if exists test_cache_user3" - sql "create user test_cache_user3 identified by 'DORIS@2024'" - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user3""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user3""" - } - - createTestTable "test_use_plan_cache14" - - sql """ - CREATE ROW POLICY test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - AS RESTRICTIVE TO test_cache_user3 - USING (id = 4)""" - - sql "sync" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - extraThread("test_cache_user3_thread", { - connect("test_cache_user3", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "sync" + + // stream load can not use cache + sql "select * from test_use_plan_cache5" + assertNoCache "select * from test_use_plan_cache5" + }), + extraThread("testUpdate",{ + createTestTable("test_use_plan_cache6", true) - assertNoCache "select * from test_use_plan_cache14" - sql "select * from test_use_plan_cache14" - assertHasCache "select * from test_use_plan_cache14" + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache6" + sql "select * from test_use_plan_cache6" + assertHasCache "select * from test_use_plan_cache6" + + sql "update test_use_plan_cache6 set value=3 where id=1" + + // update can not use cache + sql "select * from test_use_plan_cache6" + assertNoCache "select * from test_use_plan_cache6" + }), + extraThread("testDelete", { + createTestTable "test_use_plan_cache7" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache7" + sql "select * from test_use_plan_cache7" + assertHasCache "select * from test_use_plan_cache7" + + sql "delete from test_use_plan_cache7 where id = 1" + + // delete can not use cache + sql "select * from test_use_plan_cache7" + assertNoCache "select * from test_use_plan_cache7" + }), + extraThread("testDropTable", { + createTestTable "test_use_plan_cache8" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache8" + sql "select * from test_use_plan_cache8" + assertHasCache "select * from test_use_plan_cache8" + + sql "drop table test_use_plan_cache8" + + // should visible the table has bean deleted + test { + sql "select * from test_use_plan_cache8" + exception "does not exist in database" } - }).get() + }), + extraThread("testCreateAndAlterView", { + createTestTable "test_use_plan_cache9" - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - FOR test_cache_user3""" + sql "drop view if exists test_use_plan_cache9_view" + sql "create view test_use_plan_cache9_view as select * from test_use_plan_cache9" - sql "sync" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // after row policy changed, the cache is invalidate - extraThread("test_cache_user3_thread2", { - connect("test_cache_user3", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache9_view" + sql "select * from test_use_plan_cache9_view" + assertHasCache "select * from test_use_plan_cache9_view" + + // alter view should not use cache + sql "alter view test_use_plan_cache9_view as select id from test_use_plan_cache9" + assertNoCache "select * from test_use_plan_cache9_view" + }), + extraThread("testDropView", { + createTestTable "test_use_plan_cache10" - assertNoCache "select * from test_use_plan_cache14" + sql "drop view if exists test_use_plan_cache10_view" + sql "create view test_use_plan_cache10_view as select * from test_use_plan_cache10" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache10_view" + sql "select * from test_use_plan_cache10_view" + assertHasCache "select * from test_use_plan_cache10_view" + + sql "drop view test_use_plan_cache10_view" + // should visible the view has bean deleted + test { + sql "select * from test_use_plan_cache10_view" + exception "does not exist in database" } - }).get() - }), - extraThread("testRemovePrivilege", { - def dbName = context.config.getDbNameByFile(context.file) - - createTestTable "test_use_plan_cache15" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "drop user if exists test_cache_user4" - sql "create user test_cache_user4 identified by 'DORIS@2024'" - sql "GRANT SELECT_PRIV ON regression_test.* TO test_cache_user4" - sql "GRANT SELECT_PRIV ON ${dbName}.test_use_plan_cache15 TO test_cache_user4" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user4""" - } - - sql "sync" - - extraThread("test_cache_user4_thread", { - connect("test_cache_user4", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + }), + extraThread("testBaseTableChanged", { + createTestTable "test_use_plan_cache11" - assertNoCache "select * from test_use_plan_cache15" - sql "select * from test_use_plan_cache15" - assertHasCache "select * from test_use_plan_cache15" + sql "drop view if exists test_use_plan_cache11_view" + sql "create view test_use_plan_cache11_view as select * from test_use_plan_cache11" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache11_view" + sql "select * from test_use_plan_cache11_view" + assertHasCache "select * from test_use_plan_cache11_view" + + sql "insert into test_use_plan_cache11 values(1, 3)" + + // base table already changed, can not use cache + assertNoCache "select * from test_use_plan_cache11_view" + }), + extraThread("testNotShareCacheBetweenUsers", { + sql "drop user if exists test_cache_user1" + sql "create user test_cache_user1 identified by 'DORIS@2024'" + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user1""" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user1""" } - }).get() - sql "REVOKE SELECT_PRIV ON ${dbName}.test_use_plan_cache15 FROM test_cache_user4" + createTestTable "test_use_plan_cache12" - sql "sync" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // after privileges changed, the cache is invalidate - extraThread("test_cache_user4_thread2", { - connect("test_cache_user4", "DORIS@2024") { - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - test { - sql ("select * from ${dbName}.test_use_plan_cache15") - exception "Permission denied" + assertNoCache "select * from test_use_plan_cache12" + sql "select * from test_use_plan_cache12" + assertHasCache "select * from test_use_plan_cache12" + + sql "sync" + + extraThread("test_cache_user1_thread", { + connect("test_cache_user1", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache12" + } + }).get() + }), + extraThread("testAddRowPolicy", { + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_2 + ON ${dbName}.test_use_plan_cache13 + FOR test_cache_user2""" + + sql "drop user if exists test_cache_user2" + sql "create user test_cache_user2 identified by 'DORIS@2024'" + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user2""" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user2""" + } + + createTestTable "test_use_plan_cache13" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "sync" + + extraThread("test_cache_user2_thread", { + connect("test_cache_user2", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache13" + sql "select * from test_use_plan_cache13" + assertHasCache "select * from test_use_plan_cache13" + } + }).get() + + sql """ + CREATE ROW POLICY test_cache_row_policy_2 + ON ${dbName}.test_use_plan_cache13 + AS RESTRICTIVE TO test_cache_user2 + USING (id = 4)""" + + sql "sync" + + // after row policy changed, the cache is invalidate + extraThread("test_cache_user2_thread2", { + connect("test_cache_user2", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache13" } + }).get() + }), + extraThread("testDropRowPolicy", { + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + FOR test_cache_user3""" + + sql "drop user if exists test_cache_user3" + sql "create user test_cache_user3 identified by 'DORIS@2024'" + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user3""" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user3""" } - }).get() - }), - extraThread("testNondeterministic", { - createTestTable "test_use_plan_cache16" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select random() from test_use_plan_cache16" - // create sql cache - sql "select random() from test_use_plan_cache16" - // can not use sql cache - assertNoCache "select random() from test_use_plan_cache16" - - - assertNoCache "select year(now()) from test_use_plan_cache16" - sql "select year(now()) from test_use_plan_cache16" - assertHasCache "select year(now()) from test_use_plan_cache16" - - - assertNoCache "select second(now()) from test_use_plan_cache16" - sql "select second(now()) from test_use_plan_cache16" - sleep(1000) - assertNoCache "select second(now()) from test_use_plan_cache16" - }), - extraThread("testUserVariable", { - // make sure if the table has been dropped, the cache should invalidate, - // so we should retry twice to check - for (def i in 0..2) { + + createTestTable "test_use_plan_cache14" + + sql """ + CREATE ROW POLICY test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + AS RESTRICTIVE TO test_cache_user3 + USING (id = 4)""" + + sql "sync" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + extraThread("test_cache_user3_thread", { + connect("test_cache_user3", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache14" + sql "select * from test_use_plan_cache14" + assertHasCache "select * from test_use_plan_cache14" + } + }).get() + + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + FOR test_cache_user3""" + + sql "sync" + + // after row policy changed, the cache is invalidate + extraThread("test_cache_user3_thread2", { + connect("test_cache_user3", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache14" + } + }).get() + }), + extraThread("testRemovePrivilege", { + createTestTable "test_use_plan_cache15" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "drop user if exists test_cache_user4" + sql "create user test_cache_user4 identified by 'DORIS@2024'" + sql "GRANT SELECT_PRIV ON regression_test.* TO test_cache_user4" + sql "GRANT SELECT_PRIV ON ${dbName}.test_use_plan_cache15 TO test_cache_user4" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user4""" + } + + sql "sync" + + extraThread("test_cache_user4_thread", { + connect("test_cache_user4", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache15" + sql "select * from test_use_plan_cache15" + assertHasCache "select * from test_use_plan_cache15" + } + }).get() + + sql "REVOKE SELECT_PRIV ON ${dbName}.test_use_plan_cache15 FROM test_cache_user4" + + sql "sync" + + // after privileges changed, the cache is invalidate + extraThread("test_cache_user4_thread2", { + connect("test_cache_user4", "DORIS@2024") { + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + test { + sql ("select * from ${dbName}.test_use_plan_cache15") + exception "Permission denied" + } + } + }).get() + }), + extraThread("testNondeterministic", { + createTestTable "test_use_plan_cache16" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select random() from test_use_plan_cache16" + // create sql cache + sql "select random() from test_use_plan_cache16" + // can not use sql cache + assertNoCache "select random() from test_use_plan_cache16" + + assertNoCache "select year(now()) from test_use_plan_cache16" + sql "select year(now()) from test_use_plan_cache16" + assertHasCache "select year(now()) from test_use_plan_cache16" + + assertNoCache "select second(now()) from test_use_plan_cache16" + sql "select second(now()) from test_use_plan_cache16" + sleep(1000) + assertNoCache "select second(now()) from test_use_plan_cache16" + }), + extraThread("testUserVariable", { createTestTable "test_use_plan_cache17" // after partition changed 10s, the sql cache can be used @@ -559,7 +551,6 @@ suite("parse_sql_from_sql_cache") { def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10) - sql "set @custom_variable2=1" assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" @@ -578,81 +569,106 @@ suite("parse_sql_from_sql_cache") { assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" assertTrue(res[0][0] == 1) - } - }), - extraThread("test_udf", { - def jarPath = """${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" - scp_udf_file_to_all_be(jarPath) - try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") - try_sql("DROP TABLE IF EXISTS test_javaudf_string") - - sql """ DROP TABLE IF EXISTS test_javaudf_string """ - sql """ - CREATE TABLE IF NOT EXISTS test_javaudf_string ( - `user_id` INT NOT NULL COMMENT "用户id", - `char_col` CHAR NOT NULL COMMENT "", - `varchar_col` VARCHAR(10) NOT NULL COMMENT "", - `string_col` STRING NOT NULL COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - StringBuilder values = new StringBuilder() - int i = 1 - for (; i < 9; i ++) { - values.append(" (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),\n") - } - values.append("(${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')") - - sql "INSERT INTO test_javaudf_string VALUES ${values}" - sql "sync" - - File path = new File(jarPath) - if (!path.exists()) { - throw new IllegalStateException("""${jarPath} doesn't exist! """) - } - - sql """ CREATE FUNCTION java_udf_string_test(string, int, int) RETURNS string PROPERTIES ( - "file"="file://${jarPath}", - "symbol"="org.apache.doris.udf.StringTest", - "type"="JAVA_UDF" - ); """ - - assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - sql "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - }), - extraThread("testMultiFrontends", { - def aliveFrontends = sql_return_maparray("show frontends") - .stream() - .filter { it["Alive"].toString().equalsIgnoreCase("true") } - .collect(Collectors.toList()) - - if (aliveFrontends.size() <= 1) { - return - } - - def fe1 = aliveFrontends[0]["Host"] + ":" + aliveFrontends[0]["QueryPort"] - def fe2 = fe1 - if (aliveFrontends.size() > 1) { - fe2 = aliveFrontends[1]["Host"] + ":" + aliveFrontends[1]["QueryPort"] - } - - log.info("fe1: ${fe1}") - log.info("fe2: ${fe2}") - - def dbName = context.config.getDbNameByFile(context.file) - - log.info("connect to fe: ${fe1}") - connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe1}") { - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - - sql "use ${dbName}" - - createTestTable "test_use_plan_cache18" + }), + extraThread("test_udf", { + def jarPath = """${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" + scp_udf_file_to_all_be(jarPath) + try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") + try_sql("DROP TABLE IF EXISTS test_javaudf_string") + + sql """ DROP TABLE IF EXISTS test_javaudf_string """ + sql """ + CREATE TABLE IF NOT EXISTS test_javaudf_string ( + `user_id` INT NOT NULL COMMENT "用户id", + `char_col` CHAR NOT NULL COMMENT "", + `varchar_col` VARCHAR(10) NOT NULL COMMENT "", + `string_col` STRING NOT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + + StringBuilder values = new StringBuilder() + int i = 1 + for (; i < 9; i ++) { + values.append(" (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),\n") + } + values.append("(${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')") + sql "INSERT INTO test_javaudf_string VALUES ${values}" sql "sync" + sleep(10000) + + File path = new File(jarPath) + if (!path.exists()) { + throw new IllegalStateException("""${jarPath} doesn't exist! """) + } + + sql """ CREATE FUNCTION java_udf_string_test(string, int, int) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StringTest", + "type"="JAVA_UDF" + ); """ + + assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + sql "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + }), + extraThread("testMultiFrontends", { + def aliveFrontends = sql_return_maparray("show frontends") + .stream() + .filter { it["Alive"].toString().equalsIgnoreCase("true") } + .collect(Collectors.toList()) + + if (aliveFrontends.size() <= 1) { + return + } + + def fe1 = aliveFrontends[0]["Host"] + ":" + aliveFrontends[0]["QueryPort"] + def fe2 = fe1 + if (aliveFrontends.size() > 1) { + fe2 = aliveFrontends[1]["Host"] + ":" + aliveFrontends[1]["QueryPort"] + } + + log.info("fe1: ${fe1}") + log.info("fe2: ${fe2}") + + log.info("connect to fe: ${fe1}") + connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe1}") { + sql "use ${dbName}" + + createTestTable "test_use_plan_cache18" + + sql "sync" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } + + log.info("connect to fe: ${fe2}") + connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe2}") { + + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } + }), + extraThread("test_dry_run_query", { + createTestTable "test_use_plan_cache19" + // after partition changed 10s, the sql cache can be used sleep(10000) @@ -660,253 +676,225 @@ suite("parse_sql_from_sql_cache") { sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache18" - sql "select * from test_use_plan_cache18" - assertHasCache "select * from test_use_plan_cache18" - } + sql "set dry_run_query=true" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result1 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result1.size() == 1) + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + + sql "set dry_run_query=false" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result2 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result2.size() > 1) + assertHasCache "select * from test_use_plan_cache19 order by 1, 2" + + sql "set dry_run_query=true" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result3 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result3.size() == 1) + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + }), + extraThread("test_sql_cache_in_fe", { + createTestTable "test_use_plan_cache20" + + sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))" - log.info("connect to fe: ${fe2}") - connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe2}") { - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "use ${dbName}" sql "set enable_nereids_planner=true" sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache18" - sql "select * from test_use_plan_cache18" - assertHasCache "select * from test_use_plan_cache18" - } - }), - extraThread("test_dry_run_query", { - createTestTable "test_use_plan_cache19" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - sql "set dry_run_query=true" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result1 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result1.size() == 1) - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - - sql "set dry_run_query=false" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result2 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result2.size() > 1) - assertHasCache "select * from test_use_plan_cache19 order by 1, 2" - - sql "set dry_run_query=true" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result3 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result3.size() == 1) - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - }), - extraThread("test_sql_cache_in_fe", { - createTestTable "test_use_plan_cache20" - - sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - int randomInt = Math.random() * 2000000000 - - assertNoCache "select * from (select $randomInt as id)a" - def result1 = sql "select * from (select $randomInt as id)a" - assertTrue(result1.size() == 1) - - assertHasCache "select * from (select $randomInt as id)a" - def result2 = sql "select * from (select $randomInt as id)a" - assertTrue(result2.size() == 1) - - sql "select * from test_use_plan_cache20 limit 0" - assertHasCache "select * from test_use_plan_cache20 limit 0" - def result4 = sql "select * from test_use_plan_cache20 limit 0" - assertTrue(result4.isEmpty()) - - assertNoCache "select * from test_use_plan_cache20 where id=999" - def result5 = sql "select * from test_use_plan_cache20 where id=999" - assertTrue(result5.isEmpty()) - assertHasCache "select * from test_use_plan_cache20 where id=999" - def result6 = sql "select * from test_use_plan_cache20 where id=999" - assertTrue(result6.isEmpty()) - }), - extraThread("test_truncate_partition", { - sql "drop table if exists test_use_plan_cache21" - sql """create table test_use_plan_cache21 ( - id int, - dt int - ) - partition by range(dt) - ( - partition dt1 values [('1'), ('2')), - partition dt2 values [('2'), ('3')) - ) - distributed by hash(id) - properties('replication_num'='1')""" - - sql "insert into test_use_plan_cache21 values('2', '2')" - sleep(100) - sql "insert into test_use_plan_cache21 values('1', '1')" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache21" - def result1 = sql "select * from test_use_plan_cache21" - assertTrue(result1.size() == 2) - assertHasCache "select * from test_use_plan_cache21" - - sql "truncate table test_use_plan_cache21 partition dt2" - assertNoCache "select * from test_use_plan_cache21" - def result2 = sql "select * from test_use_plan_cache21" - assertTrue(result2.size() == 1) - }), - extraThread("remove_comment", { - createTestTable "test_use_plan_cache22" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" - sql "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment1*/ * from test_use_plan_cache22 order by 1, 2" - - assertHasCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" - }), - extraThread("is_cache_profile", { - createTestTable "test_use_plan_cache23" - - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - int randomInt = Math.random() * 2000000000 - sql "select ${randomInt} from test_use_plan_cache23" - profile("sql_cache_23_${randomInt}") { - run { - sql "/* sql_cache_23_${randomInt} */ select ${randomInt} from test_use_plan_cache23" - } + int randomInt = (int) (Math.random() * 2000000000) + + assertNoCache "select * from (select $randomInt as id)a" + def result1 = sql "select * from (select $randomInt as id)a" + assertTrue(result1.size() == 1) + + assertHasCache "select * from (select $randomInt as id)a" + def result2 = sql "select * from (select $randomInt as id)a" + assertTrue(result2.size() == 1) + + sql "select * from test_use_plan_cache20 limit 0" + assertHasCache "select * from test_use_plan_cache20 limit 0" + def result4 = sql "select * from test_use_plan_cache20 limit 0" + assertTrue(result4.isEmpty()) + + assertNoCache "select * from test_use_plan_cache20 where id=999" + def result5 = sql "select * from test_use_plan_cache20 where id=999" + assertTrue(result5.isEmpty()) + assertHasCache "select * from test_use_plan_cache20 where id=999" + def result6 = sql "select * from test_use_plan_cache20 where id=999" + assertTrue(result6.isEmpty()) + }), + extraThread("test_truncate_partition", { + sql "drop table if exists test_use_plan_cache21" + sql """create table test_use_plan_cache21 ( + id int, + dt int + ) + partition by range(dt) + ( + partition dt1 values [('1'), ('2')), + partition dt2 values [('2'), ('3')) + ) + distributed by hash(id) + properties('replication_num'='1')""" + + sql "insert into test_use_plan_cache21 values('2', '2')" + sleep(100) + sql "insert into test_use_plan_cache21 values('1', '1')" - check { profileString, exception -> - log.info(profileString) - assertTrue(profileString.contains("Is Cached: Yes")) - } - } + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - randomInt = Math.random() * 2000000000 - sql "select * from (select $randomInt as id)a" - profile("sql_cache_23_${randomInt}_2") { - run { - sql "/* sql_cache_23_${randomInt}_2 */ select * from (select $randomInt as id)a" + assertNoCache "select * from test_use_plan_cache21" + def result1 = sql "select * from test_use_plan_cache21" + assertTrue(result1.size() == 2) + assertHasCache "select * from test_use_plan_cache21" + + sql "truncate table test_use_plan_cache21 partition dt2" + assertNoCache "select * from test_use_plan_cache21" + def result2 = sql "select * from test_use_plan_cache21" + assertTrue(result2.size() == 1) + }), + extraThread("remove_comment", { + createTestTable "test_use_plan_cache22" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" + sql "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment1*/ * from test_use_plan_cache22 order by 1, 2" + + assertHasCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" + }), + extraThread("is_cache_profile", { + createTestTable "test_use_plan_cache23" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + int randomInt = Math.random() * 2000000000 + sql "select ${randomInt} from test_use_plan_cache23" + profile("sql_cache_23_${randomInt}") { + run { + sql "/* sql_cache_23_${randomInt} */ select ${randomInt} from test_use_plan_cache23" + } + + check { profileString, exception -> + log.info(profileString) + assertTrue(profileString.contains("Is Cached: Yes")) + } } - check { profileString, exception -> - log.info(profileString) - assertTrue(profileString.contains("Is Cached: Yes")) + randomInt = Math.random() * 2000000000 + sql "select * from (select $randomInt as id)a" + profile("sql_cache_23_${randomInt}_2") { + run { + sql "/* sql_cache_23_${randomInt}_2 */ select * from (select $randomInt as id)a" + } + + check { profileString, exception -> + log.info(profileString) + assertTrue(profileString.contains("Is Cached: Yes")) + } + } + }), + extraThread("sql_cache_with_date_format", { + sql "set enable_sql_cache=true" + for (def i in 0..3) { + def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')" + assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0]) } - } - }), - extraThread("sql_cache_with_date_format", { - sql "set enable_sql_cache=true" - for (def i in 0..3) { - def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')" - assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0]) - } - }), - extraThread("test_same_sql_with_different_db", { - def dbName1 = "test_db1" - def dbName2 = "test_db2" - def tableName = "test_cache_table" - - sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" - sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} ( - `k1` date NOT NULL COMMENT "", - `k2` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`k1`, `k2`) - COMMENT "OLAP" - PARTITION BY RANGE(`k1`) - (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) - DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ - sql "CREATE DATABASE IF NOT EXISTS ${dbName2}" - sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} ( - `k1` date NOT NULL COMMENT "", - `k2` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`k1`, `k2`) - COMMENT "OLAP" - PARTITION BY RANGE(`k1`) - (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) - DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ - - sql """ - INSERT INTO ${dbName1}.${tableName} VALUES - ("2024-11-29",0), - ("2024-11-30",0) - """ - // after partition changed 10s, the sql cache can be used - sleep(10000) - sql """ - INSERT INTO ${dbName2}.${tableName} VALUES - ("2024-11-29",0) - """ - // after partition changed 10s, the sql cache can be used - sleep(10000) - - sql "set enable_sql_cache=true" - sql "use ${dbName1}" - List> result1 = sql """ - SELECT COUNT(*) FROM ${tableName} - """ - assertEquals(result1[0][0],2) - - sql "use ${dbName2}" - List> result2 = sql """ - SELECT COUNT(*) FROM ${tableName} - """ - assertEquals(result2[0][0],1) - - sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" - sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" - sql "DROP DATABASE IF EXISTS ${dbName1}" - sql "DROP DATABASE IF EXISTS ${dbName2}" - }) - ).get() + }), + extraThread("test_same_sql_with_different_db", { + def dbName1 = "test_db1" + def dbName2 = "test_db2" + def tableName = "test_cache_table" + + sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql "CREATE DATABASE IF NOT EXISTS ${dbName2}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + + sql """ + INSERT INTO ${dbName1}.${tableName} VALUES + ("2024-11-29",0), + ("2024-11-30",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + sql """ + INSERT INTO ${dbName2}.${tableName} VALUES + ("2024-11-29",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_sql_cache=true" + sql "use ${dbName1}" + List> result1 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result1[0][0],2) + + sql "use ${dbName2}" + List> result2 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result2[0][0],1) + + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql "DROP DATABASE IF EXISTS ${dbName1}" + sql "DROP DATABASE IF EXISTS ${dbName2}" + }) + ).get() + } } diff --git a/regression-test/suites/nereids_syntax_p0/distribute/colocate_union_numbers.groovy b/regression-test/suites/nereids_syntax_p0/distribute/colocate_union_numbers.groovy index b247c5bde25a6c..3119b2ddb2c5dd 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/colocate_union_numbers.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/colocate_union_numbers.groovy @@ -25,23 +25,25 @@ suite("colocate_union_numbers") { """ def extractFragment = { String sqlStr, String containsString, Closure checkExchangeNum -> - explain { - sql sqlStr - check { result -> - log.info("Explain result:\n${result}") + retry(120, 1000) { + explain { + sql sqlStr + check { result -> + log.info("Explain result:\n${result}") - assertTrue(result.contains(containsString)) + assertTrue(result.contains(containsString)) - def fragmentContainsJoin = result.split("PLAN FRAGMENT") - .toList() - .stream() - .filter { it.contains(containsString) } - .findFirst() - .get() + def fragmentContainsJoin = result.split("PLAN FRAGMENT") + .toList() + .stream() + .filter { it.contains(containsString) } + .findFirst() + .get() - log.info("Fragment:\n${fragmentContainsJoin}") + log.info("Fragment:\n${fragmentContainsJoin}") - checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE")) + checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE")) + } } } } diff --git a/regression-test/suites/nereids_syntax_p0/distribute/prune_bucket_with_bucket_shuffle_join.groovy b/regression-test/suites/nereids_syntax_p0/distribute/prune_bucket_with_bucket_shuffle_join.groovy index 3ce412a0075b2a..7006b8c226e170 100644 --- a/regression-test/suites/nereids_syntax_p0/distribute/prune_bucket_with_bucket_shuffle_join.groovy +++ b/regression-test/suites/nereids_syntax_p0/distribute/prune_bucket_with_bucket_shuffle_join.groovy @@ -41,23 +41,25 @@ suite("prune_bucket_with_bucket_shuffle_join") { """ def extractFragment = { String sqlStr, String containsString, Closure checkExchangeNum -> - explain { - sql sqlStr - check { result -> - log.info("Explain result:\n${result}") + retry(120, 1000) { + explain { + sql sqlStr + check { result -> + log.info("Explain result:\n${result}") - assertTrue(result.contains(containsString)) + assertTrue(result.contains(containsString)) - def fragmentContainsJoin = result.split("PLAN FRAGMENT") - .toList() - .stream() - .filter { it.contains(containsString) } - .findFirst() - .get() + def fragmentContainsJoin = result.split("PLAN FRAGMENT") + .toList() + .stream() + .filter { it.contains(containsString) } + .findFirst() + .get() - log.info("Fragment:\n${fragmentContainsJoin}") + log.info("Fragment:\n${fragmentContainsJoin}") - checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE")) + checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE")) + } } } } diff --git a/regression-test/suites/nereids_syntax_p0/explain.groovy b/regression-test/suites/nereids_syntax_p0/explain.groovy index fb6af28dd443f4..899bbc2e4e7bd3 100644 --- a/regression-test/suites/nereids_syntax_p0/explain.groovy +++ b/regression-test/suites/nereids_syntax_p0/explain.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("nereids_explain") { +suite("explain") { sql """ SET enable_nereids_planner=true """ @@ -29,7 +29,6 @@ suite("nereids_explain") { contains "sum(2) + sum(lo_suppkey)" } - explain { sql("physical plan select 100") contains "PhysicalOneRowRelation"