Skip to content

Commit

Permalink
add serialized sparse column
Browse files Browse the repository at this point in the history
  • Loading branch information
csun5285 committed Dec 10, 2024
1 parent 92125f3 commit 56c6f60
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 48 deletions.
293 changes: 259 additions & 34 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "vec/aggregate_functions/helpers.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
Expand Down Expand Up @@ -982,6 +983,43 @@ void ColumnObject::Subcolumn::get(size_t n, Field& res) const {
n);
}

void ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std::string_view path,
ColumnString* value, size_t row,
bool& is_null) {
// no need insert
if (least_common_type.get_base_type_id() == TypeIndex::Nothing) {
is_null = true;
return;
}

// no need insert
if (row < num_of_defaults_in_prefix) {
is_null = true;
return;
}

// remove default
row -= num_of_defaults_in_prefix;
is_null = false;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
if (row < part->size()) {
// insert key
key->insert_data(path.data(), path.size());
// insert value
const auto& part_type = data_types[i];
const auto& serde = part_type->get_serde();
serde->write_one_cell_to_binary(*part, value, row);
return;
}

row -= part->size();
}

throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Index ({}) for serialize to sparse column is out of range", row);
}

Field ColumnObject::operator[](size_t n) const {
Field object;
get(n, object);
Expand Down Expand Up @@ -1062,42 +1100,190 @@ void ColumnObject::add_nested_subcolumn(const PathInData& key, const FieldInfo&
}
}

bool ColumnObject::try_add_new_subcolumn(std::string_view path) {
if (subcolumns.size() == MAX_SUBCOLUMNS) return false;

return add_sub_column(path, num_rows);
}

