diff --git a/be/src/olap/check_primary_keys_executor.cpp b/be/src/olap/check_primary_keys_executor.cpp index d6c79c45b429556..1a95bee33a4e5b3 100644 --- a/be/src/olap/check_primary_keys_executor.cpp +++ b/be/src/olap/check_primary_keys_executor.cpp @@ -44,6 +44,27 @@ Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan }); } +Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + segment_v2::SegmentWriter* segment_writer, + std::vector* key_columns, + uint32_t row_pos) { + { + std::shared_lock rlock(_mutex); + RETURN_IF_ERROR(_status); + } + return _thread_token->submit_func([=, this]() { + auto st = tablet->check_primary_keys_consistency(read_plan, rsid_to_rowset, segment_writer, + key_columns, row_pos); + if (!st.ok()) { + std::lock_guard wlock(_mutex); + if (_status.ok()) { + _status = st; + } + } + }); +} + Status CheckPrimaryKeysToken::wait() { _thread_token->wait(); return _status; diff --git a/be/src/olap/check_primary_keys_executor.h b/be/src/olap/check_primary_keys_executor.h index 8d098fe554b1158..8a12b63b87bbcc6 100644 --- a/be/src/olap/check_primary_keys_executor.h +++ b/be/src/olap/check_primary_keys_executor.h @@ -24,8 +24,10 @@ #include "common/status.h" #include "olap/rowset/rowset.h" +#include "olap/rowset/segment_v2/segment_writer.h" #include "olap/tablet_meta.h" #include "util/threadpool.h" +#include "vec/olap/olap_data_convertor.h" namespace doris { @@ -39,6 +41,10 @@ class CheckPrimaryKeysToken { Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, const std::map* rsid_to_rowset, std::unordered_map* pk_entries, bool with_seq_col); + Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + segment_v2::SegmentWriter* segment_writer, + std::vector* key_columns, uint32_t row_pos); Status wait(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index fece46468d9d059..d3c90815d71b866 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -394,8 +394,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector> segment_caches(specified_rowsets.size()); // locate rows in base data - std::unordered_map pk_entries; - int64_t num_rows_filtered = 0; for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { // block segment @@ -406,7 +404,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // here row_pos = 2, num_rows = 4. size_t delta_pos = block_pos - row_pos; size_t segment_pos = segment_start_pos + delta_pos; - std::string key = _full_encode_keys(key_columns, delta_pos); + std::string key = full_encode_keys(key_columns, delta_pos); if (have_input_seq_column) { _encode_seq_column(seq_column, delta_pos, &key); } @@ -464,10 +462,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* use_default_or_null_flag.emplace_back(false); _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); _tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid); - - if (config::enable_check_primary_keys) { - pk_entries.emplace(block_pos, key); - } } if (st.is()) { @@ -492,8 +486,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* auto mutable_full_columns = full_block.mutate_columns(); auto token = StorageEngine::instance()->check_primary_keys_executor()->create_token(); if (config::enable_check_primary_keys) { - RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, &pk_entries, - have_input_seq_column)); + RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, this, + &key_columns, row_pos)); } RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, segment_start_pos)); @@ -541,7 +535,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* _num_rows_written, row_pos, _primary_key_index_builder->num_rows()); } for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { - std::string key = _full_encode_keys(key_columns, block_pos - row_pos); + std::string key = full_encode_keys(key_columns, block_pos - row_pos); _encode_seq_column(seq_column, block_pos - row_pos, &key); RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); } @@ -747,7 +741,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po // create primary indexes std::string last_key; for (size_t pos = 0; pos < num_rows; pos++) { - std::string key = _full_encode_keys(key_columns, pos); + std::string key = full_encode_keys(key_columns, pos); if (_tablet_schema->has_sequence_col()) { _encode_seq_column(seq_column, pos, &key); } @@ -761,8 +755,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } else { // create short key indexes' // for min_max key - set_min_key(_full_encode_keys(key_columns, 0)); - set_max_key(_full_encode_keys(key_columns, num_rows - 1)); + set_min_key(full_encode_keys(key_columns, 0)); + set_max_key(full_encode_keys(key_columns, num_rows - 1)); key_columns.resize(_num_short_key_columns); for (const auto pos : short_key_pos) { @@ -788,7 +782,7 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) { return std::min(size_rows, count_rows); } -std::string SegmentWriter::_full_encode_keys( +std::string SegmentWriter::full_encode_keys( const std::vector& key_columns, size_t pos, bool null_first) { assert(_key_index_size.size() == _num_key_columns); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 9133712a8c4ca6b..b1885519b587386 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -131,6 +131,10 @@ class SegmentWriter { Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector& use_default_or_null_flag, bool has_default_or_nullable, const size_t& segment_start_pos); + // used for unique-key with merge on write and segment min_max key + std::string full_encode_keys( + const std::vector& key_columns, size_t pos, + bool null_first = true); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -150,9 +154,7 @@ class SegmentWriter { std::string _encode_keys(const std::vector& key_columns, size_t pos, bool null_first = true); // used for unique-key with merge on write and segment min_max key - std::string _full_encode_keys( - const std::vector& key_columns, size_t pos, - bool null_first = true); + // used for unique-key with merge on write void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos, string* encoded_keys); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5c7316dbdede11d..b2edb1d178580c9 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3803,7 +3803,6 @@ Status Tablet::check_primary_keys_consistency( const PartialUpdateReadPlan* read_plan, const std::map* rsid_to_rowset, std::unordered_map* pk_entries, bool with_seq_col) { - LOG(INFO) << "check_primary_keys_consistency"; size_t count = 0; for (auto& [rowset_id, segment_read_info] : *read_plan) { for (auto& [segment_id, rows_info] : segment_read_info) { @@ -3828,9 +3827,7 @@ Status Tablet::check_primary_keys_consistency( LOG(INFO) << fmt::format( "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", segment->rowset_id().to_string(), segment->id()); - LOG(INFO) << "[check_primary_keys_consistency] before load index"; RETURN_IF_ERROR(segment->load_index()); - LOG(INFO) << "[check_primary_keys_consistency] after load index"; auto pk_index = segment->get_primary_key_index(); std::unique_ptr iter; RETURN_IF_ERROR(pk_index->new_iterator(&iter)); @@ -3840,13 +3837,11 @@ Status Tablet::check_primary_keys_consistency( size_t idx = 0; for (auto [rowid, pos] : rows_info) { - LOG(INFO) << fmt::format("begin to fetch pk at pos {}", pos); RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); size_t num_read = 1; RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); CHECK(num_read == 1); std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); - LOG(INFO) << fmt::format("fetched previous pk at {}: {}", pos, prev_pk_entry); std::string cur_pk_entry = pk_entries->at(pos); Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); @@ -3878,14 +3873,80 @@ Status Tablet::check_primary_keys_consistency( "in read plan is {}", pk_entries->size(), count); } - LOG(INFO) << "[check_primary_keys_consistency] finish"; + return Status::OK(); +} + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + segment_v2::SegmentWriter* segment_writer, + std::vector* key_columns, uint32_t row_pos) { + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast(rowset_iter->second); + CHECK(rowset); + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + + segment_v2::SegmentSharedPtr segment = *it; + LOG(INFO) << fmt::format( + "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", + segment->rowset_id().to_string(), segment->id()); + RETURN_IF_ERROR(segment->load_index()); + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_index->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); + std::string cur_pk_entry = + segment_writer->full_encode_keys(*key_columns, pos - row_pos); + Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); + Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); + int result = 0; + // always ignore the seq col + if (tablet_schema->has_sequence_col()) { + auto seq_col_length = + tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1; + key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length); + } + result = key1.compare(key2); + if (result != 0) { + LOG(WARNING) << fmt::format( + "check primary keys consistency failed, pk at pos {} in current " + "block is {}, but in previous conflict segment is {}!", + pos, key2.to_string(), key1.to_string()); + return Status::InternalError("check primary keys consistency failed"); + } + } + } + } return Status::OK(); } Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan, const std::map* rsid_to_rowset, std::unordered_map* pk_entries) { - LOG(INFO) << "fetch_pk_entries"; for (auto& [rowset_id, segment_read_info] : *read_plan) { for (auto& [segment_id, rows_info] : segment_read_info) { auto rowset_iter = rsid_to_rowset->find(rowset_id); @@ -3905,7 +3966,6 @@ Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan, } segment_v2::SegmentSharedPtr segment = *it; - LOG(WARNING) << "[pk check]read pk"; RETURN_IF_ERROR(segment->load_index()); auto pk_index = segment->get_primary_key_index(); std::unique_ptr iter; @@ -3916,19 +3976,15 @@ Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan, size_t idx = 0; for (auto [rowid, pos] : rows_info) { - LOG(INFO) << "begin to fetch pk at [rowset_id:" << segment->rowset_id() - << "][segment_id:" << segment->id() << "][row_id:" << rowid << "]"; RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); size_t num_read = 1; RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); CHECK(num_read == 1); std::string pk_entry = index_column->get_data_at(idx++).to_string(); - LOG(INFO) << "fetched pk: " << pk_entry; pk_entries->emplace(pos, pk_entry); } } } - LOG(WARNING) << "[pk check]finish fetch pk"; return Status::OK(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 87ea2018834162a..7616afb69488fdd 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -55,6 +55,7 @@ #include "util/metrics.h" #include "util/once.h" #include "util/slice.h" +#include "vec/olap/olap_data_convertor.h" namespace doris { @@ -75,7 +76,9 @@ class CalcDeleteBitmapToken; enum CompressKind : int; class RowsetBinlogMetasPB; class CheckPrimaryKeysToken; - +namespace segment_v2 { +class SegmentWriter; +} namespace io { class RemoteFileSystem; } // namespace io @@ -569,6 +572,11 @@ class Tablet : public BaseTablet { const std::map* rsid_to_rowset, std::unordered_map* pk_entries, bool with_seq_col); + Status check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + segment_v2::SegmentWriter* segment_writer, + std::vector* key_columns, uint32_t row_pos); private: Status _init_once_action();