From 81b2f897537e835c0684b5c3395ccc18b2427cef Mon Sep 17 00:00:00 2001 From: "tongyang.han" Date: Fri, 29 Nov 2024 17:10:03 +0800 Subject: [PATCH] [fix](cache) fix same sql return wrong result when switch database with `use db` and enable sql cache; --- .../doris/common/NereidsSqlCacheManager.java | 49 +++++++----- .../cache/parse_sql_from_sql_cache.groovy | 74 +++++++++++++++++++ 2 files changed, 105 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index 1317fdaefc766e..cd32b52034a5d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -74,9 +74,11 @@ import java.util.Optional; import java.util.Set; -/** NereidsSqlCacheManager */ +/** + * NereidsSqlCacheManager + */ public class NereidsSqlCacheManager { - // key: : + // key: :: // value: SqlCacheContext private volatile Cache sqlCaches; @@ -110,7 +112,7 @@ private static Cache buildSqlCaches(int sqlCacheNum, lo // auto evict cache when jvm memory too low .softValues(); if (sqlCacheNum > 0) { - cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum); + cacheBuilder.maximumSize(sqlCacheNum); } if (expireAfterAccessSeconds > 0) { cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds)); @@ -119,7 +121,9 @@ private static Cache buildSqlCaches(int sqlCacheNum, lo return cacheBuilder.build(); } - /** tryAddFeCache */ + /** + * tryAddFeCache + */ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) { Optional sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext(); if (!sqlCacheContextOpt.isPresent()) { @@ -127,17 +131,18 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) { } SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); - UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL - ? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim()) - : currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()); + ? generateCacheKey(connectContext, normalizeSql(sql)) + : generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5())); if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null && sqlCacheContext.getResultSetInFe().isPresent()) { sqlCaches.put(key, sqlCacheContext); } } - /** tryAddBeCache */ + /** + * tryAddBeCache + */ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) { Optional sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext(); if (!sqlCacheContextOpt.isPresent()) { @@ -147,10 +152,9 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz return; } SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); - UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL - ? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim()) - : currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()); + ? generateCacheKey(connectContext, normalizeSql(sql)) + : generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5())); if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { SqlCache cache = (SqlCache) analyzer.getCache(); sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum()); @@ -167,23 +171,23 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz } } - /** tryParseSql */ + /** + * tryParseSql + */ public Optional tryParseSql(ConnectContext connectContext, String sql) { - UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); - String key = currentUserIdentity + ":" + normalizeSql(sql.trim()); + String key = generateCacheKey(connectContext, normalizeSql(sql.trim())); SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key); if (sqlCacheContext == null) { return Optional.empty(); } // LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize()); - + UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); List currentVariables = resolveUserVariables(sqlCacheContext); if (usedVariablesChanged(currentVariables, sqlCacheContext)) { String md5 = DebugUtil.printId( sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))); - - String md5CacheKey = currentUserIdentity + ":" + md5; + String md5CacheKey = generateCacheKey(connectContext, md5); SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey); // already exist cache in the fe, but the variable is different to this query, @@ -203,6 +207,15 @@ public Optional tryParseSql(ConnectContext connectContext, Stri } } + private String generateCacheKey(ConnectContext connectContext, String sqlOrMd5) { + CatalogIf currentCatalog = connectContext.getCurrentCatalog(); + String currentCatalogName = currentCatalog != null ? currentCatalog.getName() : ""; + String currentDatabase = connectContext.getDatabase(); + String currentDatabaseName = currentDatabase != null ? currentDatabase : ""; + return currentCatalogName + "." + currentDatabaseName + ":" + connectContext.getCurrentUserIdentity().toString() + + ":" + sqlOrMd5; + } + private String normalizeSql(String sql) { return NereidsParser.removeCommentAndTrimBlank(sql); } @@ -402,7 +415,7 @@ private boolean usedVariablesChanged(List currentVariables, SqlCacheCo Variable cachedVariable = cachedUsedVariables.get(i); if (!Objects.equals(currentVariable, cachedVariable) || cachedVariable.getRealExpression().anyMatch( - expr -> !((ExpressionTrait) expr).isDeterministic())) { + expr -> !((ExpressionTrait) expr).isDeterministic())) { return true; } } diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index 54ab702888888d..3635936e8be083 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -827,6 +827,80 @@ suite("parse_sql_from_sql_cache") { def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')" assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0]) } + }), + extraThread("test_same_sql_with_different_db", { + def dbName1 = "test_db1" + def dbName2 = "test_db2" + def tableName = "test_cache_table" + + sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql "CREATE DATABASE IF NOT EXISTS ${dbName2}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + + sql """ + INSERT INTO ${dbName1}.${tableName} VALUES + ("2024-11-29",0), + ("2024-11-30",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + sql """ + INSERT INTO ${dbName2}.${tableName} VALUES + ("2024-11-29",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_sql_cache=true" + sql "use ${dbName1}" + List> result1 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result1[0][0],2) + + sql "use ${dbName2}" + List> result2 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result2[0][0],1) + + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql "DROP DATABASE IF EXISTS ${dbName1}" + sql "DROP DATABASE IF EXISTS ${dbName2}" }) ).get() }