void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t length) {
#ifndef NDEBUG
check_consistency();
#endif
const auto& src_object = assert_cast<const ColumnObject&>(src);

// First, insert src subcolumns
// We can reach the limit of subcolumns, and in this case
// the rest of subcolumns from src will be inserted into sparse column.
std::map<std::string_view, Subcolumn> src_path_and_subcoumn_for_sparse_column;
for (const auto& entry : src_object.subcolumns) {
if (!has_subcolumn(entry->path)) {
if (entry->path.has_nested_part()) {
FieldInfo field_info {
.scalar_type_id = entry->data.least_common_type.get_base_type_id(),
.have_nulls = false,
.need_convert = false,
.num_dimensions = entry->data.get_dimensions()};
add_nested_subcolumn(entry->path, field_info, num_rows);
} else {
add_sub_column(entry->path, num_rows);
}
// Check if we already have such dense column path.
if (auto* subcolumn = get_subcolumn(entry->path); subcolumn != nullptr) {
subcolumn->insert_range_from(entry->data, start, length);
} else if (try_add_new_subcolumn(entry->path.get_path())) {
subcolumn = get_subcolumn(entry->path);
DCHECK(subcolumn != nullptr);
subcolumn->insert_range_from(entry->data, start, length);
} else {
src_path_and_subcoumn_for_sparse_column.emplace(entry->path.get_path(), entry->data);
}
auto* subcolumn = get_subcolumn(entry->path);
subcolumn->insert_range_from(entry->data, start, length);
}
for (auto& entry : subcolumns) {
if (!src_object.has_subcolumn(entry->path)) {
bool inserted = try_insert_many_defaults_from_nested(entry);
if (!inserted) {
entry->data.insert_many_defaults(length);
}
}

// Paths in sparse column are sorted, so paths from src_dense_column_path_for_sparse_column should be inserted properly
// to keep paths sorted. Let's sort them in advance.
std::vector<std::pair<std::string_view, Subcolumn>> sorted_src_subcolumn_for_sparse_column;
auto it = src_path_and_subcoumn_for_sparse_column.begin();
auto end = src_path_and_subcoumn_for_sparse_column.end();
while (it != end) {
sorted_src_subcolumn_for_sparse_column.emplace_back(it->first, it->second);
++it;
}

insert_from_sparse_column_and_fill_remaing_dense_column(
src_object, std::move(sorted_src_subcolumn_for_sparse_column), start, length);

num_rows += length;
finalize();
#ifndef NDEBUG
check_consistency();
#endif
}

// std::map<std::string_view, Subcolumn>
void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
const ColumnObject& src,
std::vector<std::pair<std::string_view, Subcolumn>>&&
sorted_src_subcolumn_for_sparse_column,
size_t start, size_t length) {
/// Check if src object doesn't have any paths in serialized sparse column.
const auto& src_serialized_sparse_column_offsets = src.serialized_sparse_column_offsets();
if (src_serialized_sparse_column_offsets[start - 1] ==
src_serialized_sparse_column_offsets[start + length - 1]) {
size_t current_size = size();

/// If no src subcolumns should be inserted into sparse column, insert defaults.
if (sorted_src_subcolumn_for_sparse_column.empty()) {
serialized_sparse_column->insert_many_defaults(length);
} else {
// Otherwise insert required src dense columns into sparse column.
auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values();
auto& sparse_column_offsets = serialized_sparse_column_offsets();
for (size_t i = start; i != start + length; ++i) {
int null_count = 0;
// Paths in sorted_src_subcolumn_for_sparse_column are already sorted.
for (auto& [path, subcolumn] : sorted_src_subcolumn_for_sparse_column) {
bool is_null = false;
subcolumn.serialize_to_sparse_column(sparse_column_keys, path,
sparse_column_values, i, is_null);
if (is_null) {
++null_count;
}
}

// All the sparse columns in this row are null.
if (null_count == sorted_src_subcolumn_for_sparse_column.size()) {
serialized_sparse_column->insert_default();
} else {
DCHECK_EQ(sparse_column_keys->size(),
sparse_column_offsets[i - 1] +
sorted_src_subcolumn_for_sparse_column.size() - null_count);
DCHECK_EQ(sparse_column_values->size(), sparse_column_keys->size());
sparse_column_offsets.push_back(sparse_column_keys->size());
}
}
}

// Insert default values in all remaining dense columns.
for (const auto& entry : subcolumns) {
if (entry->data.size() == current_size) {
entry->data.insert_many_defaults(length);
}
}
return;
}

// Src object column contains some paths in serialized sparse column in specified range.
// Iterate over this range and insert all required paths into serialized sparse column or subcolumns.
const auto& [src_sparse_column_path, src_sparse_column_values] =
src.get_sparse_data_paths_and_values();
auto [sparse_column_path, sparse_column_values] = get_sparse_data_paths_and_values();

auto& sparse_column_offsets = serialized_sparse_column_offsets();
for (size_t row = start; row != start + length; ++row) {
size_t current_size = sparse_column_offsets.size();

// Use separate index to iterate over sorted sorted_src_subcolumn_for_sparse_column.
size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
size_t sorted_src_subcolumn_for_sparse_column_size =
sorted_src_subcolumn_for_sparse_column.size();
int null_count = 0;

size_t offset = src_serialized_sparse_column_offsets[row - 1];
size_t end = src_serialized_sparse_column_offsets[row];
// Iterator over [path, binary value]
for (size_t i = offset; i != end; ++i) {
const StringRef src_sparse_path_string = src_sparse_column_path->get_data_at(i);
const std::string_view src_sparse_path(src_sparse_path_string);
// Check if we have this path in subcolumns.
const PathInData column_path(src_sparse_path);
if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) {
// Deserialize binary value into subcolumn from src serialized sparse column data.
subcolumn->deserialize_from_sparse_column(src_sparse_column_values, i);
} else {
// Before inserting this path into sparse column check if we need to
// insert suibcolumns from sorted_src_subcolumn_for_sparse_column before.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size &&
sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx]
.first < src_sparse_path) {
auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx++];
bool is_null = false;
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row, is_null);
if (is_null) {
++null_count;
}
}

/// Insert path and value from src sparse column to our sparse column.
sparse_column_path->insert_from(*src_sparse_column_path, i);
sparse_column_values->insert_from(*src_sparse_column_values, i);
}
}

// Insert remaining dynamic paths from src_dynamic_paths_for_shared_data.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size) {
auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx++];
bool is_null = false;
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row, is_null);
if (is_null) {
++null_count;
}
}

// All the sparse columns in this row are null.
if (null_count == sorted_src_subcolumn_for_sparse_column.size()) {
serialized_sparse_column->insert_default();
} else {
sparse_column_offsets.push_back(sparse_column_path->size());
}

// Insert default values in all remaining dense columns.
for (const auto& entry : subcolumns) {
if (entry->data.size() == current_size) {
entry->data.insert_default();
}
}
}

return;
}

