Skip to content

Commit

Permalink
[refactor](partial-update) Split partial update infos from tablet sch…
Browse files Browse the repository at this point in the history
…ema (apache#25147)
  • Loading branch information
bobhan1 authored Oct 17, 2023
1 parent 4d12d88 commit 1514f78
Show file tree
Hide file tree
Showing 26 changed files with 188 additions and 150 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
it.rowset_ids.insert(_output_rowset->rowset_id());
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
it.partition_id, it.transaction_id, _tablet->tablet_id(),
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids);
_tablet->tablet_uid(), true, it.delete_bitmap, it.rowset_ids, nullptr);
}
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Status DeltaWriter::init() {
RETURN_IF_ERROR(_rowset_builder.init());
RETURN_IF_ERROR(
_memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(),
_rowset_builder.get_partial_update_info(),
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -121,10 +122,11 @@ Status DeltaWriterV2::init() {
context.rowset_type = RowsetTypePB::BETA_ROWSET;
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.data_dir = nullptr;
context.partial_update_info = _partial_update_info;

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema,
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down Expand Up @@ -221,8 +223,10 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,

_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns());
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable_writer.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -126,6 +127,8 @@ class DeltaWriterV2 {
MonotonicStopWatch _lock_watch;

std::vector<std::shared_ptr<LoadStreamStub>> _streams;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
20 changes: 9 additions & 11 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
Expand Down Expand Up @@ -77,8 +77,11 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_num_columns = _tablet_schema->num_columns();
if (_tablet_schema->is_partial_update()) {
_num_columns = _tablet_schema->partial_input_column_size();
if (partial_update_info != nullptr) {
_is_partial_update = partial_update_info->is_partial_update;
if (_is_partial_update) {
_num_columns = partial_update_info->partial_update_input_columns.size();
}
}
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -178,7 +181,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_init_agg_functions(&target_block);
}
if (_tablet_schema->has_sequence_col()) {
if (_tablet_schema->is_partial_update()) {
if (_is_partial_update) {
// for unique key partial update, sequence column index in block
// may be different with the index in `_tablet_schema`
for (size_t i = 0; i < cloneBlock.columns(); i++) {
Expand Down Expand Up @@ -417,8 +420,8 @@ void MemTable::shrink_memtable_by_agg() {

bool MemTable::need_flush() const {
auto max_size = config::write_buffer_size;
if (_tablet_schema->is_partial_update()) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
if (_is_partial_update) {
auto update_columns_size = _num_columns;
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
Expand All @@ -428,11 +431,6 @@ bool MemTable::need_flush() const {
bool MemTable::need_agg() const {
if (_keys_type == KeysType::AGG_KEYS) {
auto max_size = config::write_buffer_size_for_agg;
if (_tablet_schema->is_partial_update()) {
auto update_columns_size = _tablet_schema->partial_input_column_size();
max_size = max_size * update_columns_size / _tablet_schema->num_columns();
max_size = max_size > 1048576 ? max_size : 1048576;
}
return memory_usage() >= max_size;
}
return false;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/tablet_schema.h"
#include "runtime/memory/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/arena.h"
Expand Down Expand Up @@ -167,7 +169,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, const std::shared_ptr<MemTracker>& insert_mem_tracker,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
~MemTable();

Expand Down Expand Up @@ -202,6 +205,7 @@ class MemTable {
private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
const KeysType _keys_type;
const TabletSchema* _tablet_schema;

Expand Down
10 changes: 7 additions & 3 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
Expand All @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() {
}

Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema, bool unique_key_mow) {
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
_partial_update_info = partial_update_info;

_reset_mem_table();

Expand Down Expand Up @@ -195,8 +199,8 @@ void MemTableWriter::_reset_mem_table() {
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc,
_unique_key_mow, mem_table_insert_tracker,
mem_table_flush_tracker));
_unique_key_mow, _partial_update_info.get(),
mem_table_insert_tracker, mem_table_flush_tracker));

_segment_num++;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "olap/delta_writer_context.h"
#include "olap/memtable.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
Expand Down Expand Up @@ -67,6 +68,7 @@ class MemTableWriter {
~MemTableWriter();

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<int>& row_idxs,
Expand Down Expand Up @@ -141,6 +143,8 @@ class MemTableWriter {
int64_t _segment_num = 0;

MonotonicStopWatch _lock_watch;

std::shared_ptr<PartialUpdateInfo> _partial_update_info;
};

} // namespace doris
54 changes: 54 additions & 0 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "olap/tablet_schema.h"

namespace doris {

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
missing_cids.clear();
update_cids.clear();
for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
can_insert_new_rows_in_partial_update = false;
}
} else {
update_cids.emplace_back(i);
}
}
this->is_strict_mode = is_strict_mode;
}

bool is_partial_update {false};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
std::vector<uint32_t> update_cids;
// if key not exist in old rowset, use default value or null value for the unmentioned cols
// to generate a new row, only available in non-strict mode
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
};
} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
_context.tablet_schema->is_partial_update()) {
(_context.partial_update_info && _context.partial_update_info->is_partial_update)) {
return Status::OK();
}
auto rowset = _build_tmp();
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ class BetaRowsetWriter : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
return _context.partial_update_info;
}

bool is_partial_update() override {
return _context.partial_update_info && _context.partial_update_info->is_partial_update;
}

private:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int64_t segment_writer_ns() override { return _segment_writer_ns; }

std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
return _context.partial_update_info;
}

bool is_partial_update() override {
return _context.partial_update_info && _context.partial_update_info->is_partial_update;
}

private:
RowsetWriterContext _context;

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class RowsetWriter {

virtual int64_t segment_writer_ns() { return 0; }

virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;

virtual bool is_partial_update() = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "io/fs/file_system.h"
#include "olap/olap_define.h"
#include "olap/partial_update_info.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"

Expand Down Expand Up @@ -105,6 +106,10 @@ struct RowsetWriterContext {

// segcompaction for this RowsetWriter, disable it for some transient writers
bool enable_segcompaction = true;

std::shared_ptr<PartialUpdateInfo> partial_update_info;

bool is_transient_rowset_writer = false;
};

} // namespace doris
20 changes: 12 additions & 8 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);

DCHECK(_opts.rowset_ctx->partial_update_info);
// find missing column cids
std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
std::vector<uint32_t> missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
std::vector<uint32_t> including_cids = _opts.rowset_ctx->partial_update_info->update_cids;

// create full block and fill with input columns
auto full_block = _tablet_schema->create_block();
Expand Down Expand Up @@ -421,15 +422,15 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_tablet_schema->is_strict_mode()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
}

if (!_tablet_schema->can_insert_new_rows_in_partial_update()) {
if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) {
return Status::InternalError(
"the unmentioned columns should have default value or be nullable for "
"newly inserted rows in non-strict mode partial update");
Expand Down Expand Up @@ -492,7 +493,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}

// convert missing columns and send to column writer
auto cids_missing = _tablet_schema->get_missing_cids();
auto cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows,
cids_missing);
for (auto cid : cids_missing) {
Expand Down Expand Up @@ -545,8 +546,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
bool has_default_or_nullable,
const size_t& segment_start_pos) {
// create old value columns
auto old_value_block = _tablet_schema->create_missing_columns_block();
std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
std::vector<uint32_t> cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK(cids_missing.size() == old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
Expand Down Expand Up @@ -652,7 +653,10 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f

Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos,
size_t num_rows) {
if (_tablet_schema->is_partial_update() && _opts.write_type == DataWriteType::TYPE_DIRECT) {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
return Status::OK();
}
Expand Down
Loading

0 comments on commit 1514f78

Please sign in to comment.