diff --git a/be/src/common/status.h b/be/src/common/status.h index ebe8c47c703227..e49bffa70f169f 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -113,6 +113,7 @@ E(ALREADY_CANCELLED, -237); E(TOO_MANY_SEGMENTS, -238); E(ALREADY_CLOSED, -239); E(SERVICE_UNAVAILABLE, -240); +E(NEED_SEND_AGAIN, -241); E(CE_CMD_PARAMS_ERROR, -300); E(CE_BUFFER_TOO_SMALL, -301); E(CE_CMD_NOT_VALID, -302); @@ -285,6 +286,7 @@ constexpr bool capture_stacktrace(int code) { && code != ErrorCode::TOO_MANY_VERSION && code != ErrorCode::ALREADY_CANCELLED && code != ErrorCode::ALREADY_CLOSED + && code != ErrorCode::NEED_SEND_AGAIN && code != ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST && code != ErrorCode::BE_NO_SUITABLE_VERSION && code != ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION @@ -428,6 +430,7 @@ class Status { ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR) ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED) ERROR_CTOR(HttpError, HTTP_ERROR) + ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN) #undef ERROR_CTOR template diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 764dddf4e9b9fd..71ca504d3a010a 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -27,7 +27,10 @@ #include #include +#include +#include "common/exception.h" +#include "common/status.h" #include "olap/tablet_schema.h" #include "runtime/descriptors.h" #include "runtime/large_int_value.h" @@ -37,8 +40,11 @@ #include "util/hash_util.hpp" #include "util/string_parser.hpp" #include "util/string_util.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" -#include "vec/exprs/vexpr.h" +#include "vec/exprs/vliteral.h" #include "vec/runtime/vdatetime_value.h" namespace doris { @@ -57,6 +63,61 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const { } } +bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator lhs, + const BlockRowWithIndicator rhs) const { + vectorized::Block* l_block = std::get<0>(lhs); + vectorized::Block* r_block = std::get<0>(rhs); + int32_t l_row = std::get<1>(lhs); + int32_t r_row = std::get<1>(rhs); + bool l_use_new = std::get<2>(lhs); + bool r_use_new = std::get<2>(rhs); + + if (l_row == -1) { + return false; + } else if (r_row == -1) { + return true; + } + + if (_param_locs.empty()) { // no transform, use origin column + for (auto slot_loc : _slot_locs) { + auto res = l_block->get_by_position(slot_loc).column->compare_at( + l_row, r_row, *r_block->get_by_position(slot_loc).column, -1); + if (res != 0) { + return res < 0; + } + } + } else { // use transformed column to compare + DCHECK(_slot_locs.size() == _param_locs.size()) + << _slot_locs.size() << ' ' << _param_locs.size(); + + //TODO: use template to accelerate this for older compiler. + const std::vector* l_index = l_use_new ? &_param_locs : &_slot_locs; + const std::vector* r_index = r_use_new ? &_param_locs : &_slot_locs; + + for (int i = 0; i < _slot_locs.size(); i++) { + vectorized::ColumnPtr l_col = l_block->get_by_position((*l_index)[i]).column; + vectorized::ColumnPtr r_col = r_block->get_by_position((*r_index)[i]).column; + //TODO: when we support any function for transform, maybe the best way is refactor all doris' functions to its essential nullable mode. + if (auto* nullable = + vectorized::check_and_get_column(l_col)) { + l_col = nullable->get_nested_column_ptr(); + } + if (auto* nullable = + vectorized::check_and_get_column(r_col)) { + r_col = nullable->get_nested_column_ptr(); + } + + auto res = l_col->compare_at(l_row, r_row, *r_col, -1); + if (res != 0) { + return res < 0; + } + } + } + + // equal, return false + return false; +} + Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _db_id = pschema.db_id(); _table_id = pschema.table_id(); @@ -208,11 +269,23 @@ VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptrtuple_desc()->slots()), - _mem_tracker(std::make_unique("OlapTablePartitionParam")) { + _mem_tracker(std::make_unique("OlapTablePartitionParam")), + _part_type(t_param.partition_type) { for (auto slot : _slots) { _partition_block.insert( {slot->get_empty_mutable_column(), slot->get_data_type_ptr(), slot->col_name()}); } + + if (t_param.__isset.enable_automatic_partition && t_param.enable_automatic_partition) { + _is_auto_partiton = true; + Status st = vectorized::VExpr::create_expr_tree(t_param.partition_function_exprs[0], + _part_func_ctx); + if (!st.ok()) { + throw Exception(Status::InternalError("Partition function expr is not valid"), + "Partition function expr is not valid"); + } + _partition_function = _part_func_ctx->root(); + } } VOlapTablePartitionParam::~VOlapTablePartitionParam() { @@ -243,8 +316,8 @@ Status VOlapTablePartitionParam::init() { } _partitions_map.reset( - new std::map( - VOlapTablePartKeyComparator(_partition_slot_locs))); + new std::map( + VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs))); if (_t_param.__isset.distributed_columns) { for (auto& col : _t_param.distributed_columns) { RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed")); @@ -272,67 +345,22 @@ Status VOlapTablePartitionParam::init() { }; } - DCHECK(!_t_param.partitions.empty()) << "must have at least 1 partition"; - _is_in_partition = _t_param.partitions[0].__isset.in_keys; + // for both auto/non-auto partition table. + _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED; // initial partitions for (int i = 0; i < _t_param.partitions.size(); ++i) { const TOlapTablePartition& t_part = _t_param.partitions[i]; - auto part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); - part->id = t_part.id; - part->is_mutable = t_part.is_mutable; - - if (!_is_in_partition) { - if (t_part.__isset.start_keys) { - RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key)); - } - - if (t_part.__isset.end_keys) { - RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key)); - } - } else { - for (const auto& keys : t_part.in_keys) { - RETURN_IF_ERROR(_create_partition_keys( - keys, &part->in_keys.emplace_back(&_partition_block, -1))); - } - if (t_part.__isset.is_default_partition && t_part.is_default_partition) { - _default_partition = part; - } - } - - part->num_buckets = t_part.num_buckets; - auto num_indexes = _schema->indexes().size(); - if (t_part.indexes.size() != num_indexes) { - return Status::InternalError( - "number of partition's index is not equal with schema's" - ", num_part_indexes={}, num_schema_indexes={}", - t_part.indexes.size(), num_indexes); - } - part->indexes = t_part.indexes; - std::sort(part->indexes.begin(), part->indexes.end(), - [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) { - return lhs.index_id < rhs.index_id; - }); - // check index - for (int j = 0; j < num_indexes; ++j) { - if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { - std::stringstream ss; - ss << "partition's index is not equal with schema's" - << ", part_index=" << part->indexes[j].index_id - << ", schema_index=" << _schema->indexes()[j]->index_id; - return Status::InternalError( - "partition's index is not equal with schema's" - ", part_index={}, schema_index={}", - part->indexes[j].index_id, _schema->indexes()[j]->index_id); - } - } + VOlapTablePartition* part = nullptr; + RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); if (_is_in_partition) { for (auto& in_key : part->in_keys) { - _partitions_map->emplace(&in_key, part); + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); } } else { - _partitions_map->emplace(&part->end_key, part); + _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, + part); } } @@ -343,19 +371,32 @@ Status VOlapTablePartitionParam::init() { bool VOlapTablePartitionParam::find_partition(BlockRow* block_row, const VOlapTablePartition** partition) const { - auto it = _is_in_partition ? _partitions_map->find(block_row) - : _partitions_map->upper_bound(block_row); + // block_row is gave by inserting process. So try to use transformed index. + auto it = + _is_in_partition + ? _partitions_map->find(std::tuple {block_row->first, block_row->second, true}) + : _partitions_map->upper_bound( + std::tuple {block_row->first, block_row->second, true}); // for list partition it might result in default partition if (_is_in_partition) { *partition = (it != _partitions_map->end()) ? it->second : _default_partition; it = _partitions_map->end(); } - if (it != _partitions_map->end() && _part_contains(it->second, block_row)) { + if (it != _partitions_map->end() && + _part_contains(it->second, std::tuple {block_row->first, block_row->second, true})) { *partition = it->second; } return (*partition != nullptr); } +bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part, + BlockRowWithIndicator key) const { + // start_key.second == -1 means only single partition + VOlapTablePartKeyComparator comparator(_partition_slot_locs, _transformed_slot_locs); + return part->start_key.second == -1 || + !comparator(key, std::tuple {part->start_key.first, part->start_key.second, false}); +} + uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const { return _compute_tablet_index(block_row, partition.num_buckets); @@ -369,6 +410,61 @@ Status VOlapTablePartitionParam::_create_partition_keys(const std::vectorid = t_part.id; + part_result->is_mutable = t_part.is_mutable; + + if (!_is_in_partition) { + if (t_part.__isset.start_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part_result->start_key)); + } + + if (t_part.__isset.end_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part_result->end_key)); + } + } else { + for (const auto& keys : t_part.in_keys) { + RETURN_IF_ERROR(_create_partition_keys( + keys, &part_result->in_keys.emplace_back(&_partition_block, -1))); + } + if (t_part.__isset.is_default_partition && t_part.is_default_partition && + _default_partition == nullptr) { + _default_partition = part_result; + } + } + + part_result->num_buckets = t_part.num_buckets; + auto num_indexes = _schema->indexes().size(); + if (t_part.indexes.size() != num_indexes) { + return Status::InternalError( + "number of partition's index is not equal with schema's" + ", num_part_indexes={}, num_schema_indexes={}", + t_part.indexes.size(), num_indexes); + } + part_result->indexes = t_part.indexes; + std::sort(part_result->indexes.begin(), part_result->indexes.end(), + [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) { + return lhs.index_id < rhs.index_id; + }); + // check index + for (int j = 0; j < num_indexes; ++j) { + if (part_result->indexes[j].index_id != _schema->indexes()[j]->index_id) { + std::stringstream ss; + ss << "partition's index is not equal with schema's" + << ", part_index=" << part_result->indexes[j].index_id + << ", schema_index=" << _schema->indexes()[j]->index_id; + return Status::InternalError( + "partition's index is not equal with schema's" + ", part_index={}, schema_index={}", + part_result->indexes[j].index_id, _schema->indexes()[j]->index_id); + } + } + return Status::OK(); +} + Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) { auto column = std::move(*part_key->first->get_by_position(pos).column).mutate(); @@ -457,4 +553,72 @@ Status VOlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr, return Status::OK(); } +Status VOlapTablePartitionParam::add_partitions( + const std::vector& partitions) { + for (const auto& t_part : partitions) { + auto part = _obj_pool.add(new VOlapTablePartition(&_partition_block)); + part->id = t_part.id; + part->is_mutable = t_part.is_mutable; + + DCHECK(t_part.__isset.start_keys == t_part.__isset.end_keys && + t_part.__isset.start_keys != t_part.__isset.in_keys); + // range partition + if (t_part.__isset.start_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key)); + } + if (t_part.__isset.end_keys) { + RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key)); + } + // list partition - we only set 1 value in 1 partition for new created ones + if (t_part.__isset.in_keys) { + for (const auto& keys : t_part.in_keys) { + RETURN_IF_ERROR(_create_partition_keys( + keys, &part->in_keys.emplace_back(&_partition_block, -1))); + } + if (t_part.__isset.is_default_partition && t_part.is_default_partition) { + _default_partition = part; + } + } + + part->num_buckets = t_part.num_buckets; + auto num_indexes = _schema->indexes().size(); + if (t_part.indexes.size() != num_indexes) { + return Status::InternalError( + "number of partition's index is not equal with schema's" + ", num_part_indexes={}, num_schema_indexes={}", + t_part.indexes.size(), num_indexes); + } + part->indexes = t_part.indexes; + std::sort(part->indexes.begin(), part->indexes.end(), + [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) { + return lhs.index_id < rhs.index_id; + }); + // check index + for (int j = 0; j < num_indexes; ++j) { + if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) { + std::stringstream ss; + ss << "partition's index is not equal with schema's" + << ", part_index=" << part->indexes[j].index_id + << ", schema_index=" << _schema->indexes()[j]->index_id; + return Status::InternalError( + "partition's index is not equal with schema's" + ", part_index={}, schema_index={}", + part->indexes[j].index_id, _schema->indexes()[j]->index_id); + } + } + _partitions.emplace_back(part); + // after _creating_partiton_keys + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + } + } else { + _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, + part); + } + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index c508b322ee172f..3e6ab7b94be922 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,8 @@ #include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" #include "vec/exprs/vexpr_fwd.h" namespace doris { @@ -110,7 +113,8 @@ using OlapTableIndexTablets = TOlapTableIndexTablets; // } using BlockRow = std::pair; -using VecBlock = vectorized::Block; +using BlockRowWithIndicator = + std::tuple; // [block, column, is_transformed] struct VOlapTablePartition { int64_t id = 0; @@ -125,32 +129,20 @@ struct VOlapTablePartition { : start_key {partition_block, -1}, end_key {partition_block, -1} {} }; +// this is only used by tablet_sink. so we can assume it's inited by its' descriptor. class VOlapTablePartKeyComparator { public: - VOlapTablePartKeyComparator(const std::vector& slot_locs) : _slot_locs(slot_locs) {} + VOlapTablePartKeyComparator(const std::vector& slot_locs, + const std::vector& params_locs) + : _slot_locs(slot_locs), _param_locs(params_locs) {} // return true if lhs < rhs - // 'row' is -1 mean - bool operator()(const BlockRow* lhs, const BlockRow* rhs) const { - if (lhs->second == -1) { - return false; - } else if (rhs->second == -1) { - return true; - } - - for (auto slot_loc : _slot_locs) { - auto res = lhs->first->get_by_position(slot_loc).column->compare_at( - lhs->second, rhs->second, *rhs->first->get_by_position(slot_loc).column, -1); - if (res != 0) { - return res < 0; - } - } - // equal, return false - return false; - } + // 'row' is -1 mean maximal boundary + bool operator()(const BlockRowWithIndicator lhs, const BlockRowWithIndicator rhs) const; private: const std::vector& _slot_locs; + const std::vector& _param_locs; }; // store an olap table's tablet information @@ -174,6 +166,26 @@ class VOlapTablePartitionParam { const std::vector& get_partitions() const { return _partitions; } + // it's same with auto now because we only support transformed partition in auto partition. may expand in future + bool is_projection_partition() const { return _is_auto_partiton; } + bool is_auto_partition() const { return _is_auto_partiton; } + + std::vector get_partition_keys() const { return _partition_slot_locs; } + + Status add_partitions(const std::vector& partitions); + + //TODO: use vector when we support multi partition column for auto-partition + vectorized::VExprContextSPtr get_part_func_ctx() { return _part_func_ctx; } + vectorized::VExprSPtr get_partition_function() { return _partition_function; } + + // which will affect _partition_block + Status generate_partition_from(const TOlapTablePartition& t_part, + VOlapTablePartition*& part_result); + + void set_transformed_slots(const std::vector& new_slots) { + _transformed_slot_locs = new_slots; + } + private: Status _create_partition_keys(const std::vector& t_exprs, BlockRow* part_key); @@ -182,11 +194,7 @@ class VOlapTablePartitionParam { std::function _compute_tablet_index; // check if this partition contain this key - bool _part_contains(VOlapTablePartition* part, BlockRow* key) const { - // start_key.second == -1 means only single partition - VOlapTablePartKeyComparator comparator(_partition_slot_locs); - return part->start_key.second == -1 || !comparator(key, &part->start_key); - } + bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const; // this partition only valid in this schema std::shared_ptr _schema; @@ -194,21 +202,32 @@ class VOlapTablePartitionParam { const std::vector& _slots; std::vector _partition_slot_locs; + std::vector _transformed_slot_locs; std::vector _distributed_slot_locs; ObjectPool _obj_pool; vectorized::Block _partition_block; std::unique_ptr _mem_tracker; std::vector _partitions; - std::unique_ptr> + // For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true. + // so that we can distinguish which column index to use (origin slots or transformed slots). + std::unique_ptr< + std::map> _partitions_map; bool _is_in_partition = false; uint32_t _mem_usage = 0; // only works when using list partition, the resource is owned by _partitions VOlapTablePartition* _default_partition = nullptr; + + // for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition. + bool _is_auto_partiton = false; + vectorized::VExprContextSPtr _part_func_ctx = nullptr; + vectorized::VExprSPtr _partition_function = nullptr; + TPartitionType::type _part_type; // support list or range }; +// indicate where's the tablet and all its replications (node-wise) using TabletLocation = TTabletLocation; // struct TTabletLocation { // 1: required i64 tablet_id @@ -235,9 +254,17 @@ class OlapTableLocationParam { return nullptr; } + void add_locations(std::vector& locations) { + for (auto& location : locations) { + if (_tablets.find(location.tablet_id) == _tablets.end()) { + _tablets[location.tablet_id] = &location; + } + } + } + private: TOlapTableLocationParam _t_param; - + // [tablet_id, tablet]. tablet has id, also. std::unordered_map _tablets; }; @@ -278,6 +305,15 @@ class DorisNodesInfo { return nullptr; } + void add_nodes(const std::vector& t_nodes) { + for (const auto& node : t_nodes) { + auto node_info = find_node(node.id); + if (node_info == nullptr) { + _nodes.emplace(node.id, node); + } + } + } + const std::unordered_map& nodes_info() { return _nodes; } private: diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 5fc73f6f59f8aa..9275b87ffbc986 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -193,89 +193,7 @@ PFilterType get_type(RuntimeFilterType type) { } Status create_literal(const TypeDescriptor& type, const void* data, vectorized::VExprSPtr& expr) { - TExprNode node; - - switch (type.type) { - case TYPE_BOOLEAN: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_TINYINT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_SMALLINT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_INT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_BIGINT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_LARGEINT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_FLOAT: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DOUBLE: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DATEV2: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DATETIMEV2: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DATE: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DATETIME: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_DECIMALV2: { - create_texpr_literal_node(data, &node, type.precision, type.scale); - break; - } - case TYPE_DECIMAL32: { - create_texpr_literal_node(data, &node, type.precision, type.scale); - break; - } - case TYPE_DECIMAL64: { - create_texpr_literal_node(data, &node, type.precision, type.scale); - break; - } - case TYPE_DECIMAL128I: { - create_texpr_literal_node(data, &node, type.precision, type.scale); - break; - } - case TYPE_CHAR: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_VARCHAR: { - create_texpr_literal_node(data, &node); - break; - } - case TYPE_STRING: { - create_texpr_literal_node(data, &node); - break; - } - default: - DCHECK(false); - return Status::InvalidArgument("Invalid type!"); - } + TExprNode node = create_texpr_node_from(data, type.type, type.precision, type.scale); try { expr = vectorized::VLiteral::create_shared(node); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 2f6b35a377a866..f03b27cfbf9311 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -42,6 +42,7 @@ #include "vec/common/string_ref.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr.h" #include "vec/runtime/vdatetime_value.h" namespace butil { @@ -452,143 +453,4 @@ class RuntimeFilterWrapperHolder { private: WrapperPtr _wrapper; }; - -// copied from expr.h since it is only used in runtime filter - -template -Status create_texpr_literal_node(const void* data, TExprNode* node, int precision = 0, - int scale = 0) { - if constexpr (T == TYPE_BOOLEAN) { - auto origin_value = reinterpret_cast(data); - TBoolLiteral boolLiteral; - (*node).__set_node_type(TExprNodeType::BOOL_LITERAL); - boolLiteral.__set_value(*origin_value); - (*node).__set_bool_literal(boolLiteral); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); - } else if constexpr (T == TYPE_TINYINT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::INT_LITERAL); - TIntLiteral intLiteral; - intLiteral.__set_value(*origin_value); - (*node).__set_int_literal(intLiteral); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT)); - } else if constexpr (T == TYPE_SMALLINT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::INT_LITERAL); - TIntLiteral intLiteral; - intLiteral.__set_value(*origin_value); - (*node).__set_int_literal(intLiteral); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT)); - } else if constexpr (T == TYPE_INT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::INT_LITERAL); - TIntLiteral intLiteral; - intLiteral.__set_value(*origin_value); - (*node).__set_int_literal(intLiteral); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT)); - } else if constexpr (T == TYPE_BIGINT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::INT_LITERAL); - TIntLiteral intLiteral; - intLiteral.__set_value(*origin_value); - (*node).__set_int_literal(intLiteral); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT)); - } else if constexpr (T == TYPE_LARGEINT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL); - TLargeIntLiteral large_int_literal; - large_int_literal.__set_value(LargeIntValue::to_string(*origin_value)); - (*node).__set_large_int_literal(large_int_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT)); - } else if constexpr ((T == TYPE_DATE) || (T == TYPE_DATETIME) || (T == TYPE_TIME)) { - auto origin_value = reinterpret_cast(data); - TDateLiteral date_literal; - char convert_buffer[30]; - origin_value->to_string(convert_buffer); - date_literal.__set_value(convert_buffer); - (*node).__set_date_literal(date_literal); - (*node).__set_node_type(TExprNodeType::DATE_LITERAL); - if (origin_value->type() == TimeType::TIME_DATE) { - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATE)); - } else if (origin_value->type() == TimeType::TIME_DATETIME) { - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIME)); - } else if (origin_value->type() == TimeType::TIME_TIME) { - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME)); - } - } else if constexpr (T == TYPE_DATEV2) { - auto origin_value = - reinterpret_cast*>(data); - TDateLiteral date_literal; - char convert_buffer[30]; - origin_value->to_string(convert_buffer); - date_literal.__set_value(convert_buffer); - (*node).__set_date_literal(date_literal); - (*node).__set_node_type(TExprNodeType::DATE_LITERAL); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATEV2)); - } else if constexpr (T == TYPE_DATETIMEV2) { - auto origin_value = - reinterpret_cast*>( - data); - TDateLiteral date_literal; - char convert_buffer[30]; - origin_value->to_string(convert_buffer); - date_literal.__set_value(convert_buffer); - (*node).__set_date_literal(date_literal); - (*node).__set_node_type(TExprNodeType::DATE_LITERAL); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIMEV2)); - } else if constexpr (T == TYPE_DECIMALV2) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); - TDecimalLiteral decimal_literal; - decimal_literal.__set_value(origin_value->to_string()); - (*node).__set_decimal_literal(decimal_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2, precision, scale)); - } else if constexpr (T == TYPE_DECIMAL32) { - auto origin_value = reinterpret_cast*>(data); - (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); - TDecimalLiteral decimal_literal; - decimal_literal.__set_value(origin_value->to_string(scale)); - (*node).__set_decimal_literal(decimal_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32, precision, scale)); - } else if constexpr (T == TYPE_DECIMAL64) { - auto origin_value = reinterpret_cast*>(data); - (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); - TDecimalLiteral decimal_literal; - decimal_literal.__set_value(origin_value->to_string(scale)); - (*node).__set_decimal_literal(decimal_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64, precision, scale)); - } else if constexpr (T == TYPE_DECIMAL128I) { - auto origin_value = reinterpret_cast*>(data); - (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); - TDecimalLiteral decimal_literal; - decimal_literal.__set_value(origin_value->to_string(scale)); - (*node).__set_decimal_literal(decimal_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL128I, precision, scale)); - } else if constexpr (T == TYPE_FLOAT) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); - TFloatLiteral float_literal; - float_literal.__set_value(*origin_value); - (*node).__set_float_literal(float_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT)); - } else if constexpr (T == TYPE_DOUBLE) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); - TFloatLiteral float_literal; - float_literal.__set_value(*origin_value); - (*node).__set_float_literal(float_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE)); - } else if constexpr ((T == TYPE_STRING) || (T == TYPE_CHAR) || (T == TYPE_VARCHAR)) { - auto origin_value = reinterpret_cast(data); - (*node).__set_node_type(TExprNodeType::STRING_LITERAL); - TStringLiteral string_literal; - string_literal.__set_value(origin_value->to_string()); - (*node).__set_string_literal(string_literal); - (*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING)); - } else { - return Status::InvalidArgument("Invalid argument type!"); - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5507daceefb13e..add92cabf37065 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -24,6 +24,7 @@ #include +#include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/pipeline.h" #include "pipeline_fragment_context.h" @@ -277,6 +278,9 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); auto status = _sink->sink(_state, block, _data_state); + if (status.is()) { + status = _sink->sink(_state, block, _data_state); + } if (UNLIKELY(!status.ok() || block->rows() == 0)) { if (_fragment_context->is_group_commit()) { auto* future_block = dynamic_cast(block); diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index d39c72352b7bda..bee2f906bc9e77 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -86,7 +86,12 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } } - RETURN_IF_ERROR(channel->open(params)); + if (params.is_incremental()) { + // incremental open would ensure not to open tablet repeatedly + RETURN_IF_ERROR(channel->incremental_open(params)); + } else { + RETURN_IF_ERROR(channel->open(params)); + } _opened = true; _last_updated_time.store(time(nullptr)); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index aafea1c422f60e..dd62e64fe23451 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -39,6 +39,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" #include "common/version_internal.h" #include "exec/data_sink.h" #include "exec/exec_node.h" @@ -333,6 +334,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() { if (!eos || block->rows() > 0) { auto st = _sink->send(runtime_state(), block.get()); + //TODO: Asynchronisation need refactor this + if (st.is()) { // created partition, do it again. + st = _sink->send(runtime_state(), block.get()); + if (st.is()) { + LOG(WARNING) << "have to create partition again..."; + } + } if (UNLIKELY(!st.ok() || block->rows() == 0)) { // Used for group commit insert if (_group_commit) { diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 2e21ec92c349c3..098f1eb25255e6 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -24,6 +24,7 @@ // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -118,6 +119,73 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { return Status::OK(); } +Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { + if (_state == kInitialized) { // haven't opened + return open(params); + } + std::lock_guard l(_lock); + std::vector* index_slots = nullptr; + int32_t schema_hash = 0; + for (auto& index : _schema->indexes()) { + if (index->index_id == _index_id) { + index_slots = &index->slots; + schema_hash = index->schema_hash; + break; + } + } + if (index_slots == nullptr) { + Status::InternalError("unknown index id, key={}", _key.to_string()); + } + // update tablets + std::vector tablet_ids; + tablet_ids.reserve(params.tablets_size()); + size_t incremental_tablet_num = 0; + std::stringstream ss; + ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id()) + << " incremental open delta writer: "; + + for (auto& tablet : params.tablets()) { + if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { + continue; + } + incremental_tablet_num++; + + WriteRequest wrequest; + wrequest.index_id = params.index_id(); + wrequest.tablet_id = tablet.tablet_id(); + wrequest.schema_hash = schema_hash; + wrequest.txn_id = _txn_id; + wrequest.partition_id = tablet.partition_id(); + wrequest.load_id = params.id(); + wrequest.tuple_desc = _tuple_desc; + wrequest.slots = index_slots; + wrequest.is_high_priority = _is_high_priority; + wrequest.table_schema_param = _schema; + + DeltaWriter* writer = nullptr; + auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id); + if (!st.ok()) { + auto err_msg = fmt::format( + "open delta writer failed, tablet_id={}" + ", txn_id={}, partition_id={}, err={}", + tablet.tablet_id(), _txn_id, tablet.partition_id(), st.to_string()); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + ss << "[" << tablet.tablet_id() << "]"; + { + std::lock_guard l(_tablet_writers_lock); + _tablet_writers.emplace(tablet.tablet_id(), writer); + } + } + + _s_tablet_writer_count += incremental_tablet_num; + LOG(INFO) << ss.str(); + + _state = kOpened; + return Status::OK(); +} + Status TabletsChannel::close( LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished, const google::protobuf::RepeatedField& partition_ids, @@ -280,7 +348,7 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer, void TabletsChannel::_add_error_tablet( google::protobuf::RepeatedPtrField* tablet_errors, int64_t tablet_id, - Status error) { + Status error) const { PTabletError* tablet_error = tablet_errors->Add(); tablet_error->set_tablet_id(tablet_id); tablet_error->set_msg(error.to_string()); @@ -301,10 +369,15 @@ void TabletsChannel::refresh_profile() { write_mem_usage += write_mem; int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH); flush_mem_usage += flush_mem; - if (write_mem > max_tablet_write_mem_usage) max_tablet_write_mem_usage = write_mem; - if (flush_mem > max_tablet_flush_mem_usage) max_tablet_flush_mem_usage = flush_mem; - if (write_mem + flush_mem > max_tablet_mem_usage) + if (write_mem > max_tablet_write_mem_usage) { + max_tablet_write_mem_usage = write_mem; + } + if (flush_mem > max_tablet_flush_mem_usage) { + max_tablet_flush_mem_usage = flush_mem; + } + if (write_mem + flush_mem > max_tablet_mem_usage) { max_tablet_mem_usage = write_mem + flush_mem; + } } } COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage); @@ -341,7 +414,12 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } #endif + int tablet_cnt = 0; for (auto& tablet : request.tablets()) { + if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) { + continue; + } + tablet_cnt++; WriteRequest wrequest; wrequest.index_id = request.index_id(); wrequest.tablet_id = tablet.tablet_id(); @@ -370,7 +448,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } } _s_tablet_writer_count += _tablet_writers.size(); - DCHECK_EQ(_tablet_writers.size(), request.tablets_size()); + DCHECK_EQ(_tablet_writers.size(), tablet_cnt); return Status::OK(); } @@ -448,7 +526,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, response->mutable_tablet_errors(); auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { - return Status::InternalError("unknown tablet to append data, tablet={}"); + return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); } Status st = write_func(tablet_writer_it->second); if (!st.ok()) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index e3d8d87ec38668..fe9c226829d794 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -90,6 +90,8 @@ class TabletsChannel { ~TabletsChannel(); Status open(const PTabletWriterOpenRequest& request); + // open + open writers + Status incremental_open(const PTabletWriterOpenRequest& params); // no-op when this channel has been closed or cancelled Status add_batch(const PTabletWriterAddBlockRequest& request, @@ -128,7 +130,7 @@ class TabletsChannel { void _add_broken_tablet(int64_t tablet_id); void _add_error_tablet(google::protobuf::RepeatedPtrField* tablet_errors, - int64_t tablet_id, Status error); + int64_t tablet_id, Status error) const; bool _is_broken_tablet(int64_t tablet_id); void _init_profile(RuntimeProfile* profile); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 0fea95a90ec28d..18d73039e502c1 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -757,10 +757,14 @@ Block Block::copy_block(const std::vector& column_offset) const { return columns_with_type_and_name; } -void Block::append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const { +void Block::append_to_block_by_selector(MutableBlock* dst, + const IColumn::Selector& selector) const { DCHECK_EQ(data.size(), dst->mutable_columns().size()); for (size_t i = 0; i < data.size(); i++) { - data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); + // FIXME: this is a quickfix. we assume that only partition functions make there some + if (!is_column_const(*data[i].column)) { + data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); + } } } diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 9bc455f054c96b..5edf9d1e70bd7d 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -289,7 +289,7 @@ class Block { // copy a new block by the offset column Block copy_block(const std::vector& column_offset) const; - void append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; + void append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; // need exception safety static void filter_block_internal(Block* block, const std::vector& columns_to_filter, diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 341125c89e73fb..a19dafe4393a70 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -55,6 +55,93 @@ namespace doris { class RowDescriptor; class RuntimeState; +TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, int precision, + int scale) { + TExprNode node; + + switch (type) { + case TYPE_BOOLEAN: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_TINYINT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_SMALLINT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_INT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_BIGINT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_LARGEINT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_FLOAT: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DOUBLE: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DATEV2: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DATETIMEV2: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DATE: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DATETIME: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_DECIMALV2: { + create_texpr_literal_node(data, &node, precision, scale); + break; + } + case TYPE_DECIMAL32: { + create_texpr_literal_node(data, &node, precision, scale); + break; + } + case TYPE_DECIMAL64: { + create_texpr_literal_node(data, &node, precision, scale); + break; + } + case TYPE_DECIMAL128I: { + create_texpr_literal_node(data, &node, precision, scale); + break; + } + case TYPE_CHAR: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_VARCHAR: { + create_texpr_literal_node(data, &node); + break; + } + case TYPE_STRING: { + create_texpr_literal_node(data, &node); + break; + } + default: + DCHECK(false); + throw std::invalid_argument("Invalid type!"); + } + return node; +} } // namespace doris namespace doris::vectorized { diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index b2f0fb90593c4e..4bd891790687f1 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -32,6 +32,7 @@ #include "common/factory_creator.h" #include "common/status.h" #include "runtime/define_primitive_type.h" +#include "runtime/large_int_value.h" #include "runtime/types.h" #include "udf/udf.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -89,6 +90,7 @@ class VExpr { /// /// Subclasses overriding this function should call VExpr::Prepare() to recursively call /// Prepare() on the expr tree + /// row_desc used in vslot_ref and some subclass to specify column virtual Status prepare(RuntimeState* state, const RowDescriptor& row_desc, VExprContext* context); @@ -253,4 +255,144 @@ class VExpr { }; } // namespace vectorized + +template +Status create_texpr_literal_node(const void* data, TExprNode* node, int precision = 0, + int scale = 0) { + if constexpr (T == TYPE_BOOLEAN) { + auto origin_value = reinterpret_cast(data); + TBoolLiteral boolLiteral; + (*node).__set_node_type(TExprNodeType::BOOL_LITERAL); + boolLiteral.__set_value(*origin_value); + (*node).__set_bool_literal(boolLiteral); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + } else if constexpr (T == TYPE_TINYINT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::INT_LITERAL); + TIntLiteral intLiteral; + intLiteral.__set_value(*origin_value); + (*node).__set_int_literal(intLiteral); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT)); + } else if constexpr (T == TYPE_SMALLINT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::INT_LITERAL); + TIntLiteral intLiteral; + intLiteral.__set_value(*origin_value); + (*node).__set_int_literal(intLiteral); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT)); + } else if constexpr (T == TYPE_INT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::INT_LITERAL); + TIntLiteral intLiteral; + intLiteral.__set_value(*origin_value); + (*node).__set_int_literal(intLiteral); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT)); + } else if constexpr (T == TYPE_BIGINT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::INT_LITERAL); + TIntLiteral intLiteral; + intLiteral.__set_value(*origin_value); + (*node).__set_int_literal(intLiteral); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT)); + } else if constexpr (T == TYPE_LARGEINT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL); + TLargeIntLiteral large_int_literal; + large_int_literal.__set_value(LargeIntValue::to_string(*origin_value)); + (*node).__set_large_int_literal(large_int_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT)); + } else if constexpr ((T == TYPE_DATE) || (T == TYPE_DATETIME) || (T == TYPE_TIME)) { + auto origin_value = reinterpret_cast(data); + TDateLiteral date_literal; + char convert_buffer[30]; + origin_value->to_string(convert_buffer); + date_literal.__set_value(convert_buffer); + (*node).__set_date_literal(date_literal); + (*node).__set_node_type(TExprNodeType::DATE_LITERAL); + if (origin_value->type() == TimeType::TIME_DATE) { + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATE)); + } else if (origin_value->type() == TimeType::TIME_DATETIME) { + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIME)); + } else if (origin_value->type() == TimeType::TIME_TIME) { + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME)); + } + } else if constexpr (T == TYPE_DATEV2) { + auto origin_value = + reinterpret_cast*>(data); + TDateLiteral date_literal; + char convert_buffer[30]; + origin_value->to_string(convert_buffer); + date_literal.__set_value(convert_buffer); + (*node).__set_date_literal(date_literal); + (*node).__set_node_type(TExprNodeType::DATE_LITERAL); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATEV2)); + } else if constexpr (T == TYPE_DATETIMEV2) { + auto origin_value = + reinterpret_cast*>( + data); + TDateLiteral date_literal; + char convert_buffer[30]; + origin_value->to_string(convert_buffer); + date_literal.__set_value(convert_buffer); + (*node).__set_date_literal(date_literal); + (*node).__set_node_type(TExprNodeType::DATE_LITERAL); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIMEV2)); + } else if constexpr (T == TYPE_DECIMALV2) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); + TDecimalLiteral decimal_literal; + decimal_literal.__set_value(origin_value->to_string()); + (*node).__set_decimal_literal(decimal_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2, precision, scale)); + } else if constexpr (T == TYPE_DECIMAL32) { + auto origin_value = reinterpret_cast*>(data); + (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); + TDecimalLiteral decimal_literal; + decimal_literal.__set_value(origin_value->to_string(scale)); + (*node).__set_decimal_literal(decimal_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32, precision, scale)); + } else if constexpr (T == TYPE_DECIMAL64) { + auto origin_value = reinterpret_cast*>(data); + (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); + TDecimalLiteral decimal_literal; + decimal_literal.__set_value(origin_value->to_string(scale)); + (*node).__set_decimal_literal(decimal_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64, precision, scale)); + } else if constexpr (T == TYPE_DECIMAL128I) { + auto origin_value = reinterpret_cast*>(data); + (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); + TDecimalLiteral decimal_literal; + decimal_literal.__set_value(origin_value->to_string(scale)); + (*node).__set_decimal_literal(decimal_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL128I, precision, scale)); + } else if constexpr (T == TYPE_FLOAT) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); + TFloatLiteral float_literal; + float_literal.__set_value(*origin_value); + (*node).__set_float_literal(float_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT)); + } else if constexpr (T == TYPE_DOUBLE) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); + TFloatLiteral float_literal; + float_literal.__set_value(*origin_value); + (*node).__set_float_literal(float_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE)); + } else if constexpr ((T == TYPE_STRING) || (T == TYPE_CHAR) || (T == TYPE_VARCHAR)) { + auto origin_value = reinterpret_cast(data); + (*node).__set_node_type(TExprNodeType::STRING_LITERAL); + TStringLiteral string_literal; + string_literal.__set_value(origin_value->to_string()); + (*node).__set_string_literal(string_literal); + (*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING)); + } else { + return Status::InvalidArgument("Invalid argument type!"); + } + return Status::OK(); +} + +TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, int precision = 0, + int scale = 0); + } // namespace doris diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index 3d39a844dc7c9e..03d1659eee63a6 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -66,6 +66,7 @@ Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* r } std::string VLiteral::value() const { + //TODO: dcheck the equality of size with 1. then use string with size to replace the ss. std::stringstream out; for (size_t i = 0; i < _column_ptr->size(); i++) { if (i != 0) { diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp index 2ee9f598b57122..f1a99e260553ac 100644 --- a/be/src/vec/sink/vtablet_finder.cpp +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -18,6 +18,9 @@ #include "vec/sink/vtablet_finder.h" #include +#include +#include +#include #include #include @@ -28,37 +31,48 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" #include "exec/tablet_info.h" +#include "exprs/runtime_filter.h" +#include "gutil/integral_types.h" #include "runtime/descriptors.h" #include "runtime/runtime_state.h" #include "vec/core/block.h" +#include "vec/core/columns_with_type_and_name.h" +#include "vec/data_types/data_type.h" +#include "vec/functions/simple_function_factory.h" namespace doris { namespace stream_load { Status OlapTabletFinder::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& stop_processing, bool& is_continue) { + bool& stop_processing, bool& is_continue, + bool* missing_partition) { Status status = Status::OK(); *partition = nullptr; tablet_index = 0; BlockRow block_row; block_row = {block, row_index}; if (!_vpartition->find_partition(&block_row, partition)) { - RETURN_IF_ERROR(state->append_error_msg_to_file( - []() -> std::string { return ""; }, - [&]() -> std::string { - fmt::memory_buffer buf; - fmt::format_to(buf, "no partition for this tuple. tuple={}", - block->dump_data(row_index, 1)); - return fmt::to_string(buf); - }, - &stop_processing)); - _num_filtered_rows++; - if (stop_processing) { - return Status::EndOfFile("Encountered unqualified data, stop processing"); + if (missing_partition != nullptr) { // auto partition table + *missing_partition = true; + return status; + } else { + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&]() -> std::string { + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple={}", + block->dump_data(row_index, 1)); + return fmt::to_string(buf); + }, + &stop_processing)); + _num_filtered_rows++; + if (stop_processing) { + return Status::EndOfFile("Encountered unqualified data, stop processing"); + } + is_continue = true; + return status; } - is_continue = true; - return status; } if (!(*partition)->is_mutable) { _num_immutable_partition_filtered_rows++; diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h index 97282e403aa32f..9fe64944300f35 100644 --- a/be/src/vec/sink/vtablet_finder.h +++ b/be/src/vec/sink/vtablet_finder.h @@ -41,7 +41,7 @@ class OlapTabletFinder { Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& filtered, bool& is_continue); + bool& filtered, bool& is_continue, bool* missing_partition = nullptr); bool is_find_tablet_every_sink() { return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 8f633c0824da57..5a627a3af6f158 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -25,23 +25,44 @@ #include #include #include +#include +#include +#include +#include #include #include #include #include +#include #include #include #include #include #include +#include +#include +#include +#include #include #include #include +#include #include +#include #include #include +#include "runtime/datetime_value.h" +#include "util/runtime_profile.h" +#include "vec/core/columns_with_type_and_name.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_string.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/functions/simple_function_factory.h" +#include "vec/runtime/vdatetime_value.h" + #ifdef DEBUG #include #endif @@ -52,6 +73,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "exec/tablet_info.h" +#include "runtime/client_cache.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -70,6 +92,7 @@ #include "util/telemetry/telemetry.h" #include "util/thread.h" #include "util/threadpool.h" +#include "util/thrift_rpc_helper.h" #include "util/thrift_util.h" #include "util/time.h" #include "util/uid_util.h" @@ -102,6 +125,7 @@ class TExpr; namespace stream_load { +// an IndexChannel is related to specific table and its rollup and mv class IndexChannel { public: IndexChannel(VOlapTableSink* parent, int64_t index_id, @@ -112,6 +136,7 @@ class IndexChannel { } ~IndexChannel() = default; + // allow to init multi times, for incremental open more tablets for one index(table) Status init(RuntimeState* state, const std::vector& tablets); void for_each_node_channel( @@ -183,20 +208,23 @@ class IndexChannel { Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); for (auto& tablet : tablets) { - auto location = _parent->_location->find_tablet(tablet.tablet_id); - if (location == nullptr) { + // First find the location BEs of this tablet + auto tablet_locations = _parent->_location->find_tablet(tablet.tablet_id); + if (tablet_locations == nullptr) { return Status::InternalError("unknown tablet, tablet_id={}", tablet.tablet_id); } std::vector> channels; - for (auto& node_id : location->node_ids) { + // For tablet, deal with its' all replica (in some node). + for (auto& replica_node_id : tablet_locations->node_ids) { std::shared_ptr channel; - auto it = _node_channels.find(node_id); + auto it = _node_channels.find(replica_node_id); + // when we prepare for TableSink or incremental open tablet, we need init if (it == _node_channels.end()) { // NodeChannel is not added to the _parent->_pool. // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. // but the ObjectPool will hold a spin lock to delete objects. - channel = std::make_shared(_parent, this, node_id); - _node_channels.emplace(node_id, channel); + channel = std::make_shared(_parent, this, replica_node_id); + _node_channels.emplace(replica_node_id, channel); } else { channel = it->second; } @@ -208,7 +236,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector(fmt::format( "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), thread_context()->get_thread_id())); } VNodeChannel::~VNodeChannel() { - if (_open_closure != nullptr) { - if (_open_closure->unref()) { - delete _open_closure; + for (auto& closure : _open_closures) { + if (closure != nullptr) { + if (closure->unref()) { + delete closure; + } + closure = nullptr; } - _open_closure = nullptr; } if (_add_block_closure != nullptr) { delete _add_block_closure; _add_block_closure = nullptr; } - if (_open_closure != nullptr) { - delete _open_closure; - } static_cast(_cur_add_block_request.release_id()); } @@ -342,12 +373,12 @@ Status VNodeChannel::init(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); _tuple_desc = _parent->_output_tuple_desc; _state = state; + // get corresponding BE node. auto node = _parent->_nodes_info->find_node(_node_id); if (node == nullptr) { _cancelled = true; return Status::InternalError("unknown node id, id={}", _node_id); } - _node_info = *node; _load_info = "load_id=" + print_id(_parent->_load_id) + @@ -368,7 +399,9 @@ Status VNodeChannel::init(RuntimeState* state) { _timeout_watch.start(); // Initialize _cur_add_block_request - _cur_add_block_request.set_allocated_id(&_parent->_load_id); + if (!_cur_add_block_request.has_id()) { + _cur_add_block_request.set_allocated_id(&_parent->_load_id); + } _cur_add_block_request.set_index_id(_index_channel->_index_id); _cur_add_block_request.set_sender_id(_parent->_sender_id); _cur_add_block_request.set_backend_id(_node_id); @@ -385,17 +418,22 @@ Status VNodeChannel::init(RuntimeState* state) { return Status::OK(); } -void VNodeChannel::open() { +void VNodeChannel::_open_internal(bool is_incremental) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); PTabletWriterOpenRequest request; request.set_allocated_id(&_parent->_load_id); request.set_index_id(_index_channel->_index_id); request.set_txn_id(_parent->_txn_id); request.set_allocated_schema(_parent->_schema->to_protobuf()); + std::set deduper; for (auto& tablet : _all_tablets) { + if (deduper.contains(tablet.tablet_id)) { + continue; + } auto ptablet = request.add_tablets(); ptablet->set_partition_id(tablet.partition_id); ptablet->set_tablet_id(tablet.tablet_id); + deduper.insert(tablet.tablet_id); } request.set_num_senders(_parent->_num_senders); request.set_need_gen_rollup(false); // Useless but it is a required field in pb @@ -406,160 +444,81 @@ void VNodeChannel::open() { request.set_is_vectorized(true); request.set_backend_id(_node_id); request.set_enable_profile(_state->enable_profile()); + request.set_is_incremental(is_incremental); - _open_closure = new RefCountClosure(); - _open_closure->ref(); + auto* open_closure = new RefCountClosure {}; + open_closure->ref(); - // This ref is for RPC's reference - _open_closure->ref(); - _open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); + open_closure->ref(); // This ref is for RPC's reference + open_closure->cntl.set_timeout_ms(config::tablet_writer_open_rpc_timeout_sec * 1000); if (config::tablet_writer_ignore_eovercrowded) { - _open_closure->cntl.ignore_eovercrowded(); + open_closure->cntl.ignore_eovercrowded(); } - _stub->tablet_writer_open(&_open_closure->cntl, &request, &_open_closure->result, - _open_closure); + // the real transmission here. the corresponding BE's load mgr will open load channel for it. + _stub->tablet_writer_open(&open_closure->cntl, &request, &open_closure->result, open_closure); + _open_closures.push_back(open_closure); + static_cast(request.release_id()); static_cast(request.release_schema()); } +void VNodeChannel::open() { + _open_internal(false); +} + +void VNodeChannel::incremental_open() { + _open_internal(true); +} + Status VNodeChannel::open_wait() { - _open_closure->join(); - SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); - if (_open_closure->cntl.Failed()) { - if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( - _stub, _node_info.host, _node_info.brpc_port)) { - ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - _open_closure->cntl.remote_side()); + Status status; + for (auto& open_closure : _open_closures) { + // because of incremental open, we will wait multi times. so skip the closures which have been checked and set to nullptr in previous rounds + if (open_closure == nullptr) { + continue; } - _cancelled = true; - auto error_code = _open_closure->cntl.ErrorCode(); - auto error_text = _open_closure->cntl.ErrorText(); - if (_open_closure->unref()) { - delete _open_closure; + open_closure->join(); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + if (open_closure->cntl.Failed()) { + if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( + _stub, _node_info.host, _node_info.brpc_port)) { + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + open_closure->cntl.remote_side()); + } + + _cancelled = true; + auto error_code = open_closure->cntl.ErrorCode(); + auto error_text = open_closure->cntl.ErrorText(); + if (open_closure->unref()) { + delete open_closure; + } + open_closure = nullptr; + return Status::InternalError( + "failed to open tablet writer, error={}, error_text={}, info={}", + berror(error_code), error_text, channel_info()); } - _open_closure = nullptr; - return Status::InternalError( - "failed to open tablet writer, error={}, error_text={}, info={}", - berror(error_code), error_text, channel_info()); - } - Status status(Status::create(_open_closure->result.status())); - if (_open_closure->unref()) { - delete _open_closure; - } - _open_closure = nullptr; + status = Status::create(open_closure->result.status()); + if (open_closure->unref()) { + delete open_closure; + } + open_closure = nullptr; - if (!status.ok()) { - _cancelled = true; - return status; + if (!status.ok()) { + _cancelled = true; + return status; + } } // add block closure _add_block_closure = ReusableClosure::create(); - _add_block_closure->addFailedHandler([this](bool is_last_rpc) { - std::lock_guard l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call `mark_as_failed`, - // and notice that _index_channel may already be destroyed. - return; - } - SCOPED_ATTACH_TASK(_state); - // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed(this, - fmt::format("rpc failed, error coed:{}, error text:{}", - _add_block_closure->cntl.ErrorCode(), - _add_block_closure->cntl.ErrorText()), - -1); - Status st = _index_channel->check_intolerable_failure(); - if (!st.ok()) { - _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); - } else if (is_last_rpc) { - // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait - // will be blocked. - _add_batches_finished = true; - } - }); - - _add_block_closure->addSuccessHandler([this](const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { - std::lock_guard l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call the following logic, - // and notice that _index_channel may already be destroyed. - return; - } - SCOPED_ATTACH_TASK(_state); - Status status(Status::create(result.status())); - if (status.ok()) { - // if has error tablet, handle them first - for (auto& error : result.tablet_errors()) { - _index_channel->mark_as_failed(this, "tablet error: " + error.msg(), - error.tablet_id()); - } + _add_block_closure->addFailedHandler( + [this](bool is_last_rpc) { _add_block_failed_callback(is_last_rpc); }); - Status st = _index_channel->check_intolerable_failure(); - if (!st.ok()) { - _cancel_with_msg(st.to_string()); - } else if (is_last_rpc) { - for (auto& tablet : result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - if (tablet.has_received_rows()) { - _tablets_received_rows.emplace_back(tablet.tablet_id(), - tablet.received_rows()); - } - if (tablet.has_num_rows_filtered()) { - _state->update_num_rows_filtered_in_strict_mode_partial_update( - tablet.num_rows_filtered()); - } - VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() - << ", backendId=" << _node_id - << ", master node id: " << this->node_id() - << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; - } - if (_parent->_write_single_replica) { - for (auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { - for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_slave_node_ids.first; - commit_info.backendId = slave_node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - VLOG_CRITICAL << "slave replica commit info: tabletId=" - << tablet_slave_node_ids.first - << ", backendId=" << slave_node_id - << ", master node id: " << this->node_id() - << ", host: " << this->host() - << ", txn_id=" << _parent->_txn_id; - } - } - } - _add_batches_finished = true; - } - } else { - _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", - channel_info(), status.to_string())); - } - - if (result.has_execution_time_us()) { - _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); - _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); - _add_batch_counter.add_batch_num++; - } - if (result.has_load_channel_profile()) { - TRuntimeProfileTree tprofile; - const uint8_t* buf = (const uint8_t*)result.load_channel_profile().data(); - uint32_t len = result.load_channel_profile().size(); - auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); - if (st.ok()) { - _state->load_channel_profile()->update(tprofile); - } else { - LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" - << st; - } - } - }); + _add_block_closure->addSuccessHandler( + [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { + _add_block_success_callback(result, is_last_rpc); + }); return status; } @@ -643,7 +602,7 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); if (is_append) { // Do not split the data of the block by tablets but append it to a single delta writer. - // This is a faster way to send block than append_block_by_selector + // This is a faster way to send block than append_to_block_by_selector // TODO: we could write to local delta writer if single_replica_load is true VLOG_DEBUG << "send whole block by append block"; std::vector tablets(block->rows(), payload->second[0]); @@ -652,12 +611,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, columns.reserve(block->columns()); // Hold the reference of block columns to avoid copying for (auto column : block->get_columns()) { - columns.push_back(column->assume_mutable()); + columns.push_back(std::move(*column).mutate()); } *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), tablets.end()}; _cur_add_block_request.set_is_single_tablet_block(true); } else { - block->append_block_by_selector(_cur_mutable_block.get(), *(payload->first)); + block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); for (auto tablet_id : payload->second) { _cur_add_block_request.add_tablet_ids(tablet_id); } @@ -670,6 +629,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, std::lock_guard l(_pending_batches_lock); // To simplify the add_row logic, postpone adding block into req until the time of sending req _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); + _cur_add_block_request.set_eos( + false); // for multi-add, only when marking close we set it eos. _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); _pending_batches_num++; VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << this @@ -699,7 +660,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, // We are sure that try_send_batch is not running if (_pending_batches_num > 0) { auto s = thread_pool_token->submit_func( - std::bind(&VNodeChannel::try_send_block, this, state)); + std::bind(&VNodeChannel::try_send_pending_block, this, state)); if (!s.ok()) { _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed"); // clear in flight @@ -740,13 +701,12 @@ Status VNodeChannel::none_of(std::initializer_list vars) { return st; } -void VNodeChannel::try_send_block(RuntimeState* state) { +void VNodeChannel::try_send_pending_block(RuntimeState* state) { SCOPED_ATTACH_TASK(state); SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker); SCOPED_ATOMIC_TIMER(&_actual_consume_ns); AddBlockReq send_block; { - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard l(_pending_batches_lock); DCHECK(!_pending_blocks.empty()); send_block = std::move(_pending_blocks.front()); @@ -871,6 +831,107 @@ void VNodeChannel::try_send_block(RuntimeState* state) { _next_packet_seq++; } +void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result, + bool is_last_rpc) { + std::lock_guard l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call the following logic, + // and notice that _index_channel may already be destroyed. + return; + } + SCOPED_ATTACH_TASK(_state); + Status status(Status::create(result.status())); + if (status.ok()) { + // if has error tablet, handle them first + for (auto& error : result.tablet_errors()) { + _index_channel->mark_as_failed(this, "tablet error: " + error.msg(), error.tablet_id()); + } + + Status st = _index_channel->check_intolerable_failure(); + if (!st.ok()) { + _cancel_with_msg(st.to_string()); + } else if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + if (tablet.has_received_rows()) { + _tablets_received_rows.emplace_back(tablet.tablet_id(), tablet.received_rows()); + } + if (tablet.has_num_rows_filtered()) { + _state->update_num_rows_filtered_in_strict_mode_partial_update( + tablet.num_rows_filtered()); + } + VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() + << ", backendId=" << _node_id + << ", master node id: " << this->node_id() + << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; + } + if (_parent->_write_single_replica) { + for (auto& tablet_slave_node_ids : result.success_slave_tablet_node_ids()) { + for (auto slave_node_id : tablet_slave_node_ids.second.slave_node_ids()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_slave_node_ids.first; + commit_info.backendId = slave_node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + VLOG_CRITICAL + << "slave replica commit info: tabletId=" + << tablet_slave_node_ids.first << ", backendId=" << slave_node_id + << ", master node id: " << this->node_id() + << ", host: " << this->host() << ", txn_id=" << _parent->_txn_id; + } + } + } + _add_batches_finished = true; + } + } else { + _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", + channel_info(), status.to_string())); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); + _add_batch_counter.add_batch_num++; + } + if (result.has_load_channel_profile()) { + TRuntimeProfileTree tprofile; + const uint8_t* buf = (const uint8_t*)result.load_channel_profile().data(); + uint32_t len = result.load_channel_profile().size(); + auto st = deserialize_thrift_msg(buf, &len, false, &tprofile); + if (st.ok()) { + _state->load_channel_profile()->update(tprofile); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree deserialize failed, errmsg=" << st; + } + } +} + +void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { + std::lock_guard l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call `mark_as_failed`, + // and notice that _index_channel may already be destroyed. + return; + } + SCOPED_ATTACH_TASK(_state); + // If rpc failed, mark all tablets on this node channel as failed + _index_channel->mark_as_failed( + this, + fmt::format("rpc failed, error coed:{}, error text:{}", + _add_block_closure->cntl.ErrorCode(), _add_block_closure->cntl.ErrorText()), + -1); + Status st = _index_channel->check_intolerable_failure(); + if (!st.ok()) { + _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); + } else if (is_last_rpc) { + // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait + // will be blocked. + _add_batches_finished = true; + } +} + void VNodeChannel::cancel(const std::string& cancel_msg) { if (_is_closed) { // skip the channels that have been canceled or close_wait. @@ -932,7 +993,6 @@ Status VNodeChannel::close_wait(RuntimeState* state) { // waiting for finished, it may take a long time, so we couldn't set a timeout // In pipeline, is_close_done() is false at this time, will not bock. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { - // std::this_thread::sleep_for(std::chrono::milliseconds(1)); bthread_usleep(1000); } _close_time_ms = UnixMillis() - _close_time_ms; @@ -965,12 +1025,12 @@ void VNodeChannel::mark_close() { _cur_add_block_request.set_eos(true); { - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard l(_pending_batches_lock); if (!_cur_mutable_block) { // add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } + // when prepare to close, add block to queue so that try_send_pending_block thread will send it. _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); _pending_batches_num++; DCHECK(_pending_blocks.back().second.eos()); @@ -1084,6 +1144,8 @@ Status VOlapTableSink::prepare(RuntimeState* state) { _filter_timer = ADD_CHILD_TIMER(_profile, "FilterTime", "SendDataTime"); _where_clause_timer = ADD_CHILD_TIMER(_profile, "WhereClauseTime", "SendDataTime"); _append_node_channel_timer = ADD_CHILD_TIMER(_profile, "AppendNodeChannelTime", "SendDataTime"); + _add_partition_request_timer = + ADD_CHILD_TIMER(_profile, "AddPartitionRequestTime", "SendDataTime"); _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseWaitTime"); @@ -1130,15 +1192,22 @@ Status VOlapTableSink::prepare(RuntimeState* state) { tablets.emplace_back(std::move(tablet_with_partition)); } } - if (UNLIKELY(tablets.empty())) { + if (tablets.empty() && !_vpartition->is_auto_partition()) { LOG(WARNING) << "load job:" << state->load_job_id() << " index: " << index->index_id << " would open 0 tablet"; } _channels.emplace_back(new IndexChannel(this, index->index_id, index->where_clause)); + _index_id_to_channel[index->index_id] = _channels.back(); RETURN_IF_ERROR(_channels.back()->init(state, tablets)); } // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + // prepare for auto partition functions + if (_vpartition->is_auto_partition()) { + auto [part_ctx, part_func] = _get_partition_function(); + RETURN_IF_ERROR(part_func->prepare(_state, *_output_row_desc, part_ctx.get())); + } + _prepare = true; return Status::OK(); } @@ -1185,6 +1254,7 @@ Status VOlapTableSink::open(RuntimeState* state) { MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job); _send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism); + // start to send batch continually if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) { return Status::Error("bthread_start_backgroud failed"); } @@ -1196,7 +1266,12 @@ void VOlapTableSink::_send_batch_process() { SCOPED_TIMER(_non_blocking_send_timer); SCOPED_ATTACH_TASK(_state); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + bool had_effect = false; while (true) { + // incremental open will temporarily make channels into abnormal state. stop checking when this. + std::unique_lock l(_stop_check_channel); + int running_channels_num = 0; for (const auto& index_channel : _channels) { index_channel->for_each_node_channel([&running_channels_num, @@ -1206,11 +1281,14 @@ void VOlapTableSink::_send_batch_process() { }); } - if (running_channels_num == 0) { + // if there is no channel, maybe auto partition table. so check does there have had running channels ever. + if (running_channels_num == 0 && had_effect) { LOG(INFO) << "all node channels are stopped(maybe finished/offending/cancelled), " "sender thread exit. " << print_id(_load_id); return; + } else if (running_channels_num != 0) { + had_effect = true; } bthread_usleep(config::olap_table_sink_send_interval_ms * 1000); } @@ -1224,6 +1302,96 @@ size_t VOlapTableSink::get_pending_bytes() const { return mem_consumption; } +Status VOlapTableSink::_automatic_create_partition() { + SCOPED_TIMER(_add_partition_request_timer); + TCreatePartitionRequest request; + TCreatePartitionResult result; + request.__set_txn_id(_txn_id); + request.__set_db_id(_vpartition->db_id()); + request.__set_table_id(_vpartition->table_id()); + request.__set_partitionValues(_partitions_need_create); + + VLOG(1) << "automatic partition rpc begin request " << request; + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + int time_out = _state->execution_timeout() * 1000; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->createPartition(result, request); + }, + time_out)); + + Status status(Status::create(result.status)); + VLOG(1) << "automatic partition rpc end response " << result; + if (result.status.status_code == TStatusCode::OK) { + // add new created partitions + RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); + + // add new tablet locations. it will use by address. so add to pool + auto* new_locations = _pool->add(new std::vector(result.tablets)); + _location->add_locations(*new_locations); + + // update new node info + _nodes_info->add_nodes(result.nodes); + + // incremental open node channel + RETURN_IF_ERROR(_incremental_open_node_channel(result.partitions)); + } + + return status; +} + +Status VOlapTableSink::_incremental_open_node_channel( + const std::vector& partitions) { + // do what we did in prepare() for partitions. indexes which don't change when we create new partition is orthogonal to partitions. + std::unique_lock _l(_stop_check_channel); + for (int i = 0; i < _schema->indexes().size(); ++i) { + const OlapTableIndexSchema* index = _schema->indexes()[i]; + std::vector tablets; + for (auto& t_part : partitions) { + VOlapTablePartition* part = nullptr; + RETURN_IF_ERROR(_vpartition->generate_partition_from(t_part, part)); + for (const auto& tablet : part->indexes[i].tablets) { + TTabletWithPartition tablet_with_partition; + tablet_with_partition.partition_id = part->id; + tablet_with_partition.tablet_id = tablet; + tablets.emplace_back(std::move(tablet_with_partition)); + } + DCHECK(!tablets.empty()) << "incremental open got nothing!"; + } + // update and reinit for existing channels. + std::shared_ptr channel = _index_id_to_channel[index->index_id]; + DCHECK(channel != nullptr); + RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it + } + + fmt::memory_buffer buf; + for (auto& channel : _channels) { + // incremental open new partition's tablet on storage side + channel->for_each_node_channel( + [](const std::shared_ptr& ch) { ch->incremental_open(); }); + fmt::format_to(buf, "index id:{}", channel->_index_id); + VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf); + + channel->for_each_node_channel([&channel](const std::shared_ptr& ch) { + auto st = ch->open_wait(); + if (!st.ok()) { + // The open() phase is mainly to generate DeltaWriter instances on the nodes corresponding to each node channel. + // This phase will not fail due to a single tablet. + // Therefore, if the open() phase fails, all tablets corresponding to the node need to be marked as failed. + channel->mark_as_failed( + ch.get(), + fmt::format("{}, open failed, err: {}", ch->channel_info(), st.to_string()), + -1); + } + }); + + RETURN_IF_ERROR(channel->check_intolerable_failure()); + } + + return Status::OK(); +} + void VOlapTableSink::_generate_row_distribution_payload( ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition, uint32_t tablet_index, int row_idx, size_t row_cnt) { @@ -1297,6 +1465,27 @@ Status VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize return Status::OK(); } +std::pair +VOlapTableSink::_get_partition_function() { + return {_vpartition->get_part_func_ctx(), _vpartition->get_partition_function()}; +} + +void VOlapTableSink::_save_missing_values(vectorized::ColumnPtr col, + vectorized::DataTypePtr value_type, + std::vector filter) { + _partitions_need_create.clear(); + std::set deduper; + // de-duplication + for (auto row : filter) { + deduper.emplace(value_type->to_string(*col, row)); + } + for (auto& value : deduper) { + TStringLiteral node; + node.value = value; + _partitions_need_create.emplace_back(std::vector {node}); // only 1 partition column now + } +} + Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1326,9 +1515,6 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, eos, has_filtered_rows)); - // clear and release the references of columns - input_block->clear(); - SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. bool stop_processing = false; @@ -1338,30 +1524,106 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, _row_distribution_watch.start(); auto num_rows = block->rows(); size_t partition_num = _vpartition->get_partitions().size(); - if (partition_num == 1 && _tablet_finder->is_find_tablet_every_sink()) { + if (!_vpartition->is_auto_partition() && partition_num == 1 && + _tablet_finder->is_find_tablet_every_sink()) { RETURN_IF_ERROR(_single_partition_generate(state, block.get(), channel_to_payload, num_rows, has_filtered_rows)); } else { - for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { - continue; + // if there's projection of partition calc, we need to calc it first. + auto [part_ctx, part_func] = _get_partition_function(); + int result_idx; + if (_vpartition->is_projection_partition()) { + // calc the start value of missing partition ranges. + part_func->execute(part_ctx.get(), block.get(), &result_idx); + VLOG_DEBUG << "Partition-calculated block:" << block->dump_data(); + // change the column to compare to transformed. + _vpartition->set_transformed_slots({(uint16_t)result_idx}); + } + + if (_vpartition->is_auto_partition()) { + std::vector partition_keys = _vpartition->get_partition_keys(); + //TODO: use loop to create missing_vals for multi column. + CHECK(partition_keys.size() == 1) + << "now support only 1 partition column for auto partitions."; + auto partition_col = block->get_by_position(partition_keys[0]); + + std::vector missing_map; // indice of missing values in partition_col + missing_map.reserve(partition_col.column->size()); + + // try to find tablet and save missing value + for (int i = 0; i < num_rows; ++i) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + continue; + } + const VOlapTablePartition* partition = nullptr; + bool is_continue = false; + uint32_t tablet_index = 0; + bool missing_this = false; + RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, + tablet_index, stop_processing, + is_continue, &missing_this)); + if (missing_this) { + missing_map.push_back(i); + } else { + _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, + i, 1); + } } - const VOlapTablePartition* partition = nullptr; - bool is_continue = false; - uint32_t tablet_index = 0; - RETURN_IF_ERROR(_tablet_finder->find_tablet( - state, block.get(), i, &partition, tablet_index, stop_processing, is_continue)); - if (is_continue) { - continue; + missing_map.shrink_to_fit(); + + // for missing partition keys, calc the missing partition and save in _partitions_need_create + auto type = partition_col.type; + if (missing_map.size() > 0) { + auto return_type = part_func->data_type(); + + // expose the data column + vectorized::ColumnPtr range_left_col = block->get_by_position(result_idx).column; + if (auto* nullable = + check_and_get_column(*range_left_col)) { + range_left_col = nullable->get_nested_column_ptr(); + return_type = + assert_cast(return_type.get()) + ->get_nested_type(); + } + // calc the end value and save them. + _save_missing_values(range_left_col, return_type, missing_map); + // then call FE to create it. then FragmentExecutor will redo the load. + RETURN_IF_ERROR(_automatic_create_partition()); + // now we need to rollback the metrics + _number_input_rows -= rows; + state->update_num_rows_load_total(-rows); + state->update_num_bytes_load_total(-bytes); + DorisMetrics::instance()->load_rows->increment(-rows); + DorisMetrics::instance()->load_bytes->increment(-bytes); + // In the next round, we will _generate_row_distribution_payload again to get right payload of new tablet + LOG(INFO) << "Auto created partition. Send block again."; + return Status::NeedSendAgain(""); + } // creating done + } else { // not auto partition + for (int i = 0; i < num_rows; ++i) { + if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_bitmap().Get(i)) { + continue; + } + const VOlapTablePartition* partition = nullptr; + bool is_continue = false; + uint32_t tablet_index = 0; + RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block.get(), i, &partition, + tablet_index, stop_processing, + is_continue)); + if (is_continue) { + continue; + } + // each row + _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, + 1); } - // each row - _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1); } } _row_distribution_watch.stop(); // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. - bool load_block_to_single_tablet = _tablet_finder->is_single_tablet(); + bool load_block_to_single_tablet = + !_vpartition->is_auto_partition() && _tablet_finder->is_single_tablet(); if (load_block_to_single_tablet) { SCOPED_RAW_TIMER(&_filter_ns); // Filter block @@ -1381,12 +1643,17 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, handle_block(input_block, rows, _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - filtered_rows); + // TODO: Before load, we need to projection unuseful column + // auto slots = _schema->tuple_desc()->slots(); + // for (auto desc : slots) { + // desc->col_pos(); + // } // Add block to node channel for (size_t i = 0; i < _channels.size(); i++) { for (const auto& entry : channel_to_payload[i]) { // if this node channel is already failed, this add_row will be skipped auto st = entry.first->add_block( - block.get(), &entry.second, + block.get(), &entry.second, // entry.second is a [row -> tablet] mapping // if it is load single tablet, then append this whole block load_block_to_single_tablet); if (!st.ok()) { @@ -1500,6 +1767,7 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_profile->total_time_counter()); try_close(state, exec_status); + // If _close_status is not ok, all nodes have been canceled in try_close. if (_close_status.ok()) { auto status = Status::OK(); @@ -1510,44 +1778,43 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, total_add_batch_num = 0, num_node_channels = 0; VNodeChannelStat channel_stat; - { - for (const auto& index_channel : _channels) { - if (!status.ok()) { - break; - } - int64_t add_batch_exec_time = 0; - int64_t wait_exec_time = 0; - index_channel->for_each_node_channel( - [this, &index_channel, &status, &state, &node_add_batch_counter_map, - &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, - &actual_consume_ns, &total_add_batch_exec_time_ns, &add_batch_exec_time, - &total_wait_exec_time_ns, &wait_exec_time, - &total_add_batch_num](const std::shared_ptr& ch) { - if (!status.ok() || ch->is_closed()) { - return; - } - // in pipeline, all node channels are done or canceled, will not block. - // no pipeline, close may block waiting. - auto s = ch->close_wait(state); - if (!s.ok()) { - status = this->_cancel_channel_and_check_intolerable_failure( - status, s.to_string(), index_channel, ch); - } - ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, - &channel_stat, &queue_push_lock_ns, &actual_consume_ns, - &total_add_batch_exec_time_ns, &add_batch_exec_time, - &total_wait_exec_time_ns, &wait_exec_time, - &total_add_batch_num); - }); - num_node_channels += index_channel->num_node_channels(); - if (add_batch_exec_time > max_add_batch_exec_time_ns) { - max_add_batch_exec_time_ns = add_batch_exec_time; - } - if (wait_exec_time > max_wait_exec_time_ns) { - max_wait_exec_time_ns = wait_exec_time; - } - } // end for index channels - } + + for (const auto& index_channel : _channels) { + if (!status.ok()) { + break; + } + int64_t add_batch_exec_time = 0; + int64_t wait_exec_time = 0; + index_channel->for_each_node_channel( + [this, &index_channel, &status, &state, &node_add_batch_counter_map, + &serialize_batch_ns, &channel_stat, &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, &total_wait_exec_time_ns, + &wait_exec_time, + &total_add_batch_num](const std::shared_ptr& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + // in pipeline, all node channels are done or canceled, will not block. + // no pipeline, close may block waiting. + auto s = ch->close_wait(state); + if (!s.ok()) { + status = this->_cancel_channel_and_check_intolerable_failure( + status, s.to_string(), index_channel, ch); + } + ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns, + &channel_stat, &queue_push_lock_ns, &actual_consume_ns, + &total_add_batch_exec_time_ns, &add_batch_exec_time, + &total_wait_exec_time_ns, &wait_exec_time, + &total_add_batch_num); + }); + num_node_channels += index_channel->num_node_channels(); + if (add_batch_exec_time > max_add_batch_exec_time_ns) { + max_add_batch_exec_time_ns = add_batch_exec_time; + } + if (wait_exec_time > max_wait_exec_time_ns) { + max_wait_exec_time_ns = wait_exec_time; + } + } // end for index channels if (status.ok()) { // TODO need to be improved diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 3f0ad7ed8900bf..d429666eabb85c 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -32,6 +35,7 @@ #include // IWYU pragma: no_include #include // IWYU pragma: keep +#include #include #include #include @@ -40,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -154,6 +159,8 @@ class ReusableClosure final : public google::protobuf::Closure { cid = cntl.call_id(); } + // if _packet_in_flight == false, set it to true. Return true. + // if _packet_in_flight == true, Return false. bool try_set_in_flight() { bool value = false; return _packet_in_flight.compare_exchange_strong(value, true); @@ -211,31 +218,47 @@ class VNodeChannelStat { int64_t append_node_channel_ns = 0; }; +// every NodeChannel keeps a data transmission channel with one BE. for multiple times open, it has a dozen of requests and corresponding closures. class VNodeChannel { public: - VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t node_id); + VNodeChannel(VOlapTableSink* parent, IndexChannel* index_channel, int64_t node_id, + bool is_incremental = false); ~VNodeChannel(); - // called before open, used to add tablet located in this backend + // called before open, used to add tablet located in this backend. called by IndexChannel::init void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } + std::string debug_tablets() const { + std::stringstream ss; + for (auto& tab : _all_tablets) { + tab.printTo(ss); + ss << '\n'; + } + return ss.str(); + } void add_slave_tablet_nodes(int64_t tablet_id, const std::vector& slave_nodes) { _slave_tablet_nodes[tablet_id] = slave_nodes; } + // build a request and build corresponding connect to BE. void open(); + // for auto partition, we use this to open more tablet. + void incremental_open(); Status init(RuntimeState* state); + // this will block until all request transmission which were opened or incremental opened finished. Status open_wait(); Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false); + // @return: unfinished running channels. + // @caller: VOlapTabletSink::_send_batch_process. it's a continual asynchronous process. int try_send_and_fetch_status(RuntimeState* state, std::unique_ptr& thread_pool_token); - - void try_send_block(RuntimeState* state); + // when there's pending block found by try_send_and_fetch_status(), we will awake a thread to send it. + void try_send_pending_block(RuntimeState* state); void clear_all_blocks(); @@ -299,10 +322,18 @@ class VNodeChannel { size_t get_pending_bytes() { return _pending_batches_bytes; } + bool is_incremental() const { return _is_incremental; } + protected: + // make a real open request for relative BE's load channel. + void _open_internal(bool is_incremental); + void _close_check(); void _cancel_with_msg(const std::string& msg); + void _add_block_success_callback(const PTabletWriterAddBlockResult& result, bool is_last_rpc); + void _add_block_failed_callback(bool is_last_rpc); + VOlapTableSink* _parent = nullptr; IndexChannel* _index_channel = nullptr; int64_t _node_id = -1; @@ -345,7 +376,8 @@ class VNodeChannel { std::atomic _pending_batches_num {0}; // reuse for vectorized std::shared_ptr _stub = nullptr; - RefCountClosure* _open_closure = nullptr; + // because we have incremantal open, we should keep one relative closure for one request. it's similarly for adding block. + std::vector*> _open_closures; std::vector _all_tablets; // map from tablet_id to node_id where slave replicas locate in @@ -373,6 +405,7 @@ class VNodeChannel { // rows number received per tablet, tablet_id -> rows_num std::vector> _tablets_received_rows; + // build a _cur_mutable_block and push into _pending_blocks. when not building, this block is empty. std::unique_ptr _cur_mutable_block; PTabletWriterAddBlockRequest _cur_add_block_request; @@ -380,6 +413,8 @@ class VNodeChannel { std::pair, PTabletWriterAddBlockRequest>; std::queue _pending_blocks; ReusableClosure* _add_block_closure = nullptr; + + bool _is_incremental; }; // Write block data to Olap Table. @@ -436,6 +471,16 @@ class VOlapTableSink : public DataSink { void _cancel_all_channel(Status status); + std::pair _get_partition_function(); + + void _save_missing_values(vectorized::ColumnPtr col, vectorized::DataTypePtr value_type, + std::vector filter); + + // create partitions when need for auto-partition table using #_partitions_need_create. + Status _automatic_create_partition(); + + Status _incremental_open_node_channel(const std::vector& partitions); + std::shared_ptr _mem_tracker; ObjectPool* _pool; @@ -467,11 +512,16 @@ class VOlapTableSink : public DataSink { std::unique_ptr _tablet_finder; // index_channel + std::mutex _stop_check_channel; std::vector> _channels; + std::unordered_map> _index_id_to_channel; bthread_t _sender_thread = 0; std::unique_ptr _send_batch_thread_pool_token; + // support only one partition column now + std::vector> _partitions_need_create; + std::unique_ptr _block_convertor; // Stats for this int64_t _send_data_ns = 0; @@ -489,6 +539,7 @@ class VOlapTableSink : public DataSink { RuntimeProfile::Counter* _append_node_channel_timer = nullptr; RuntimeProfile::Counter* _filter_timer = nullptr; RuntimeProfile::Counter* _where_clause_timer = nullptr; + RuntimeProfile::Counter* _add_partition_request_timer = nullptr; RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr; RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 8ab2013c633865..5cff7011af0792 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -346,10 +346,11 @@ private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Ana } ArrayList exprs = partitionInfo.getPartitionExprs(); if (exprs != null && analyzer != null) { + Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext()); tupleDescriptor.setTable(table); - analyzer.registerTupleDescriptor(tupleDescriptor); + funcAnalyzer.registerTupleDescriptor(tupleDescriptor); for (Expr e : exprs) { - e.analyze(analyzer); + e.analyze(funcAnalyzer); } partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); } diff --git a/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load1.csv b/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load1.csv new file mode 100644 index 00000000000000..ada7d3c6af6b0a --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load1.csv @@ -0,0 +1,10 @@ +1,2001-12-12 12:12:12.123,2001-12-12 12:12:12.123456 +2,2002-12-12 12:12:12.123,2001-12-12 12:12:12.123456 +3,2002-12-12 12:12:12.123,2001-12-12 12:12:12.123456 +4,2002-12-12 12:12:12.123,2001-12-12 12:12:12.123456 +5,2003-12-12 12:12:12.123,2001-12-13 12:12:12.123456 +6,2004-12-12 12:12:12.123,2001-12-14 12:12:12.123456 +7,2005-12-12 12:12:12.123,2001-11-12 12:12:12.123456 +8,2006-12-12 12:12:12.123,2001-11-12 12:12:12.123456 +9,2006-12-12 12:12:12.123,2001-11-13 12:12:12.123456 +10,2007-12-12 12:12:12.123,2001-11-14 12:12:12.123456 \ No newline at end of file diff --git a/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load2.csv b/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load2.csv new file mode 100644 index 00000000000000..16e3fae4915d65 --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/auto_partition_stream_load2.csv @@ -0,0 +1,10 @@ +1,Beijing,2001-12-12 12:12:12.123456 +2,BJ,2001-12-12 12:12:12.123456 +3,bj,2001-12-12 12:12:12.123456 +4,bJ,2001-12-12 12:12:12.123456 +5,Chengdu,2001-12-13 12:12:12.123456 +6,XIAN,2001-12-14 12:12:12.123456 +7,Chengdu,2001-11-12 12:12:12.123456 +8,chengDU,2001-11-12 12:12:12.123456 +9,xian,2001-11-13 12:12:12.123456 +10,beiJing,2001-11-14 12:12:12.123456 \ No newline at end of file diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_list_partition.out b/regression-test/data/partition_p0/auto_partition/test_auto_list_partition.out new file mode 100644 index 00000000000000..36a6418f3fbc9f --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/test_auto_list_partition.out @@ -0,0 +1,41 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +Abc +Beijing +Beijing +XXX +xxx + +-- !sql2 -- +Abc +Abc +Beijing +Beijing +Beijing +Beijing +XXX +XXX +new +xxx +xxx + +-- !sql3 -- +Abc +Beijing +Beijing +XXX +xxx + +-- !sql4 -- +Abc +Abc +Beijing +Beijing +Beijing +Beijing +XXX +XXX +new +xxx +xxx + diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out new file mode 100644 index 00000000000000..7e1dd673f69927 --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +1 2001-12-12T12:12:12 2001-12-12T12:12:12.123456 +2 2002-12-12T12:12:12 2001-12-12T12:12:12.123456 +3 2002-12-12T12:12:12 2001-12-12T12:12:12.123456 +4 2002-12-12T12:12:12 2001-12-12T12:12:12.123456 +5 2003-12-12T12:12:12 2001-12-13T12:12:12.123456 +6 2004-12-12T12:12:12 2001-12-14T12:12:12.123456 +7 2005-12-12T12:12:12 2001-11-12T12:12:12.123456 +8 2006-12-12T12:12:12 2001-11-12T12:12:12.123456 +9 2006-12-12T12:12:12 2001-11-13T12:12:12.123456 +10 2007-12-12T12:12:12 2001-11-14T12:12:12.123456 + +-- !select2 -- +1 Beijing 2001-12-12T12:12:12.123456 +2 BJ 2001-12-12T12:12:12.123456 +3 bj 2001-12-12T12:12:12.123456 +4 bJ 2001-12-12T12:12:12.123456 +5 Chengdu 2001-12-13T12:12:12.123456 +6 XIAN 2001-12-14T12:12:12.123456 +7 Chengdu 2001-11-12T12:12:12.123456 +8 chengDU 2001-11-12T12:12:12.123456 +9 xian 2001-11-13T12:12:12.123456 +10 beiJing 2001-11-14T12:12:12.123456 + diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_range_partition.out b/regression-test/data/partition_p0/auto_partition/test_auto_range_partition.out new file mode 100644 index 00000000000000..f359c996ecc682 --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/test_auto_range_partition.out @@ -0,0 +1,74 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select00 -- +2022-12-14T00:00 +2022-12-15T00:00 +2022-12-16T00:00 +2022-12-17T00:00 +2022-12-18T00:00 +2022-12-19T00:00 +2022-12-20T00:00 +2122-12-14T00:00 +2122-12-15T00:00 +2122-12-16T00:00 +2122-12-17T00:00 +2122-12-18T00:00 +2122-12-19T00:00 +2122-12-20T00:00 + +-- !select01 -- +2022-12-15T00:00 + +-- !select02 -- +2022-12-16T00:00 +2022-12-17T00:00 +2022-12-18T00:00 +2022-12-19T00:00 +2022-12-20T00:00 +2122-12-14T00:00 +2122-12-15T00:00 +2122-12-16T00:00 +2122-12-17T00:00 +2122-12-18T00:00 +2122-12-19T00:00 +2122-12-20T00:00 + +-- !select10 -- +2022-11-14T22:22:22.222 +2022-11-15T22:22:22.222 +2022-11-16T22:22:22.222 +2022-11-17T22:22:22.222 +2022-11-18T22:22:22.222 +2022-11-19T22:22:22.222 +2022-11-20T22:22:22.222 +2022-12-14T22:22:22.222 +2022-12-15T22:22:22.222 +2022-12-16T22:22:22.222 +2022-12-17T22:22:22.222 +2022-12-18T22:22:22.222 +2022-12-19T22:22:22.222 +2022-12-20T22:22:22.222 +2122-12-14T22:22:22.222 +2122-12-15T22:22:22.222 +2122-12-16T22:22:22.222 +2122-12-17T22:22:22.222 +2122-12-18T22:22:22.222 +2122-12-19T22:22:22.222 +2122-12-20T22:22:22.222 + +-- !select11 -- +2022-12-15T22:22:22.222 + +-- !select12 -- +2022-12-16T22:22:22.222 +2022-12-17T22:22:22.222 +2022-12-18T22:22:22.222 +2022-12-19T22:22:22.222 +2022-12-20T22:22:22.222 +2122-12-14T22:22:22.222 +2122-12-15T22:22:22.222 +2122-12-16T22:22:22.222 +2122-12-17T22:22:22.222 +2122-12-18T22:22:22.222 +2122-12-19T22:22:22.222 +2122-12-20T22:22:22.222 + diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_list_partition.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_list_partition.groovy new file mode 100644 index 00000000000000..405b2a1fc5cf77 --- /dev/null +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_list_partition.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_auto_list_partition") { + def tblName1 = "list_table1" + sql "drop table if exists ${tblName1}" + sql """ + CREATE TABLE `${tblName1}` ( + `str` varchar not null + ) ENGINE=OLAP + DUPLICATE KEY(`str`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`str`) + ( + ) + DISTRIBUTED BY HASH(`str`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName1} values ("Beijing"), ("XXX"), ("xxx"), ("Beijing"), ("Abc") """ + qt_sql1 """ select * from ${tblName1} order by `str` """ + result11 = sql "show partitions from ${tblName1}" + assertEquals(result11.size(), 4) + sql """ insert into ${tblName1} values ("Beijing"), ("XXX"), ("xxx"), ("Beijing"), ("Abc"), ("new") """ + qt_sql2 """ select * from ${tblName1} order by `str` """ + result12 = sql "show partitions from ${tblName1}" + assertEquals(result12.size(), 5) + + def tblName2 = "list_table2" + sql "drop table if exists ${tblName2}" + sql """ + CREATE TABLE `${tblName2}` ( + `str` varchar not null + ) ENGINE=OLAP + DUPLICATE KEY(`str`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`str`) + ( + ) + DISTRIBUTED BY HASH(`str`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName2} values ("Beijing"), ("XXX"), ("xxx"), ("Beijing"), ("Abc") """ + qt_sql3 """ select * from ${tblName2} order by `str` """ + result21 = sql "show partitions from ${tblName2}" + assertEquals(result21.size(), 4) + sql """ insert into ${tblName2} values ("Beijing"), ("XXX"), ("xxx"), ("Beijing"), ("Abc"), ("new") """ + qt_sql4 """ select * from ${tblName2} order by `str` """ + result22 = sql "show partitions from ${tblName2}" + assertEquals(result22.size(), 5) + + def tblName3 = "list_table3" + sql "drop table if exists ${tblName3}" + sql """ + CREATE TABLE `${tblName3}` ( + `k1` INT, + `k2` VARCHAR(50) not null, + `k3` DATETIMEV2(6) + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`k2`) + ( + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName3} values (1, 'ABC', '2000-01-01 12:12:12.123456'), (2, 'AAA', '2000-01-01'), (3, 'aaa', '2000-01-01'), (3, 'AaA', '2000-01-01') """ + result3 = sql "show partitions from ${tblName3}" + logger.info("${result3}") + assertEquals(result3.size(), 4) +} diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy new file mode 100644 index 00000000000000..0cf2eaf9c12dc2 --- /dev/null +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_auto_partition_load") { + def tblName1 = "load_table1" + sql "drop table if exists ${tblName1}" + sql """ + CREATE TABLE `${tblName1}` ( + `k1` INT, + `k2` DATETIME, + `k3` DATETIMEV2(6) + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + AUTO PARTITION BY RANGE date_trunc(`k2`, 'year') + ( + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + streamLoad { + table "${tblName1}" + set 'column_separator', ',' + file "auto_partition_stream_load1.csv" + time 20000 + } + + qt_select1 "select * from ${tblName1} order by k1" + result1 = sql "show partitions from ${tblName1}" + logger.info("${result1}") + assertEquals(result1.size(), 7) + + + def tblName2 = "load_table2" + sql "drop table if exists ${tblName2}" + sql """ + CREATE TABLE `${tblName2}` ( + `k1` INT, + `k2` VARCHAR(50) not null, + `k3` DATETIMEV2(6) + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`k2`) + ( + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + streamLoad { + table "${tblName2}" + set 'column_separator', ',' + file "auto_partition_stream_load2.csv" + time 20000 + } + + qt_select2 "select * from ${tblName2} order by k1" + result2 = sql "show partitions from ${tblName2}" + logger.info("${result2}") + assertEquals(result2.size(), 9) +} diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy new file mode 100644 index 00000000000000..874704ea8f5e21 --- /dev/null +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_auto_range_partition") { + def tblName1 = "range_table1" + sql "drop table if exists ${tblName1}" + // not support datev2 now. need impl date_trunc(datev2) + sql """ + CREATE TABLE `${tblName1}` ( + `TIME_STAMP` datetimev2 NOT NULL COMMENT '采集日期' + ) ENGINE=OLAP + DUPLICATE KEY(`TIME_STAMP`) + COMMENT 'OLAP' + AUTO PARTITION BY RANGE date_trunc(`TIME_STAMP`, 'day') + ( + ) + DISTRIBUTED BY HASH(`TIME_STAMP`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName1} values ('2022-12-14'), ('2022-12-15'), ('2022-12-16'), ('2022-12-17'), ('2022-12-18'), ('2022-12-19'), ('2022-12-20') """ + sql """ insert into ${tblName1} values ('2122-12-14'), ('2122-12-15'), ('2122-12-16'), ('2122-12-17'), ('2122-12-18'), ('2122-12-19'), ('2122-12-20') """ + + qt_select00 """ select * from ${tblName1} order by TIME_STAMP """ + qt_select01 """ select * from ${tblName1} WHERE TIME_STAMP = '2022-12-15' order by TIME_STAMP """ + qt_select02 """ select * from ${tblName1} WHERE TIME_STAMP > '2022-12-15' order by TIME_STAMP """ + + def tblName2 = "range_table2" + sql "drop table if exists ${tblName2}" + sql """ + CREATE TABLE `${tblName2}` ( + `TIME_STAMP` datetimev2(3) NOT NULL COMMENT '采集日期' + ) ENGINE=OLAP + DUPLICATE KEY(`TIME_STAMP`) + COMMENT 'OLAP' + AUTO PARTITION BY RANGE date_trunc(`TIME_STAMP`, 'day') + ( + ) + DISTRIBUTED BY HASH(`TIME_STAMP`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName2} values ('2022-12-14 22:22:22.222'), ('2022-12-15 22:22:22.222'), ('2022-12-16 22:22:22.222'), ('2022-12-17 22:22:22.222'), ('2022-12-18 22:22:22.222'), ('2022-12-19 22:22:22.222'), ('2022-12-20 22:22:22.222') """ + sql """ insert into ${tblName2} values ('2122-12-14 22:22:22.222'), ('2122-12-15 22:22:22.222'), ('2122-12-16 22:22:22.222'), ('2122-12-17 22:22:22.222'), ('2122-12-18 22:22:22.222'), ('2122-12-19 22:22:22.222'), ('2122-12-20 22:22:22.222') """ + sql """ insert into ${tblName2} values ('2022-11-14 22:22:22.222'), ('2022-11-15 22:22:22.222'), ('2022-11-16 22:22:22.222'), ('2022-11-17 22:22:22.222'), ('2022-11-18 22:22:22.222'), ('2022-11-19 22:22:22.222'), ('2022-11-20 22:22:22.222') """ + + + qt_select10 """ select * from ${tblName2} order by TIME_STAMP """ + qt_select11 """ select * from ${tblName2} WHERE TIME_STAMP = '2022-12-15 22:22:22.222' order by TIME_STAMP """ + qt_select12 """ select * from ${tblName2} WHERE TIME_STAMP > '2022-12-15 22:22:22.222' order by TIME_STAMP """ + + def tblName3 = "range_table3" + sql "drop table if exists ${tblName3}" + sql """ + CREATE TABLE `${tblName3}` ( + `k1` INT, + `k2` DATETIMEV2(3), + `k3` DATETIMEV2(6) + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + AUTO PARTITION BY RANGE date_trunc(`k2`, 'day') + ( + ) + DISTRIBUTED BY HASH(`k1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ insert into ${tblName3} values (1, '1990-01-01', '2000-01-01 12:12:12.123456'), (2, '1991-02-01', '2000-01-01'), (3, '1991-01-01', '2000-01-01'), (3, '1991-01-01', '2000-01-01') """ + result1 = sql "show partitions from ${tblName3}" + logger.info("${result1}") + assertEquals(result1.size(), 3) +} diff --git a/regression-test/suites/partition_p0/test_datev2_partition.groovy b/regression-test/suites/partition_p0/test_datev2_partition.groovy index 600b820684c240..63852bb4e2e672 100644 --- a/regression-test/suites/partition_p0/test_datev2_partition.groovy +++ b/regression-test/suites/partition_p0/test_datev2_partition.groovy @@ -43,7 +43,6 @@ suite("test_datev2_partition") { qt_select """ select * from ${tblName1} order by TIME_STAMP """ qt_select """ select * from ${tblName1} WHERE TIME_STAMP = '2022-12-15' order by TIME_STAMP """ qt_select """ select * from ${tblName1} WHERE TIME_STAMP > '2022-12-15' order by TIME_STAMP """ - sql "drop table ${tblName1}" def tblName2 = "test_datev2_partition2" sql "drop table if exists ${tblName2}" @@ -72,5 +71,4 @@ suite("test_datev2_partition") { qt_select """ select * from ${tblName2} order by TIME_STAMP """ qt_select """ select * from ${tblName2} WHERE TIME_STAMP = '2022-12-15 22:22:22.222' order by TIME_STAMP """ qt_select """ select * from ${tblName2} WHERE TIME_STAMP > '2022-12-15 22:22:22.222' order by TIME_STAMP """ - sql "drop table ${tblName2}" }