From cd71570038fd476edf1620252d8b26e5e96f97da Mon Sep 17 00:00:00 2001 From: eldenmoon Date: Thu, 19 Dec 2024 15:11:13 +0800 Subject: [PATCH] fix 6 --- .../olap/rowset/segment_v2/column_reader.cpp | 15 +++++--- be/src/olap/rowset/segment_v2/column_reader.h | 10 ++--- .../segment_v2/hierarchical_data_reader.cpp | 25 +++++++++--- be/src/olap/rowset/segment_v2/segment.cpp | 23 ++++------- be/src/olap/rowset/segment_v2/segment.h | 4 +- .../segment_v2/variant_column_writer_impl.cpp | 38 +++++++++++-------- .../segment_v2/variant_column_writer_impl.h | 4 +- be/src/vec/columns/column_object.cpp | 17 +++++++-- be/src/vec/columns/column_object.h | 8 ++-- 9 files changed, 85 insertions(+), 59 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 2f303999aea308d..909e006bb8269ee 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -245,7 +245,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, // Node contains column with children columns or has correspoding sparse columns // Create reader with hirachical data. std::unique_ptr sparse_iter; - if (!_sparse_column_set_in_stats.empty()) { + if (_statistics && !_statistics->sparse_column_non_null_size.empty()) { // Sparse column exists or reached sparse size limit, read sparse column ColumnIterator* iter; RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter)); @@ -259,9 +259,10 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, read_type, std::move(sparse_iter))); } } else { - if (_sparse_column_set_in_stats.contains(StringRef {relative_path.get_path()}) || - _sparse_column_set_in_stats.size() > - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + if (_statistics && + (_statistics->sparse_column_non_null_size.contains(relative_path.get_path()) || + _statistics->sparse_column_non_null_size.size() > + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE)) { // Sparse column exists or reached sparse size limit, read sparse column ColumnIterator* inner_iter; RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); @@ -323,9 +324,13 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF // init sparse column set in stats if (self_column_pb.has_variant_statistics()) { + _statistics = std::make_unique(); const auto& variant_stats = self_column_pb.variant_statistics(); for (const auto& [path, _] : variant_stats.sparse_column_non_null_size()) { - _sparse_column_set_in_stats.emplace(path.data(), path.size()); + _statistics->sparse_column_non_null_size.emplace(path.data(), path.size()); + } + for (const auto& [path, _] : variant_stats.subcolumn_non_null_size()) { + _statistics->subcolumns_non_null_size.emplace(path.data(), path.size()); } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 189435c20951304..646e657b1627b45 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -79,8 +79,7 @@ class InvertedIndexFileReader; class PageDecoder; class RowRanges; class ZoneMapIndexReader; -// struct SubcolumnReader; -// using SubcolumnColumnReaders = vectorized::SubcolumnsTree; +struct VariantStatistics; struct ColumnReaderOptions { // whether verify checksum when read page @@ -311,13 +310,12 @@ class VariantColumnReader : public ColumnReader { FieldType get_meta_type() override { return FieldType::OLAP_FIELD_TYPE_VARIANT; } + const VariantStatistics* get_stats() const { return _statistics.get(); } + private: std::unique_ptr _subcolumn_readers; std::unique_ptr _sparse_column_reader; - // Some sparse column record in stats, use StringRef to reduce memory usage, - // notice: make sure the ref is not released before the ColumnReader is destructed, - // used to decide whether to read from sparse column - std::unordered_set _sparse_column_set_in_stats; + std::unique_ptr _statistics; }; // Base iterator to read one column data diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index ca25b230bceec3e..a0e8b3fd0eecd84 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -24,6 +24,7 @@ #include "olap/rowset/segment_v2/column_reader.h" #include "vec/columns/column.h" #include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/common/assert_cast.h" #include "vec/common/schema_util.h" @@ -82,6 +83,10 @@ Status HierarchicalDataReader::init(const ColumnIteratorOptions& opts) { RETURN_IF_ERROR(_root_reader->iterator->init(opts)); _root_reader->inited = true; } + if (_sparse_column_reader && !_sparse_column_reader->inited) { + RETURN_IF_ERROR(_sparse_column_reader->iterator->init(opts)); + _sparse_column_reader->inited = true; + } return Status::OK(); } @@ -402,15 +407,23 @@ Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) { } void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst) { + vectorized::ColumnNullable* nullable_column = nullptr; + if (dst->is_nullable()) { + nullable_column = assert_cast(dst.get()); + } vectorized::ColumnObject& var = - dst->is_nullable() - ? assert_cast( - assert_cast(*dst).get_nested_column()) + nullable_column != nullptr + ? assert_cast(nullable_column->get_nested_column()) : assert_cast(*dst); - DCHECK(!var.is_null_root()); - vectorized::ColumnObject::fill_path_olumn_from_sparse_data( - *var.get_subcolumn({}) /*root*/, StringRef {_path.data(), _path.size()}, + if (var.is_null_root()) { + var.add_sub_column({}, dst->size()); + } + vectorized::NullMap* null_map = + nullable_column ? &nullable_column->get_null_map_data() : nullptr; + vectorized::ColumnObject::fill_path_column_from_sparse_data( + *var.get_subcolumn({}) /*root*/, null_map, StringRef {_path.data(), _path.size()}, _sparse_column->get_ptr(), 0, _sparse_column->size()); + var.incr_num_rows(_sparse_column->size()); _sparse_column->clear(); } diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index ededa493018aca4..238898a74ecc09a 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -201,25 +201,9 @@ Status Segment::_open() { // 0.01 comes from PrimaryKeyIndexBuilder::init _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8; - // collec variant statistics - for (const auto& column_pb : _footer_pb->columns()) { - if (column_pb.has_variant_statistics()) { - _variant_column_stats.try_emplace(column_pb.unique_id(), - column_pb.variant_statistics()); - } - } - return Status::OK(); } -const VariantStatisticsPB* Segment::get_stats(int32_t unique_id) const { - auto it = _variant_column_stats.find(unique_id); - if (it == _variant_column_stats.end()) { - return nullptr; - } - return &it->second; -} - Status Segment::_open_inverted_index() { _inverted_index_file_reader = std::make_shared( _fs, @@ -828,6 +812,13 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column, return Status::OK(); } +ColumnReader* Segment::get_column_reader(int32_t col_unique_id) { + if (_column_readers.contains(col_unique_id)) { + return _column_readers[col_unique_id].get(); + } + return nullptr; +} + ColumnReader* Segment::_get_column_reader(const TabletColumn& col) { // init column iterator by path info if (col.has_path_info() || col.is_variant_type()) { diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index 1c7b94271630d9b..5b88e60e37a29c8 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -66,6 +66,7 @@ class BitmapIndexIterator; class Segment; class InvertedIndexIterator; class InvertedIndexFileReader; +struct VariantStatistics; using SegmentSharedPtr = std::shared_ptr; // A Segment is used to represent a segment in memory format. When segment is @@ -208,7 +209,7 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; } - const VariantStatisticsPB* get_stats(int32_t unique_id) const; + ColumnReader* get_column_reader(int32_t col_unique_id); private: DISALLOW_COPY_AND_ASSIGN(Segment); @@ -288,7 +289,6 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd int _be_exec_version = BeExecVersionManager::get_newest_version(); OlapReaderStatistics* _pk_index_load_stats = nullptr; - std::unordered_map _variant_column_stats; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index a3671f3afd31099..6588e7dbe4f5887 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -19,6 +19,7 @@ #include #include "common/status.h" +#include "olap/olap_common.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_writer_context.h" @@ -68,19 +69,24 @@ Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::setload_segments( std::static_pointer_cast(reader->rowset()), &segment_cache)); for (const auto& segment : segment_cache.get_segments()) { - const VariantStatisticsPB* source_statistics = - segment->get_stats(_tablet_column->unique_id()); + ColumnReader* column_reader = segment->get_column_reader(_tablet_column->unique_id()); + if (!column_reader) { + continue; + } + CHECK(column_reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT); + const VariantStatistics* source_statistics = + static_cast(column_reader)->get_stats(); if (!source_statistics) { continue; } - for (const auto& [path, size] : source_statistics->subcolumn_non_null_size()) { + for (const auto& [path, size] : source_statistics->subcolumns_non_null_size) { auto it = path_to_total_number_of_non_null_values.find(path); if (it == path_to_total_number_of_non_null_values.end()) { it = path_to_total_number_of_non_null_values.emplace(path, 0).first; } it->second += size; } - for (const auto& [path, size] : source_statistics->sparse_column_non_null_size()) { + for (const auto& [path, size] : source_statistics->sparse_column_non_null_size) { auto it = path_to_total_number_of_non_null_values.find(path); if (it == path_to_total_number_of_non_null_values.end()) { it = path_to_total_number_of_non_null_values.emplace(path, 0).first; @@ -201,8 +207,8 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows); // get stastics - _statistics._subcolumns_non_null_size.emplace(entry->path.get_path(), - entry->data.get_non_null_value_size()); + _statistics.subcolumns_non_null_size.emplace(entry->path.get_path(), + entry->data.get_non_null_value_size()); } return Status::OK(); } @@ -239,12 +245,12 @@ Status VariantColumnWriterImpl::_process_sparse_column( const auto [sparse_data_paths, _] = ptr->get_sparse_data_paths_and_values(); for (size_t i = 0; i != sparse_data_paths->size(); ++i) { auto path = sparse_data_paths->get_data_at(i); - if (auto it = _statistics._sparse_column_non_null_size.find(path); - it != _statistics._sparse_column_non_null_size.end()) { + if (auto it = _statistics.sparse_column_non_null_size.find(path.to_string()); + it != _statistics.sparse_column_non_null_size.end()) { ++it->second; - } else if (_statistics._sparse_column_non_null_size.size() < + } else if (_statistics.sparse_column_non_null_size.size() < VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { - _statistics._sparse_column_non_null_size.emplace(path, 1); + _statistics.sparse_column_non_null_size.emplace(path, 1); } } @@ -253,21 +259,21 @@ Status VariantColumnWriterImpl::_process_sparse_column( } void VariantStatistics::to_pb(VariantStatisticsPB* stats) const { - for (const auto& [path, value] : _subcolumns_non_null_size) { - stats->mutable_subcolumn_non_null_size()->emplace(path.to_string(), value); + for (const auto& [path, value] : subcolumns_non_null_size) { + stats->mutable_subcolumn_non_null_size()->emplace(path, value); } - for (const auto& [path, value] : _sparse_column_non_null_size) { - stats->mutable_sparse_column_non_null_size()->emplace(path.to_string(), value); + for (const auto& [path, value] : sparse_column_non_null_size) { + stats->mutable_sparse_column_non_null_size()->emplace(path, value); } } void VariantStatistics::from_pb(const VariantStatisticsPB& stats) { // make sure the ref of path, todo not use ref for (const auto& [path, value] : stats.subcolumn_non_null_size()) { - _subcolumns_non_null_size[StringRef(path.data(), path.size())] = value; + subcolumns_non_null_size[path] = value; } for (const auto& [path, value] : stats.sparse_column_non_null_size()) { - _sparse_column_non_null_size[StringRef(path.data(), path.size())] = value; + sparse_column_non_null_size[path] = value; } } diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 66c5269e7ce2c5b..b003a21098f5660 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -38,8 +38,8 @@ class ScalarColumnWriter; struct VariantStatistics { // If reached the size of this, we should stop writing statistics for sparse data constexpr static size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000; - std::map _subcolumns_non_null_size; - std::map _sparse_column_non_null_size; + std::unordered_map subcolumns_non_null_size; + std::unordered_map sparse_column_non_null_size; void to_pb(VariantStatisticsPB* stats) const; void from_pb(const VariantStatisticsPB& stats); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 2b64f7f392f250b..f74ad2283963083 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -2041,6 +2041,7 @@ Status ColumnObject::finalize(FinalizeMode mode) { new_subcolumns.get_mutable_root()->data.finalize(mode); } else if (mode == FinalizeMode::WRITE_MODE) { new_subcolumns.create_root(Subcolumn(num_rows, is_nullable, true)); + new_subcolumns.get_mutable_root()->data.finalize(mode); } const bool need_pick_subcolumn_to_sparse_column = @@ -2493,15 +2494,19 @@ size_t ColumnObject::find_path_lower_bound_in_sparse_data(StringRef path, return it.index; } -void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, StringRef path, - const ColumnPtr& sparse_data_column, - size_t start, size_t end) { +void ColumnObject::fill_path_column_from_sparse_data(Subcolumn& subcolumn, NullMap* null_map, + StringRef path, + const ColumnPtr& sparse_data_column, + size_t start, size_t end) { const auto& sparse_data_map = assert_cast(*sparse_data_column); const auto& sparse_data_offsets = sparse_data_map.get_offsets(); size_t first_offset = sparse_data_offsets[static_cast(start) - 1]; size_t last_offset = sparse_data_offsets[static_cast(end) - 1]; // Check if we have at least one row with data. if (first_offset == last_offset) { + if (null_map) { + null_map->resize_fill(end - start, 1); + } subcolumn.insert_many_defaults(end - start); return; } @@ -2513,6 +2518,7 @@ void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, String size_t paths_end = sparse_data_offsets[static_cast(i)]; auto lower_bound_path_index = ColumnObject::find_path_lower_bound_in_sparse_data( path, sparse_data_paths, paths_start, paths_end); + bool is_null = false; if (lower_bound_path_index != paths_end && sparse_data_paths.get_data_at(lower_bound_path_index) == path) { // auto value_data = sparse_data_values.get_data_at(lower_bound_path_index); @@ -2521,8 +2527,13 @@ void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, String const auto& data = ColumnObject::deserialize_from_sparse_column(&sparse_data_values, lower_bound_path_index); subcolumn.insert(data.first, data.second); + is_null = false; } else { subcolumn.insert_default(); + is_null = true; + } + if (null_map) { + null_map->push_back(is_null); } } } diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index f8ba93ef8247476..9d9d40f9075d653 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -39,6 +39,7 @@ #include "util/jsonb_document.h" #include "vec/columns/column.h" #include "vec/columns/column_map.h" +#include "vec/columns/column_nullable.h" #include "vec/columns/subcolumn_tree.h" #include "vec/common/cow.h" #include "vec/common/string_ref.h" @@ -586,9 +587,10 @@ class ColumnObject final : public COWHelper { return {&key, &value}; } // Insert all the data from sparse data with specified path to sub column. - static void fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, StringRef path, - const ColumnPtr& sparse_data_column, size_t start, - size_t end); + static void fill_path_column_from_sparse_data(Subcolumn& subcolumn, NullMap* null_map, + StringRef path, + const ColumnPtr& sparse_data_column, size_t start, + size_t end); static size_t find_path_lower_bound_in_sparse_data(StringRef path, const ColumnString& sparse_data_paths,