-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
branch-3.0: [test](mtmv)add create partition and partition scrolling (#…
…42852) Cherry-picked from #42402 Co-authored-by: zfr95 <[email protected]>
- Loading branch information
1 parent
0f8813d
commit 0ded2d9
Showing
7 changed files
with
1,237 additions
and
27 deletions.
There are no files selected for viewing
200 changes: 200 additions & 0 deletions
200
...on-test/suites/nereids_rules_p0/mv/create_part_and_up/create_commit_mtmv_many_task.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
import java.time.LocalDate | ||
|
||
suite("create_commit_mtmv_many_tasks", "p2") { | ||
|
||
def src_database_name = context.config.getDbNameByFile(context.file) | ||
sql """drop database if exists ${src_database_name};""" | ||
sql """create database ${src_database_name};""" | ||
sql """use ${src_database_name};""" | ||
|
||
def table_name1 = "lineitem" | ||
def table_name2 = "orders" | ||
sql """drop table if exists ${table_name1}""" | ||
sql """drop table if exists ${table_name2}""" | ||
sql """CREATE TABLE lineitem ( | ||
l_orderkey bigint NOT NULL, | ||
l_linenumber int not null, | ||
l_partkey int NOT NULL, | ||
l_suppkey int not null, | ||
l_quantity decimal(15, 2) NOT NULL, | ||
l_extendedprice decimal(15, 2) NOT NULL, | ||
l_discount decimal(15, 2) NOT NULL, | ||
l_tax decimal(15, 2) NOT NULL, | ||
l_returnflag VARCHAR(1) NOT NULL, | ||
l_linestatus VARCHAR(1) NOT NULL, | ||
l_shipdate DATE NOT NULL, | ||
l_commitdate DATE NOT NULL, | ||
l_receiptdate DATE NOT NULL, | ||
l_shipinstruct VARCHAR(25) NOT NULL, | ||
l_shipmode VARCHAR(10) NOT NULL, | ||
l_comment VARCHAR(44) NOT NULL, | ||
l_null VARCHAR(1) NULL | ||
)ENGINE=OLAP | ||
UNIQUE KEY(`l_orderkey`) | ||
COMMENT "OLAP" | ||
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 | ||
PROPERTIES ( | ||
"replication_num" = "1", | ||
"colocate_with" = "lineitem_orders", | ||
"enable_unique_key_merge_on_write" = "true" | ||
);""" | ||
sql """CREATE TABLE orders ( | ||
o_orderkey bigint NOT NULL, | ||
o_custkey int NOT NULL, | ||
o_orderstatus VARCHAR(1) NOT NULL, | ||
o_totalprice decimal(15, 2) NOT NULL, | ||
o_orderdate DATE NOT NULL, | ||
o_orderpriority VARCHAR(15) NOT NULL, | ||
o_clerk VARCHAR(15) NOT NULL, | ||
o_shippriority int NOT NULL, | ||
o_comment VARCHAR(79) NOT NULL, | ||
o_null VARCHAR(1) NULL | ||
)ENGINE=OLAP | ||
UNIQUE KEY(`o_orderkey`) | ||
COMMENT "OLAP" | ||
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 | ||
PROPERTIES ( | ||
"replication_num" = "1", | ||
"colocate_with" = "lineitem_orders", | ||
"enable_unique_key_merge_on_write" = "false" | ||
);""" | ||
|
||
def stream_load_job = { table_name, src_file_name -> | ||
streamLoad { | ||
table table_name | ||
set 'column_separator', '|' | ||
file """${getS3Url() + '/regression/tpch/sf1/'}${src_file_name}""" | ||
|
||
check { result, exception, startTime, endTime -> | ||
if (exception != null) { | ||
throw exception | ||
} | ||
log.info("Stream load result: ${result}".toString()) | ||
def json = parseJson(result) | ||
assertEquals("success", json.Status.toLowerCase()) | ||
assertEquals(json.NumberTotalRows, json.NumberLoadedRows) | ||
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) | ||
} | ||
|
||
} | ||
sql "select count(*) from ${table_name}" | ||
} | ||
|
||
for (int i = 1; i <= 10; i++) { | ||
stream_load_job(table_name1, "lineitem.tbl.${i}") | ||
stream_load_job(table_name2, "orders.tbl.${i}") | ||
} | ||
|
||
def dst_database_name = "wz_tpch_mtmv_hit_property" | ||
sql """drop database if exists ${dst_database_name};""" | ||
sql """create database ${dst_database_name};""" | ||
sql """use ${dst_database_name};""" | ||
|
||
sql """drop table if exists ${table_name1}""" | ||
sql """drop table if exists ${table_name2}""" | ||
sql """CREATE TABLE `lineitem` ( | ||
`l_orderkey` BIGINT NOT NULL, | ||
`l_linenumber` INT NOT NULL, | ||
`l_partkey` INT NOT NULL, | ||
`l_suppkey` INT NOT NULL, | ||
`l_quantity` DECIMAL(15, 2) NOT NULL, | ||
`l_extendedprice` DECIMAL(15, 2) NOT NULL, | ||
`l_discount` DECIMAL(15, 2) NOT NULL, | ||
`l_tax` DECIMAL(15, 2) NOT NULL, | ||
`l_returnflag` VARCHAR(1) NOT NULL, | ||
`l_linestatus` VARCHAR(1) NOT NULL, | ||
`l_commitdate` DATE NOT NULL, | ||
`l_receiptdate` DATE NOT NULL, | ||
`l_shipinstruct` VARCHAR(25) NOT NULL, | ||
`l_shipmode` VARCHAR(10) NOT NULL, | ||
`l_comment` VARCHAR(44) NOT NULL, | ||
`l_shipdate` DATE NOT NULL | ||
) ENGINE=OLAP | ||
DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) | ||
COMMENT 'OLAP' | ||
AUTO PARTITION BY range(date_trunc(`l_shipdate`, 'day')) () | ||
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 | ||
PROPERTIES ( | ||
"replication_allocation" = "tag.location.default: 1" | ||
);""" | ||
sql """CREATE TABLE `orders` ( | ||
`o_orderkey` BIGINT NOT NULL, | ||
`o_custkey` INT NOT NULL, | ||
`o_orderstatus` VARCHAR(1) NOT NULL, | ||
`o_totalprice` DECIMAL(15, 2) NOT NULL, | ||
`o_orderpriority` VARCHAR(15) NOT NULL, | ||
`o_clerk` VARCHAR(15) NOT NULL, | ||
`o_shippriority` INT NOT NULL, | ||
`o_comment` VARCHAR(79) NOT NULL, | ||
`o_orderdate` DATE NOT NULL | ||
) ENGINE=OLAP | ||
DUPLICATE KEY(`o_orderkey`, `o_custkey`) | ||
COMMENT 'OLAP' | ||
AUTO PARTITION BY range(date_trunc(`o_orderdate`, 'day')) () | ||
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 | ||
PROPERTIES ( | ||
"replication_allocation" = "tag.location.default: 1" | ||
);""" | ||
sql """drop MATERIALIZED VIEW if exists mv1;""" | ||
sql """ | ||
CREATE MATERIALIZED VIEW mv1 | ||
BUILD IMMEDIATE REFRESH ON COMMIT | ||
partition by(l_shipdate) | ||
DISTRIBUTED BY RANDOM BUCKETS 2 | ||
PROPERTIES ('replication_num' = '1') AS | ||
select l_shipdate, l_orderkey from lineitem as t1 left join orders as t2 on t1.l_orderkey = t2.o_orderkey group by l_shipdate, l_orderkey; | ||
""" | ||
|
||
def insert_into_select = { date_it -> | ||
sql """INSERT INTO ${dst_database_name}.${table_name1} | ||
SELECT l_orderkey, l_linenumber, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, '${date_it}' AS new_date_column | ||
FROM ${src_database_name}.${table_name1};""" | ||
|
||
sql """INSERT INTO ${dst_database_name}.${table_name2} | ||
SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderpriority, o_clerk, o_shippriority, o_comment, '${date_it}' AS new_date_column | ||
FROM ${src_database_name}.${table_name2}""" | ||
} | ||
|
||
def get_next_day = { def date_it -> | ||
def date = LocalDate.parse(date_it) | ||
def next_day = date.plusDays(1) | ||
return next_day | ||
} | ||
|
||
def start_date = "2023-12-01" | ||
while (true) { | ||
if (start_date.toString() == "2024-03-11") { | ||
break | ||
} | ||
logger.info("task load start") | ||
insert_into_select(start_date) | ||
start_date = get_next_day(start_date.toString()) | ||
} | ||
|
||
def job_name = getJobName(dst_database_name, "mv1") | ||
waitingMTMVTaskFinished(job_name) | ||
def task_num = sql """select count(*) from tasks("type"="mv") where JobName="${job_name}";""" | ||
assertTrue(task_num[0][0] < 100) | ||
|
||
def mv_row_count = sql """select count(1) from mv1;""" | ||
def real_row_count = sql """select count(1) from (select l_shipdate, l_orderkey from lineitem as t1 left join orders as t2 on t1.l_orderkey = t2.o_orderkey group by l_shipdate, l_orderkey) t;""" | ||
assertTrue(mv_row_count[0][0] == real_row_count[0][0]) | ||
|
||
} |
142 changes: 142 additions & 0 deletions
142
...on-test/suites/nereids_rules_p0/mv/create_part_and_up/range_date_datetrunc_part_up.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
suite("mtmv_range_date_datetrunc_date_part_up") { | ||
|
||
String db = context.config.getDbNameByFile(context.file) | ||
sql "use ${db}" | ||
sql "SET enable_nereids_planner=true" | ||
sql "SET enable_fallback_to_original_planner=false" | ||
sql "SET enable_materialized_view_rewrite=true" | ||
sql "SET enable_nereids_timeout = false" | ||
String mv_prefix = "range_datetrunc_date_up" | ||
String tb_name = mv_prefix + "_tb" | ||
String mv_name = mv_prefix + "_mv" | ||
|
||
sql """ | ||
drop table if exists ${tb_name} | ||
""" | ||
|
||
sql """CREATE TABLE `${tb_name}` ( | ||
`l_orderkey` BIGINT NULL, | ||
`l_linenumber` INT NULL, | ||
`l_partkey` INT NULL, | ||
`l_suppkey` INT NULL, | ||
`l_quantity` DECIMAL(15, 2) NULL, | ||
`l_extendedprice` DECIMAL(15, 2) NULL, | ||
`l_discount` DECIMAL(15, 2) NULL, | ||
`l_tax` DECIMAL(15, 2) NULL, | ||
`l_returnflag` VARCHAR(1) NULL, | ||
`l_linestatus` VARCHAR(1) NULL, | ||
`l_commitdate` DATE NULL, | ||
`l_receiptdate` DATE NULL, | ||
`l_shipinstruct` VARCHAR(25) NULL, | ||
`l_shipmode` VARCHAR(10) NULL, | ||
`l_comment` VARCHAR(44) NULL, | ||
`l_shipdate` DATEtime not NULL | ||
) ENGINE=OLAP | ||
DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) | ||
COMMENT 'OLAP' | ||
partition by range (`l_shipdate`) ( | ||
partition p1 values [("2023-10-29 00:00:00"), ("2023-10-29 01:00:00")), | ||
partition p2 values [("2023-10-29 01:00:00"), ("2023-10-29 02:00:00")), | ||
partition p3 values [("2023-10-29 02:00:00"), ("2023-10-29 03:00:00")), | ||
partition p4 values [("2023-10-29 03:00:00"), ("2023-10-29 04:00:00")), | ||
partition p5 values [("2023-10-29 04:00:00"), ("2023-10-29 05:00:00")) | ||
) | ||
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 | ||
PROPERTIES ( | ||
"replication_allocation" = "tag.location.default: 1" | ||
);""" | ||
|
||
sql """ | ||
insert into ${tb_name} values | ||
(null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29 00:00:00'), | ||
(1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-29 01:00:00'), | ||
(3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-29 02:00:00'), | ||
(1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29 03:00:00'), | ||
(2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-29 04:00:00'); | ||
""" | ||
|
||
def compare_res = { def stmt -> | ||
sql "SET enable_materialized_view_rewrite=false" | ||
def origin_res = sql stmt | ||
logger.info("origin_res: " + origin_res) | ||
sql "SET enable_materialized_view_rewrite=true" | ||
def mv_origin_res = sql stmt | ||
logger.info("mv_origin_res: " + mv_origin_res) | ||
assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) | ||
for (int row = 0; row < mv_origin_res.size(); row++) { | ||
assertTrue(mv_origin_res[row].size() == origin_res[row].size()) | ||
for (int col = 0; col < mv_origin_res[row].size(); col++) { | ||
assertTrue(mv_origin_res[row][col] == origin_res[row][col]) | ||
} | ||
} | ||
} | ||
|
||
def create_mv = { cur_mv_name, mv_sql, col_name, date_trunc_range -> | ||
sql """DROP MATERIALIZED VIEW IF EXISTS ${cur_mv_name};""" | ||
sql """DROP TABLE IF EXISTS ${cur_mv_name}""" | ||
sql""" | ||
CREATE MATERIALIZED VIEW ${cur_mv_name} | ||
BUILD IMMEDIATE REFRESH AUTO ON MANUAL | ||
partition by(date_trunc(`${col_name}`, '${date_trunc_range}')) | ||
DISTRIBUTED BY RANDOM BUCKETS 2 | ||
PROPERTIES ('replication_num' = '1') | ||
AS | ||
${mv_sql} | ||
""" | ||
} | ||
|
||
def select_list1_1 = "l_shipdate" | ||
def select_list1_2 = "date_trunc(`l_shipdate`, 'day') as col1" | ||
def select_list1_3 = "DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')" | ||
def select_list2_1 = "date_trunc(`l_shipdate`, 'day') as col1, l_shipdate" | ||
def select_list2_2 = "date_trunc(`l_shipdate`, 'day') as col1, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')" | ||
def select_list2_3 = "l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')" | ||
def select_list3_1 = "date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')" | ||
def select_list3_2 = "date_trunc(`l_shipdate`, 'day') as col1, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d'), l_shipdate" | ||
def select_list3_3 = "l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d'), date_trunc(`l_shipdate`, 'day') as col1" | ||
|
||
def select_lists = [select_list1_1, select_list1_2, select_list1_3, select_list2_1, select_list2_2, | ||
select_list2_3, select_list3_1, select_list3_2, select_list3_3] | ||
for (int i = 0; i < select_lists.size(); i++) { | ||
for (int j = 0; j < select_lists.size(); j++) { | ||
if (i == j || j > 5) { | ||
def group_by_str = select_lists[j].replaceAll("as col1", "") | ||
def str = "select " + select_lists[i] + " from ${tb_name} group by " + group_by_str | ||
sql str | ||
|
||
if (select_lists[i].replaceAll("`l_shipdate`", "").indexOf("l_shipdate") != -1) { | ||
create_mv(mv_name, str, "l_shipdate", "day") | ||
waitingMTMVTaskFinishedByMvName(mv_name) | ||
mv_rewrite_success(str, mv_name) | ||
compare_res(str + " order by 1,2,3") | ||
} | ||
|
||
if (select_lists[i].indexOf("col1") != -1) { | ||
create_mv(mv_name, str, "col1", "day") | ||
waitingMTMVTaskFinishedByMvName(mv_name) | ||
mv_rewrite_success(str, mv_name) | ||
compare_res(str + " order by 1,2,3") | ||
} | ||
|
||
} | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.