diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 71ca504d3a010a..02dc50a491a466 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -124,6 +124,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _version = pschema.version(); _is_partial_update = pschema.partial_update(); _is_strict_mode = pschema.is_strict_mode(); + _is_unique_key_replace_if_not_null = pschema.is_unique_key_replace_if_not_null(); for (auto& col : pschema.partial_update_input_columns()) { _partial_update_input_columns.insert(col); @@ -176,9 +177,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _table_id = tschema.table_id; _version = tschema.version; _is_partial_update = tschema.is_partial_update; - if (tschema.__isset.is_strict_mode) { - _is_strict_mode = tschema.is_strict_mode; - } + _is_strict_mode = tschema.__isset.is_strict_mode && tschema.is_strict_mode; + _is_unique_key_replace_if_not_null = tschema.__isset.is_unique_key_replace_if_not_null && + tschema.is_unique_key_replace_if_not_null; for (auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); @@ -246,6 +247,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_version(_version); pschema->set_partial_update(_is_partial_update); pschema->set_is_strict_mode(_is_strict_mode); + pschema->set_is_unique_key_replace_if_not_null(_is_unique_key_replace_if_not_null); for (auto col : _partial_update_input_columns) { *pschema->add_partial_update_input_columns() = col; } diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 3e6ab7b94be922..056f49786982ce 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -90,6 +90,7 @@ class OlapTableSchemaParam { return _partial_update_input_columns; } bool is_strict_mode() const { return _is_strict_mode; } + bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } std::string debug_string() const; private: @@ -104,6 +105,7 @@ class OlapTableSchemaParam { bool _is_partial_update = false; std::set _partial_update_input_columns; bool _is_strict_mode = false; + bool _is_unique_key_replace_if_not_null = false; }; using OlapTableIndexTablets = TOlapTableIndexTablets; diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp b/be/src/olap/calc_delete_bitmap_executor.cpp index 51e77a362809a9..4675a1c576c8d2 100644 --- a/be/src/olap/calc_delete_bitmap_executor.cpp +++ b/be/src/olap/calc_delete_bitmap_executor.cpp @@ -30,10 +30,12 @@ namespace doris { using namespace ErrorCode; -Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, - const segment_v2::SegmentSharedPtr& cur_segment, - const std::vector& target_rowsets, - int64_t end_version, RowsetWriter* rowset_writer) { +Status CalcDeleteBitmapToken::submit( + TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, + const segment_v2::SegmentSharedPtr& cur_segment, + const std::vector& target_rowsets, + std::shared_ptr>> indicator_maps, + int64_t end_version, RowsetWriter* rowset_writer) { { std::shared_lock rlock(_lock); RETURN_IF_ERROR(_status); @@ -45,8 +47,9 @@ Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur _delete_bitmaps.push_back(bitmap); } return _thread_token->submit_func([=, this]() { - auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets, - bitmap, end_version, rowset_writer); + auto st = + tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, target_rowsets, bitmap, + indicator_maps, end_version, rowset_writer); if (!st.ok()) { LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " << tablet->tablet_id() << " rowset: " << cur_rowset->rowset_id() diff --git a/be/src/olap/calc_delete_bitmap_executor.h b/be/src/olap/calc_delete_bitmap_executor.h index d2c392a04d49dd..ada36a89d665ef 100644 --- a/be/src/olap/calc_delete_bitmap_executor.h +++ b/be/src/olap/calc_delete_bitmap_executor.h @@ -51,8 +51,9 @@ class CalcDeleteBitmapToken { Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, const segment_v2::SegmentSharedPtr& cur_segment, - const std::vector& target_rowsets, int64_t end_version, - RowsetWriter* rowset_writer); + const std::vector& target_rowsets, + std::shared_ptr>> indicator_maps, + int64_t end_version, RowsetWriter* rowset_writer); // wait all tasks in token to be completed. Status wait(); diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 7feb857723b1a3..43636a7a15fc49 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -194,7 +194,7 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets, - delete_bitmap, cur_version, nullptr, + delete_bitmap, nullptr, cur_version, nullptr, rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 130d65e7ef448d..70f96869d5ac84 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -31,6 +31,7 @@ #include #include #include +#include #include "io/io_common.h" #include "olap/olap_define.h" @@ -471,10 +472,21 @@ struct MowContext { // used in mow partial update struct RidAndPos { uint32_t rid; - // pos in block - size_t pos; + uint32_t pos; // pos in block }; -using PartialUpdateReadPlan = std::map>>; +struct ReadRowsInfo { + RidAndPos id_and_pos; + std::vector cids; // cids for partial update columns +}; +struct ReadColumnsInfo { + std::vector missing_column_rows; + std::map> partial_update_rows; +}; + +using RowStoreReadPlan = std::map>>; +using ColumnStoreReadPlan = std::map>; +using PartialUpdateReadPlan = std::variant; +using IndicatorMaps = std::map>; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 2019ea7c349b17..f8a5d28b318881 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -148,8 +148,8 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { } OlapStopWatch watch; RETURN_IF_ERROR(_context.tablet->calc_delete_bitmap( - rowset, segments, specified_rowsets, _context.mow_context->delete_bitmap, - _context.mow_context->max_version, nullptr)); + rowset, segments, specified_rowsets, _context.mow_context->delete_bitmap, nullptr, + _context.mow_context->max_version, nullptr, nullptr)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 38cb6d24f9cfea..8c2fc36b35e9b4 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -102,6 +102,11 @@ class BetaRowsetWriter : public RowsetWriter { int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); } + // currently, a rowset cantains at most one segment, so we just return the segment's indicator maps + std::shared_ptr get_indicator_maps() const override { + return _segment_creator.get_indicator_maps(); + } + RowsetId rowset_id() override { return _context.rowset_id; } RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h b/be/src/olap/rowset/beta_rowset_writer_v2.h index a9822722172b79..70670817c4a8d6 100644 --- a/be/src/olap/rowset/beta_rowset_writer_v2.h +++ b/be/src/olap/rowset/beta_rowset_writer_v2.h @@ -120,6 +120,11 @@ class BetaRowsetWriterV2 : public RowsetWriter { return Status::OK(); } + // currently, a rowset cantains at most one segment, so we just return the segment's indicator maps + std::shared_ptr get_indicator_maps() const override { + return _segment_creator.get_indicator_maps(); + } + Status add_segment(uint32_t segment_id, SegmentStatistics& segstat) override; int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); }; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 21637a2379e024..8f5d9e8666e597 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -131,6 +131,8 @@ class RowsetWriter { virtual int64_t num_rows_filtered() const = 0; + virtual std::shared_ptr get_indicator_maps() const = 0; + virtual RowsetId rowset_id() = 0; virtual RowsetTypePB type() const = 0; diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index 458e31992e737e..2934649865ef6b 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -145,6 +145,14 @@ Status SegmentFlusher::_flush_segment_writer(std::unique_ptrget_inverted_index_file_size(); segstat.key_bounds = key_bounds; + if (!_indicator_maps) { + _indicator_maps.reset(new IndicatorMaps); + } + auto indicator_maps = writer->get_indicator_maps(); + if (indicator_maps) { + _indicator_maps->merge(*indicator_maps); + } + writer.reset(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat)); diff --git a/be/src/olap/rowset/segment_creator.h b/be/src/olap/rowset/segment_creator.h index 750aa1487240a3..8ea7407856c739 100644 --- a/be/src/olap/rowset/segment_creator.h +++ b/be/src/olap/rowset/segment_creator.h @@ -98,6 +98,8 @@ class SegmentFlusher { int64_t num_rows_filtered() const { return _num_rows_filtered; } + std::shared_ptr get_indicator_maps() const { return _indicator_maps; } + Status close(); public: @@ -142,6 +144,8 @@ class SegmentFlusher { // written rows by add_block/add_row std::atomic _num_rows_written = 0; std::atomic _num_rows_filtered = 0; + + std::shared_ptr _indicator_maps = nullptr; }; class SegmentCreator { @@ -166,6 +170,10 @@ class SegmentCreator { int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); } + std::shared_ptr get_indicator_maps() const { + return _segment_flusher.get_indicator_maps(); + } + // Flush a block into a single segment, with pre-allocated segment_id. // Return the file size flushed to disk in "flush_size" // This method is thread-safe. diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 1bfac900d2b592..cd1f58f0911374 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -30,6 +30,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" // LOG +#include "common/status.h" #include "gutil/port.h" #include "io/fs/file_writer.h" #include "olap/data_dir.h" @@ -51,11 +52,13 @@ #include "util/crc32c.h" #include "util/faststring.h" #include "util/key_util.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/common/schema_util.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" +#include "vec/data_types/data_type_factory.hpp" #include "vec/io/reader_buffer.h" #include "vec/jsonb/serialize.h" #include "vec/olap/olap_data_convertor.h" @@ -323,7 +326,7 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) { Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block, size_t row_pos, size_t num_rows) { if (block->columns() <= _tablet_schema->num_key_columns() || - block->columns() >= _tablet_schema->num_columns()) { + block->columns() > _tablet_schema->num_columns()) { return Status::InternalError( fmt::format("illegal partial update block columns: {}, num key columns: {}, total " "schema columns: {}", @@ -332,25 +335,69 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); - // find missing column cids + // all the cells in missing_cids will be filled from the old rows std::vector missing_cids = _tablet_schema->get_missing_cids(); std::vector including_cids = _tablet_schema->get_update_cids(); + std::vector determistic_cids; // columns that dont't need read from old rows + std::vector point_read_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); - size_t input_id = 0; - for (auto i : including_cids) { - full_block.replace_by_position(i, block->get_by_position(input_id++).column); + + // indictors to apply partial updates for including columns, + // cells with `indicator` values should be read from the old rows + // currently we use treat null value as `indicator` + IndicatorMapsVertical indicator_maps_vertical; // cid -> rows + + PartialUpdateReadPlan read_plan; + if (_tablet_schema->store_row_column()) { + read_plan = RowStoreReadPlan {}; + } + + bool is_unique_key_replace_if_not_null = _tablet_schema->is_unique_key_replace_if_not_null(); + if (is_unique_key_replace_if_not_null) { + for (size_t i = 0; i < including_cids.size(); i++) { + uint32_t cid = including_cids[i]; + vectorized::ColumnPtr col = block->get_by_position(i).column; + full_block.replace_by_position(cid, col); + if (i < _num_key_columns) { + // key columns always don't need to be read from old rows + determistic_cids.emplace_back(cid); + continue; + } + + if (col->is_nullable() && col->has_null() && + (!_tablet_schema->has_sequence_col() || + cid != _tablet_schema->sequence_col_idx())) { + // we don't read from old rows for seq col if include-cids contain the sequence column + indicator_maps_vertical[cid] = + assert_cast(col.get()) + ->get_null_map_data() + .data(); + point_read_cids.emplace_back(cid); + } else { + indicator_maps_vertical[cid] = nullptr; + determistic_cids.emplace_back(cid); + } + } + + _calc_indicator_maps(row_pos, num_rows, indicator_maps_vertical); + } else { + determistic_cids = including_cids; + for (size_t i = 0; i < including_cids.size(); i++) { + full_block.replace_by_position(including_cids[i], block->get_by_position(i).column); + } } + _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, - including_cids); + determistic_cids); bool have_input_seq_column = false; - // write including columns + // write key columns std::vector key_columns; vectorized::IOlapColumnDataAccessor* seq_column = nullptr; size_t segment_start_pos; - for (auto cid : including_cids) { + for (auto cid : determistic_cids) { // here we get segment column row num before append data. segment_start_pos = _column_writers[cid]->get_next_rowid(); // olap data convertor alway start from id = 0 @@ -459,7 +506,13 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // partial update should not contain invisible columns use_default_or_null_flag.emplace_back(false); _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - _tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid); + + if (is_unique_key_replace_if_not_null && _indicator_maps->contains(block_pos)) { + _tablet->prepare_to_read(read_plan, loc, segment_pos, + _indicator_maps->at(block_pos)); + } else { + _tablet->prepare_to_read(read_plan, loc, segment_pos, {}); + } } if (st.is()) { @@ -481,20 +534,31 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } // read and fill block - auto mutable_full_columns = full_block.mutate_columns(); - RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + RETURN_IF_ERROR(fill_missing_columns(&full_block, read_plan, missing_cids, point_read_cids, + use_default_or_null_flag, has_default_or_nullable, + segment_start_pos, is_unique_key_replace_if_not_null)); // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format _serialize_block_to_row_column(full_block); } - // convert missing columns and send to column writer - auto cids_missing = _tablet_schema->get_missing_cids(); + std::vector remaining_cids; + if (is_unique_key_replace_if_not_null) { + remaining_cids.reserve(missing_cids.size() + point_read_cids.size()); + for (uint32_t cid : missing_cids) { + remaining_cids.emplace_back(cid); + } + for (uint32_t cid : point_read_cids) { + remaining_cids.emplace_back(cid); + } + } else { + remaining_cids = missing_cids; + } + _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, - cids_missing); - for (auto cid : cids_missing) { + remaining_cids); + for (auto cid : remaining_cids) { auto converted_result = _olap_data_convertor->convert_column_data(cid); if (!converted_result.first.ok()) { return converted_result.first; @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector& cids_full_read, const std::vector& cids_point_read, + const std::vector& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); + vectorized::Block old_point_read_block = _tablet_schema->create_block_by_cids(cids_point_read); + vectorized::MutableColumns old_point_read_columns = old_point_read_block.mutate_columns(); + vectorized::MutableColumns filled_including_value_columns = + old_point_read_block.clone_empty_columns(); // used to hold data after being filled + // !!NOTE: columns in old_point_read_block may have different row nums! + + // rowid in input block -> line num in old_full_read_block + std::map missing_cols_read_index; + // partial update cid -> (rowid in input block -> line num in old_point_read_block) + std::map> parital_update_cols_read_index; + + if (is_unique_key_replace_if_not_null) { + RETURN_IF_ERROR(_tablet->read_columns_by_plan( + _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read, &cids_point_read, + &old_full_read_block, &old_point_read_block, &missing_cols_read_index, + &parital_update_cols_read_index)); + } else { + RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema, _rsid_to_rowset, read_plan, + &cids_full_read, &old_full_read_block, + &missing_cols_read_index)); + } + // build default value columns - auto default_value_block = old_value_block.clone_empty(); + auto default_value_block = old_full_read_block.clone_empty(); auto mutable_default_value_columns = default_value_block.mutate_columns(); if (has_default_or_nullable) { - for (auto i = 0; i < cids_missing.size(); ++i) { - const auto& column = _tablet_schema->column(cids_missing[i]); + for (auto i = 0; i < cids_full_read.size(); ++i) { + const auto& column = _tablet_schema->column(cids_full_read[i]); if (column.has_default_value()) { - auto default_value = _tablet_schema->column(cids_missing[i]).default_value(); + auto default_value = _tablet_schema->column(cids_full_read[i]).default_value(); vectorized::ReadBuffer rb(const_cast(default_value.c_str()), default_value.size()); - old_value_block.get_by_position(i).type->from_string( + old_full_read_block.get_by_position(i).type->from_string( rb, mutable_default_value_columns[i].get()); } } } - // fill all missing value from mutable_old_columns, need to consider default value and null value + // fill all missing value from old_value_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { if (use_default_or_null_flag[idx]) { - for (auto i = 0; i < cids_missing.size(); ++i) { + for (auto i = 0; i < cids_full_read.size(); ++i) { // if the column has default value, fiil it with default value // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(cids_missing[i]); + const auto& tablet_column = _tablet_schema->column(cids_full_read[i]); if (tablet_column.has_default_value()) { - mutable_full_columns[cids_missing[i]]->insert_from( + full_columns[cids_full_read[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); } else if (tablet_column.is_nullable()) { auto nullable_column = assert_cast( - mutable_full_columns[cids_missing[i]].get()); + full_columns[cids_full_read[i]].get()); nullable_column->insert_null_elements(1); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[cids_missing[i]]->insert_default(); + full_columns[cids_full_read[i]]->insert_default(); } } continue; } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < cids_missing.size(); ++i) { - mutable_full_columns[cids_missing[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); + + //TODO(bobhan1): fix me later(the wrong row pos) + // TODO(bobhan1): handle row store column here!!! + uint32_t pos_in_old_block = missing_cols_read_index[idx + segment_start_pos]; + for (auto i = 0; i < cids_full_read.size(); ++i) { + uint32_t cid = cids_full_read[i]; + if (full_block->get_by_position(cid).name != BeConsts::ROW_STORE_COL) { + full_columns[cid]->insert_from( + *old_full_read_block.get_columns_with_type_and_name()[i].column.get(), + pos_in_old_block); + } + } + + if (is_unique_key_replace_if_not_null) { + //TODO(bobhan1): fix me later(the wrong row pos) + for (size_t i = 0; i < cids_point_read.size(); i++) { + uint32_t cid = cids_point_read[i]; + if (parital_update_cols_read_index[cid].contains(idx + segment_start_pos)) { + // cells with indicator value should be replaced with old values in previous rows + uint32_t pos_in_old_block = + parital_update_cols_read_index[cid][idx + segment_start_pos]; + filled_including_value_columns[i]->insert_from(*old_point_read_columns[i], + pos_in_old_block); + } else { + filled_including_value_columns[i]->insert_from(*full_columns[cid], + idx + segment_start_pos); + } + } + } + } + + if (is_unique_key_replace_if_not_null) { + for (size_t i = 0; i < cids_point_read.size(); i++) { + full_block->replace_by_position(cids_point_read[i], + std::move(filled_including_value_columns[i])); } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 9133712a8c4ca6..a8b0d03c7e0126 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -78,6 +78,8 @@ struct SegmentWriterOptions { using TabletSharedPtr = std::shared_ptr; +using IndicatorMapsVertical = std::map; // TODO(baohan) + class SegmentWriter { public: explicit SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, @@ -128,9 +130,15 @@ class SegmentWriter { void clear(); void set_mow_context(std::shared_ptr mow_context); - Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, + Status fill_missing_columns(vectorized::Block* full_block, + const PartialUpdateReadPlan& read_plan, + const std::vector& cids_full_read, + const std::vector& cids_point_read, const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + bool is_unique_key_replace_if_not_null); + + std::shared_ptr get_indicator_maps() const { return _indicator_maps; } private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -162,6 +170,9 @@ class SegmentWriter { bool _should_create_writers_with_dynamic_block(size_t num_columns_in_block); void _serialize_block_to_row_column(vectorized::Block& block); + void _calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical); + private: uint32_t _segment_id; TabletSchemaSPtr _tablet_schema; @@ -206,9 +217,20 @@ class SegmentWriter { std::shared_ptr _mow_context; // group every rowset-segment row id to speed up reader - PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; + // For partial update, during publish version we may generate a new block to handle + // the data loss problem due to concurrent update. We need to re-calculate the values + // read from the old rows because there may be new rows with same keys written successfully + // during the period of flush phase and publish phase of the current write. Columns to read from + // the segements in old versions for this purpose include `missing_cols` and part of the + // `including_cols` if the property `enable_unique_key_replace_if_not_null` is turned on for all conflict rows. + // However, in publish version, the input block(s) has been trasnformed to segment(s) and we can't + // get the information about the locations of indicator values DIRECTLY. So we keep the information of + // the locations of indicator values of `including_cols` in `_indicator_maps` and pass it to publish phase + // through TabletTxnInfo + std::shared_ptr _indicator_maps = nullptr; + // record row locations here and used when memtable flush }; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 1de133d3965d45..9e6ead7ee3dedd 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -277,9 +277,17 @@ Status RowsetBuilder::commit_txn() { return res; } if (_tablet->enable_unique_key_merge_on_write()) { - _storage_engine->txn_manager()->set_txn_related_delete_bitmap( - _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->tablet_uid(), true, - _delete_bitmap, _rowset_ids); + if (_tablet_schema->is_partial_update()) { + auto indicator_maps = _rowset_writer->get_indicator_maps(); + _storage_engine->txn_manager()->set_txn_related_delete_bitmap_and_indicator_maps( + _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->tablet_uid(), + true, _delete_bitmap, _rowset_ids, + (!indicator_maps || indicator_maps->empty()) ? nullptr : indicator_maps); + } else { + _storage_engine->txn_manager()->set_txn_related_delete_bitmap( + _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->tablet_uid(), + true, _delete_bitmap, _rowset_ids); + } } _is_committed = true; @@ -325,6 +333,8 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t index_id, _tablet_schema->set_partial_update_info(table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns()); _tablet_schema->set_is_strict_mode(table_schema_param->is_strict_mode()); + _tablet_schema->set_is_unique_key_replace_if_not_null( + table_schema_param->is_unique_key_replace_if_not_null()); } } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 6a5f1dc50997e4..07fbb1f09a8a58 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2669,15 +2669,16 @@ Status Tablet::_get_segment_column_iterator( } // fetch value by row column -Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, - const std::vector& rowids, - const std::vector& cids, - vectorized::Block& block) { +Status Tablet::fetch_value_through_row_column( + RowsetSharedPtr input_rowset, uint32_t segid, const std::vector& rids, + const std::vector& rows_info, const std::vector* cids_full_read, + const std::vector* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, bool with_point_read) { MonotonicStopWatch watch; watch.start(); Defer _defer([&]() { LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 - << ", row_batch_size:" << rowids.size(); + << ", row_batch_size:" << rids.size(); }); BetaRowsetSharedPtr rowset = std::static_pointer_cast(input_rowset); @@ -2691,20 +2692,40 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); - RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); - assert(column_ptr->size() == rowids.size()); + RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(), column_ptr)); + assert(column_ptr->size() == rids.size()); auto string_column = static_cast(column_ptr.get()); - vectorized::DataTypeSerDeSPtrs serdes; - serdes.resize(cids.size()); - std::unordered_map col_uid_to_idx; - for (int i = 0; i < cids.size(); ++i) { - const TabletColumn& column = tablet_schema->column(cids[i]); + vectorized::DataTypeSerDeSPtrs serdes_full_read; + serdes_full_read.resize(cids_full_read->size()); + + std::unordered_map col_uid_to_idx_full_read; + + for (int i = 0; i < cids_full_read->size(); ++i) { + const TabletColumn& column = tablet_schema->column(cids_full_read->at(i)); vectorized::DataTypePtr type = vectorized::DataTypeFactory::instance().create_data_type(column); - col_uid_to_idx[column.unique_id()] = i; - serdes[i] = type->get_serde(); + col_uid_to_idx_full_read[column.unique_id()] = i; + serdes_full_read[i] = type->get_serde(); + } + + if (with_point_read) { + vectorized::DataTypeSerDeSPtrs serdes_point_read; + serdes_point_read.resize(cids_point_read->size()); + std::unordered_map> col_uid_to_idx_cid_point_read; + for (int i = 0; i < cids_point_read->size(); ++i) { + const TabletColumn& column = tablet_schema->column(cids_point_read->at(i)); + vectorized::DataTypePtr type = + vectorized::DataTypeFactory::instance().create_data_type(column); + col_uid_to_idx_cid_point_read[column.unique_id()] = {i, cids_point_read->at(i)}; + serdes_point_read[i] = type->get_serde(); + } + vectorized::JsonbSerializeUtil::jsonb_to_block( + serdes_full_read, serdes_point_read, *string_column, col_uid_to_idx_full_read, + col_uid_to_idx_cid_point_read, rows_info, *block_full_read, *block_point_read); + } else { + vectorized::JsonbSerializeUtil::jsonb_to_block(serdes_full_read, *string_column, + col_uid_to_idx_full_read, *block_full_read); } - vectorized::JsonbSerializeUtil::jsonb_to_block(serdes, *string_column, col_uid_to_idx, block); return Status::OK(); } @@ -2880,19 +2901,24 @@ void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_b row_pos_vec.data() + in_block.rows()); } -Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, - const segment_v2::SegmentSharedPtr& seg, - const std::vector& specified_rowsets, - DeleteBitmapPtr delete_bitmap, int64_t end_version, - RowsetWriter* rowset_writer) { +Status Tablet::calc_segment_delete_bitmap( + RowsetSharedPtr rowset, const segment_v2::SegmentSharedPtr& seg, + const std::vector& specified_rowsets, DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t end_version, RowsetWriter* rowset_writer) { OlapStopWatch watch; auto rowset_id = rowset->rowset_id(); Version dummy_version(end_version + 1, end_version + 1); auto rowset_schema = rowset->tablet_schema(); bool is_partial_update = rowset_schema->is_partial_update(); + bool has_row_column = rowset_schema->store_row_column(); // use for partial update PartialUpdateReadPlan read_plan_ori; PartialUpdateReadPlan read_plan_update; + if (has_row_column) { + read_plan_ori = RowStoreReadPlan {}; + read_plan_update = RowStoreReadPlan {}; + } std::map rsid_to_rowset; rsid_to_rowset[rowset_id] = rowset; @@ -2971,8 +2997,15 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, // So here we should read version 5's columns and build a new row, which is // consists of version 6's update columns and version 5's origin columns // here we build 2 read plan for ori values and update values - prepare_to_read(loc, pos, &read_plan_ori); - prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); + if (indicator_maps) { + prepare_to_read(read_plan_ori, loc, pos, indicator_maps->at(row_id)); + prepare_to_read(read_plan_update, + RowLocation {rowset_id, seg->id(), row_id}, pos, {}); + } else { + prepare_to_read(read_plan_ori, loc, pos, {}); + prepare_to_read(read_plan_update, + RowLocation {rowset_id, seg->id(), row_id}, pos, {}); + } rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; ++pos; // delete bitmap will be calculate when memtable flush and @@ -3010,8 +3043,9 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, } if (pos > 0) { - RETURN_IF_ERROR(generate_new_block_for_partial_update( - rowset_schema, read_plan_ori, read_plan_update, rsid_to_rowset, &block)); + RETURN_IF_ERROR(generate_new_block_for_partial_update(rowset_schema, read_plan_ori, + read_plan_update, rsid_to_rowset, + indicator_maps, &block)); sort_block(block, ordered_block); RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); } @@ -3027,11 +3061,11 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, // user can get all delete bitmaps from that token. // if `token` is nullptr, the calculation will run in local, and user can get the result // delete bitmap from `delete_bitmap` directly. -Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, - const std::vector& segments, - const std::vector& specified_rowsets, - DeleteBitmapPtr delete_bitmap, int64_t end_version, - CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { +Status Tablet::calc_delete_bitmap( + RowsetSharedPtr rowset, const std::vector& segments, + const std::vector& specified_rowsets, DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t end_version, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { auto rowset_id = rowset->rowset_id(); if (specified_rowsets.empty() || segments.empty()) { LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id() @@ -3050,14 +3084,14 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, for (size_t i = 0; i < segments.size(); i++) { auto& seg = segments[i]; if (token != nullptr) { - RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, end_version, - rowset_writer)); + RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, specified_rowsets, + indicator_maps, end_version, rowset_writer)); } else { DeleteBitmapPtr seg_delete_bitmap = std::make_shared(tablet_id()); seg_delete_bitmaps.push_back(seg_delete_bitmap); RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], specified_rowsets, - seg_delete_bitmap, end_version, - rowset_writer)); + seg_delete_bitmap, indicator_maps, + end_version, rowset_writer)); } } @@ -3088,6 +3122,7 @@ Status Tablet::generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, + std::shared_ptr>> indicator_maps, vectorized::Block* output_block) { // do partial update related works // 1. read columns by read plan @@ -3095,103 +3130,270 @@ Status Tablet::generate_new_block_for_partial_update( // 3. write a new segment and modify rowset meta // 4. mark current keys deleted CHECK(output_block); - auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_missing_columns_block(); - auto missing_cids = rowset_schema->get_missing_cids(); - auto update_block = rowset_schema->create_update_columns_block(); - auto update_cids = rowset_schema->get_update_cids(); - - std::map read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old)); - - std::map read_index_update; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + if (!indicator_maps) { + auto full_mutable_columns = output_block->mutate_columns(); + auto old_block = rowset_schema->create_missing_columns_block(); + auto missing_cids = rowset_schema->get_missing_cids(); + auto update_block = rowset_schema->create_update_columns_block(); + auto update_cids = rowset_schema->get_update_cids(); + + std::map read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &old_block, &read_index_old)); + + std::map read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + + // build full block + CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_block.get_columns_with_type_and_name()[i].column.get(), + read_index_old[idx]); + } + } + for (auto i = 0; i < update_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_update.size(); ++idx) { + full_mutable_columns[update_cids[i]]->insert_from( + *update_block.get_columns_with_type_and_name()[i].column.get(), + read_index_update[idx]); + } + } + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + } else { + std::unordered_set cids_point_read; + for (const auto& [_, cids] : *indicator_maps) { + for (uint32_t cid : cids) { + cids_point_read.insert(cid); + } + } + std::vector point_read_cids; + point_read_cids.reserve(cids_point_read.size()); + for (uint32_t cid : cids_point_read) { + point_read_cids.emplace_back(cid); + } + auto full_mutable_columns = output_block->mutate_columns(); + auto old_full_read_block = rowset_schema->create_missing_columns_block(); + auto old_full_read_cids = rowset_schema->get_missing_cids(); + auto point_read_block = rowset_schema->create_block_by_cids(point_read_cids); + + std::map full_read_index_old; + std::map> point_read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &old_full_read_cids, &point_read_cids, + &old_full_read_block, &point_read_block, + &full_read_index_old, &point_read_index_old)); + + auto update_block = rowset_schema->create_update_columns_block(); + auto update_cids = rowset_schema->get_update_cids(); + std::map read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + // construct the full block + CHECK(full_read_index_old.size() == read_index_update.size()); + + for (size_t i = 0; i < old_full_read_cids.size(); ++i) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[old_full_read_cids[i]]->insert_from( + *old_full_read_block.get_by_position(i).column.get(), + full_read_index_old[idx]); + } + } - // build full block - CHECK(read_index_old.size() == read_index_update.size()); - for (auto i = 0; i < missing_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_old.size(); ++idx) { - full_mutable_columns[missing_cids[i]]->insert_from( - *old_block.get_columns_with_type_and_name()[i].column.get(), - read_index_old[idx]); + for (size_t i = 0; i < update_cids.size(); i++) { + uint32_t cid = update_cids[i]; + if (!cids_point_read.contains(cid)) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), read_index_update[idx]); + } + } } - } - for (auto i = 0; i < update_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_update.size(); ++idx) { - full_mutable_columns[update_cids[i]]->insert_from( - *update_block.get_columns_with_type_and_name()[i].column.get(), - read_index_update[idx]); + + for (size_t i = 0; i < point_read_cids.size(); i++) { + uint32_t cid = point_read_cids[i]; + for (uint32_t idx = 0; i < full_read_index_old.size(); i++) { + if (point_read_index_old[cid].contains(idx)) { + full_mutable_columns[cid]->insert_from( + *point_read_block.get_by_position(i).column.get(), + point_read_index_old[cid][idx]); + } else { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), idx); + } + } } } - VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); return Status::OK(); } - -// read columns by read plan -// read_index: ori_pos-> block_idx Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector cids_to_read, - const PartialUpdateReadPlan& read_plan, const std::map& rsid_to_rowset, - vectorized::Block& block, - std::map* read_index) { - bool has_row_column = tablet_schema->store_row_column(); - auto mutable_columns = block.mutate_columns(); - size_t read_idx = 0; - for (auto rs_it : read_plan) { - for (auto seg_it : rs_it.second) { - auto rowset_iter = rsid_to_rowset.find(rs_it.first); - CHECK(rowset_iter != rsid_to_rowset.end()); - std::vector rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - (*read_index)[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + const PartialUpdateReadPlan& read_plan, + const std::vector* cids_full_read, + vectorized::Block* block_full_read, + std::map* full_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + uint32_t read_idx1 = 0; + + if (std::holds_alternative(read_plan)) { + const auto& row_store_read_plan = std::get(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, nullptr, block_full_read, + nullptr, false); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; } - continue; } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block + } + } else { + const auto& column_store_read_plan = std::get(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } + return Status::OK(); +} + +Status Tablet::read_columns_by_plan( + TabletSchemaSPtr tablet_schema, const std::map& rsid_to_rowset, + const PartialUpdateReadPlan& read_plan, const std::vector* cids_full_read, + const std::vector* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, std::map* full_read_index, + std::map>* point_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + auto point_read_columns = block_point_read->mutate_columns(); + + uint32_t read_idx1 = 0; + std::map read_idx2; + for (uint32_t cid : *cids_point_read) { + read_idx2[cid] = 0; + } + + if (std::holds_alternative(read_plan)) { + const auto& row_store_read_plan = std::get(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + for (const auto cid : cids) { + // set read index for partial update columns + (*point_read_index)[cid][id_and_pos.pos] = read_idx2[cid]++; + } + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, cids_point_read, + block_full_read, block_point_read, true); if (!st.ok()) { - LOG(WARNING) << "failed to fetch value"; + LOG(WARNING) << "failed to fetch value through row column"; return st; } } } + } else { + const auto& column_store_read_plan = std::get(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + // read values for cells with indicator values in including columns + for (size_t i = 0; i < cids_point_read->size(); i++) { + const auto& rows_info = columns_info.partial_update_rows; + uint32_t cid = cids_point_read->at(i); + if (!rows_info.empty() && rows_info.contains(cid)) { + std::vector rids; + for (auto [rid, pos] : rows_info.at(cid)) { + rids.emplace_back(rid); + // set read index for partial update columns + (*point_read_index)[cid][pos] = read_idx2[cid]++; + } + + TabletColumn tablet_column = tablet_schema->column(cid); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + point_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } } return Status::OK(); } -void Tablet::prepare_to_read(const RowLocation& row_location, size_t pos, - PartialUpdateReadPlan* read_plan) { - auto rs_it = read_plan->find(row_location.rowset_id); - if (rs_it == read_plan->end()) { - std::map> segid_to_rid; - std::vector rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - segid_to_rid.emplace(row_location.segment_id, rid_pos); - read_plan->emplace(row_location.rowset_id, segid_to_rid); - return; - } - auto seg_it = rs_it->second.find(row_location.segment_id); - if (seg_it == rs_it->second.end()) { - std::vector rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - rs_it->second.emplace(row_location.segment_id, rid_pos); - return; +void Tablet::prepare_to_read(PartialUpdateReadPlan& read_plan, const RowLocation& row_location, + uint32_t pos, const std::vector& partial_update_cids) { + if (std::holds_alternative(read_plan)) { + std::get(read_plan)[row_location.rowset_id][row_location.segment_id] + .emplace_back(RidAndPos {row_location.row_id, pos}, partial_update_cids); + } else { + auto& read_columns_info = std::get( + read_plan)[row_location.rowset_id][row_location.segment_id]; + read_columns_info.missing_column_rows.emplace_back(row_location.row_id, pos); + for (uint32_t cid : partial_update_cids) { + read_columns_info.partial_update_rows[cid].emplace_back(row_location.row_id, pos); + } } - seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos}); } void Tablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, @@ -3228,7 +3430,7 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) std::vector specified_rowsets = get_rowset_by_ids(&cur_rowset_ids); OlapStopWatch watch; auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, nullptr, cur_version - 1, token.get())); RETURN_IF_ERROR(token->wait()); RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap)); @@ -3280,7 +3482,7 @@ Status Tablet::commit_phase_update_delete_bitmap( delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); } - RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, nullptr, cur_version, token, rowset_writer)); size_t total_rows = std::accumulate( segments.begin(), segments.end(), 0, @@ -3294,10 +3496,11 @@ Status Tablet::commit_phase_update_delete_bitmap( return Status::OK(); } -Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, - const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, int64_t txn_id, - RowsetWriter* rowset_writer) { +Status Tablet::update_delete_bitmap( + const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t txn_id, RowsetWriter* rowset_writer) { SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; @@ -3333,7 +3536,8 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, OlapStopWatch watch; auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, - cur_version - 1, token.get(), rowset_writer)); + indicator_maps, cur_version - 1, token.get(), + rowset_writer)); RETURN_IF_ERROR(token->wait()); RETURN_IF_ERROR(token->get_delete_bitmap(delete_bitmap)); size_t total_rows = std::accumulate( diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 2a10535a982804..4923b69d31b918 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -133,6 +133,7 @@ class Tablet : public BaseTablet { Version max_version_unlocked() const; CumulativeCompactionPolicy* cumulative_compaction_policy(); bool enable_unique_key_merge_on_write() const; + bool enable_unique_key_replace_if_not_null() const; // properties encapsulated in TabletSchema KeysType keys_type() const; @@ -434,10 +435,11 @@ class Tablet : public BaseTablet { const TabletColumn& tablet_column, vectorized::MutableColumnPtr& dst); - Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, - const std::vector& rowids, - const std::vector& cids, - vectorized::Block& block); + Status fetch_value_through_row_column( + RowsetSharedPtr input_rowset, uint32_t segid, const std::vector& rids, + const std::vector& rows_info, const std::vector* cids_full_read, + const std::vector* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, bool with_point_read = true); // calc delete bitmap when flush memtable, use a fake version to calc // For example, cur max version is 5, and we use version 6 to calc but @@ -445,35 +447,46 @@ class Tablet : public BaseTablet { // for rowset 6-7. Also, if a compaction happens between commit_txn and // publish_txn, we should remove compaction input rowsets' delete_bitmap // and build newly generated rowset's delete_bitmap - Status calc_delete_bitmap(RowsetSharedPtr rowset, - const std::vector& segments, - const std::vector& specified_rowsets, - DeleteBitmapPtr delete_bitmap, int64_t version, - CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); + Status calc_delete_bitmap( + RowsetSharedPtr rowset, const std::vector& segments, + const std::vector& specified_rowsets, DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t version, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); std::vector get_rowset_by_ids( const RowsetIdUnorderedSet* specified_rowset_ids); - Status calc_segment_delete_bitmap(RowsetSharedPtr rowset, - const segment_v2::SegmentSharedPtr& seg, - const std::vector& specified_rowsets, - DeleteBitmapPtr delete_bitmap, int64_t end_version, - RowsetWriter* rowset_writer); + Status calc_segment_delete_bitmap( + RowsetSharedPtr rowset, const segment_v2::SegmentSharedPtr& seg, + const std::vector& specified_rowsets, DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t end_version, RowsetWriter* rowset_writer); Status calc_delete_bitmap_between_segments( RowsetSharedPtr rowset, const std::vector& segments, DeleteBitmapPtr delete_bitmap); Status read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector cids_to_read, - const PartialUpdateReadPlan& read_plan, const std::map& rsid_to_rowset, - vectorized::Block& block, std::map* read_index); - void prepare_to_read(const RowLocation& row_location, size_t pos, - PartialUpdateReadPlan* read_plan); + const PartialUpdateReadPlan& read_plan, + const std::vector* cids_full_read, + vectorized::Block* block_full_read, + std::map* missing_cols_read_index); + // with point read + Status read_columns_by_plan( + TabletSchemaSPtr tablet_schema, + const std::map& rsid_to_rowset, + const PartialUpdateReadPlan& read_plan, const std::vector* cids_full_read, + const std::vector* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, + std::map* missing_cols_read_index, + std::map>* parital_update_cols_read_index); + void prepare_to_read(PartialUpdateReadPlan& read_plan, const RowLocation& row_location, + uint32_t pos, const std::vector& partial_update_cids); Status generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map& rsid_to_rowset, + std::shared_ptr>> indicator_maps, vectorized::Block* output_block); Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset); @@ -484,10 +497,11 @@ class Tablet : public BaseTablet { const std::vector& segments, int64_t txn_id, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); - Status update_delete_bitmap(const RowsetSharedPtr& rowset, - const RowsetIdUnorderedSet& pre_rowset_ids, - DeleteBitmapPtr delete_bitmap, int64_t txn_id, - RowsetWriter* rowset_writer = nullptr); + Status update_delete_bitmap( + const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids, + DeleteBitmapPtr delete_bitmap, + std::shared_ptr>> indicator_maps, + int64_t txn_id, RowsetWriter* rowset_writer = nullptr); void calc_compaction_output_rowset_delete_bitmap( const std::vector& input_rowsets, const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, @@ -765,6 +779,10 @@ inline bool Tablet::enable_unique_key_merge_on_write() const { return _tablet_meta->enable_unique_key_merge_on_write(); } +inline bool Tablet::enable_unique_key_replace_if_not_null() const { + return _tablet_meta->enable_unique_key_replace_if_not_null(); +} + // TODO(lingbin): Why other methods that need to get information from _tablet_meta // are not locked, here needs a comment to explain. inline size_t Tablet::tablet_footprint() { @@ -859,5 +877,4 @@ inline size_t Tablet::next_unique_id() const { inline size_t Tablet::row_size() const { return _schema->row_size(); } - } // namespace doris diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 614a69430c935c..b8db7a9539956d 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -67,7 +67,10 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl std::move(binlog_config), request.compaction_policy, request.time_series_compaction_goal_size_mbytes, request.time_series_compaction_file_count_threshold, - request.time_series_compaction_time_threshold_seconds); + request.time_series_compaction_time_threshold_seconds, + request.__isset.enable_unique_key_replace_if_not_null + ? request.enable_unique_key_replace_if_not_null + : false); return Status::OK(); } @@ -86,7 +89,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id std::optional binlog_config, std::string compaction_policy, int64_t time_series_compaction_goal_size_mbytes, int64_t time_series_compaction_file_count_threshold, - int64_t time_series_compaction_time_threshold_seconds) + int64_t time_series_compaction_time_threshold_seconds, + bool enable_unique_key_replace_if_not_null) : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap(tablet_id)) { @@ -106,6 +110,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id ? TabletTypePB::TABLET_TYPE_DISK : TabletTypePB::TABLET_TYPE_MEMORY); tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write); + tablet_meta_pb.set_enable_unique_key_replace_if_not_null(enable_unique_key_replace_if_not_null); tablet_meta_pb.set_storage_policy_id(storage_policy_id); tablet_meta_pb.set_compaction_policy(compaction_policy); tablet_meta_pb.set_time_series_compaction_goal_size_mbytes( @@ -524,6 +529,11 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write(); } + if (tablet_meta_pb.has_enable_unique_key_replace_if_not_null()) { + _enable_unique_key_replace_if_not_null = + tablet_meta_pb.enable_unique_key_replace_if_not_null(); + } + // init _rs_metas for (auto& it : tablet_meta_pb.rs_metas()) { RowsetMetaSharedPtr rs_meta(new RowsetMeta()); @@ -635,6 +645,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { } tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write); + tablet_meta_pb->set_enable_unique_key_replace_if_not_null( + _enable_unique_key_replace_if_not_null); if (_enable_unique_key_merge_on_write) { std::set stale_rs_ids; diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 42fec6489b2a14..c25506a94c98b8 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -111,7 +111,8 @@ class TabletMeta { std::string compaction_policy = "size_based", int64_t time_series_compaction_goal_size_mbytes = 1024, int64_t time_series_compaction_file_count_threshold = 2000, - int64_t time_series_compaction_time_threshold_seconds = 3600); + int64_t time_series_compaction_time_threshold_seconds = 3600, + bool enable_unique_key_if_not_null = false); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; @@ -227,6 +228,10 @@ class TabletMeta { bool enable_unique_key_merge_on_write() const { return _enable_unique_key_merge_on_write; } + bool enable_unique_key_replace_if_not_null() const { + return _enable_unique_key_replace_if_not_null; + } + // TODO(Drogon): thread safety const BinlogConfig& binlog_config() const { return _binlog_config; } void set_binlog_config(BinlogConfig binlog_config) { @@ -297,6 +302,7 @@ class TabletMeta { // which can avoid the merging cost in read stage, and accelerate the aggregation // query performance significantly. bool _enable_unique_key_merge_on_write = false; + bool _enable_unique_key_replace_if_not_null = false; std::shared_ptr _delete_bitmap; // binlog config diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 6f25b20e650eff..38f11df60e669b 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1104,18 +1104,16 @@ vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const { } vectorized::Block TabletSchema::create_missing_columns_block() { - vectorized::Block block; - for (const auto& cid : _missing_cids) { - auto col = _cols[cid]; - auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col); - block.insert({data_type->create_column(), data_type, col.name()}); - } - return block; + return create_block_by_cids(_missing_cids); } vectorized::Block TabletSchema::create_update_columns_block() { + return create_block_by_cids(_update_cids); +} + +vectorized::Block TabletSchema::create_block_by_cids(const std::vector& cids) { vectorized::Block block; - for (const auto& cid : _update_cids) { + for (const auto& cid : cids) { auto col = _cols[cid]; auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col); block.insert({data_type->create_column(), data_type, col.name()}); diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 72d0636f6e472a..393adcfcdf88f1 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -353,6 +353,7 @@ class TabletSchema { vectorized::Block create_missing_columns_block(); vectorized::Block create_update_columns_block(); + vectorized::Block create_block_by_cids(const std::vector& cids); void set_partial_update_info(bool is_partial_update, const std::set& partial_update_input_columns); bool is_partial_update() const { return _is_partial_update; } @@ -363,8 +364,12 @@ class TabletSchema { } void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode = is_strict_mode; } bool is_strict_mode() const { return _is_strict_mode; } - std::vector get_missing_cids() const { return _missing_cids; } - std::vector get_update_cids() const { return _update_cids; } + void set_is_unique_key_replace_if_not_null(bool is_unique_key_replace_if_not_null) { + _is_unique_key_replace_if_not_null = is_unique_key_replace_if_not_null; + } + bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } + std::vector get_missing_cids() { return _missing_cids; } + std::vector get_update_cids() { return _update_cids; } private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); @@ -411,6 +416,7 @@ class TabletSchema { // to generate a new row, only available in non-strict mode bool _can_insert_new_rows_in_partial_update = true; bool _is_strict_mode = false; + bool _is_unique_key_replace_if_not_null = false; }; bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 0a3a0d27a85627..f9ac99b309ad59 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -207,6 +207,40 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, } } +void TxnManager::set_txn_related_delete_bitmap_and_indicator_maps( + TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, + TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + std::shared_ptr>> indicator_maps) { + pair key(partition_id, transaction_id); + TabletInfo tablet_info(tablet_id, tablet_uid); + + std::lock_guard txn_lock(_get_txn_lock(transaction_id)); + { + // get tx + std::lock_guard wrlock(_get_txn_map_lock(transaction_id)); + txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); + auto it = txn_tablet_map.find(key); + if (it == txn_tablet_map.end()) { + LOG(WARNING) << "transaction_id: " << transaction_id + << " partition_id: " << partition_id << " may be cleared"; + return; + } + auto load_itr = it->second.find(tablet_info); + if (load_itr == it->second.end()) { + LOG(WARNING) << "transaction_id: " << transaction_id + << " partition_id: " << partition_id << " tablet_id: " << tablet_id + << " may be cleared"; + return; + } + TabletTxnInfo& load_info = load_itr->second; + load_info.unique_key_merge_on_write = unique_key_merge_on_write; + load_info.delete_bitmap = delete_bitmap; + load_info.rowset_ids = rowset_ids; + load_info.indicator_maps = indicator_maps; + } +} + Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, @@ -359,9 +393,9 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, tablet->create_transient_rowset_writer(rowset, &rowset_writer); int64_t t2 = MonotonicMicros(); - RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset, tablet_txn_info.rowset_ids, - tablet_txn_info.delete_bitmap, transaction_id, - rowset_writer.get())); + RETURN_IF_ERROR(tablet->update_delete_bitmap( + rowset, tablet_txn_info.rowset_ids, tablet_txn_info.delete_bitmap, + tablet_txn_info.indicator_maps, transaction_id, rowset_writer.get())); int64_t t3 = MonotonicMicros(); stats->calc_delete_bitmap_time_us = t3 - t2; if (rowset->tablet_schema()->is_partial_update()) { diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index c311fed8799461..91022ba0fb1fa1 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -59,6 +59,7 @@ struct TabletTxnInfo { RowsetIdUnorderedSet rowset_ids; int64_t creation_time; bool ingest {false}; + std::shared_ptr>> indicator_maps = nullptr; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} @@ -186,6 +187,11 @@ class TxnManager { bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids); + void set_txn_related_delete_bitmap_and_indicator_maps( + TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, + TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, + const RowsetIdUnorderedSet& rowset_ids, + std::shared_ptr>> indicator_maps); void get_all_commit_tablet_txn_info_by_tablet( const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp index c7b98095bbb65c..b280ce1566fe98 100644 --- a/be/src/vec/jsonb/serialize.cpp +++ b/be/src/vec/jsonb/serialize.cpp @@ -78,6 +78,27 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const ColumnString& jsonb_column, + const std::unordered_map& col_uid_to_idx_full_read, + const std::unordered_map>& + col_uid_to_idx_cid_point_read, + const std::vector& rows_info, Block& block_full_read, + Block& block_point_read) { + DCHECK(jsonb_column.size() == rows_info.size()); + for (int i = 0; i < jsonb_column.size(); ++i) { + StringRef jsonb_data = jsonb_column.get_data_at(i); + std::unordered_map point_read_cids; + for (uint32_t cid : rows_info[i].cids) { + point_read_cids.emplace(cid, false); + } + jsonb_to_block(serdes_full_read, serdes_point_read, jsonb_data.data, jsonb_data.size, + col_uid_to_idx_full_read, col_uid_to_idx_cid_point_read, point_read_cids, + block_full_read, block_point_read); + } +} + // single row void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, size_t size, @@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map& col_uid_to_idx_full_read, + const std::unordered_map>& + col_uid_to_idx_cid_point_read, + std::unordered_map& point_read_cids, Block& block_full_read, + Block& block_point_read) { + JsonbDocument& doc = *JsonbDocument::createDocument(data, size); + size_t full_read_filled_columns = 0; + for (auto it = doc->begin(); it != doc->end(); ++it) { + auto col_it = col_uid_to_idx_full_read.find(it->getKeyId()); + if (col_it != col_uid_to_idx_full_read.end()) { + MutableColumnPtr dst_column = + block_full_read.get_by_position(col_it->second).column->assume_mutable(); + serdes_full_read[col_it->second]->read_one_cell_from_jsonb(*dst_column, it->value()); + ++full_read_filled_columns; + } else { + auto it2 = col_uid_to_idx_cid_point_read.find(it->getKeyId()); + if (it2 != col_uid_to_idx_cid_point_read.end()) { + uint32_t cid = it2->second.second; + uint32_t idx = it2->second.first; + auto it3 = point_read_cids.find(cid); + if (it3 != point_read_cids.end()) { + MutableColumnPtr dst_column = + block_point_read.get_by_position(idx).column->assume_mutable(); + serdes_point_read[idx]->read_one_cell_from_jsonb(*dst_column, it->value()); + } + } + } + } + // __DORIS_ROW_STORE_COL__ column + CHECK(full_read_filled_columns + 1 == block_full_read.columns()) + << "full_read_filled_columns=" << full_read_filled_columns + << ", block_full_read.columns():" << block_full_read.columns(); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/jsonb/serialize.h b/be/src/vec/jsonb/serialize.h index 725dbc07072475..8a3e4ca19f040f 100644 --- a/be/src/vec/jsonb/serialize.h +++ b/be/src/vec/jsonb/serialize.h @@ -40,13 +40,30 @@ class JsonbSerializeUtil { public: static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst, int num_cols, const DataTypeSerDeSPtrs& serdes); - // batch rows + // for batch rows static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const ColumnString& jsonb_column, const std::unordered_map& col_id_to_idx, Block& dst); - // single row + static void jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const ColumnString& jsonb_column, + const std::unordered_map& col_uid_to_idx_full_read, + const std::unordered_map>& + col_uid_to_idx_cid_point_read, + const std::vector& rows_info, Block& block_full_read, + Block& block_point_read); + + // for a single row static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, size_t size, const std::unordered_map& col_id_to_idx, Block& dst); + static void jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map& col_uid_to_idx_full_read, + const std::unordered_map>& + col_uid_to_idx_cid_point_read, + std::unordered_map& point_read_cids, Block& block_full_read, + Block& block_point_read); }; } // namespace doris::vectorized \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 8d4af3648bbd69..8a00fb8006d267 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -281,7 +281,9 @@ protected void runPendingJob() throws AlterCancelException { tabletType, null, tbl.getCompressionType(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(), + tbl.getEnableUniqueKeyMergeOnWrite(), + tbl.getEnableUniqueKeyReplaceIfNotNull(), + tbl.getStoragePolicy(), tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index b5a208cadecf30..62d7e0c72188a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -276,7 +276,9 @@ protected void runPendingJob() throws AlterCancelException { tbl.getPartitionInfo().getTabletType(partitionId), null, tbl.getCompressionType(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(), + tbl.getEnableUniqueKeyMergeOnWrite(), + tbl.getEnableUniqueKeyReplaceIfNotNull(), + tbl.getStoragePolicy(), tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index ad1a2f1e527384..ade540e4697427 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -387,7 +387,7 @@ public void analyze(Analyzer analyzer) throws UserException { boolean isInsertStrict = analyzer.getContext().getSessionVariable().getEnableInsertStrict() && !isFromDeleteOrUpdateStmt; sink.init(loadId, transactionId, db.getId(), timeoutSecond, - sendBatchParallelism, false, isInsertStrict); + sendBatchParallelism, false, isInsertStrict, false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 48db491366dd0e..cd2a30766995ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1039,7 +1039,9 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc localTbl.getPartitionInfo().getTabletType(restorePart.getId()), null, localTbl.getCompressionType(), - localTbl.getEnableUniqueKeyMergeOnWrite(), localTbl.getStoragePolicy(), + localTbl.getEnableUniqueKeyMergeOnWrite(), + localTbl.getEnableUniqueKeyReplaceIfNotNull(), + localTbl.getStoragePolicy(), localTbl.disableAutoCompaction(), localTbl.enableSingleReplicaCompaction(), localTbl.skipWriteIndexOnLoad(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 10d5d542737245..92e7b4cc618ebf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2168,6 +2168,17 @@ public boolean getEnableUniqueKeyMergeOnWrite() { return tableProperty.getEnableUniqueKeyMergeOnWrite(); } + public void setEnableUniqueKeyReplaceIfNotNull(boolean enable) { + getOrCreatTableProperty().setEnableUniqueKeyReplaceIfNotNull(enable); + } + + public boolean getEnableUniqueKeyReplaceIfNotNull() { + if (tableProperty == null) { + return false; + } + return tableProperty.getEnableUniqueKeyReplaceIfNotNull(); + } + public boolean isDuplicateWithoutKey() { return getKeysType() == KeysType.DUP_KEYS && getKeysNum() == 0; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index ae8f85dc0c8280..b8e63780f6961c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -454,6 +454,15 @@ public boolean getEnableUniqueKeyMergeOnWrite() { PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false")); } + public void setEnableUniqueKeyReplaceIfNotNull(boolean enable) { + properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL, Boolean.toString(enable)); + } + + public boolean getEnableUniqueKeyReplaceIfNotNull() { + return Boolean.parseBoolean(properties.getOrDefault( + PropertyAnalyzer.ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL, "false")); + } + public void setSequenceMapCol(String colName) { properties.put(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "." + PropertyAnalyzer.PROPERTIES_SEQUENCE_COL, colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index d983c688f95896..710bab3c868a6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -160,6 +160,7 @@ public class PropertyAnalyzer { // For the detail design, see the [DISP-018](https://cwiki.apache.org/confluence/ // display/DORIS/DSIP-018%3A+Support+Merge-On-Write+implementation+for+UNIQUE+KEY+data+model) public static final String ENABLE_UNIQUE_KEY_MERGE_ON_WRITE = "enable_unique_key_merge_on_write"; + public static final String ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL = "enable_unique_key_replace_if_not_null"; private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class); private static final String COMMA_SEPARATOR = ","; private static final double MAX_FPP = 0.05; @@ -1113,6 +1114,24 @@ public static boolean analyzeUniqueKeyMergeOnWrite(Map propertie throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`"); } + public static boolean analyzeUniqueKeyReplaceIfNotNull(Map properties) throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return false; + } + String value = properties.get(PropertyAnalyzer.ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL); + if (value == null) { + return false; + } + properties.remove(PropertyAnalyzer.ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL); + if (value.equals("true")) { + return true; + } else if (value.equals("false")) { + return false; + } + throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_REPLACE_IF_NOT_NULL + + " must be `true` or `false`"); + } + /** * Check the type property of the catalog props. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 2433967da3e801..3de72664e84fe9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1570,10 +1570,10 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet, olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(), singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(), - olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer, - olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), - olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(), - olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getEnableUniqueKeyReplaceIfNotNull(), + storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), + olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), + olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), olapTable.storeRowColumn(), @@ -1824,8 +1824,8 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc, Long versionInfo, Set bfColumns, double bfFpp, Set tabletIdSet, List indexes, boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType, - DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, - IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction, + DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, boolean enableUniqueKeyReplaceIfNotNull, + String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction, boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes, Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds, @@ -1891,8 +1891,8 @@ private Partition createPartitionWithIndices(String clusterName, long dbId, long CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType, - dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy, - disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, + dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, enableUniqueKeyReplaceIfNotNull, + storagePolicy, disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad, compactionPolicy, timeSeriesCompactionGoalSizeMbytes, timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds, storeRowColumn, binlogConfig); @@ -2167,6 +2167,16 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep } olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); + boolean enableUniqueKeyReplaceIfNotNull = false; + if (keysType == KeysType.UNIQUE_KEYS) { + try { + enableUniqueKeyReplaceIfNotNull = PropertyAnalyzer.analyzeUniqueKeyReplaceIfNotNull(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + } + olapTable.setEnableUniqueKeyReplaceIfNotNull(enableUniqueKeyReplaceIfNotNull); + boolean enableSingleReplicaCompaction = false; try { enableSingleReplicaCompaction = PropertyAnalyzer.analyzeEnableSingleReplicaCompaction(properties); @@ -2458,10 +2468,11 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep partitionInfo.getDataProperty(partitionId).getStorageMedium(), partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType, - olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, - idGeneratorBuffer, olapTable.disableAutoCompaction(), - olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, - olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), + olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), + olapTable.getEnableUniqueKeyReplaceIfNotNull(), storagePolicy, idGeneratorBuffer, + olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), + skipWriteIndexOnLoad, olapTable.getCompactionPolicy(), + olapTable.getTimeSeriesCompactionGoalSizeMbytes(), olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), storeRowColumn, binlogConfigForTask, @@ -2528,7 +2539,8 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep dataProperty.getStorageMedium(), partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType, - olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, + olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), + olapTable.getEnableUniqueKeyReplaceIfNotNull(), storagePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(), @@ -2958,6 +2970,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti copiedTbl.isInMemory(), copiedTbl.getStorageFormat(), copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(), copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite(), + copiedTbl.getEnableUniqueKeyReplaceIfNotNull(), olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(), idGeneratorBuffer, olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index d14b5d6d7809f3..2577e059abfefe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -202,7 +202,8 @@ public void plan(TUniqueId loadId, List> fileStatusesLis List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode); + olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode, + table.getEnableUniqueKeyReplaceIfNotNull()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 2e6be68604160a..355d2f73996fb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -806,7 +806,9 @@ private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta olapTable.getPartitionInfo().getTabletType(partitionId), null, olapTable.getCompressionType(), - olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(), + olapTable.getEnableUniqueKeyMergeOnWrite(), + olapTable.getEnableUniqueKeyReplaceIfNotNull(), + olapTable.getStoragePolicy(), olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index f87b79308c32c3..e6affcb545ffda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -154,7 +154,8 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { ctx.getExecTimeout(), ctx.getSessionVariable().getSendBatchParallelism(), false, - isStrictMode); + isStrictMode, + false); sink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState( diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 4ef1aa9015aef4..293dd39931a8ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -105,6 +105,8 @@ public class OlapTableSink extends DataSink { private boolean isStrictMode = false; + private boolean isUniqueKeyReplaceIfNotNull = false; + public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, boolean singleReplicaLoad) { this.dstTable = dstTable; @@ -114,7 +116,8 @@ public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, - taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), + taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), + destTable.getEnableUniqueKeyReplaceIfNotNull()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); @@ -465,8 +466,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, - taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), + taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), + destTable.getEnableUniqueKeyReplaceIfNotNull()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index abb6f6e23d1edc..0b14b8a9d4fcc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -97,6 +97,8 @@ public class CreateReplicaTask extends AgentTask { private boolean enableUniqueKeyMergeOnWrite; + private boolean enableUniqueKeyReplaceIfNotNull; + private boolean disableAutoCompaction; private boolean enableSingleReplicaCompaction; @@ -126,6 +128,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition DataSortInfo dataSortInfo, TCompressionType compressionType, boolean enableUniqueKeyMergeOnWrite, + boolean enableUniqueKeyReplaceIfNotNull, String storagePolicy, boolean disableAutoCompaction, boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad, @@ -160,6 +163,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition this.tabletType = tabletType; this.dataSortInfo = dataSortInfo; this.enableUniqueKeyMergeOnWrite = (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite); + this.enableUniqueKeyReplaceIfNotNull = (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyReplaceIfNotNull); if (storagePolicy != null && !storagePolicy.isEmpty()) { Optional policy = Env.getCurrentEnv().getPolicyMgr() .findPolicy(storagePolicy, PolicyTypeEnum.STORAGE); @@ -310,6 +314,7 @@ public TCreateTabletReq toThrift() { createTabletReq.setTabletType(tabletType); createTabletReq.setCompressionType(compressionType); createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite); + createTabletReq.setEnableUniqueKeyReplaceIfNotNull(enableUniqueKeyReplaceIfNotNull); createTabletReq.setCompactionPolicy(compactionPolicy); createTabletReq.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes); createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java index dc98026a00eb2c..44a0f0292877cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java @@ -107,7 +107,7 @@ public void testSinglePartition() throws UserException { new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM)); dstTable.getPartitionInfo().setIsMutable(partition.getId(), true); OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -144,7 +144,7 @@ public void testRangePartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); try { sink.complete(null); } catch (UserException e) { @@ -169,7 +169,7 @@ public void testRangeUnknownPartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); sink.complete(null); LOG.info("sink is {}", sink.toThrift()); LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL)); @@ -206,7 +206,7 @@ public void testListPartition( }; OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false); - sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false); + sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false); try { sink.complete(null); } catch (UserException e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 8ef71047377025..127ec437bc099f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,7 +107,7 @@ public void setUp() throws AnalysisException { createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, false, null); + TCompressionType.LZ4F, false, false, "", false, false, false, "", 0, 0, 0, false, null); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index abebf8fde5e6a1..0e6b32fa4c00f8 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -67,5 +67,6 @@ message POlapTableSchemaParam { optional bool partial_update = 7; repeated string partial_update_input_columns = 8; optional bool is_strict_mode = 9 [default = false]; + optional bool is_unique_key_replace_if_not_null = 10 [default = false]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index fe5f76f6b4f18c..3d288d42b388b9 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -333,6 +333,7 @@ message TabletMetaPB { optional int64 time_series_compaction_goal_size_mbytes = 29 [default = 1024]; optional int64 time_series_compaction_file_count_threshold = 30 [default = 2000]; optional int64 time_series_compaction_time_threshold_seconds = 31 [default = 3600]; + optional bool enable_unique_key_replace_if_not_null = 32 [default = false]; } message OLAPRawDeltaHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 9f0755c7356915..cfb6d6edd39bf3 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -147,6 +147,7 @@ struct TCreateTabletReq { 23: optional i64 time_series_compaction_goal_size_mbytes = 1024 24: optional i64 time_series_compaction_file_count_threshold = 2000 25: optional i64 time_series_compaction_time_threshold_seconds = 3600 + 26: optional bool enable_unique_key_replace_if_not_null = false } struct TDropTabletReq { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index fa391febdaef22..d662d3f4ffc59d 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -230,6 +230,7 @@ struct TOlapTableSchemaParam { 8: optional bool is_partial_update 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false; + 11: optional bool is_unique_key_replace_if_not_null = false; } struct TTabletLocation { diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace1.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace1.csv new file mode 100644 index 00000000000000..df7154c3398e27 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace1.csv @@ -0,0 +1,4 @@ +4,444 +2,222 +3,333 +5,555 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace2.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace2.csv new file mode 100644 index 00000000000000..0036deb3556130 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace2.csv @@ -0,0 +1,4 @@ +1,10,10,10 +2,null,20,20 +3,30,30,null +5,50,null,50 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace3.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace3.csv new file mode 100644 index 00000000000000..86b4fc8ba15966 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace3.csv @@ -0,0 +1,3 @@ +1,12345,, +2,,23456, +3,,,34567 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace4.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace4.csv new file mode 100644 index 00000000000000..9e4ca40d006133 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace4.csv @@ -0,0 +1,4 @@ +1,,, +2,,, +3,,, +5,,, diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace5.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace5.csv new file mode 100644 index 00000000000000..32198a6a6cc70f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace5.csv @@ -0,0 +1,5 @@ +1,null,null,null +3,null,null,null +2,null,null,null +4,null,null,null +5,null,null,null diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace6.csv b/regression-test/data/unique_with_mow_p0/partial_update/replace6.csv new file mode 100644 index 00000000000000..350e46481a5979 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace6.csv @@ -0,0 +1,3 @@ +3,,-3,,, +4,,-4,,, +5,-50,,-5,,-50 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace7.json b/regression-test/data/unique_with_mow_p0/partial_update/replace7.json new file mode 100644 index 00000000000000..3ad36cbd1a7044 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace7.json @@ -0,0 +1,4 @@ +{"k": 2, "v5": 987654321} +{"k": 3, "v4": 987654321, "v5": 8888888} +{"k": 4, "v3": 987654321, "v4": 3333333, "v5": 4444444} +{"k": 5, "v2": 987654321, "v4": 2222222} \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/replace8.json b/regression-test/data/unique_with_mow_p0/partial_update/replace8.json new file mode 100644 index 00000000000000..0692da42674781 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/replace8.json @@ -0,0 +1,4 @@ +{"k": 5, "v1": 0} +{"k": 1, "v4": 0, "v5": 0} +{"k": 4, "v3": 0, "v1": 0, "v5": 0} +{"k": 2, "v2": 0, "v4": 0} \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.out new file mode 100644 index 00000000000000..a07d11f6c0c9f7 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.out @@ -0,0 +1,64 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 1 +2 2 2 2 2 2 +3 3 3 3 3 3 +4 4 4 4 4 4 +5 5 5 5 5 5 + +-- !sql -- +1 1 1 1 1 1 +2 222 2 2 2 2 +3 333 3 3 3 3 +4 444 4 4 4 4 +5 555 5 5 5 5 + +-- !sql -- +1 10 1 10 1 10 +2 222 2 20 2 20 +3 30 3 30 3 3 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 -3 30 3 34567 +4 444 -4 4 4 4 +5 -50 5 -5 5 -50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 987654321 +3 30 -3 30 987654321 8888888 +4 444 -4 987654321 3333333 4444444 +5 -50 987654321 -5 2222222 -50 + +-- !sql -- +1 12345 1 10 0 0 +2 222 0 23456 0 987654321 +3 30 -3 30 987654321 8888888 +4 0 -4 0 3333333 0 +5 0 987654321 -5 2222222 -50 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.out new file mode 100644 index 00000000000000..a07d11f6c0c9f7 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.out @@ -0,0 +1,64 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 1 +2 2 2 2 2 2 +3 3 3 3 3 3 +4 4 4 4 4 4 +5 5 5 5 5 5 + +-- !sql -- +1 1 1 1 1 1 +2 222 2 2 2 2 +3 333 3 3 3 3 +4 444 4 4 4 4 +5 555 5 5 5 5 + +-- !sql -- +1 10 1 10 1 10 +2 222 2 20 2 20 +3 30 3 30 3 3 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 3 30 3 34567 +4 444 4 4 4 4 +5 50 5 5 5 50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 20 +3 30 -3 30 3 34567 +4 444 -4 4 4 4 +5 -50 5 -5 5 -50 + +-- !sql -- +1 12345 1 10 1 10 +2 222 2 23456 2 987654321 +3 30 -3 30 987654321 8888888 +4 444 -4 987654321 3333333 4444444 +5 -50 987654321 -5 2222222 -50 + +-- !sql -- +1 12345 1 10 0 0 +2 222 0 23456 0 987654321 +3 30 -3 30 987654321 8888888 +4 0 -4 0 3333333 0 +5 0 987654321 -5 2222222 -50 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.groovy new file mode 100644 index 00000000000000..a499367f8fad37 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null.groovy @@ -0,0 +1,176 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_replace_if_not_null", "p0") { + + def tableName = "test_partial_update_replace_if_not_null1" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `k` int(11) NULL, + `v1` BIGINT NULL, + `v2` BIGINT NULL, + `v3` BIGINT NULL, + `v4` BIGINT NULL, + `v5` BIGINT NULL + ) ENGINE = OLAP UNIQUE KEY(`k`) + COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "disable_auto_compaction" = "true", + "enable_unique_key_replace_if_not_null" = "true" + ); + """ + sql """insert into ${tableName} values(1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3),(4,4,4,4,4,4),(5,5,5,5,5,5);""" + qt_sql """select * from ${tableName} order by k;""" + + // test the normal partial update stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1' + set 'strict_mode', 'false' + + file 'replace1.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with explict null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace2.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with implicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace3.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // corner case: read old values for all columns with explicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace4.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // corner case: read old values for all columns with implicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace5.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // csv file, include all columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v2,v3,v4,v5' + set 'strict_mode', 'false' + + file 'replace6.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with json file + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'json' + set 'partial_columns', 'true' + set 'columns', 'k,v2,v3,v4,v5' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + + file 'replace7.json' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // json file, include all columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'json' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v2,v3,v4,v5' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + + file 'replace8.json' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + sql """ DROP TABLE IF EXISTS ${tableName};""" +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.groovy new file mode 100644 index 00000000000000..51638e2b0b3b64 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_replace_if_not_null_row_store.groovy @@ -0,0 +1,177 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_replace_if_not_null_row_store", "p0") { + + def tableName = "test_partial_update_replace_if_not_null_row_store1" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `k` int(11) NULL, + `v1` BIGINT NULL, + `v2` BIGINT NULL, + `v3` BIGINT NULL, + `v4` BIGINT NULL, + `v5` BIGINT NULL + ) ENGINE = OLAP UNIQUE KEY(`k`) + COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "disable_auto_compaction" = "true", + "enable_unique_key_replace_if_not_null" = "true", + "store_row_column" = "true" + ); + """ + sql """insert into ${tableName} values(1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3),(4,4,4,4,4,4),(5,5,5,5,5,5);""" + qt_sql """select * from ${tableName} order by k;""" + + // test the normal partial update stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1' + set 'strict_mode', 'false' + + file 'replace1.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with explict null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace2.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with implicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace3.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // corner case: read old values for all columns with explicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace4.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // corner case: read old values for all columns with implicit null values + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v3,v5' + set 'strict_mode', 'false' + + file 'replace5.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // csv file, include all columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v2,v3,v4,v5' + set 'strict_mode', 'false' + + file 'replace6.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // test the partial update stream load with json file + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'json' + set 'partial_columns', 'true' + set 'columns', 'k,v2,v3,v4,v5' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + + file 'replace7.json' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + // json file, include all columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'json' + set 'partial_columns', 'true' + set 'columns', 'k,v1,v2,v3,v4,v5' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + + file 'replace8.json' + time 10000 // limit inflight 10s + } + sql "sync" + qt_sql """select * from ${tableName} order by k;""" + + sql """ DROP TABLE IF EXISTS ${tableName};""" +}