diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index c3ba8fa23ffffff..558e5e6efe8fe8a 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -538,7 +538,7 @@ Status DataDir::load() { Status commit_txn_status = _txn_manager->commit_txn( _meta, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), rowset_meta->tablet_uid(), rowset_meta->load_id(), - rowset, true); + rowset, true, false); if (!commit_txn_status && !commit_txn_status.is()) { LOG(WARNING) << "failed to add committed rowset: " << rowset_meta->rowset_id() << " to tablet: " << rowset_meta->tablet_id() diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 2f2ff7cc938375b..c0514f3ab438bc5 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -97,9 +97,9 @@ Status DeltaWriter::init() { return Status::OK(); } RETURN_IF_ERROR(_rowset_builder.init()); - RETURN_IF_ERROR( - _memtable_writer->init(_rowset_builder.rowset_writer(), _rowset_builder.tablet_schema(), - _rowset_builder.tablet()->enable_unique_key_merge_on_write())); + RETURN_IF_ERROR(_memtable_writer->init(_rowset_builder.rowset_writer(), + _rowset_builder.tablet_schema(), + _rowset_builder.get_partial_update_info())); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 0be2e1751ae615e..b52f8c73442f401 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -50,6 +50,7 @@ #include "olap/schema_change.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" @@ -124,7 +125,7 @@ Status DeltaWriterV2::init() { _rowset_writer = std::make_shared(_streams); RETURN_IF_ERROR(_rowset_writer->init(context)); - RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, + RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, _streams[0]->enable_unique_mow(_req.index_id))); ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer); _is_init = true; @@ -225,8 +226,12 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); + // _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + // table_schema_param->partial_update_input_columns()); + _partial_update_info = std::make_shared(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 741d939fa8d9770..358b74e71171ce0 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -127,6 +127,8 @@ class DeltaWriterV2 { MonotonicStopWatch _lock_watch; std::vector> _streams; + + std::shared_ptr _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 28092d31ecebba6..957f94a56aa5133 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -49,11 +49,12 @@ using namespace ErrorCode; MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, + bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info, const std::shared_ptr& insert_mem_tracker, const std::shared_ptr& flush_mem_tracker) : _tablet_id(tablet_id), _enable_unique_key_mow(enable_unique_key_mow), + _is_partial_update(partial_update_info.is_partial_update), _keys_type(tablet_schema->keys_type()), _tablet_schema(tablet_schema), _insert_mem_tracker(insert_mem_tracker), @@ -77,8 +78,8 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); - if (_tablet_schema->is_partial_update()) { - _num_columns = _tablet_schema->partial_input_column_size(); + if (_is_partial_update) { + _num_columns = partial_update_info.partial_update_input_columns.size(); } } void MemTable::_init_columns_offset_by_slot_descs(const std::vector* slot_descs, @@ -178,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorhas_sequence_col()) { - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { // for unique key partial update, sequence column index in block // may be different with the index in `_tablet_schema` for (size_t i = 0; i < cloneBlock.columns(); i++) { @@ -417,7 +418,7 @@ void MemTable::shrink_memtable_by_agg() { bool MemTable::need_flush() const { auto max_size = config::write_buffer_size; - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { auto update_columns_size = _tablet_schema->partial_input_column_size(); max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; @@ -428,7 +429,7 @@ bool MemTable::need_flush() const { bool MemTable::need_agg() const { if (_keys_type == KeysType::AGG_KEYS) { auto max_size = config::write_buffer_size_for_agg; - if (_tablet_schema->is_partial_update()) { + if (_is_partial_update) { auto update_columns_size = _tablet_schema->partial_input_column_size(); max_size = max_size * update_columns_size / _tablet_schema->num_columns(); max_size = max_size > 1048576 ? max_size : 1048576; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index ed9226c4a0c1013..2ac3973a8d3ce8d 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -31,6 +31,7 @@ #include "common/status.h" #include "gutil/integral_types.h" #include "olap/olap_common.h" +#include "olap/tablet_schema.h" #include "runtime/memory/mem_tracker.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/common/arena.h" @@ -42,6 +43,7 @@ class Schema; class SlotDescriptor; class TabletSchema; class TupleDescriptor; +struct PartialUpdateInfo; enum KeysType : int; // row pos in _input_mutable_block @@ -167,7 +169,8 @@ class MemTable { public: MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, const std::shared_ptr& insert_mem_tracker, + bool enable_unique_key_mow, const PartialUpdateInfo& partial_update_info, + const std::shared_ptr& insert_mem_tracker, const std::shared_ptr& flush_mem_tracker); ~MemTable(); @@ -202,6 +205,7 @@ class MemTable { private: int64_t _tablet_id; bool _enable_unique_key_mow = false; + bool _is_partial_update = false; const KeysType _keys_type; const TabletSchema* _tablet_schema; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 955a961f0a5af49..2fc7478ad8f876e 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -39,6 +39,7 @@ #include "olap/rowset/rowset_writer.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" +#include "olap/tablet_schema.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "service/backend_options.h" @@ -63,10 +64,13 @@ MemTableWriter::~MemTableWriter() { } Status MemTableWriter::init(std::shared_ptr rowset_writer, - TabletSchemaSPtr tablet_schema, bool unique_key_mow) { + TabletSchemaSPtr tablet_schema, + std::shared_ptr partial_update_info, + bool unique_key_mow) { _rowset_writer = rowset_writer; _tablet_schema = tablet_schema; _unique_key_mow = unique_key_mow; + _partial_update_info = partial_update_info; _reset_mem_table(); @@ -195,7 +199,7 @@ void MemTableWriter::_reset_mem_table() { _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema.get(), _req.slots, _req.tuple_desc, - _unique_key_mow, mem_table_insert_tracker, + _unique_key_mow, *_partial_update_info, mem_table_insert_tracker, mem_table_flush_tracker)); _segment_num++; diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index b374f10bdedfbca..d633c6164eb2833 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -67,6 +67,7 @@ class MemTableWriter { ~MemTableWriter(); Status init(std::shared_ptr rowset_writer, TabletSchemaSPtr tablet_schema, + std::shared_ptr partial_update_info, bool unique_key_mow = false); Status write(const vectorized::Block* block, const std::vector& row_idxs, @@ -141,6 +142,8 @@ class MemTableWriter { int64_t _segment_num = 0; MonotonicStopWatch _lock_watch; + + std::shared_ptr _partial_update_info; }; } // namespace doris diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 130d65e7ef448da..cfeb761b7c861ca 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -457,15 +457,21 @@ struct HashOfRowsetId { using RowsetIdUnorderedSet = std::unordered_set; class DeleteBitmap; +struct PartialUpdateInfo; // merge on write context struct MowContext { MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, - std::shared_ptr db) - : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} + std::shared_ptr db, std::shared_ptr info) + : max_version(version), + txn_id(txnid), + rowset_ids(ids), + delete_bitmap(db), + partial_update_info(info) {} int64_t max_version; int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; std::shared_ptr delete_bitmap; + std::shared_ptr partial_update_info; }; // used in mow partial update diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index fc13b1692aab142..5890676cc7e87da 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -223,7 +223,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR del_preds.pop(); } Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn( - request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false); + request.partition_id, tablet, request.transaction_id, load_id, rowset_to_add, false, + false); if (!commit_status.ok() && !commit_status.is()) { res = std::move(commit_status); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index d36d9d90e7e4a24..352cdd7bad1181b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -134,7 +134,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { SCOPED_RAW_TIMER(&_delete_bitmap_ns); if (!_context.tablet->enable_unique_key_merge_on_write() || - _context.tablet_schema->is_partial_update()) { + _context.mow_context->partial_update_info->is_partial_update) { return Status::OK(); } auto rowset = _build_tmp(); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 38cb6d24f9cfeae..7cb7d619028df6f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -131,6 +131,8 @@ class BetaRowsetWriter : public RowsetWriter { int64_t segment_writer_ns() override { return _segment_writer_ns; } + bool is_partial_update() override { return _context.is_partial_update; } + private: Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer); Status _check_segment_number_limit(); diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index a9822722172b799..0b18b1a03e662a4 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -132,6 +132,8 @@ class BetaRowsetWriterV2 : public RowsetWriter { int64_t segment_writer_ns() override { return _segment_writer_ns; } + bool is_partial_update() override { return _context.is_partial_update; } + private: RowsetWriterContext _context; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 21637a2379e024d..26863da03c0c03c 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -151,6 +151,8 @@ class RowsetWriter { virtual int64_t segment_writer_ns() { return 0; } + virtual bool is_partial_update() = 0; + private: DISALLOW_COPY_AND_ASSIGN(RowsetWriter); }; diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 985efa0809ef7e3..68b5fb4437c4c5d 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -105,6 +105,9 @@ struct RowsetWriterContext { // segcompaction for this RowsetWriter, disable it for some transient writers bool enable_segcompaction = true; + // only used in publish phase, the flush phase and commit phase will use + // the partial update info in mow_context + bool is_partial_update = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 9508557d0ef25b4..95266eac11cfa50 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -334,8 +334,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); // find missing column cids - std::vector missing_cids = _tablet_schema->get_missing_cids(); - std::vector including_cids = _tablet_schema->get_update_cids(); + std::vector missing_cids = _mow_context->partial_update_info->missing_cids; + std::vector including_cids = _mow_context->partial_update_info->update_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); @@ -421,7 +421,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc, _mow_context->max_version, segment_caches, &rowset); if (st.is()) { - if (_tablet_schema->is_strict_mode()) { + if (_mow_context->partial_update_info->is_strict_mode) { ++num_rows_filtered; // delete the invalid newly inserted row _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, @@ -429,7 +429,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* segment_pos); } - if (!_tablet_schema->can_insert_new_rows_in_partial_update()) { + if (!_mow_context->partial_update_info->can_insert_new_rows_in_partial_update) { return Status::InternalError( "the unmentioned columns should have default value or be nullable for " "newly inserted rows in non-strict mode partial update"); @@ -492,7 +492,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } // convert missing columns and send to column writer - auto cids_missing = _tablet_schema->get_missing_cids(); + auto cids_missing = _mow_context->partial_update_info->missing_cids; _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, cids_missing); for (auto cid : cids_missing) { @@ -546,7 +546,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const size_t& segment_start_pos) { // create old value columns auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector cids_missing = _tablet_schema->get_missing_cids(); + std::vector cids_missing = _mow_context->partial_update_info->missing_cids; CHECK(cids_missing.size() == old_value_block.columns()); auto mutable_old_columns = old_value_block.mutate_columns(); bool has_row_column = _tablet_schema->store_row_column(); @@ -652,7 +652,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows) { - if (_tablet_schema->is_partial_update() && _opts.write_type == DataWriteType::TYPE_DIRECT) { + if (_mow_context->partial_update_info->is_partial_update && + _opts.write_type == DataWriteType::TYPE_DIRECT) { RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows)); return Status::OK(); } diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index e599d96a8fee84e..3150b60dc551f09 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -43,6 +43,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "util/brpc_client_cache.h" #include "util/mem_info.h" @@ -131,7 +132,7 @@ Status RowsetBuilder::init() { } _delete_bitmap = std::make_shared(_tablet->tablet_id()); mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, - _delete_bitmap); + _delete_bitmap, _partial_update_info); } // check tablet version number @@ -226,7 +227,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { // For partial update, we need to fill in the entire row of data, during the calculation // of the delete bitmap. This operation is resource-intensive, and we need to minimize // the number of times it occurs. Therefore, we skip this operation here. - if (_rowset->tablet_schema()->is_partial_update()) { + if (_partial_update_info->is_partial_update) { return Status::OK(); } @@ -238,8 +239,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { } Status RowsetBuilder::wait_calc_delete_bitmap() { - if (!_tablet->enable_unique_key_merge_on_write() || - _rowset->tablet_schema()->is_partial_update()) { + if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) { return Status::OK(); } std::lock_guard l(_lock); @@ -270,8 +270,9 @@ Status RowsetBuilder::commit_txn() { std::lock_guard l(_lock); SCOPED_TIMER(_commit_txn_timer); - Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, - _req.load_id, _rowset, false); + Status res = _storage_engine->txn_manager()->commit_txn( + _req.partition_id, _tablet, _req.txn_id, _req.load_id, _rowset, false, + _partial_update_info->is_partial_update); if (!res && !res.is()) { LOG(WARNING) << "Failed to commit txn: " << _req.txn_id @@ -324,9 +325,13 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_table_id(table_schema_param->table_id()); // set partial update columns info - _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), - table_schema_param->partial_update_input_columns()); - _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + // _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), + // table_schema_param->partial_update_input_columns()); + // _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + _partial_update_info = std::make_shared(); + _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), + table_schema_param->partial_update_input_columns(), + table_schema_param->is_strict_mode()); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 8bb94c20905651e..0ff0ca72d35630d 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -86,6 +86,10 @@ class RowsetBuilder { // For UT DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; } + std::shared_ptr get_partial_update_info() const { + return _partial_update_info; + } + private: void _garbage_collection(); @@ -113,6 +117,8 @@ class RowsetBuilder { // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + std::shared_ptr _partial_update_info; + RuntimeProfile* _profile = nullptr; RuntimeProfile::Counter* _build_rowset_timer = nullptr; RuntimeProfile::Counter* _submit_delete_bitmap_timer = nullptr; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index ecb1021beb5e258..95046e120e88c97 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1986,7 +1986,8 @@ Status Tablet::create_rowset_writer(RowsetWriterContext& context, // create a rowset writer with rowset_id and seg_id // after writer, merge this transient rowset with original rowset Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, - std::unique_ptr* rowset_writer) { + std::unique_ptr* rowset_writer, + bool is_partial_update) { RowsetWriterContext context; context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; @@ -2000,6 +2001,7 @@ Status Tablet::create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, // get the shared_ptr from tablet_manager. context.tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id()); context.write_type = DataWriteType::TYPE_DIRECT; + context.is_partial_update = is_partial_update; RETURN_IF_ERROR( create_transient_rowset_writer(context, rowset_ptr->rowset_id(), rowset_writer)); (*rowset_writer)->set_segment_start_id(rowset_ptr->num_segments()); @@ -2899,7 +2901,7 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, auto rowset_id = rowset->rowset_id(); Version dummy_version(end_version + 1, end_version + 1); auto rowset_schema = rowset->tablet_schema(); - bool is_partial_update = rowset_schema->is_partial_update(); + bool is_partial_update = rowset_writer->is_partial_update(); // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 43993773542310a..c0151561a0832fb 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -340,7 +340,8 @@ class Tablet : public BaseTablet { std::unique_ptr* rowset_writer); Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr, - std::unique_ptr* rowset_writer); + std::unique_ptr* rowset_writer, + bool is_partial_update = false); Status create_transient_rowset_writer(RowsetWriterContext& context, const RowsetId& rowset_id, std::unique_ptr* rowset_writer); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 5e6ffa3ac777848..782753ef3dd69a0 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -418,4 +418,35 @@ bool operator!=(const TabletSchema& a, const TabletSchema& b); using TabletSchemaSPtr = std::shared_ptr; +struct PartialUpdateInfo { + void init(const TabletSchema& tablet_schema, bool is_partial_update, + const std::set& partial_update_input_columns, bool is_strict_mode) { + this->is_partial_update = is_partial_update; + this->partial_update_input_columns = partial_update_input_columns; + missing_cids.clear(); + update_cids.clear(); + for (auto i = 0; i < tablet_schema.num_columns(); ++i) { + auto tablet_column = tablet_schema.column(i); + if (partial_update_input_columns.count(tablet_column.name()) == 0) { + missing_cids.emplace_back(i); + if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { + can_insert_new_rows_in_partial_update = false; + } + } else { + update_cids.emplace_back(i); + } + } + this->is_strict_mode = is_strict_mode; + } + + bool is_partial_update {false}; + std::set partial_update_input_columns; + std::vector missing_cids; + std::vector update_cids; + // if key not exist in old rowset, use default value or null value for the unmentioned cols + // to generate a new row, only available in non-strict mode + bool can_insert_new_rows_in_partial_update {true}; + bool is_strict_mode {false}; +}; + } // namespace doris diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 2bdd7e5b268fa14..57dfb944b62242e 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -150,9 +150,11 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac Status TxnManager::commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, bool is_recovery) { + const RowsetSharedPtr& rowset_ptr, bool is_recovery, + bool is_partial_update) { return commit_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, - tablet->tablet_id(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery); + tablet->tablet_id(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery, + is_partial_update); } Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, @@ -210,7 +212,8 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, bool is_recovery) { + const RowsetSharedPtr& rowset_ptr, bool is_recovery, + bool is_partial_update) { if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { LOG(FATAL) << "invalid commit req " << " partition_id=" << partition_id << " transaction_id=" << transaction_id @@ -292,6 +295,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { load_info.unique_key_merge_on_write = true; load_info.delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); + load_info.is_partial_update = is_partial_update; } } txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); @@ -356,7 +360,8 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, // update delete_bitmap if (tablet_txn_info.unique_key_merge_on_write) { std::unique_ptr rowset_writer; - static_cast(tablet->create_transient_rowset_writer(rowset, &rowset_writer)); + static_cast(tablet->create_transient_rowset_writer( + rowset, &rowset_writer, tablet_txn_info.is_partial_update)); int64_t t2 = MonotonicMicros(); RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids, @@ -364,7 +369,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, rowset_writer.get())); int64_t t3 = MonotonicMicros(); stats->calc_delete_bitmap_time_us = t3 - t2; - if (rowset->tablet_schema()->is_partial_update()) { + if (tablet_txn_info.is_partial_update) { // build rowset writer and merge transient rowset RETURN_IF_ERROR(rowset_writer->flush()); RowsetSharedPtr transient_rowset = rowset_writer->build(); diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index c311fed87994616..fc1dcbd5c05ce89 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -59,6 +59,7 @@ struct TabletTxnInfo { RowsetIdUnorderedSet rowset_ids; int64_t creation_time; bool ingest {false}; + bool is_partial_update {false}; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} @@ -120,7 +121,7 @@ class TxnManager { Status commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, bool is_recovery); + const RowsetSharedPtr& rowset_ptr, bool is_recovery, bool is_partial_update); Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id, const Version& version, @@ -135,7 +136,7 @@ class TxnManager { Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, - const RowsetSharedPtr& rowset_ptr, bool is_recovery); + const RowsetSharedPtr& rowset_ptr, bool is_recovery, bool is_partial_update); // remove a txn from txn manager // not persist rowset meta because diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 69c9e6608e745ba..32ee263864dfaa6 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -702,7 +702,7 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result, Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(), - rowset_meta->load_id(), rowset, false); + rowset_meta->load_id(), rowset, false, false); if (!commit_txn_status && !commit_txn_status.is()) { auto err_msg = fmt::format( "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, " diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 02ab1b4450a46a3..87b3de9b579482c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1475,7 +1475,7 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset( Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn( tablet->data_dir()->get_meta(), rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(), tablet->tablet_uid(), rowset_meta->load_id(), rowset, - false); + false, false); if (!commit_txn_status && !commit_txn_status.is()) { LOG(WARNING) << "failed to add committed rowset for slave replica. rowset_id=" << rowset_meta->rowset_id() << ", tablet_id=" << rowset_meta->tablet_id()