diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index dbfa16158df118..d704a721c28c34 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -354,17 +354,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { int64_t range_start, range_end; if (!_parent->cast()._window.__isset.window_start && _parent->cast()._window.window_end.type == - TAnalyticWindowBoundaryType:: - CURRENT_ROW) { //[preceding, current_row],[current_row, following] + TAnalyticWindowBoundaryType::CURRENT_ROW) { + // [preceding, current_row], [current_row, following] rewrite it's same + // as could reuse the previous calculate result, so don't call _reset_agg_status function + // going on calculate, add up data, no need to reset state range_start = _shared_state->current_row_position; - range_end = _shared_state->current_row_position + - 1; //going on calculate,add up data, no need to reset state + range_end = _shared_state->current_row_position + 1; } else { _reset_agg_status(); range_end = _shared_state->current_row_position + _rows_end_offset + 1; - if (!_parent->cast() - ._window.__isset - .window_start) { //[preceding, offset] --unbound: [preceding, following] + //[preceding, offset] --unbound: [preceding, following] + if (!_parent->cast()._window.__isset.window_start) { range_start = _partition_by_start.pos; } else { range_start = _shared_state->current_row_position + _rows_start_offset; diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h index d11350a5b3a323..859a2b50e0955a 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader_first_last.h @@ -139,6 +139,8 @@ struct ReaderFirstAndLastData { bool has_set_value() { return _has_value; } + bool is_null() { return _data_value.is_null(); } + protected: StoreType _data_value; bool _has_value = false; diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp b/be/src/vec/aggregate_functions/aggregate_function_window.cpp index 9da838a6b9067c..2d10083488b40b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp @@ -40,6 +40,8 @@ AggregateFunctionPtr create_function_lead_lag_first_last(const String& name, WhichDataType which(*type); bool arg_ignore_null_value = false; + // FE have rewrite case first_value(k1,false)--->first_value(k1) + // so size is 2, must will be arg_ignore_null_value if (argument_types.size() == 2) { DCHECK(name == "first_value" || name == "last_value") << "invalid function name: " << name; arg_ignore_null_value = true; diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index d293383feb6f45..7e22d6c0e1f146 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -457,31 +457,28 @@ struct WindowFunctionLagImpl : Data { static const char* name() { return "lag"; } }; -// TODO: first_value && last_value in some corner case will be core, -// if need to simply change it, should set them to always nullable insert into null value, and register in cpp maybe be change -// But it's may be another better way to handle it template struct WindowFunctionFirstImpl : Data { void add_range_single_place(int64_t partition_start, int64_t partition_end, int64_t frame_start, int64_t frame_end, const IColumn** columns) { - if (this->has_set_value()) { + // case 1: (has_set_value() = true && arg_ignore_null = false) + // case 2: (has_set_value() = true && arg_ignore_null = true && is_null() = false) + if ((this->has_set_value()) && + (!arg_ignore_null || (arg_ignore_null && !this->is_null()))) { return; } - if (frame_start <= frame_end && - frame_end <= partition_start) { //rewrite last_value when under partition - this->set_is_null(); //so no need more judge + DCHECK_LE(frame_start, frame_end); + if (frame_start >= partition_end || frame_end <= partition_start) { + this->set_is_null(); return; } frame_start = std::max(frame_start, partition_start); if constexpr (arg_ignore_null) { frame_end = std::min(frame_end, partition_end); - - auto& second_arg = assert_cast&>(*columns[1]); - auto ignore_null_value = second_arg.get_data()[0]; - - if (ignore_null_value && columns[0]->is_nullable()) { - auto& arg_nullable = assert_cast(*columns[0]); + if (columns[0]->is_nullable()) { + const auto& arg_nullable = assert_cast(*columns[0]); + // the valid range is: [frame_start, frame_end) while (frame_start < frame_end - 1 && arg_nullable.is_null_at(frame_start)) { frame_start++; } @@ -507,15 +504,25 @@ struct WindowFunctionLastImpl : Data { if constexpr (arg_ignore_null) { frame_start = std::max(frame_start, partition_start); - - auto& second_arg = assert_cast&>(*columns[1]); - auto ignore_null_value = second_arg.get_data()[0]; - - if (ignore_null_value && columns[0]->is_nullable()) { - auto& arg_nullable = assert_cast(*columns[0]); - while (frame_start < (frame_end - 1) && arg_nullable.is_null_at(frame_end - 1)) { - frame_end--; + if (columns[0]->is_nullable()) { + const auto& arg_nullable = assert_cast(*columns[0]); + // wants find a not null value in [frame_start, frame_end) + // iff has find: set_value and return directly + // iff not find: the while loop is finished + // case 1: iff has_set_value, means the previous window have value, could reuse it, so return directly + // case 2: iff not has_set_value, means there is none value, set it's to NULL + while (frame_start < frame_end) { + if (arg_nullable.is_null_at(frame_end - 1)) { + frame_end--; + } else { + this->set_value(columns, frame_end - 1); + return; + } } + if (!this->has_set_value()) { + this->set_is_null(); + } + return; } } diff --git a/regression-test/data/correctness_p0/test_first_value_window.out b/regression-test/data/correctness_p0/test_first_value_window.out index 9951ad95c60bf4..73dbcf3ed34eda 100644 --- a/regression-test/data/correctness_p0/test_first_value_window.out +++ b/regression-test/data/correctness_p0/test_first_value_window.out @@ -41,3 +41,103 @@ 11 23 04-23-13 \N 10 10 10 12 24 02-24-10-21 \N \N \N \N +-- !select_default4 -- +a 1 1 1 0 +a \N 1 \N 1 +a \N 1 \N 2 +a \N 1 \N 3 +b \N \N \N 4 +b 3 3 3 5 +b \N 3 \N 6 +b 2 2 2 7 + +-- !select_default5 -- +a \N \N \N 0 +a 1 1 \N 1 +a \N 1 \N 2 +a \N 1 \N 3 +b \N \N \N 4 +b 3 3 \N 5 +b \N 3 \N 6 +b 2 3 \N 7 + +-- !select_default_desc -- +a 2 3 +a \N 2 +a \N 1 +a 1 0 +b 2 7 +b \N 6 +b 3 5 +b \N 4 + +-- !select_default_asc -- +a 1 0 +a \N 1 +a \N 2 +a 2 3 +b \N 4 +b 3 5 +b \N 6 +b 2 7 + +-- !select_default_last_rewrite_first -- +a 1 1 0 +a \N 1 1 +a \N 1 2 +a 2 1 3 +b \N \N 4 +b 3 3 5 +b \N 3 6 +b 2 3 7 + +-- !select_default6 -- +a \N 2 \N 0 +a 1 2 1 1 +a 2 2 2 2 +a \N 2 2 3 +b \N 2 \N 4 +b 3 2 3 5 +b \N 2 3 6 +b 2 2 2 7 + +-- !select_default_last_rewrite_first2 -- +a 1 1 0 +a \N 1 1 +a \N 1 2 +a 2 2 3 +b \N \N 4 +b 3 3 5 +b \N 3 6 +b 2 2 7 + +-- !select_default7 -- +a 1 1 1 1 1 0 +a \N 1 1 1 1 1 +a \N 1 1 1 1 2 +a 2 2 2 2 1 3 +b \N \N \N \N \N 4 +b 3 3 3 3 3 5 +b \N 3 3 3 3 6 +b 2 2 2 2 3 7 + +-- !select_default8 -- +a 1 2 0 +a \N \N 1 +a \N \N 2 +a 2 \N 3 +b \N 2 4 +b 3 \N 5 +b \N \N 6 +b 2 \N 7 + +-- !select_default9 -- +a 1 2 0 +a \N \N 1 +a \N \N 2 +a 2 \N 3 +b \N 2 4 +b 3 \N 5 +b \N \N 6 +b 2 \N 7 + diff --git a/regression-test/suites/correctness_p0/test_first_value_window.groovy b/regression-test/suites/correctness_p0/test_first_value_window.groovy index 8d0a3097056d18..7c1582e0e61b60 100644 --- a/regression-test/suites/correctness_p0/test_first_value_window.groovy +++ b/regression-test/suites/correctness_p0/test_first_value_window.groovy @@ -159,4 +159,193 @@ suite("test_first_value_window") { ,first_value(`state`, 1) over(partition by `myday` order by `time_col` rows between 1 preceding and 1 following) v3 from ${tableName3} order by `id`, `myday`, `time_col`; """ + + qt_select_default4 """ + SELECT uid + ,amt + ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1 + ,LAST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s + ; + """ + + qt_select_default5 """ + SELECT uid + ,amt + ,FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt1 + ,FIRST_VALUE(amt, false) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2 + ,time_s + FROM ( + SELECT 'a' AS uid, NULL AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s + ; + """ + qt_select_default_desc """ + SELECT uid + ,amt + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + order by uid,time_s desc; + """ + + qt_select_default_asc """ + SELECT uid + ,amt + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + order by uid,time_s ASC; + """ + + // FIRST_VALUE: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + qt_select_default_last_rewrite_first """ + SELECT uid + ,amt + ,(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s; + """ + + qt_select_default6 """ + SELECT uid + ,amt + ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED following) amt1 + ,LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) amt2 + ,time_s + FROM ( + SELECT 'a' AS uid, null AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, 1 AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s + ; + """ + + //last value: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + qt_select_default_last_rewrite_first2 """ + SELECT uid + ,amt + ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s; + """ + + qt_select_default7 """ + SELECT uid + ,amt + ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)) amt1 + ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ASC ROWS BETWEEN 100 PRECEDING AND CURRENT ROW)) amt_not + ,COALESCE(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt2 + ,COALESCE(LAST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) amt3 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s + ; + """ + + qt_select_default8 """ + SELECT uid + ,amt + ,(FIRST_VALUE(amt, true) OVER(PARTITION BY uid ORDER BY time_s ROWS between 3 following AND 6 FOLLOWING)) amt3 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s; + """ + + qt_select_default9 """ + SELECT uid + ,amt + ,(FIRST_VALUE(amt) OVER(PARTITION BY uid ORDER BY time_s ROWS between 3 following AND 6 FOLLOWING)) amt3 + ,time_s + FROM ( + SELECT 'a' AS uid, 1 AS amt, 0 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 1 AS time_s UNION ALL + SELECT 'a' AS uid, null AS amt, 2 AS time_s UNION ALL + SELECT 'a' AS uid, 2 AS amt, 3 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 4 AS time_s UNION ALL + SELECT 'b' AS uid, 3 AS amt, 5 AS time_s UNION ALL + SELECT 'b' AS uid, null AS amt, 6 AS time_s UNION ALL + SELECT 'b' AS uid, 2 AS amt, 7 AS time_s + ) t + ORDER BY uid, time_s; + """ }