ColumnPtr ColumnObject::replicate(const Offsets& offsets) const {
if (subcolumns.empty()) {
// Add an emtpy column with offsets.back rows
Expand Down Expand Up @@ -1514,6 +1700,47 @@ size_t ColumnObject::Subcolumn::get_non_null_value_size() const {
return res;
}

Status ColumnObject::serialize_sparse_columns(
std::map<std::string_view, Subcolumn>&& remaing_subcolumns) {
CHECK(is_finalized());

serialized_sparse_column = ColumnMap::create(ColumnString::create(), ColumnString::create(),
ColumnArray::ColumnOffsets::create());
if (remaing_subcolumns.empty()) {
serialized_sparse_column->insert_many_defaults(num_rows);
return Status::OK();
}
serialized_sparse_column->reserve(num_rows);
auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values();
auto& sparse_column_offsets = serialized_sparse_column_offsets();

// Fill the column map for each row
for (size_t i = 0; i < num_rows; ++i) {
int null_count = 0;

for (auto& [path, subcolumn] : remaing_subcolumns) {
bool is_null = false;
subcolumn.serialize_to_sparse_column(sparse_column_keys, path, sparse_column_values, i,
is_null);
if (is_null) {
++null_count;
}
}

// All the sparse columns in this row are null.
if (null_count == remaing_subcolumns.size()) {
serialized_sparse_column->insert_default();
} else {
DCHECK_EQ(sparse_column_keys->size(),
sparse_column_offsets[i - 1] + remaing_subcolumns.size() - null_count);
DCHECK_EQ(sparse_column_values->size(), sparse_column_keys->size());
sparse_column_offsets.push_back(sparse_column_keys->size());
}
}
CHECK_EQ(serialized_sparse_column->size(), num_rows);
return Status::OK();
}

void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const {
entry->data.finalize();
auto nested_column = entry->data.get_finalized_column_ptr()->assume_mutable();
Expand Down Expand Up @@ -1558,11 +1785,11 @@ Status ColumnObject::finalize(FinalizeMode mode) {
}

// pick sparse columns
std::set<String> selected_subcolumns;
std::set<String> remaining_subcolumns;
std::set<std::string_view> selected_path;
std::vector<std::string_view> remaining_path;
if (subcolumns.size() > MAX_SUBCOLUMNS) {
// pick subcolumns sort by size of none null values
std::unordered_map<String, size_t> none_null_value_sizes;
std::unordered_map<std::string_view, size_t> none_null_value_sizes;
// 1. get the none null value sizes
for (auto&& entry : subcolumns) {
if (entry->data.is_root) {
Expand All @@ -1572,22 +1799,20 @@ Status ColumnObject::finalize(FinalizeMode mode) {
none_null_value_sizes[entry->path.get_path()] = size;
}
// 2. sort by the size
std::vector<std::pair<String, size_t>> sorted_by_size(none_null_value_sizes.begin(),
none_null_value_sizes.end());
std::vector<std::pair<std::string_view, size_t>> sorted_by_size(
none_null_value_sizes.begin(), none_null_value_sizes.end());
std::sort(sorted_by_size.begin(), sorted_by_size.end(),
[](const auto& a, const auto& b) { return a.second > b.second; });

// 3. pick MAX_SUBCOLUMNS selected subcolumns
std::set<String> selected_subcolumns;
for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, sorted_by_size.size()); ++i) {
selected_subcolumns.insert(sorted_by_size[i].first);
selected_path.insert(sorted_by_size[i].first);
}

// 4. put remaining subcolumns to remaining_subcolumns
std::vector<String> remaining_subcolumns;
for (const auto& entry : sorted_by_size) {
if (selected_subcolumns.find(entry.first) == selected_subcolumns.end()) {
remaining_subcolumns.push_back(entry.first);
if (selected_path.find(entry.first) == selected_path.end()) {
remaining_path.emplace_back(entry.first);
}
}
}
Expand Down Expand Up @@ -1617,21 +1842,21 @@ Status ColumnObject::finalize(FinalizeMode mode) {

// add selected subcolumns to new_subcolumns
for (auto&& entry : subcolumns) {
if (selected_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) {
if (selected_path.find(entry->path.get_path()) != selected_path.end()) {
new_subcolumns.add(entry->path, entry->data);
}
}

std::map<String, Subcolumn> remaing_subcolumns;
std::map<std::string_view, Subcolumn> remaing_subcolumns;
// merge remaining subcolumns to sparse_column
for (auto&& entry : subcolumns) {
if (remaining_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) {
if (selected_path.find(entry->path.get_path()) != selected_path.end()) {
remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
}
}

// merge and encode sparse column
RETURN_IF_ERROR(merge_sparse_columns(remaing_subcolumns));
RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));

std::swap(subcolumns, new_subcolumns);
doc_structure = nullptr;
Expand Down
Loading

0 comments on commit 56c6f60

Please sign in to comment.