Skip to content

Commit

Permalink
[test](mtmv) Fix sync mv not partition in rewrite test and some other…
Browse files Browse the repository at this point in the history
… test problems (apache#46546)

Fix sync mv not partition in rewrite because thought mv is already built
wrongly

In the `createMV()` method of `Suite.groovy`, it checks whether the most
recent materialized view of the current database's last table has been
built successfully. If a database has two tables, the synchronization
materialized view of the second table may not be completed, which could
incorrectly lead to the assumption that the build is complete.

To address this issue, modify the test case to ensure that there is at
most one table per database.
  • Loading branch information
seawinde committed Jan 16, 2025
1 parent 3941c19 commit 9074575
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,12 @@ private Statistics computeOlapScan(OlapScan olapScan) {
// mv is selected, return its estimated stats
Optional<Statistics> optStats = cascadesContext.getStatementContext()
.getStatistics(((Relation) olapScan).getRelationId());
LOG.info("computeOlapScan optStats isPresent {}, tableRowCount is {}, table name is {}",
optStats.isPresent(), tableRowCount, olapTable.getQualifiedName());
if (optStats.isPresent()) {
double selectedPartitionsRowCount = getSelectedPartitionRowCount(olapScan);
LOG.info("computeOlapScan optStats is {}, selectedPartitionsRowCount is {}", optStats.get(),
selectedPartitionsRowCount);
if (selectedPartitionsRowCount == -1) {
selectedPartitionsRowCount = tableRowCount;
}
Expand Down
14 changes: 11 additions & 3 deletions regression-test/data/nereids_syntax_p0/mv/newMv/multi_slot4.out
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
-4 -4 -4 d
-4 -4 -4 d
-4 -4 -4 d
1 1 1 a
1 1 1 a
1 1 1 a
2 2 2 b
2 2 2 b
2 2 2 b
3 -3 \N c
3 -3 \N c
3 -3 \N c
3 -3 \N c

-- !select_mv --
-3 1
2 7
3 9
-3 3
2 21
3 27
4 \N

Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,16 @@ class Suite implements GroovyInterceptable {
runAction(new ProfileAction(context, tag), actionSupplier)
}

// Should use create_sync_mv, this method only check the sync mv in current db
// If has multi sync mv in db, may make mistake
@Deprecated
void createMV(String sql) {
(new CreateMVAction(context, sql)).run()
}

// Should use create_sync_mv, this method only check the sync mv in current db
// If has multi sync mv in db, may make mistake
@Deprecated
void createMV(String sql, String expection) {
(new CreateMVAction(context, sql, expection)).run()
}
Expand Down Expand Up @@ -1088,7 +1094,7 @@ class Suite implements GroovyInterceptable {
Connection getTargetConnection() {
return context.getTargetConnection(this)
}

boolean deleteFile(String filePath) {
def file = new File(filePath)
file.delete()
Expand Down Expand Up @@ -1377,80 +1383,101 @@ class Suite implements GroovyInterceptable {
return debugPoint
}

void waitingMTMVTaskFinishedByMvName(String mvName) {
def waitingMTMVTaskFinishedByMvName = { mvName, dbName = context.dbName ->
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvDatabaseName = '${dbName}' and MvName = '${mvName}' order by CreateTime DESC LIMIT 1"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != "SUCCESS")) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("current db is " + dbName + ", showTasks is " + showTasks)
if (result.isEmpty()) {
logger.info("waitingMTMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
toCheckTaskRow = result.get(0);
status = toCheckTaskRow.get(4)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
}
if (status != "SUCCESS") {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
def show_tables = sql """
show tables from ${result.last().get(6)};
show tables from ${toCheckTaskRow.get(6)};
"""
def db_id = getDbId(result.last().get(6))
def table_id = getTableId(result.last().get(6), mvName)
def db_id = getDbId(toCheckTaskRow.get(6))
def table_id = getTableId(toCheckTaskRow.get(6), mvName)
logger.info("waitingMTMVTaskFinished analyze mv name is " + mvName
+ ", db name is " + result.last().get(6)
+ ", db name is " + toCheckTaskRow.get(6)
+ ", show_tables are " + show_tables
+ ", db_id is " + db_id
+ ", table_id " + table_id)
sql "analyze table ${result.last().get(6)}.${mvName} with sync;"
sql "analyze table ${toCheckTaskRow.get(6)}.${mvName} with sync;"
}

void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
def waitingMTMVTaskFinishedByMvNameAllowCancel = {mvName, dbName = context.dbName ->
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvDatabaseName = '${dbName}' and MvName = '${mvName}' order by CreateTime DESC LIMIT 1"

String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != "SUCCESS")) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("current db is " + dbName + ", showTasks result: " + result.toString())
if (result.isEmpty()) {
logger.info("waitingMTMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
toCheckTaskRow = result.get(0)
status = toCheckTaskRow.get(4)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED'))
}
if (status != "SUCCESS") {
logger.info("status is not success")
assertTrue(result.toString().contains("same table"))
}
// Need to analyze materialized view for cbo to choose the materialized view accurately
logger.info("waitingMTMVTaskFinished analyze mv name is " + result.last().get(5))
sql "analyze table ${result.last().get(6)}.${mvName} with sync;"
logger.info("waitingMTMVTaskFinished analyze mv name is " + toCheckTaskRow.get(5))
sql "analyze table ${toCheckTaskRow.get(6)}.${mvName} with sync;"
}

void waitingMVTaskFinishedByMvName(String dbName, String tableName) {
void waitingMVTaskFinishedByMvName(String dbName, String tableName, String indexName) {
Thread.sleep(2000)
String showTasks = "SHOW ALTER TABLE MATERIALIZED VIEW from ${dbName} where TableName='${tableName}' ORDER BY CreateTime ASC"
String showTasks = "SHOW ALTER TABLE MATERIALIZED VIEW from ${dbName} where TableName='${tableName}' ORDER BY CreateTime DESC"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED')) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(8)
logger.info("crrent db is " + dbName + ", showTasks result: " + result.toString())
// just consider current db
for (List<String> taskRow : result) {
if (taskRow.get(5).equals(indexName)) {
toCheckTaskRow = taskRow;
}
}
if (toCheckTaskRow.isEmpty()) {
logger.info("waitingMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
status = toCheckTaskRow.get(8)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED'))
}
if (status != "FINISHED") {
logger.info("status is not success")
}
Expand Down Expand Up @@ -1515,6 +1542,12 @@ class Suite implements GroovyInterceptable {
+ ", db_id is " + db_id
+ ", table_id " + table_id)
sql "analyze table ${result.last().get(6)}.${result.last().get(5)} with sync;"
String db = result.last().get(6)
String table = result.last().get(5)
result = sql("show table stats ${db}.${table}")
logger.info("table stats: " + result.toString())
result = sql("show index stats ${db}.${table} ${table}")
logger.info("index stats: " + result.toString())
}

void waitingMTMVTaskFinishedNotNeedSuccess(String jobName) {
Expand Down Expand Up @@ -1817,6 +1850,15 @@ class Suite implements GroovyInterceptable {
return isReady
}

def create_sync_mv = { db, table_name, mv_name, mv_sql ->
sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name} ON ${table_name};"""
sql"""
CREATE MATERIALIZED VIEW ${mv_name}
AS ${mv_sql}
"""
waitingMVTaskFinishedByMvName(db, table_name, mv_name)
}

def create_async_mv = { db, mv_name, mv_sql ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${db}.${mv_name}"""
Expand Down Expand Up @@ -1945,6 +1987,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
success = success && result.contains("(${mv_name})")
}
if (!success) {
logger.info("mv_rewrite_all_success fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -1955,7 +2000,11 @@ class Suite implements GroovyInterceptable {
check { result ->
boolean success = true;
for (String mv_name : mv_names) {
Assert.assertEquals(true, result.contains("${mv_name} chose"))
def contains = result.contains("${mv_name} chose")
if (!contains) {
logger.info("mv_rewrite_all_success fail =" + result)
}
Assert.assertEquals(true, contains)
}
}
}
Expand All @@ -1982,6 +2031,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
success = success || result.contains("(${mv_name})")
}
if (!success) {
logger.info("mv_rewrite_any_success fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -1994,6 +2046,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
success = success || result.contains("${mv_name} chose")
}
if (!success) {
logger.info("mv_rewrite_any_success fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -2014,6 +2069,9 @@ class Suite implements GroovyInterceptable {
def each_result = splitResult.length == 2 ? splitResult[0].contains(mv_name) : false
success = success && (result.contains("(${mv_name})") || each_result)
}
if (!success) {
logger.info("mv_rewrite_all_success_without_check_chosen fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -2027,6 +2085,9 @@ class Suite implements GroovyInterceptable {
boolean stepSuccess = result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
success = success && stepSuccess
}
if (!success) {
logger.info("mv_rewrite_all_success_without_check_chosen fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -2047,6 +2108,9 @@ class Suite implements GroovyInterceptable {
def each_result = splitResult.length == 2 ? splitResult[0].contains(mv_name) : false
success = success || (result.contains("(${mv_name})") || each_result)
}
if (!success) {
logger.info("mv_rewrite_any_success_without_check_chosen fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand All @@ -2059,6 +2123,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
success = success || result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
}
if (!success) {
logger.info("mv_rewrite_any_success_without_check_chosen fail =" + result)
}
Assert.assertEquals(true, success)
}
}
Expand Down Expand Up @@ -2117,6 +2184,9 @@ class Suite implements GroovyInterceptable {
boolean stepFail = !result.contains("(${mv_name})")
fail = fail && stepFail
}
if (!fail) {
logger.info("mv_rewrite_all_fail =" + result)
}
Assert.assertEquals(true, fail)
}
}
Expand All @@ -2130,6 +2200,9 @@ class Suite implements GroovyInterceptable {
boolean stepFail = result.contains("${mv_name} fail")
fail = fail && stepFail
}
if (!fail) {
logger.info("mv_rewrite_all_fail =" + result)
}
Assert.assertEquals(true, fail)
}
}
Expand All @@ -2147,6 +2220,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
fail = fail || !result.contains("(${mv_name})")
}
if (!fail) {
logger.info("mv_rewrite_any_fail =" + result)
}
Assert.assertEquals(true, fail)
}
}
Expand All @@ -2159,6 +2235,9 @@ class Suite implements GroovyInterceptable {
for (String mv_name : mv_names) {
fail = fail || result.contains("${mv_name} fail")
}
if (!fail) {
logger.info("mv_rewrite_any_fail =" + result)
}
Assert.assertEquals(true, fail)
}
}
Expand Down Expand Up @@ -2212,6 +2291,20 @@ class Suite implements GroovyInterceptable {
mv_rewrite_fail(query_sql, mv_name, true)
}

def async_create_mv = { db, mv_sql, mv_name ->
sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name}"""
sql"""
CREATE MATERIALIZED VIEW ${mv_name}
BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS ${mv_sql}
"""

def job_name = getJobName(db, mv_name);
waitingMTMVTaskFinished(job_name)
}

def token = context.config.metaServiceToken
def instance_id = context.config.multiClusterInstance
def get_be_metric = { ip, port, field ->
Expand Down
4 changes: 2 additions & 2 deletions regression-test/suites/auth_call/test_ddl_mv_auth.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ suite("test_ddl_mv_auth","p0,auth_call") {
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
sql """use ${dbName}"""
sql """create materialized view ${mvName} as select username from ${dbName}.${tableName};"""
waitingMVTaskFinishedByMvName(dbName, tableName)
waitingMVTaskFinishedByMvName(dbName, tableName, mvName)
sql """alter table ${dbName}.${tableName} add rollup ${rollupName}(username)"""
waitingMVTaskFinishedByMvName(dbName, tableName)
waitingMVTaskFinishedByMvName(dbName, tableName, rollupName)

def mv_res = sql """desc ${dbName}.${tableName} all;"""
logger.info("mv_res: " + mv_res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ suite("test_select_column_auth","p0,auth") {
(3, "333");
"""
sql """refresh MATERIALIZED VIEW ${dbName}.${mtmv_name} auto"""
waitingMTMVTaskFinishedByMvName(mtmv_name)
waitingMTMVTaskFinishedByMvName(mtmv_name, dbName)

sql """grant select_priv on regression_test to ${user}"""

Expand Down
Loading

0 comments on commit 9074575

Please sign in to comment.