diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 2f2ff7cc938375b..a468a034121e78b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -97,9 +97,9 @@ Status DeltaWriter::init() { return Status::OK(); } RETURN_IF_ERROR(_rowset_builder.init()); - RETURN_IF_ERROR( - _memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), - _rowset_builder.tablet()->enable_unique_key_merge_on_write())); + RETURN_IF_ERROR(_memtable_writer->init(_rowset_builder.rowset_writer(), + _rowset_builder.tablet_schema(), + _rowset_builder.get_partial_udpate_info())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 0be2e1751ae615e..b52f8c73442f401 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -50,6 +50,7 @@ #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" @@ -124,7 +125,7 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, + RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; @@ -225,8 +226,12 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); + // _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + // table_schema_param->partial_update_input_columns()); + _partial_update_info = std::make_shared(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 741d939fa8d9770..358b74e71171ce0 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -127,6 +127,8 @@ class DeltaWriterV2 { MonotonicStopWatch _lock_watch; std::vector> _streams; + + std::shared_ptr _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 28092d31ecebba6..957f94a56aa5133 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -49,11 +49,12 @@ using namespace ErrorCode; MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, + bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info, const std::shared_ptr& insert_mem_tracker, const std::shared_ptr& flush_mem_tracker) : _tablet_id(tablet_id), _enable_unique_key_mow(enable_unique_key_mow), + _is_partial_update(partial_update_info.is_partial_update), _keys_type(tablet_schema->keys_type()), _tablet_schema(tablet_schema), _insert_mem_tracker(insert_mem_tracker), @@ -77,8 +78,8 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); - if (_tablet_schema->is_partial_update()) { - _num_columns = _tablet_schema->partial_input_column_size(); + if (_is_partial_update) { + _num_columns = partial_update_info.partial_update_input_columns.size(); } } void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, @@ -178,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorhas_sequence_col()) { - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { // for unique key partial update, sequence column index in block // may be different with the index in `_tablet_schema` for (size_t i = 0; i < cloneBlock.columns(); i++) { @@ -417,7 +418,7 @@ void MemTable::shrink_memtable_by_agg() { bool MemTable::need_flush() const { auto max_size = config::write_buffer_size; - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { auto update_columns_size = _tablet_schema->partial_input_column_size(); max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; @@ -428,7 +429,7 @@ bool MemTable::need_flush() const { bool MemTable::need_agg() const { if (_keys_type == KeysType::AGG_KEYS) { auto max_size = config::write_buffer_size_for_agg; - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { auto update_columns_size = _tablet_schema->partial_input_column_size(); max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index ed9226c4a0c1013..2ac3973a8d3ce8d 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -31,6 +31,7 @@ #include "common/status.h" #include "gutil/integral_types.h" #include "olap/olap_common.h" +#include "olap/tablet_schema.h" #include "runtime/memory/mem_tracker.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/arena.h" @@ -42,6 +43,7 @@ class Schema; class SlotDescriptor; class TabletSchema; class TupleDescriptor; +struct PartialUpdateInfo; enum KeysType : int; // row pos in _input_mutable_block @@ -167,7 +169,8 @@ class MemTable { public: MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, const std::shared_ptr& insert_mem_tracker, + bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info, + const std::shared_ptr& insert_mem_tracker, const std::shared_ptr& flush_mem_tracker); ~MemTable(); @@ -202,6 +205,7 @@ class MemTable { private: int64_t _tablet_id; bool _enable_unique_key_mow = false; + bool _is_partial_update = false; const KeysType _keys_type; const TabletSchema* _tablet_schema; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 955a961f0a5af49..2fc7478ad8f876e 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -39,6 +39,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "service/backend_options.h" @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() { } Status MemTableWriter::init(std::shared_ptr rowset_writer, - TabletSchemaSPtr tablet_schema, bool unique_key_mow) { + TabletSchemaSPtr tablet_schema, + std::shared_ptr partial_update_info, + bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; + _partial_update_info = partial_update_info; _reset_mem_table(); @@ -195,7 +199,7 @@ void MemTableWriter::_reset_mem_table() { _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc, - _unique_key_mow, mem_table_insert_tracker, + _unique_key_mow, *_partial_update_info, mem_table_insert_tracker, mem_table_flush_tracker)); _segment_num++; diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index b374f10bdedfbca..d633c6164eb2833 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -67,6 +67,7 @@ class MemTableWriter { ~MemTableWriter(); Status init(std::shared_ptr rowset_writer, TabletSchemaSPtr tablet_schema, + std::shared_ptr partial_update_info, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector& row_idxs, @@ -141,6 +142,8 @@ class MemTableWriter { int64_t _segment_num = 0; MonotonicStopWatch _lock_watch; + + std::shared_ptr _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 985efa0809ef7e3..e59239e9dfb20bf 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -105,6 +105,8 @@ struct RowsetWriterContext { // segcompaction for this RowsetWriter, disable it for some transient writers bool enable_segcompaction = true; + + bool is_partial_update = false; }; } // namespace doris diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e599d96a8fee84e..1f06c33516fbded 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -43,6 +43,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "util/brpc_client_cache.h" #include "util/mem_info.h" @@ -226,7 +227,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { // For partial update, we need to fill in the entire row of data, during the calculation // of the delete bitmap. This operation is resource-intensive, and we need to minimize // the number of times it occurs. Therefore, we skip this operation here. - if (_rowset->tablet_schema()->is_partial_update()) { + if (_partial_update_info->is_partial_update) { return Status::OK(); } @@ -238,8 +239,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { } Status RowsetBuilder::wait_calc_delete_bitmap() { - if (!_tablet->enable_unique_key_merge_on_write() || - _rowset->tablet_schema()->is_partial_update()) { + if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) { return Status::OK(); } std::lock_guard l(_lock); @@ -324,9 +324,13 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); - _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + // _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + // table_schema_param->partial_update_input_columns()); + // _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + _partial_update_info = std::make_shared(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 8bb94c20905651e..8938778799f1cf9 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -86,6 +86,10 @@ class RowsetBuilder { // For UT DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + std::shared_ptr get_partial_udpate_info() const { + return _partial_update_info; + } + private: void _garbage_collection(); @@ -113,6 +117,8 @@ class RowsetBuilder { // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + std::shared_ptr _partial_update_info; + RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _build_rowset_timer = nullptr; RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 5e6ffa3ac777848..782753ef3dd69a0 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -418,4 +418,35 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b); using TabletSchemaSPtr = std::shared_ptr; +struct PartialUpdateInfo { + void init(const TabletSchema& tablet_schema, bool is_partial_update, + const std::set& partial_update_input_columns, bool is_strict_mode) { + this->is_partial_update = is_partial_update; + this->partial_update_input_columns = partial_update_input_columns; + missing_cids.clear(); + update_cids.clear(); + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { + auto tablet_column = tablet_schema.column(i); + if (partial_update_input_columns.count(tablet_column.name()) == 0) { + missing_cids.emplace_back(i); + if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { + can_insert_new_rows_in_partial_update = false; + } + } else { + update_cids.emplace_back(i); + } + } + this->is_strict_mode = is_strict_mode; + } + + bool is_partial_update {false}; + std::set partial_update_input_columns; + std::vector missing_cids; + std::vector update_cids; + // if key not exist in old rowset, use default value or null value for the unmentioned cols + // to generate a new row, only available in non-strict mode + bool can_insert_new_rows_in_partial_update {true}; + bool is_strict_mode {false}; +}; + } // namespace doris