Skip to content

Commit

Permalink
[test](mtmv)add create partition and partition scrolling (#42402)
Browse files Browse the repository at this point in the history
  • Loading branch information
zfr9527 authored Oct 29, 2024
1 parent d2e0f23 commit cac5e15
Show file tree
Hide file tree
Showing 7 changed files with 1,237 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import java.time.LocalDate

suite("create_commit_mtmv_many_tasks", "p2") {

def src_database_name = context.config.getDbNameByFile(context.file)
sql """drop database if exists ${src_database_name};"""
sql """create database ${src_database_name};"""
sql """use ${src_database_name};"""

def table_name1 = "lineitem"
def table_name2 = "orders"
sql """drop table if exists ${table_name1}"""
sql """drop table if exists ${table_name2}"""
sql """CREATE TABLE lineitem (
l_orderkey bigint NOT NULL,
l_linenumber int not null,
l_partkey int NOT NULL,
l_suppkey int not null,
l_quantity decimal(15, 2) NOT NULL,
l_extendedprice decimal(15, 2) NOT NULL,
l_discount decimal(15, 2) NOT NULL,
l_tax decimal(15, 2) NOT NULL,
l_returnflag VARCHAR(1) NOT NULL,
l_linestatus VARCHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct VARCHAR(25) NOT NULL,
l_shipmode VARCHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
l_null VARCHAR(1) NULL
)ENGINE=OLAP
UNIQUE KEY(`l_orderkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "lineitem_orders",
"enable_unique_key_merge_on_write" = "true"
);"""
sql """CREATE TABLE orders (
o_orderkey bigint NOT NULL,
o_custkey int NOT NULL,
o_orderstatus VARCHAR(1) NOT NULL,
o_totalprice decimal(15, 2) NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority VARCHAR(15) NOT NULL,
o_clerk VARCHAR(15) NOT NULL,
o_shippriority int NOT NULL,
o_comment VARCHAR(79) NOT NULL,
o_null VARCHAR(1) NULL
)ENGINE=OLAP
UNIQUE KEY(`o_orderkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "1",
"colocate_with" = "lineitem_orders",
"enable_unique_key_merge_on_write" = "false"
);"""

def stream_load_job = { table_name, src_file_name ->
streamLoad {
table table_name
set 'column_separator', '|'
file """${getS3Url() + '/regression/tpch/sf1/'}${src_file_name}"""

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}

}
sql "select count(*) from ${table_name}"
}

for (int i = 1; i <= 10; i++) {
stream_load_job(table_name1, "lineitem.tbl.${i}")
stream_load_job(table_name2, "orders.tbl.${i}")
}

def dst_database_name = "wz_tpch_mtmv_hit_property"
sql """drop database if exists ${dst_database_name};"""
sql """create database ${dst_database_name};"""
sql """use ${dst_database_name};"""

sql """drop table if exists ${table_name1}"""
sql """drop table if exists ${table_name2}"""
sql """CREATE TABLE `lineitem` (
`l_orderkey` BIGINT NOT NULL,
`l_linenumber` INT NOT NULL,
`l_partkey` INT NOT NULL,
`l_suppkey` INT NOT NULL,
`l_quantity` DECIMAL(15, 2) NOT NULL,
`l_extendedprice` DECIMAL(15, 2) NOT NULL,
`l_discount` DECIMAL(15, 2) NOT NULL,
`l_tax` DECIMAL(15, 2) NOT NULL,
`l_returnflag` VARCHAR(1) NOT NULL,
`l_linestatus` VARCHAR(1) NOT NULL,
`l_commitdate` DATE NOT NULL,
`l_receiptdate` DATE NOT NULL,
`l_shipinstruct` VARCHAR(25) NOT NULL,
`l_shipmode` VARCHAR(10) NOT NULL,
`l_comment` VARCHAR(44) NOT NULL,
`l_shipdate` DATE NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
COMMENT 'OLAP'
AUTO PARTITION BY range(date_trunc(`l_shipdate`, 'day')) ()
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """CREATE TABLE `orders` (
`o_orderkey` BIGINT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` VARCHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderpriority` VARCHAR(15) NOT NULL,
`o_clerk` VARCHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` VARCHAR(79) NOT NULL,
`o_orderdate` DATE NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`o_orderkey`, `o_custkey`)
COMMENT 'OLAP'
AUTO PARTITION BY range(date_trunc(`o_orderdate`, 'day')) ()
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """drop MATERIALIZED VIEW if exists mv1;"""
sql """
CREATE MATERIALIZED VIEW mv1
BUILD IMMEDIATE REFRESH ON COMMIT
partition by(l_shipdate)
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1') AS
select l_shipdate, l_orderkey from lineitem as t1 left join orders as t2 on t1.l_orderkey = t2.o_orderkey group by l_shipdate, l_orderkey;
"""

def insert_into_select = { date_it ->
sql """INSERT INTO ${dst_database_name}.${table_name1}
SELECT l_orderkey, l_linenumber, l_partkey, l_suppkey, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment, '${date_it}' AS new_date_column
FROM ${src_database_name}.${table_name1};"""

sql """INSERT INTO ${dst_database_name}.${table_name2}
SELECT o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderpriority, o_clerk, o_shippriority, o_comment, '${date_it}' AS new_date_column
FROM ${src_database_name}.${table_name2}"""
}

def get_next_day = { def date_it ->
def date = LocalDate.parse(date_it)
def next_day = date.plusDays(1)
return next_day
}

def start_date = "2023-12-01"
while (true) {
if (start_date.toString() == "2024-03-11") {
break
}
logger.info("task load start")
insert_into_select(start_date)
start_date = get_next_day(start_date.toString())
}

def job_name = getJobName(dst_database_name, "mv1")
waitingMTMVTaskFinished(job_name)
def task_num = sql """select count(*) from tasks("type"="mv") where JobName="${job_name}";"""
assertTrue(task_num[0][0] < 100)

def mv_row_count = sql """select count(1) from mv1;"""
def real_row_count = sql """select count(1) from (select l_shipdate, l_orderkey from lineitem as t1 left join orders as t2 on t1.l_orderkey = t2.o_orderkey group by l_shipdate, l_orderkey) t;"""
assertTrue(mv_row_count[0][0] == real_row_count[0][0])

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("mtmv_range_date_datetrunc_date_part_up") {

String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"
sql "SET enable_nereids_timeout = false"
String mv_prefix = "range_datetrunc_date_up"
String tb_name = mv_prefix + "_tb"
String mv_name = mv_prefix + "_mv"

sql """
drop table if exists ${tb_name}
"""

sql """CREATE TABLE `${tb_name}` (
`l_orderkey` BIGINT NULL,
`l_linenumber` INT NULL,
`l_partkey` INT NULL,
`l_suppkey` INT NULL,
`l_quantity` DECIMAL(15, 2) NULL,
`l_extendedprice` DECIMAL(15, 2) NULL,
`l_discount` DECIMAL(15, 2) NULL,
`l_tax` DECIMAL(15, 2) NULL,
`l_returnflag` VARCHAR(1) NULL,
`l_linestatus` VARCHAR(1) NULL,
`l_commitdate` DATE NULL,
`l_receiptdate` DATE NULL,
`l_shipinstruct` VARCHAR(25) NULL,
`l_shipmode` VARCHAR(10) NULL,
`l_comment` VARCHAR(44) NULL,
`l_shipdate` DATEtime not NULL
) ENGINE=OLAP
DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey )
COMMENT 'OLAP'
partition by range (`l_shipdate`) (
partition p1 values [("2023-10-29 00:00:00"), ("2023-10-29 01:00:00")),
partition p2 values [("2023-10-29 01:00:00"), ("2023-10-29 02:00:00")),
partition p3 values [("2023-10-29 02:00:00"), ("2023-10-29 03:00:00")),
partition p4 values [("2023-10-29 03:00:00"), ("2023-10-29 04:00:00")),
partition p5 values [("2023-10-29 04:00:00"), ("2023-10-29 05:00:00"))
)
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""

sql """
insert into ${tb_name} values
(null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29 00:00:00'),
(1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-29 01:00:00'),
(3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-29 02:00:00'),
(1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-29 03:00:00'),
(2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-29 04:00:00');
"""

def compare_res = { def stmt ->
sql "SET enable_materialized_view_rewrite=false"
def origin_res = sql stmt
logger.info("origin_res: " + origin_res)
sql "SET enable_materialized_view_rewrite=true"
def mv_origin_res = sql stmt
logger.info("mv_origin_res: " + mv_origin_res)
assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size()))
for (int row = 0; row < mv_origin_res.size(); row++) {
assertTrue(mv_origin_res[row].size() == origin_res[row].size())
for (int col = 0; col < mv_origin_res[row].size(); col++) {
assertTrue(mv_origin_res[row][col] == origin_res[row][col])
}
}
}

def create_mv = { cur_mv_name, mv_sql, col_name, date_trunc_range ->
sql """DROP MATERIALIZED VIEW IF EXISTS ${cur_mv_name};"""
sql """DROP TABLE IF EXISTS ${cur_mv_name}"""
sql"""
CREATE MATERIALIZED VIEW ${cur_mv_name}
BUILD IMMEDIATE REFRESH AUTO ON MANUAL
partition by(date_trunc(`${col_name}`, '${date_trunc_range}'))
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
${mv_sql}
"""
}

def select_list1_1 = "l_shipdate"
def select_list1_2 = "date_trunc(`l_shipdate`, 'day') as col1"
def select_list1_3 = "DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')"
def select_list2_1 = "date_trunc(`l_shipdate`, 'day') as col1, l_shipdate"
def select_list2_2 = "date_trunc(`l_shipdate`, 'day') as col1, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')"
def select_list2_3 = "l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')"
def select_list3_1 = "date_trunc(`l_shipdate`, 'day') as col1, l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d')"
def select_list3_2 = "date_trunc(`l_shipdate`, 'day') as col1, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d'), l_shipdate"
def select_list3_3 = "l_shipdate, DATE_FORMAT(`l_shipdate`, '%Y-%m-%d'), date_trunc(`l_shipdate`, 'day') as col1"

def select_lists = [select_list1_1, select_list1_2, select_list1_3, select_list2_1, select_list2_2,
select_list2_3, select_list3_1, select_list3_2, select_list3_3]
for (int i = 0; i < select_lists.size(); i++) {
for (int j = 0; j < select_lists.size(); j++) {
if (i == j || j > 5) {
def group_by_str = select_lists[j].replaceAll("as col1", "")
def str = "select " + select_lists[i] + " from ${tb_name} group by " + group_by_str
sql str

if (select_lists[i].replaceAll("`l_shipdate`", "").indexOf("l_shipdate") != -1) {
create_mv(mv_name, str, "l_shipdate", "day")
waitingMTMVTaskFinishedByMvName(mv_name)
mv_rewrite_success(str, mv_name)
compare_res(str + " order by 1,2,3")
}

if (select_lists[i].indexOf("col1") != -1) {
create_mv(mv_name, str, "col1", "day")
waitingMTMVTaskFinishedByMvName(mv_name)
mv_rewrite_success(str, mv_name)
compare_res(str + " order by 1,2,3")
}

}
}
}

}
Loading

0 comments on commit cac5e15

Please sign in to comment.