Skip to content

Commit

Permalink
[Improve](mtmv) skip the generation of invalid task for refresh mtmv (
Browse files Browse the repository at this point in the history
…#46280)

### What problem does this PR solve?

We specified the `excluded_trigger_tables = 'a'` attribute when creating
the materialized view. If table `a` is updated frequently, many invalid
tasks will be generated, and these tasks do not really refresh the mv,
which is unreasonable, too many invalid tasks will wash away useful task
information


Co-authored-by: garenshi <[email protected]>
  • Loading branch information
qzsee and garenshi authored Jan 3, 2025
1 parent ba28c94 commit 2254956
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,21 @@ public void processEvent(Event event) throws EventException {
try {
// check if mtmv should trigger by event
MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo);
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) {
if (canRefresh(mtmv, table)) {
jobManager.onCommit(mtmv);
}
} catch (Exception e) {
throw new EventException(e);
}
}
}

private boolean canRefresh(MTMV mtmv, TableIf table) {
if (mtmv.getExcludedTriggerTables().contains(table.getName())) {
LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}",
mtmv.getName(), table.getName());
return false;
}
return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT);
}
}
20 changes: 20 additions & 0 deletions regression-test/data/mtmv_p0/test_commit_mtmv.out
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,23 @@
-- !mv1_replace --
3 2017-03-15 3

-- !mv_sag --
1 1 60

-- !task_sag --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}

-- !mv_sag1 --
1 1 60

-- !task_sag1 --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}

-- !mv_sag2 --
1 1 60
1 2 70
2 1 70

-- !task_sag2 --
{"triggerMode":"COMMIT","partitions":[],"isComplete":false}

72 changes: 72 additions & 0 deletions regression-test/suites/mtmv_p0/test_commit_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,76 @@ suite("test_commit_mtmv") {
sql """drop materialized view if exists ${mvName2};"""
sql """drop table if exists `${tableName}`"""

//===========test excluded_trigger_tables===========
def tblStu = "test_commit_mtmv_tbl_stu"
def tblGrade = "test_commit_mtmv_tbl_grade"
def mvSag = "test_commit_mv_sag"
sql """drop materialized view if exists ${mvSag};"""
sql """drop table if exists `${tblStu}`"""
sql """drop table if exists `${tblGrade}`"""
sql """
CREATE TABLE `${tblStu}` (
`sid` int(32) NULL,
`sname` varchar(32) NULL,
) ENGINE=OLAP
DUPLICATE KEY(`sid`)
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """
CREATE TABLE `${tblGrade}` (
`sid` int(32) NULL,
`cid` int(32) NULL,
`score` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`sid`)
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """
CREATE MATERIALIZED VIEW ${mvSag}
BUILD DEFERRED
REFRESH COMPLETE ON commit
DISTRIBUTED BY HASH(`sid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"excluded_trigger_tables" = "${tblGrade}"
)
AS select a.sid,b.cid,b.score from ${tblStu} a join ${tblGrade} b on a.sid = b.sid;
"""

sql """
insert into ${tblGrade} values(1, 1, 60);
insert into ${tblStu} values(1, 'sam');
"""
def sagJobName = getJobName(dbName, mvSag);
waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"

sql """
insert into ${tblGrade} values(1, 2, 70);
"""
waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag1 "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"

sql """
insert into ${tblGrade} values(2, 1, 70);
insert into ${tblStu} values(2, 'jack');
"""

waitingMTMVTaskFinished(sagJobName)
order_qt_mv_sag2 "SELECT * FROM ${mvSag} order by sid,cid"
order_qt_task_sag2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvSag}' order by CreateTime desc limit 1"

sql """drop materialized view if exists ${mvSag};"""
sql """drop table if exists `${tblStu}`"""
sql """drop table if exists `${tblGrade}`"""
}

0 comments on commit 2254956

Please sign in to comment.