Skip to content

Commit

Permalink
add stats
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Dec 10, 2024
1 parent 25ca020 commit 92125f3
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 216 deletions.
12 changes: 0 additions & 12 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,18 +422,6 @@ Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da
_flush_schema->append_column(tablet_column);
_olap_data_convertor->clear_source_content();
}
// sparse_columns
for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns(
object_column.get_sparse_subcolumns())) {
TabletColumn sparse_tablet_column = generate_column_info(entry);
_flush_schema->mutable_column_by_uid(parent_column->unique_id())
.append_sparse_column(sparse_tablet_column);

// add sparse column to footer
auto* column_pb = _footer.mutable_columns(i);
init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column,
_flush_schema);
}
}

// Update rowset schema, tablet's tablet schema will be updated when build Rowset
Expand Down
197 changes: 132 additions & 65 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,78 +38,61 @@ VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& opts
_tablet_column = column;
}

Status VariantColumnWriterImpl::finalize() {
auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE);

// convert each subcolumns to storage format and add data to sub columns writers buffer
auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();

DCHECK(ptr->is_finalized());

if (ptr->is_null_root()) {
auto root_type = vectorized::make_nullable(
std::make_shared<vectorized::ColumnObject::MostCommonType>());
auto root_col = root_type->create_column();
root_col->insert_many_defaults(ptr->rows());
ptr->create_root(root_type, std::move(root_col));
}

// common extracted columns
const auto& parent_column = *_tablet_column;

// generate column info by entry info
auto generate_column_info = [&](const auto& entry) {
const std::string& column_name =
parent_column.name_lower_case() + "." + entry->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
entry->data.get_least_common_type();
vectorized::PathInDataBuilder full_path_builder;
auto full_path = full_path_builder.append(parent_column.name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
// set unique_id and parent_unique_id, will use unique_id to get iterator correct
return vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id = parent_column.unique_id(),
.parent_unique_id = parent_column.unique_id(),
.path_info = full_path});
};
Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter,
size_t num_rows, int& column_id) {
// root column
ColumnWriterOptions root_opts = _opts;
_root_writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
_opts, std::unique_ptr<Field>(FieldFactory::create(parent_column)), _opts.file_writer));
_opts, std::unique_ptr<Field>(FieldFactory::create(*_tablet_column)),
_opts.file_writer));
RETURN_IF_ERROR(_root_writer->init());

// subcolumn
size_t num_rows = _column->size();
for (auto& subcolumn : _subcolumn_writers) {
RETURN_IF_ERROR(subcolumn->init());
}

// make sure the root type
auto expected_root_type =
vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>());
ptr->ensure_root_node_type(expected_root_type);

int column_id = 0;
// convert root column data from engine format to storage layer format
olap_data_convertor->add_column_data_convertor(parent_column);
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
converter->add_column_data_convertor(*_tablet_column);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, column_id));
auto [status, column] = olap_data_convertor->convert_column_data(column_id);
auto [status, column] = converter->convert_column_data(column_id);
if (!status.ok()) {
return status;
}
// use real null data instead of root
const uint8_t* nullmap =
vectorized::check_and_get_column<vectorized::ColumnUInt8>(_null_column.get())
->get_data()
.data();
RETURN_IF_ERROR(_root_writer->append(nullmap, column->get_data(), num_rows));
++column_id;
olap_data_convertor->clear_source_content();
converter->clear_source_content();

