From aff912b8672c859d980632904f25d88ba654cba8 Mon Sep 17 00:00:00 2001 From: Sun Chenyang Date: Tue, 10 Dec 2024 19:13:14 +0800 Subject: [PATCH] [opt](variant) add serialized sparse column (#45252) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/vec/columns/column_object.cpp | 293 ++++++++++++++++-- be/src/vec/columns/column_object.h | 67 +++- be/src/vec/data_types/serde/data_type_serde.h | 5 + 3 files changed, 318 insertions(+), 47 deletions(-) diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 2983d7991666e9..c1a50f6064b34a 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -51,6 +51,7 @@ #include "vec/aggregate_functions/helpers.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" +#include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" @@ -982,6 +983,43 @@ void ColumnObject::Subcolumn::get(size_t n, Field& res) const { n); } +void ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std::string_view path, + ColumnString* value, size_t row, + bool& is_null) { + // no need insert + if (least_common_type.get_base_type_id() == TypeIndex::Nothing) { + is_null = true; + return; + } + + // no need insert + if (row < num_of_defaults_in_prefix) { + is_null = true; + return; + } + + // remove default + row -= num_of_defaults_in_prefix; + is_null = false; + for (size_t i = 0; i < data.size(); ++i) { + const auto& part = data[i]; + if (row < part->size()) { + // insert key + key->insert_data(path.data(), path.size()); + // insert value + const auto& part_type = data_types[i]; + const auto& serde = part_type->get_serde(); + serde->write_one_cell_to_binary(*part, value, row); + return; + } + + row -= part->size(); + } + + throw doris::Exception(ErrorCode::OUT_OF_BOUND, + "Index ({}) for serialize to sparse column is out of range", row); +} + Field ColumnObject::operator[](size_t n) const { Field object; get(n, object); @@ -1062,35 +1100,48 @@ void ColumnObject::add_nested_subcolumn(const PathInData& key, const FieldInfo& } } +bool ColumnObject::try_add_new_subcolumn(const PathInData& path) { + if (subcolumns.size() == MAX_SUBCOLUMNS) return false; + + return add_sub_column(path, num_rows); +} + void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) { #ifndef NDEBUG check_consistency(); #endif const auto& src_object = assert_cast(src); + + // First, insert src subcolumns + // We can reach the limit of subcolumns, and in this case + // the rest of subcolumns from src will be inserted into sparse column. + std::map src_path_and_subcoumn_for_sparse_column; for (const auto& entry : src_object.subcolumns) { - if (!has_subcolumn(entry->path)) { - if (entry->path.has_nested_part()) { - FieldInfo field_info { - .scalar_type_id = entry->data.least_common_type.get_base_type_id(), - .have_nulls = false, - .need_convert = false, - .num_dimensions = entry->data.get_dimensions()}; - add_nested_subcolumn(entry->path, field_info, num_rows); - } else { - add_sub_column(entry->path, num_rows); - } + // Check if we already have such dense column path. + if (auto* subcolumn = get_subcolumn(entry->path); subcolumn != nullptr) { + subcolumn->insert_range_from(entry->data, start, length); + } else if (try_add_new_subcolumn(entry->path)) { + subcolumn = get_subcolumn(entry->path); + DCHECK(subcolumn != nullptr); + subcolumn->insert_range_from(entry->data, start, length); + } else { + src_path_and_subcoumn_for_sparse_column.emplace(entry->path.get_path(), entry->data); } - auto* subcolumn = get_subcolumn(entry->path); - subcolumn->insert_range_from(entry->data, start, length); } - for (auto& entry : subcolumns) { - if (!src_object.has_subcolumn(entry->path)) { - bool inserted = try_insert_many_defaults_from_nested(entry); - if (!inserted) { - entry->data.insert_many_defaults(length); - } - } + + // Paths in sparse column are sorted, so paths from src_dense_column_path_for_sparse_column should be inserted properly + // to keep paths sorted. Let's sort them in advance. + std::vector> sorted_src_subcolumn_for_sparse_column; + auto it = src_path_and_subcoumn_for_sparse_column.begin(); + auto end = src_path_and_subcoumn_for_sparse_column.end(); + while (it != end) { + sorted_src_subcolumn_for_sparse_column.emplace_back(it->first, it->second); + ++it; } + + insert_from_sparse_column_and_fill_remaing_dense_column( + src_object, std::move(sorted_src_subcolumn_for_sparse_column), start, length); + num_rows += length; finalize(); #ifndef NDEBUG @@ -1098,6 +1149,141 @@ void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t le #endif } +// std::map +void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( + const ColumnObject& src, + std::vector>&& + sorted_src_subcolumn_for_sparse_column, + size_t start, size_t length) { + /// Check if src object doesn't have any paths in serialized sparse column. + const auto& src_serialized_sparse_column_offsets = src.serialized_sparse_column_offsets(); + if (src_serialized_sparse_column_offsets[start - 1] == + src_serialized_sparse_column_offsets[start + length - 1]) { + size_t current_size = size(); + + /// If no src subcolumns should be inserted into sparse column, insert defaults. + if (sorted_src_subcolumn_for_sparse_column.empty()) { + serialized_sparse_column->insert_many_defaults(length); + } else { + // Otherwise insert required src dense columns into sparse column. + auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values(); + auto& sparse_column_offsets = serialized_sparse_column_offsets(); + for (size_t i = start; i != start + length; ++i) { + int null_count = 0; + // Paths in sorted_src_subcolumn_for_sparse_column are already sorted. + for (auto& [path, subcolumn] : sorted_src_subcolumn_for_sparse_column) { + bool is_null = false; + subcolumn.serialize_to_sparse_column(sparse_column_keys, path, + sparse_column_values, i, is_null); + if (is_null) { + ++null_count; + } + } + + // All the sparse columns in this row are null. + if (null_count == sorted_src_subcolumn_for_sparse_column.size()) { + serialized_sparse_column->insert_default(); + } else { + DCHECK_EQ(sparse_column_keys->size(), + sparse_column_offsets[i - 1] + + sorted_src_subcolumn_for_sparse_column.size() - null_count); + DCHECK_EQ(sparse_column_values->size(), sparse_column_keys->size()); + sparse_column_offsets.push_back(sparse_column_keys->size()); + } + } + } + + // Insert default values in all remaining dense columns. + for (const auto& entry : subcolumns) { + if (entry->data.size() == current_size) { + entry->data.insert_many_defaults(length); + } + } + return; + } + + // Src object column contains some paths in serialized sparse column in specified range. + // Iterate over this range and insert all required paths into serialized sparse column or subcolumns. + const auto& [src_sparse_column_path, src_sparse_column_values] = + src.get_sparse_data_paths_and_values(); + auto [sparse_column_path, sparse_column_values] = get_sparse_data_paths_and_values(); + + auto& sparse_column_offsets = serialized_sparse_column_offsets(); + for (size_t row = start; row != start + length; ++row) { + size_t current_size = sparse_column_offsets.size(); + + // Use separate index to iterate over sorted sorted_src_subcolumn_for_sparse_column. + size_t sorted_src_subcolumn_for_sparse_column_idx = 0; + size_t sorted_src_subcolumn_for_sparse_column_size = + sorted_src_subcolumn_for_sparse_column.size(); + int null_count = 0; + + size_t offset = src_serialized_sparse_column_offsets[row - 1]; + size_t end = src_serialized_sparse_column_offsets[row]; + // Iterator over [path, binary value] + for (size_t i = offset; i != end; ++i) { + const StringRef src_sparse_path_string = src_sparse_column_path->get_data_at(i); + const std::string_view src_sparse_path(src_sparse_path_string); + // Check if we have this path in subcolumns. + const PathInData column_path(src_sparse_path); + if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) { + // Deserialize binary value into subcolumn from src serialized sparse column data. + subcolumn->deserialize_from_sparse_column(src_sparse_column_values, i); + } else { + // Before inserting this path into sparse column check if we need to + // insert suibcolumns from sorted_src_subcolumn_for_sparse_column before. + while (sorted_src_subcolumn_for_sparse_column_idx < + sorted_src_subcolumn_for_sparse_column_size && + sorted_src_subcolumn_for_sparse_column + [sorted_src_subcolumn_for_sparse_column_idx] + .first < src_sparse_path) { + auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column + [sorted_src_subcolumn_for_sparse_column_idx++]; + bool is_null = false; + src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path, + sparse_column_values, row, is_null); + if (is_null) { + ++null_count; + } + } + + /// Insert path and value from src sparse column to our sparse column. + sparse_column_path->insert_from(*src_sparse_column_path, i); + sparse_column_values->insert_from(*src_sparse_column_values, i); + } + } + + // Insert remaining dynamic paths from src_dynamic_paths_for_shared_data. + while (sorted_src_subcolumn_for_sparse_column_idx < + sorted_src_subcolumn_for_sparse_column_size) { + auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column + [sorted_src_subcolumn_for_sparse_column_idx++]; + bool is_null = false; + src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path, + sparse_column_values, row, is_null); + if (is_null) { + ++null_count; + } + } + + // All the sparse columns in this row are null. + if (null_count == sorted_src_subcolumn_for_sparse_column.size()) { + serialized_sparse_column->insert_default(); + } else { + sparse_column_offsets.push_back(sparse_column_path->size()); + } + + // Insert default values in all remaining dense columns. + for (const auto& entry : subcolumns) { + if (entry->data.size() == current_size) { + entry->data.insert_default(); + } + } + } + + return; +} + ColumnPtr ColumnObject::replicate(const Offsets& offsets) const { if (subcolumns.empty()) { // Add an emtpy column with offsets.back rows @@ -1514,6 +1700,47 @@ size_t ColumnObject::Subcolumn::get_non_null_value_size() const { return res; } +Status ColumnObject::serialize_sparse_columns( + std::map&& remaing_subcolumns) { + CHECK(is_finalized()); + + serialized_sparse_column = ColumnMap::create(ColumnString::create(), ColumnString::create(), + ColumnArray::ColumnOffsets::create()); + if (remaing_subcolumns.empty()) { + serialized_sparse_column->insert_many_defaults(num_rows); + return Status::OK(); + } + serialized_sparse_column->reserve(num_rows); + auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values(); + auto& sparse_column_offsets = serialized_sparse_column_offsets(); + + // Fill the column map for each row + for (size_t i = 0; i < num_rows; ++i) { + int null_count = 0; + + for (auto& [path, subcolumn] : remaing_subcolumns) { + bool is_null = false; + subcolumn.serialize_to_sparse_column(sparse_column_keys, path, sparse_column_values, i, + is_null); + if (is_null) { + ++null_count; + } + } + + // All the sparse columns in this row are null. + if (null_count == remaing_subcolumns.size()) { + serialized_sparse_column->insert_default(); + } else { + DCHECK_EQ(sparse_column_keys->size(), + sparse_column_offsets[i - 1] + remaing_subcolumns.size() - null_count); + DCHECK_EQ(sparse_column_values->size(), sparse_column_keys->size()); + sparse_column_offsets.push_back(sparse_column_keys->size()); + } + } + CHECK_EQ(serialized_sparse_column->size(), num_rows); + return Status::OK(); +} + void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const { entry->data.finalize(); auto nested_column = entry->data.get_finalized_column_ptr()->assume_mutable(); @@ -1558,11 +1785,11 @@ Status ColumnObject::finalize(FinalizeMode mode) { } // pick sparse columns - std::set selected_subcolumns; - std::set remaining_subcolumns; + std::set selected_path; + std::vector remaining_path; if (subcolumns.size() > MAX_SUBCOLUMNS) { // pick subcolumns sort by size of none null values - std::unordered_map none_null_value_sizes; + std::unordered_map none_null_value_sizes; // 1. get the none null value sizes for (auto&& entry : subcolumns) { if (entry->data.is_root) { @@ -1572,22 +1799,20 @@ Status ColumnObject::finalize(FinalizeMode mode) { none_null_value_sizes[entry->path.get_path()] = size; } // 2. sort by the size - std::vector> sorted_by_size(none_null_value_sizes.begin(), - none_null_value_sizes.end()); + std::vector> sorted_by_size( + none_null_value_sizes.begin(), none_null_value_sizes.end()); std::sort(sorted_by_size.begin(), sorted_by_size.end(), [](const auto& a, const auto& b) { return a.second > b.second; }); // 3. pick MAX_SUBCOLUMNS selected subcolumns - std::set selected_subcolumns; for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, sorted_by_size.size()); ++i) { - selected_subcolumns.insert(sorted_by_size[i].first); + selected_path.insert(sorted_by_size[i].first); } // 4. put remaining subcolumns to remaining_subcolumns - std::vector remaining_subcolumns; for (const auto& entry : sorted_by_size) { - if (selected_subcolumns.find(entry.first) == selected_subcolumns.end()) { - remaining_subcolumns.push_back(entry.first); + if (selected_path.find(entry.first) == selected_path.end()) { + remaining_path.emplace_back(entry.first); } } } @@ -1617,21 +1842,21 @@ Status ColumnObject::finalize(FinalizeMode mode) { // add selected subcolumns to new_subcolumns for (auto&& entry : subcolumns) { - if (selected_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) { + if (selected_path.find(entry->path.get_path()) != selected_path.end()) { new_subcolumns.add(entry->path, entry->data); } } - std::map remaing_subcolumns; + std::map remaing_subcolumns; // merge remaining subcolumns to sparse_column for (auto&& entry : subcolumns) { - if (remaining_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) { + if (selected_path.find(entry->path.get_path()) != selected_path.end()) { remaing_subcolumns.emplace(entry->path.get_path(), entry->data); } } // merge and encode sparse column - RETURN_IF_ERROR(merge_sparse_columns(remaing_subcolumns)); + RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns))); std::swap(subcolumns, new_subcolumns); doc_structure = nullptr; diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 1475a168c23a9d..38ed5478f02c86 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -98,7 +98,7 @@ class ColumnObject final : public COWHelper { constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB; // Nullable(Array(Nullable(Object))) const static DataTypePtr NESTED_TYPE; - const static size_t MAX_SUBCOLUMNS = 200; + const size_t MAX_SUBCOLUMNS = 200; // Finlize mode for subcolumns, write mode will estimate which subcolumns are sparse columns(too many null values inside column), // merge and encode them into a shared column in root column. Only affects in flush block to segments. // Otherwise read mode should be as default mode. @@ -181,6 +181,13 @@ class ColumnObject final : public COWHelper { void add_new_column_part(DataTypePtr type); + // Serialize the i-th row of the column into the sparse column. + void serialize_to_sparse_column(ColumnString* key, std::string_view path, + ColumnString* value, size_t row, bool& is_null); + + // Deserialize the i-th row of the column from the sparse column. + void deserialize_from_sparse_column(const ColumnString* value, size_t row) {} + friend class ColumnObject; private: @@ -244,13 +251,20 @@ class ColumnObject final : public COWHelper { const bool is_nullable; Subcolumns subcolumns; size_t num_rows; - // sparse columns will be merge and encoded as ColumnMap - WrappedPtr sparse_column; + + // The rapidjson document format of Subcolumns tree structure + // the leaves is null.In order to display whole document, copy + // this structure and fill with Subcolumns sub items + mutable std::shared_ptr doc_structure; using SubColumnWithName = std::pair; // Cached search results for previous row (keyed as index in JSON object) - used as a hint. mutable std::vector _prev_positions; + // It's filled when the number of subcolumns reaches the limit. + // It has type Map(String, String) and stores a map (path, binary serialized subcolumn value) for each row. + WrappedPtr serialized_sparse_column; + public: static constexpr auto COLUMN_NAME_DUMMY = "_dummy"; @@ -280,19 +294,12 @@ class ColumnObject final : public COWHelper { Status serialize_one_row_to_json_format(int64_t row, rapidjson::StringBuffer* output, bool* is_null) const; - // merge multiple sub sparse columns - Status merge_sparse_columns(const std::map& remaing_subcolumns); + // Fill the `serialized_sparse_column` + Status serialize_sparse_columns(std::map&& remaing_subcolumns); // ensure root node is a certain type void ensure_root_node_type(const DataTypePtr& type); - std::pair get_sparse_data_paths_and_values() { - auto& column_map = assert_cast(*sparse_column); - auto& key = assert_cast(column_map.get_keys()); - auto& value = assert_cast(column_map.get_values()); - return {&key, &value}; - } - // create jsonb root if missing // notice: should only using in VariantRootColumnIterator // since some datastructures(sparse columns are schema on read @@ -354,7 +361,9 @@ class ColumnObject final : public COWHelper { Subcolumns& get_subcolumns() { return subcolumns; } - ColumnPtr get_sparse_column() { return sparse_column->convert_to_full_column_if_const(); } + ColumnPtr get_sparse_column() { + return serialized_sparse_column->convert_to_full_column_if_const(); + } PathsInData getKeys() const; @@ -556,6 +565,20 @@ class ColumnObject final : public COWHelper { "replace_column_data" + get_name()); } + std::pair get_sparse_data_paths_and_values() { + auto& column_map = assert_cast(*serialized_sparse_column); + auto& key = assert_cast(column_map.get_keys()); + auto& value = assert_cast(column_map.get_values()); + return {&key, &value}; + } + + std::pair get_sparse_data_paths_and_values() const { + const auto& column_map = assert_cast(*serialized_sparse_column); + const auto& key = assert_cast(column_map.get_keys()); + const auto& value = assert_cast(column_map.get_values()); + return {&key, &value}; + } + private: // May throw execption void try_insert(const Field& field); @@ -570,6 +593,24 @@ class ColumnObject final : public COWHelper { // unnest nested type columns, and flat them into finlized array subcolumns void unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const; + + ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() { + auto& column_map = assert_cast(*serialized_sparse_column); + return column_map.get_offsets(); + } + + const ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() const { + const auto& column_map = assert_cast(*serialized_sparse_column); + return column_map.get_offsets(); + } + + void insert_from_sparse_column_and_fill_remaing_dense_column( + const ColumnObject& src, + std::vector>&& + sorted_src_subcolumn_for_sparse_column, + size_t start, size_t length); + + bool try_add_new_subcolumn(const PathInData& path); }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/serde/data_type_serde.h b/be/src/vec/data_types/serde/data_type_serde.h index f0e9eb27961439..5b0e8fab65e262 100644 --- a/be/src/vec/data_types/serde/data_type_serde.h +++ b/be/src/vec/data_types/serde/data_type_serde.h @@ -337,6 +337,11 @@ class DataTypeSerDe { Arena& mem_pool, int64_t row_num) const; virtual Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& result) const; + virtual void write_one_cell_to_binary(const IColumn& src_column, ColumnString* dst_column, + int64_t row_num) { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "write_one_cell_to_binary"); + } + protected: bool _return_object_as_string = false; // This parameter indicates what level the serde belongs to and is mainly used for complex types