Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug](function) fix first/last value return error with ignore null #44996

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
int64_t range_start, range_end;
if (!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
_parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
TAnalyticWindowBoundaryType::
CURRENT_ROW) { //[preceding, current_row],[current_row, following]
TAnalyticWindowBoundaryType::CURRENT_ROW) {
// [preceding, current_row], [current_row, following] rewrite it's same
// as could reuse the previous calculate result, so don't call _reset_agg_status function
// going on calculate, add up data, no need to reset state
range_start = _shared_state->current_row_position;
range_end = _shared_state->current_row_position +
1; //going on calculate,add up data, no need to reset state
range_end = _shared_state->current_row_position + 1;
} else {
_reset_agg_status();
range_end = _shared_state->current_row_position + _rows_end_offset + 1;
if (!_parent->cast<AnalyticSourceOperatorX>()
._window.__isset
.window_start) { //[preceding, offset] --unbound: [preceding, following]
//[preceding, offset] --unbound: [preceding, following]
if (!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
range_start = _partition_by_start.pos;
} else {
range_start = _shared_state->current_row_position + _rows_start_offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ struct ReaderFirstAndLastData {

bool has_set_value() { return _has_value; }

bool is_null() { return _data_value.is_null(); }

protected:
StoreType _data_value;
bool _has_value = false;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ AggregateFunctionPtr create_function_lead_lag_first_last(const String& name,
WhichDataType which(*type);

bool arg_ignore_null_value = false;
// FE have rewrite case first_value(k1,false)--->first_value(k1)
// so size is 2, must will be arg_ignore_null_value
if (argument_types.size() == 2) {
DCHECK(name == "first_value" || name == "last_value") << "invalid function name: " << name;
arg_ignore_null_value = true;
Expand Down
49 changes: 28 additions & 21 deletions be/src/vec/aggregate_functions/aggregate_function_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,31 +455,28 @@ struct WindowFunctionLagImpl : Data {
static const char* name() { return "lag"; }
};

// TODO: first_value && last_value in some corner case will be core,
// if need to simply change it, should set them to always nullable insert into null value, and register in cpp maybe be change
// But it's may be another better way to handle it
template <typename Data, bool arg_ignore_null = false>
struct WindowFunctionFirstImpl : Data {
void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end, const IColumn** columns) {
if (this->has_set_value()) {
// case 1: (has_set_value() = true && arg_ignore_null = false)
// case 2: (has_set_value() = true && arg_ignore_null = true && is_null() = false)
if ((this->has_set_value()) &&
(!arg_ignore_null || (arg_ignore_null && !this->is_null()))) {
return;
}
if (frame_start <= frame_end &&
frame_end <= partition_start) { //rewrite last_value when under partition
this->set_is_null(); //so no need more judge
DCHECK_LE(frame_start, frame_end);
if (frame_start >= partition_end || frame_end <= partition_start) {
this->set_is_null();
return;
}
frame_start = std::max<int64_t>(frame_start, partition_start);

if constexpr (arg_ignore_null) {
frame_end = std::min<int64_t>(frame_end, partition_end);

auto& second_arg = assert_cast<const ColumnVector<UInt8>&>(*columns[1]);
auto ignore_null_value = second_arg.get_data()[0];

if (ignore_null_value && columns[0]->is_nullable()) {
auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
// the valid range is: [frame_start, frame_end)
while (frame_start < frame_end - 1 && arg_nullable.is_null_at(frame_start)) {
frame_start++;
}
Expand All @@ -505,15 +502,25 @@ struct WindowFunctionLastImpl : Data {

if constexpr (arg_ignore_null) {
frame_start = std::max<int64_t>(frame_start, partition_start);

auto& second_arg = assert_cast<const ColumnVector<UInt8>&>(*columns[1]);
auto ignore_null_value = second_arg.get_data()[0];

if (ignore_null_value && columns[0]->is_nullable()) {
auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
while (frame_start < (frame_end - 1) && arg_nullable.is_null_at(frame_end - 1)) {
frame_end--;
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
// wants find a not null value in [frame_start, frame_end)
// iff has find: set_value and return directly
// iff not find: the while loop is finished
// case 1: iff has_set_value, means the previous window have value, could reuse it, so return directly
// case 2: iff not has_set_value, means there is none value, set it's to NULL
while (frame_start < frame_end) {
if (arg_nullable.is_null_at(frame_end - 1)) {
frame_end--;
} else {
this->set_value(columns, frame_end - 1);
return;
}
}
if (!this->has_set_value()) {
this->set_is_null();
}
return;
}
}

Expand Down
100 changes: 100 additions & 0 deletions regression-test/data/correctness_p0/test_first_value_window.out
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,103 @@
11 23 04-23-13 \N 10 10 10
12 24 02-24-10-21 \N \N \N \N

-- !select_default4 --
a 1 1 1 0
a \N 1 \N 1
a \N 1 \N 2
a \N 1 \N 3
b \N \N \N 4
b 3 3 3 5
b \N 3 \N 6
b 2 2 2 7

-- !select_default5 --
a \N \N \N 0
a 1 1 \N 1
a \N 1 \N 2
a \N 1 \N 3
b \N \N \N 4
b 3 3 \N 5
b \N 3 \N 6
b 2 3 \N 7

-- !select_default_desc --
a 2 3
a \N 2
a \N 1
a 1 0
b 2 7
b \N 6
b 3 5
b \N 4

-- !select_default_asc --
a 1 0
a \N 1
a \N 2
a 2 3
b \N 4
b 3 5
b \N 6
b 2 7

-- !select_default_last_rewrite_first --
a 1 1 0
a \N 1 1
a \N 1 2
a 2 1 3
b \N \N 4
b 3 3 5
b \N 3 6
b 2 3 7

-- !select_default6 --
a \N 2 \N 0
a 1 2 1 1
a 2 2 2 2
a \N 2 2 3
b \N 2 \N 4
b 3 2 3 5
b \N 2 3 6
b 2 2 2 7

-- !select_default_last_rewrite_first2 --
a 1 1 0
a \N 1 1
a \N 1 2
a 2 2 3
b \N \N 4
b 3 3 5
b \N 3 6
b 2 2 7

-- !select_default7 --
a 1 1 1 1 1 0
a \N 1 1 1 1 1
a \N 1 1 1 1 2
a 2 2 2 2 1 3
b \N \N \N \N \N 4
b 3 3 3 3 3 5
b \N 3 3 3 3 6
b 2 2 2 2 3 7

-- !select_default8 --
a 1 2 0
a \N \N 1
a \N \N 2
a 2 \N 3
b \N 2 4
b 3 \N 5
b \N \N 6
b 2 \N 7

-- !select_default9 --
a 1 2 0
a \N \N 1
a \N \N 2
a 2 \N 3
b \N 2 4
b 3 \N 5
b \N \N 6
b 2 \N 7

Loading
Loading