_opts.meta->set_num_rows(num_rows);
return Status::OK();
}

Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter,
size_t num_rows, int& column_id) {
// generate column info by entry info
auto generate_column_info = [&](const auto& entry) {
const std::string& column_name =
_tablet_column->name_lower_case() + "." + entry->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
entry->data.get_least_common_type();
vectorized::PathInDataBuilder full_path_builder;
auto full_path = full_path_builder.append(_tablet_column->name_lower_case(), false)
.append(entry->path.get_parts(), false)
.build();
// set unique_id and parent_unique_id, will use unique_id to get iterator correct
return vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name,
vectorized::schema_util::ExtraInfo {.unique_id = _tablet_column->unique_id(),
.parent_unique_id = _tablet_column->unique_id(),
.path_info = full_path});
};
_statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size());
// convert sub column data from engine format to storage layer format
for (const auto& entry :
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
Expand All @@ -120,22 +103,111 @@ Status VariantColumnWriterImpl::finalize() {
CHECK(entry->data.is_finalized());
int current_column_id = column_id++;
TabletColumn tablet_column = generate_column_info(entry);
RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, parent_column,
RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, *_tablet_column,
_opts.rowset_ctx->tablet_schema));
olap_data_convertor->add_column_data_convertor(tablet_column);
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
converter->add_column_data_convertor(tablet_column);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{entry->data.get_finalized_column_ptr()->get_ptr(),
entry->data.get_least_common_type(), tablet_column.name()},
0, num_rows, current_column_id));
auto [status, column] = olap_data_convertor->convert_column_data(current_column_id);
auto [status, column] = converter->convert_column_data(current_column_id);
if (!status.ok()) {
return status;
}
const uint8_t* nullmap = column->get_nullmap();
RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append(
nullmap, column->get_data(), num_rows));
olap_data_convertor->clear_source_content();
converter->clear_source_content();
_subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);

// get stastics
_statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size());
}
return Status::OK();
}

Status VariantColumnWriterImpl::_process_sparse_column(
vectorized::ColumnObject* ptr, vectorized::OlapBlockDataConvertor* converter,
size_t num_rows, int& column_id) {
// create sparse column writer
TabletColumn sparse_column =
vectorized::schema_util::create_sparse_column(_tablet_column->unique_id());
ColumnWriterOptions sparse_writer_opts;
sparse_writer_opts.meta = _opts.footer->add_columns();

_init_column_meta(sparse_writer_opts.meta, column_id, sparse_column);
RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts, &sparse_column,
_opts.file_writer, &_sparse_column_writer));
RETURN_IF_ERROR(_sparse_column_writer->init());

// convert root column data from engine format to storage layer format
converter->add_column_data_convertor(sparse_column);
RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
{ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows, column_id));
auto [status, column] = converter->convert_column_data(column_id);
if (!status.ok()) {
return status;
}
RETURN_IF_ERROR(
_sparse_column_writer->append(column->get_nullmap(), column->get_data(), num_rows));
++column_id;
converter->clear_source_content();

// get stastics
// todo: reuse the statics from collected stastics from compaction stage
std::unordered_map<std::string, size_t> sparse_data_paths_statistics;
const auto [sparse_data_paths, _] = ptr->get_sparse_data_paths_and_values();
for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
auto path = sparse_data_paths->get_data_at(i);
if (auto it = _statistics._sparse_column_non_null_size.find(path);
it != _statistics._sparse_column_non_null_size.end()) {
++it->second;
} else if (_statistics._sparse_column_non_null_size.size() <
VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) {
_statistics._sparse_column_non_null_size.emplace(path, 1);
}
}

sparse_writer_opts.meta->set_num_rows(num_rows);
return Status::OK();
}

void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
// TODO
}

Status VariantColumnWriterImpl::finalize() {
auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
RETURN_IF_ERROR(ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE));

// convert each subcolumns to storage format and add data to sub columns writers buffer
auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();

DCHECK(ptr->is_finalized());

if (ptr->is_null_root()) {
auto root_type = vectorized::make_nullable(
std::make_shared<vectorized::ColumnObject::MostCommonType>());
auto root_col = root_type->create_column();
root_col->insert_many_defaults(ptr->rows());
ptr->create_root(root_type, std::move(root_col));
}

size_t num_rows = _column->size();
int column_id = 0;

// convert root column data from engine format to storage layer format
RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(), num_rows, column_id));

// process and append each subcolumns to sub columns writers buffer
RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(), num_rows, column_id));

// process sparse column and append to sparse writer buffer
RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(), num_rows, column_id));

// set statistics info
_statistics.to_pb(_opts.meta->mutable_variant_statistics());

