Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](Variant) fix some nested explode_variant_array bug and add more… #44533

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions be/src/vec/exprs/table_function/vexplode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@

#include "common/status.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nothing.h"
#include "vec/columns/column_object.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nothing.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
Expand All @@ -37,6 +42,34 @@ VExplodeTableFunction::VExplodeTableFunction() {
_fn_name = "vexplode";
}

Status VExplodeTableFunction::_process_init_variant(Block* block, int value_column_idx) {
// explode variant array
const auto& variant_column = check_and_get_column<ColumnObject>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's bug which will lead to coredump

remove_nullable(block->get_by_position(value_column_idx)
.column->convert_to_full_column_if_const())
.get());
_detail.output_as_variant = true;
if (!variant_column->is_null_root()) {
_array_column = variant_column->get_root();
// We need to wrap the output nested column within a variant column.
// Otherwise the type is missmatched
const auto* array_type = check_and_get_data_type<DataTypeArray>(
remove_nullable(variant_column->get_root_type()).get());
if (array_type == nullptr) {
return Status::NotSupported("explode not support none array type {}",
variant_column->get_root_type()->get_name());
}
_detail.nested_type = array_type->get_nested_type();
} else {
// null root, use nothing type
_array_column = ColumnNullable::create(ColumnArray::create(ColumnNothing::create(0)),
ColumnUInt8::create(0));
_array_column->assume_mutable()->insert_many_defaults(variant_column->size());
_detail.nested_type = std::make_shared<DataTypeNothing>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe here should wrap in nullable , because many calculate operator behavior just make nested column in array as nullable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's redundant in column object case

}
return Status::OK();
}

Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
CHECK(_expr_context->root()->children().size() == 1)
<< "VExplodeTableFunction only support 1 child but has "
Expand All @@ -47,12 +80,7 @@ Status VExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
&value_column_idx));
if (WhichDataType(remove_nullable(block->get_by_position(value_column_idx).type))
.is_variant_type()) {
// explode variant array
const auto& variant_column = check_and_get_column<ColumnObject>(
remove_nullable(block->get_by_position(value_column_idx)
.column->convert_to_full_column_if_const())
.get());
_array_column = variant_column->get_root();
RETURN_IF_ERROR(_process_init_variant(block, value_column_idx));
} else {
_array_column =
block->get_by_position(value_column_idx).column->convert_to_full_column_if_const();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/table_function/vexplode.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class VExplodeTableFunction : public TableFunction {
int get_value(MutableColumnPtr& column, int max_step) override;

private:
Status _process_init_variant(Block* block, int value_column_idx);
ColumnPtr _array_column;
ColumnArrayExecutionData _detail;
size_t _array_offset; // start offset of array[row_idx]
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/functions/array/function_array_contains_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ class FunctionArrayContainsAll : public IFunction {
is_equal_value = false;
} else {
// all is not null, check the data is equal
const auto* left_column = assert_cast<const T*>(left_data.nested_col);
const auto* right_column = assert_cast<const T*>(right_data.nested_col);
const auto* left_column = assert_cast<const T*>(left_data.nested_col.get());
const auto* right_column =
assert_cast<const T*>(right_data.nested_col.get());
auto res = left_column->compare_at(left_nested_loop_pos, right_pos,
*right_column, -1);
is_equal_value = (res == 0);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/array/function_array_distance.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class FunctionArrayDistance : public IFunction {

const auto& offsets1 = *arr1.offsets_ptr;
const auto& offsets2 = *arr2.offsets_ptr;
const auto& nested_col1 = assert_cast<const ColumnFloat64*>(arr1.nested_col);
const auto& nested_col2 = assert_cast<const ColumnFloat64*>(arr2.nested_col);
const auto& nested_col1 = assert_cast<const ColumnFloat64*>(arr1.nested_col.get());
const auto& nested_col2 = assert_cast<const ColumnFloat64*>(arr2.nested_col.get());
for (ssize_t row = 0; row < offsets1.size(); ++row) {
if (arr1.array_nullmap_data && arr1.array_nullmap_data[row]) {
dst_null_data[row] = true;
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/functions/array/function_array_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type.h"

namespace doris::vectorized {

Expand All @@ -45,12 +47,19 @@ bool extract_column_array_info(const IColumn& src, ColumnArrayExecutionData& dat

// extract array offsets and nested column
data.offsets_ptr = &data.array_col->get_offsets();
data.nested_col = &data.array_col->get_data();
data.nested_col = data.array_col->get_data_ptr();
// extract nested column is nullable
if (data.nested_col->is_nullable()) {
const auto& nested_null_col = reinterpret_cast<const ColumnNullable&>(*data.nested_col);
data.nested_nullmap_data = nested_null_col.get_null_map_data().data();
data.nested_col = nested_null_col.get_nested_column_ptr().get();
data.nested_col = nested_null_col.get_nested_column_ptr();
}
if (data.output_as_variant &&
!WhichDataType(remove_nullable(data.nested_type)).is_variant_type()) {
// set variant root column/type to from column/type
auto variant = ColumnObject::create(true /*always nullable*/);
variant->create_root(data.nested_type, make_nullable(data.nested_col)->assume_mutable());
data.nested_col = variant->get_ptr();
}
return true;
}
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/functions/array/function_array_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.
#pragma once

#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
Expand Down Expand Up @@ -54,7 +55,10 @@ struct ColumnArrayExecutionData {
const ColumnArray* array_col = nullptr;
const ColumnArray::Offsets64* offsets_ptr = nullptr;
const UInt8* nested_nullmap_data = nullptr;
const IColumn* nested_col = nullptr;
ColumnPtr nested_col = nullptr;
DataTypePtr nested_type = nullptr;
// wrap the nested column as variant column
bool output_as_variant = false;

ColumnArrayMutableData to_mutable_data() const {
ColumnArrayMutableData dst;
Expand Down
20 changes: 20 additions & 0 deletions regression-test/data/variant_p0/nested.out
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,23 @@ v.xx tinyint Yes false \N NONE
1 {"callLimit":3,"number":"02124713252","type":"HOME"}
1 {"callLimit":5,"number":"5550219210","type":"GSM"}

-- !sql --
2 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
4 {"nested":[{"baaa":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
5 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
6 {"nested":[{"mmm":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
7 {"nested":[{"ba":"11111"},{"a":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
8 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
9 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
11 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
12 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}
13 {"nested":[{"yyy":"11111"},{"ax1111":"1111"},{"axxxb":100,"xxxy111":111},{"aaa":"11","ddsss":1024},{"xx":10}]}

-- !explode_sql --

-- !explode_sql --
19 10

-- !explode_sql --
2 10

53 changes: 52 additions & 1 deletion regression-test/suites/variant_p0/nested.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,64 @@ parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
sql """insert into var_nested2 select * from var_nested order by k limit 1024"""
qt_sql """select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_result_sink=false,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=false,partitioned_hash_join_rows_threshold=1048576,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadcast_join=false,enable_two_phase_read_opt=true,enable_common_expr_pushdown_for_inverted_index=true,enable_delete_sub_predicate_v2=true,min_revocable_mem=33554432,fetch_remote_schema_timeout_seconds=120,max_fetch_remote_schema_tablet_count=512,enable_join_spill=false,enable_sort_spill=false,enable_agg_spill=false,enable_force_spill=false,data_queue_max_blocks=1,spill_streaming_agg_mem_limit=268435456,external_agg_partition_bits=5) */ * from var_nested2 order by k limit 10;"""
qt_sql """select v['nested'] from var_nested2 where k < 10 order by k limit 10;"""
// explode variant array
// 0. nomal explode variant array
order_qt_explode_sql """select count(),cast(vv['xx'] as int) from var_nested lateral view explode_variant_array(v['nested']) tmp as vv where vv['xx'] = 10 group by cast(vv['xx'] as int)"""
sql """truncate table var_nested2"""
sql """insert into var_nested2 values(1119111, '{"eventId":1,"firstName":"Name1","lastName":"Surname1","body":{"phoneNumbers":[{"number":"5550219210","type":"GSM","callLimit":5},{"number":"02124713252","type":"HOME","callLimit":3},{"number":"05550219211","callLimit":2,"type":"WORK"}]}}
')"""
order_qt_explode_sql """select v['eventId'], phone_numbers from var_nested2 lateral view explode_variant_array(v['body']['phoneNumbers']) tmp1 as phone_numbers
where phone_numbers['type'] = 'GSM' OR phone_numbers['type'] = 'HOME' and phone_numbers['callLimit'] > 2;"""

// test array_function
sql "DROP TABLE IF EXISTS var_nested_array_agg"
sql """
CREATE TABLE IF NOT EXISTS var_nested_array_agg(
k bigint,
v variant
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false", "enable_unique_key_merge_on_write" = "true", "variant_enable_flatten_nested" = "true");
"""
sql "insert into var_nested_array_agg select * from var_nested"
// 1. array_contains
qt_sql "select * from var_nested_array_agg where array_contains(cast(v['nested']['xx'] as array<int>), 10) order by k limit 10"
// 2. array_agg scalar
sql "select k, array_agg(cast(v['nested'] as text)) from var_nested_array_agg group by k limit 10"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qt_sql ? just sql will not show the output in out file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output is not stable since the serailized v['nested'] may contains blank and is not stable, which is acceptable at present


// test explode_variant_array with abonomal case
sql "DROP TABLE IF EXISTS var_nested_explode_variant_with_abnomal"
sql """
CREATE TABLE IF NOT EXISTS var_nested_explode_variant_with_abnomal(
k bigint,
v variant
)
UNIQUE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false", "enable_unique_key_merge_on_write" = "true", "variant_enable_flatten_nested" = "true");
"""
sql "insert into var_nested_explode_variant_with_abnomal select * from var_nested"
// 1. v['nested']['x'] is null root
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['nested']['x']) tmp as vv where vv = 10 group by cast(vv as int)"""
// 2. v['nested']['xx'] is normal array
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['nested']['xx']) tmp as vv where vv = 10 group by cast(vv as int)"""
// 3. v['xx'] is none array scalar type
test {
sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['xx']) tmp as vv where vv = 10 group by cast(vv as int)"""
exception("explode not support none array type")
}
// 4. v['k1'] is json scalar type
test {
sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['k1']) tmp as vv where vv = 10 group by cast(vv as int)"""
exception("explode not support none array type")
}
// 5. toplevel nested array
sql "truncate table var_nested_explode_variant_with_abnomal"
sql """insert into var_nested_explode_variant_with_abnomal values(1, '[{"a" : 10}, {"b" : "20", "c" :1024, "a" : 11}]')"""
sql """insert into var_nested_explode_variant_with_abnomal values(2, '[{"a" : 10}, {"b" : "20", "a" : 150}]')"""
order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v['a']) tmp as vv where vv = 10 group by cast(vv as int)"""
// FIXME after refator
// order_qt_explode_sql """select count(),cast(vv as int) from var_nested_explode_variant_with_abnomal lateral view explode_variant_array(v) tmp as vv where vv['a'] = 10 group by cast(vv as int)"""
} finally {
// reset flags
}
Expand Down
Loading