Skip to content

Commit

Permalink
[fix](cache) fix same sql return wrong result when switch database wi…
Browse files Browse the repository at this point in the history
…th `use db` and enable sql cache (#44782)

### What problem does this PR solve?

It will return wrong result when running same sql in different db with
`enable_sql_cache=true`

for example, the `db1` and `db2` has the same table `tbl` but the data
are not equals,
if execute the below sql in `db1` and cache the result, then execute it
in `db2`, it will return the wrong result
```sql
select count(*) from tbl
```
Co-authored-by: tongyang.han <[email protected]>
  • Loading branch information
htyoung authored Nov 30, 2024
1 parent 42a7734 commit ea2a21d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@
import java.util.Optional;
import java.util.Set;

/** NereidsSqlCacheManager */
/**
* NereidsSqlCacheManager
*/
public class NereidsSqlCacheManager {
// key: <user>:<sql>
// key: <ctl.db>:<user>:<sql>
// value: SqlCacheContext
private volatile Cache<String, SqlCacheContext> sqlCaches;

Expand Down Expand Up @@ -110,7 +112,7 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
// auto evict cache when jvm memory too low
.softValues();
if (sqlCacheNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
cacheBuilder.maximumSize(sqlCacheNum);
}
if (expireAfterAccessSeconds > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
Expand All @@ -119,25 +121,28 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
return cacheBuilder.build();
}

/** tryAddFeCache */
/**
* tryAddFeCache
*/
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
}

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim())
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
? generateCacheKey(connectContext, normalizeSql(sql))
: generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()));
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
}
}

/** tryAddBeCache */
/**
* tryAddBeCache
*/
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
Expand All @@ -147,10 +152,9 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
return;
}
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + normalizeSql(sql.trim())
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
? generateCacheKey(connectContext, normalizeSql(sql))
: generateCacheKey(connectContext, DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()));
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
Expand All @@ -167,23 +171,23 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
}
}

/** tryParseSql */
/**
* tryParseSql
*/
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity + ":" + normalizeSql(sql.trim());
String key = generateCacheKey(connectContext, normalizeSql(sql.trim()));
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
}

// LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize());

UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
String md5 = DebugUtil.printId(
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));

String md5CacheKey = currentUserIdentity + ":" + md5;
String md5CacheKey = generateCacheKey(connectContext, md5);
SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey);

// already exist cache in the fe, but the variable is different to this query,
Expand All @@ -203,6 +207,15 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}
}

private String generateCacheKey(ConnectContext connectContext, String sqlOrMd5) {
CatalogIf<?> currentCatalog = connectContext.getCurrentCatalog();
String currentCatalogName = currentCatalog != null ? currentCatalog.getName() : "";
String currentDatabase = connectContext.getDatabase();
String currentDatabaseName = currentDatabase != null ? currentDatabase : "";
return currentCatalogName + "." + currentDatabaseName + ":" + connectContext.getCurrentUserIdentity().toString()
+ ":" + sqlOrMd5;
}

private String normalizeSql(String sql) {
return NereidsParser.removeCommentAndTrimBlank(sql);
}
Expand Down Expand Up @@ -402,7 +415,7 @@ private boolean usedVariablesChanged(List<Variable> currentVariables, SqlCacheCo
Variable cachedVariable = cachedUsedVariables.get(i);
if (!Objects.equals(currentVariable, cachedVariable)
|| cachedVariable.getRealExpression().anyMatch(
expr -> !((ExpressionTrait) expr).isDeterministic())) {
expr -> !((ExpressionTrait) expr).isDeterministic())) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,80 @@ suite("parse_sql_from_sql_cache") {
def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')"
assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0])
}
}),
extraThread("test_same_sql_with_different_db", {
def dbName1 = "test_db1"
def dbName2 = "test_db2"
def tableName = "test_cache_table"

sql "CREATE DATABASE IF NOT EXISTS ${dbName1}"
sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT "OLAP"
PARTITION BY RANGE(`k1`)
(PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01')))
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql "CREATE DATABASE IF NOT EXISTS ${dbName2}"
sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k1`, `k2`)
COMMENT "OLAP"
PARTITION BY RANGE(`k1`)
(PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01')))
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""

sql """
INSERT INTO ${dbName1}.${tableName} VALUES
("2024-11-29",0),
("2024-11-30",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)
sql """
INSERT INTO ${dbName2}.${tableName} VALUES
("2024-11-29",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)

sql "set enable_sql_cache=true"
sql "use ${dbName1}"
List<List<Object>> result1 = sql """
SELECT COUNT(*) FROM ${tableName}
"""
assertEquals(result1[0][0],2)

sql "use ${dbName2}"
List<List<Object>> result2 = sql """
SELECT COUNT(*) FROM ${tableName}
"""
assertEquals(result2[0][0],1)

sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}"
sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}"
sql "DROP DATABASE IF EXISTS ${dbName1}"
sql "DROP DATABASE IF EXISTS ${dbName2}"
})
).get()
}

0 comments on commit ea2a21d

Please sign in to comment.