Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Dec 8, 2024
1 parent a61a4d4 commit 0d6cc74
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 10 deletions.
18 changes: 10 additions & 8 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,25 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
_remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_push_down_count = -1;
_remaining_table_level_row_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
auto rows =
std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size);
_remaining_push_down_count -= rows;
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_push_down_count == 0) {
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

Expand Down Expand Up @@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
return Status::OK();
}

Expand All @@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOC
if (position_delete_files.size() > 0) {
RETURN_IF_ERROR(
_position_delete_base(table_desc.original_file_path, position_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

COUNTER_UPDATE(_iceberg_profile.num_delete_files, table_desc.delete_files.size());
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class IcebergTableReader : public TableFormatReader {
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

int64_t _remaining_push_down_count;
// the table level row count for optimizing query like:
// select count(*) from table;
int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;

const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
return Status::OK();
}

// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,8 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
// ATTN: the push down agg type may be set back to NONE,
// see IcebergTableReader::init_row_filters for example.
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,26 @@ bbb

-- !c109 --

-- !c110 --
3

-- !c111 --
3

-- !c112 --
2

-- !c113 --
2

-- !c114 --
3 3_1
4 4_1

-- !c115 --
3 3_1
4 4_1

-- !all --
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821
Expand Down Expand Up @@ -1157,6 +1177,26 @@ bbb

-- !c109 --

-- !c110 --
3

-- !c111 --
3

-- !c112 --
2

-- !c113 --
2

-- !c114 --
3 3_1
4 4_1

-- !c115 --
3 3_1
4 4_1

-- !all --
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821
Expand Down Expand Up @@ -1736,6 +1776,26 @@ bbb

-- !c109 --

-- !c110 --
3

-- !c111 --
3

-- !c112 --
2

-- !c113 --
2

-- !c114 --
3 3_1
4 4_1

-- !c115 --
3 3_1
4 4_1

-- !all --
1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530
10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821
Expand Down Expand Up @@ -2315,3 +2375,23 @@ bbb

-- !c109 --

-- !c110 --
3

-- !c111 --
3

-- !c112 --
2

-- !c113 --
2

-- !c114 --
3 3_1
4 4_1

-- !c115 --
3 3_1
4 4_1

Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ suite("test_iceberg_optimize_count", "p0,external,doris,external_docker,external

} finally {
sql """ set enable_count_push_down_for_external_table=true; """
sql """drop catalog if exists ${catalog_name}"""
// sql """drop catalog if exists ${catalog_name}"""
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
def c108= """ select id from tb_with_upper_case where id = 1 """
def c109= """ select id from tb_with_upper_case where id < 1 """

def c110 = """select count(*) from deletion_vector_orc;"""
def c111 = """select count(*) from deletion_vector_parquet;"""
def c112 = """select count(*) from deletion_vector_orc where id > 2;"""
def c113 = """select count(*) from deletion_vector_parquet where id > 2;"""
def c114 = """select * from deletion_vector_orc where id > 2;"""
def c115 = """select * from deletion_vector_parquet where id > 2;"""

String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
String catalog_name = "ctl_test_paimon_catalog"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
Expand Down Expand Up @@ -289,6 +296,13 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
qt_c107 c107
qt_c108 c108
qt_c109 c109

qt_c110 c110
qt_c111 c111
qt_c112 c112
qt_c113 c113
qt_c114 c114
qt_c115 c115
}

test_cases("false", "false")
Expand Down

0 comments on commit 0d6cc74

Please sign in to comment.