Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cache) fix same sql return wrong result when switch database with use db and enable sql cache #44782

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@
import java.util.Optional;
import java.util.Set;

/** NereidsSqlCacheManager */
/**
* NereidsSqlCacheManager
*/
public class NereidsSqlCacheManager {
// key: <user>:<sql>
// key: <ctl.db>:<user>:<sql>
// value: SqlCacheContext
private volatile Cache<String, SqlCacheContext> sqlCaches;

Expand Down Expand Up @@ -110,7 +112,7 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
// auto evict cache when jvm memory too low
.softValues();
if (sqlCacheNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
cacheBuilder.maximumSize(sqlCacheNum);
}
if (expireAfterAccessSeconds > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
Expand All @@ -119,25 +121,28 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
return cacheBuilder.build();
}

/** tryAddFeCache */
/**
* tryAddFeCache
*/
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
}

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim())
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
? generateCacheKey(connectContext, normalizeSql(sql))
: generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()));
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
}
}

/** tryAddBeCache */
/**
* tryAddBeCache
*/
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
Expand All @@ -147,10 +152,9 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
return;
}
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim())
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
? generateCacheKey(connectContext, normalizeSql(sql))
: generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()));
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
Expand All @@ -167,23 +171,23 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
}
}

/** tryParseSql */
/**
* tryParseSql
*/
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity + ":" + normalizeSql(sql.trim());
String key = generateCacheKey(connectContext, normalizeSql(sql.trim()));
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
}

// LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize());

UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
String md5 = DebugUtil.printId(
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));

String md5CacheKey = currentUserIdentity + ":" + md5;
String md5CacheKey = generateCacheKey(connectContext, md5);
SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey);

// already exist cache in the fe, but the variable is different to this query,
Expand All @@ -203,6 +207,15 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}
}

private String generateCacheKey(ConnectContext connectContext, String sqlOrMd5) {
CatalogIf<?> currentCatalog = connectContext.getCurrentCatalog();
String currentCatalogName = currentCatalog != null ? currentCatalog.getName() : "";
String currentDatabase = connectContext.getDatabase();
String currentDatabaseName = currentDatabase != null ? currentDatabase : "";
return currentCatalogName + "." + currentDatabaseName + ":" + connectContext.getCurrentUserIdentity().toString()
+ ":" + sqlOrMd5;
}

private String normalizeSql(String sql) {
return NereidsParser.removeCommentAndTrimBlank(sql);
}
Expand Down Expand Up @@ -402,7 +415,7 @@ private boolean usedVariablesChanged(List<Variable> currentVariables, SqlCacheCo
Variable cachedVariable = cachedUsedVariables.get(i);
if (!Objects.equals(currentVariable, cachedVariable)
|| cachedVariable.getRealExpression().anyMatch(
expr -> !((ExpressionTrait) expr).isDeterministic())) {
expr -> !((ExpressionTrait) expr).isDeterministic())) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,80 @@ suite("parse_sql_from_sql_cache") {
def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')"
assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0])
}
}),
extraThread("test_same_sql_with_different_db", {
def dbName1 = "test_db1"
def dbName2 = "test_db2"
def tableName = "test_cache_table"

sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT "OLAP"
PARTITION BY RANGE(`k1`)
(PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01')))
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql "CREATE DATABASE IF NOT EXISTS ${dbName2}"
sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT "OLAP"
PARTITION BY RANGE(`k1`)
(PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01')))
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""

sql """
INSERT INTO ${dbName1}.${tableName} VALUES
("2024-11-29",0),
("2024-11-30",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)
sql """
INSERT INTO ${dbName2}.${tableName} VALUES
("2024-11-29",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)

sql "set enable_sql_cache=true"
sql "use ${dbName1}"
List<List<Object>> result1 = sql """
SELECT COUNT(*) FROM ${tableName}
"""
assertEquals(result1[0][0],2)

sql "use ${dbName2}"
List<List<Object>> result2 = sql """
SELECT COUNT(*) FROM ${tableName}
"""
assertEquals(result2[0][0],1)

sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}"
sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}"
sql "DROP DATABASE IF EXISTS ${dbName1}"
sql "DROP DATABASE IF EXISTS ${dbName2}"
})
).get()
}
Loading