_is_finalized = true;
return Status::OK();
}
Expand Down Expand Up @@ -164,6 +236,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() {
size += column_writer->estimate_buffer_size();
}
size += _root_writer->estimate_buffer_size();
size += _sparse_column_writer->estimate_buffer_size();
return size;
}

Expand All @@ -172,21 +245,18 @@ Status VariantColumnWriterImpl::finish() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->finish());
RETURN_IF_ERROR(_sparse_column_writer->finish());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
_opts.meta->set_num_rows(_root_writer->get_next_rowid());
for (auto& suboptions : _subcolumn_opts) {
suboptions.meta->set_num_rows(_root_writer->get_next_rowid());
}
return Status::OK();
return Status::OK();
}
Status VariantColumnWriterImpl::write_data() {
if (!is_finalized()) {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_data());
RETURN_IF_ERROR(_sparse_column_writer->write_data());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
Expand All @@ -197,6 +267,7 @@ Status VariantColumnWriterImpl::write_ordinal_index() {
RETURN_IF_ERROR(finalize());
}
RETURN_IF_ERROR(_root_writer->write_ordinal_index());
RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
for (auto& column_writer : _subcolumn_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
Expand Down Expand Up @@ -277,10 +348,6 @@ void VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t col
for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
_init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i));
}
// add sparse column to footer
for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
_init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i));
}
};

Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const TabletColumn& column,
Expand Down
35 changes: 32 additions & 3 deletions be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,25 @@
#include "olap/tablet_schema.h"
#include "vec/columns/column.h"

namespace doris::segment_v2 {
namespace doris {

namespace vectorized {
class ColumnObject;
class OlapBlockDataConvertor;
} // namespace vectorized
namespace segment_v2 {

class ColumnWriter;
class ScalarColumnWriter;

struct VariantStatistics {
constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000;
std::vector<size_t> _subcolumns_non_null_size;
std::map<StringRef, size_t> _sparse_column_non_null_size;

void to_pb(VariantStatisticsPB* stats) const;
};

class VariantColumnWriterImpl {
public:
VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column);
Expand All @@ -54,15 +68,30 @@ class VariantColumnWriterImpl {
Status _create_column_writer(uint32_t cid, const TabletColumn& column,
const TabletColumn& parent_column,
const TabletSchemaSPtr& tablet_schema);
Status _process_root_column(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter, size_t num_rows,
int& column_id);
Status _process_sparse_column(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter, size_t num_rows,
int& column_id);
Status _process_subcolumns(vectorized::ColumnObject* ptr,
vectorized::OlapBlockDataConvertor* converter, size_t num_rows,
int& column_id);
// prepare a column for finalize
doris::vectorized::MutableColumnPtr _column;
doris::vectorized::MutableColumnPtr _null_column;
ColumnWriterOptions _opts;
const TabletColumn* _tablet_column = nullptr;
bool _is_finalized = false;
// for sparse column and root column
// for root column
std::unique_ptr<ColumnWriter> _root_writer;
// for sparse column
std::unique_ptr<ColumnWriter> _sparse_column_writer;
std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
std::vector<ColumnWriterOptions> _subcolumn_opts;

// staticstics which will be persisted in the footer
VariantStatistics _statistics;
};
} // namespace doris::segment_v2
} // namespace segment_v2
} // namespace doris
11 changes: 0 additions & 11 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1054,17 +1054,6 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
_flush_schema->append_column(tablet_column);
_olap_data_convertor->clear_source_content();
}
// sparse_columns
for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns(
object_column.get_sparse_subcolumns())) {
TabletColumn sparse_tablet_column = generate_column_info(entry);
_flush_schema->mutable_column_by_uid(parent_column->unique_id())
.append_sparse_column(sparse_tablet_column);

// add sparse column to footer
auto* column_pb = _footer.mutable_columns(i);
_init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column);
}
}

// Update rowset schema, tablet's tablet schema will be updated when build Rowset
Expand Down
Loading

0 comments on commit 92125f3

Please sign in to comment.