From e2678876e73b411c55c741bfb4d7db94b164c219 Mon Sep 17 00:00:00 2001 From: garenshi Date: Sat, 30 Nov 2024 17:16:25 +0800 Subject: [PATCH] 1 --- be/src/vec/exec/vtable_function_node.cpp | 304 ----------------------- 1 file changed, 304 deletions(-) delete mode 100644 be/src/vec/exec/vtable_function_node.cpp diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp deleted file mode 100644 index 334bf4e08c43818..000000000000000 --- a/be/src/vec/exec/vtable_function_node.cpp +++ /dev/null @@ -1,304 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/exec/vtable_function_node.h" - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "vec/columns/column.h" -#include "vec/core/block.h" -#include "vec/exprs/table_function/table_function.h" -#include "vec/exprs/table_function/table_function_factory.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" -#include "vec/utils/util.hpp" - -namespace doris { -class ObjectPool; -} // namespace doris - -namespace doris::vectorized { - -VTableFunctionNode::VTableFunctionNode(doris::ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs) { - _child_block = Block::create_shared(); -} - -Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); - - for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) { - VExprContextSPtr ctx; - RETURN_IF_ERROR(VExpr::create_expr_tree(texpr, ctx)); - _vfn_ctxs.push_back(ctx); - - auto root = ctx->root(); - const std::string& tf_name = root->fn().name.function_name; - TableFunction* fn = nullptr; - RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, _pool, &fn)); - fn->set_expr_context(ctx); - _fns.push_back(fn); - } - _fn_num = _fns.size(); - - // Prepare output slot ids - RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); - return Status::OK(); -} - -Status VTableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) { - // Prepare output slot ids - if (tnode.table_function_node.outputSlotIds.empty()) { - return Status::InternalError("Output slots of table function node is empty"); - } - SlotId max_id = -1; - for (auto slot_id : tnode.table_function_node.outputSlotIds) { - if (slot_id > max_id) { - max_id = slot_id; - } - } - _output_slot_ids = std::vector(max_id + 1, false); - for (auto slot_id : tnode.table_function_node.outputSlotIds) { - _output_slot_ids[slot_id] = true; - } - - return Status::OK(); -} - -bool VTableFunctionNode::_is_inner_and_empty() { - for (int i = 0; i < _fn_num; i++) { - // if any table function is not outer and has empty result, go to next child row - if (!_fns[i]->is_outer() && _fns[i]->current_empty()) { - return true; - } - } - return false; -} - -Status VTableFunctionNode::prepare(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_TIMER(_exec_timer); - - _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT); - for (auto* fn : _fns) { - RETURN_IF_ERROR(fn->prepare()); - } - RETURN_IF_ERROR(VExpr::prepare(_vfn_ctxs, state, _row_descriptor)); - - // get current all output slots - for (const auto& tuple_desc : this->_row_descriptor.tuple_descriptors()) { - for (const auto& slot_desc : tuple_desc->slots()) { - _output_slots.push_back(slot_desc); - } - } - - // get all input slots - for (const auto& child_tuple_desc : child(0)->row_desc().tuple_descriptors()) { - for (const auto& child_slot_desc : child_tuple_desc->slots()) { - _child_slots.push_back(child_slot_desc); - } - } - - for (size_t i = 0; i < _child_slots.size(); i++) { - if (_slot_need_copy(i)) { - _output_slot_indexs.push_back(i); - } else { - _useless_slot_indexs.push_back(i); - } - } - - _cur_child_offset = -1; - - return Status::OK(); -} - -Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - - RETURN_IF_CANCELLED(state); - - // if child_block is empty, get data from child. - while (need_more_input_data()) { - RETURN_IF_ERROR(child(0)->get_next_after_projects( - state, _child_block.get(), &_child_eos, - std::bind((Status(ExecNode::*)(RuntimeState*, Block*, bool*)) & ExecNode::get_next, - _children[0], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3))); - - RETURN_IF_ERROR(push(state, _child_block.get(), _child_eos)); - } - - return pull(state, block, eos); -} - -Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block, - bool* eos) { - MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots); - MutableColumns& columns = m_block.mutable_columns(); - for (int i = 0; i < _fn_num; i++) { - if (columns[i + _child_slots.size()]->is_nullable()) { - _fns[i]->set_nullable(); - } - } - - while (columns[_child_slots.size()]->size() < state->batch_size()) { - RETURN_IF_CANCELLED(state); - - if (_child_block->rows() == 0) { - break; - } - - bool skip_child_row = false; - while (columns[_child_slots.size()]->size() < state->batch_size()) { - int idx = _find_last_fn_eos_idx(); - if (idx == 0 || skip_child_row) { - _copy_output_slots(columns); - // all table functions' results are exhausted, process next child row. - _process_next_child_row(); - if (_cur_child_offset == -1) { - break; - } - } else if (idx < _fn_num && idx != -1) { - // some of table functions' results are exhausted. - if (!_roll_table_functions(idx)) { - // continue to process next child row. - continue; - } - } - - // if any table function is not outer and has empty result, go to next child row - if (skip_child_row = _is_inner_and_empty(); skip_child_row) { - continue; - } - - DCHECK_LE(1, _fn_num); - auto repeat_times = _fns[_fn_num - 1]->get_value( - columns[_child_slots.size() + _fn_num - 1], - state->batch_size() - columns[_child_slots.size()]->size()); - _current_row_insert_times += repeat_times; - for (int i = 0; i < _fn_num - 1; i++) { - _fns[i]->get_same_many_values(columns[i + _child_slots.size()], repeat_times); - } - } - } - - _copy_output_slots(columns); - - size_t row_size = columns[_child_slots.size()]->size(); - for (auto index : _useless_slot_indexs) { - columns[index]->insert_many_defaults(row_size - columns[index]->size()); - } - output_block->set_columns(std::move(columns)); - // 3. eval conjuncts - RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns())); - - *eos = _child_eos && _cur_child_offset == -1; - return Status::OK(); -} - -void VTableFunctionNode::_process_next_child_row() { - _cur_child_offset++; - LOG(INFO) << "[shi] VTableFunctionNode _cur_child_offset : " << _cur_child_offset; - - if (_cur_child_offset >= _child_block->rows()) { - // release block use count. - for (TableFunction* fn : _fns) { - fn->process_close(); - } - - release_block_memory(*_child_block); - _cur_child_offset = -1; - return; - } - - for (TableFunction* fn : _fns) { - fn->process_row(_cur_child_offset); - } -} - -// Returns the index of fn of the last eos counted from back to front -// eg: there are 3 functions in `_fns` -// eos: false, true, true -// return: 1 -// -// eos: false, false, true -// return: 2 -// -// eos: false, false, false -// return: -1 -// -// eos: true, true, true -// return: 0 -// -// return: -// 0: all fns are eos -// -1: all fns are not eos -// >0: some of fns are eos -int VTableFunctionNode::_find_last_fn_eos_idx() { - for (int i = _fn_num - 1; i >= 0; --i) { - if (!_fns[i]->eos()) { - if (i == _fn_num - 1) { - return -1; - } else { - return i + 1; - } - } - } - // all eos - return 0; -} - -// Roll to reset the table function. -// Eg: -// There are 3 functions f1, f2 and f3 in `_fns`. -// If `last_eos_idx` is 1, which means f2 and f3 are eos. -// So we need to forward f1, and reset f2 and f3. -bool VTableFunctionNode::_roll_table_functions(int last_eos_idx) { - int i = last_eos_idx - 1; - for (; i >= 0; --i) { - _fns[i]->forward(); - if (!_fns[i]->eos()) { - break; - } - } - if (i == -1) { - // after forward, all functions are eos. - // we should process next child row to get more table function results. - return false; - } - - for (int j = i + 1; j < _fn_num; ++j) { - _fns[j]->reset(); - } - - return true; -} - -} // namespace doris::vectorized