From 4397b2f0eb92474bf8301c7d0bb655a4bd3df04a Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 24 Aug 2023 18:22:52 +0800 Subject: [PATCH] update --- be/src/common/config.cpp | 4 + be/src/common/config.h | 4 + be/src/olap/check_primary_keys_executor.cpp | 64 +++++++ be/src/olap/check_primary_keys_executor.h | 68 +++++++ .../olap/rowset/segment_v2/segment_writer.cpp | 15 ++ be/src/olap/storage_engine.cpp | 4 + be/src/olap/storage_engine.h | 5 + be/src/olap/tablet.cpp | 167 ++++++++++++++++-- be/src/olap/tablet.h | 9 + .../test_partial_update_parallel.groovy | 2 +- 10 files changed, 330 insertions(+), 12 deletions(-) create mode 100644 be/src/olap/check_primary_keys_executor.cpp create mode 100644 be/src/olap/check_primary_keys_executor.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a5ba48d337ff8ae..94e12f37f75dd5d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1102,6 +1102,10 @@ DEFINE_Bool(exit_on_exception, "false"); DEFINE_String(doris_cgroup_cpu_path, ""); DEFINE_Bool(enable_cpu_hard_limit, "false"); +DEFINE_mBool(enable_check_primary_keys, "true"); + +DEFINE_Int32(check_primary_keys_max_thread, "16"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 3010cf497632d17..613f9a4ec12de9b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1172,6 +1172,10 @@ DECLARE_mBool(exit_on_exception); DECLARE_String(doris_cgroup_cpu_path); DECLARE_Bool(enable_cpu_hard_limit); +DECLARE_mBool(enable_check_primary_keys); + +DECLARE_Int32(check_primary_keys_max_thread); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/check_primary_keys_executor.cpp b/be/src/olap/check_primary_keys_executor.cpp new file mode 100644 index 000000000000000..d6c79c45b429556 --- /dev/null +++ b/be/src/olap/check_primary_keys_executor.cpp @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/check_primary_keys_executor.h" + +#include "common/config.h" +#include "common/logging.h" +#include "olap/tablet.h" + +namespace doris { +using namespace ErrorCode; + +Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries, + bool with_seq_col) { + { + std::shared_lock rlock(_mutex); + RETURN_IF_ERROR(_status); + } + return _thread_token->submit_func([=, this]() { + auto st = tablet->check_primary_keys_consistency(read_plan, rsid_to_rowset, pk_entries, + with_seq_col); + if (!st.ok()) { + std::lock_guard wlock(_mutex); + if (_status.ok()) { + _status = st; + } + } + }); +} + +Status CheckPrimaryKeysToken::wait() { + _thread_token->wait(); + return _status; +} + +void CheckPrimaryKeysExecutor::init() { + ThreadPoolBuilder("TabletCheckPrimaryKeysThreadPool") + .set_min_threads(1) + .set_max_threads(config::check_primary_keys_max_thread) + .build(&_thread_pool); +} + +std::unique_ptr CheckPrimaryKeysExecutor::create_token() { + return std::make_unique( + _thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)); +} + +} // namespace doris diff --git a/be/src/olap/check_primary_keys_executor.h b/be/src/olap/check_primary_keys_executor.h new file mode 100644 index 000000000000000..8d098fe554b1158 --- /dev/null +++ b/be/src/olap/check_primary_keys_executor.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "olap/rowset/rowset.h" +#include "olap/tablet_meta.h" +#include "util/threadpool.h" + +namespace doris { + +class Tablet; + +class CheckPrimaryKeysToken { +public: + explicit CheckPrimaryKeysToken(std::unique_ptr thread_token) + : _thread_token(std::move(thread_token)), _status(Status::OK()) {} + + Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries, bool with_seq_col); + + Status wait(); + + void cancel() { _thread_token->shutdown(); } + + Status get_delete_bitmap(DeleteBitmapPtr res_bitmap); + +private: + std::unique_ptr _thread_token; + std::shared_mutex _mutex; + Status _status; +}; + +class CheckPrimaryKeysExecutor { +public: + CheckPrimaryKeysExecutor() = default; + ~CheckPrimaryKeysExecutor() { _thread_pool->shutdown(); } + + void init(); + + std::unique_ptr create_token(); + +private: + std::unique_ptr _thread_pool; +}; + +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 9508557d0ef25b4..fece46468d9d059 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -44,6 +44,7 @@ #include "olap/rowset/segment_v2/page_pointer.h" #include "olap/segment_loader.h" #include "olap/short_key_index.h" +#include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "olap/utils.h" #include "runtime/memory/mem_tracker.h" @@ -393,6 +394,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector> segment_caches(specified_rowsets.size()); // locate rows in base data + std::unordered_map pk_entries; + int64_t num_rows_filtered = 0; for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { // block segment @@ -461,6 +464,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* use_default_or_null_flag.emplace_back(false); _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); _tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid); + + if (config::enable_check_primary_keys) { + pk_entries.emplace(block_pos, key); + } } if (st.is()) { @@ -483,8 +490,16 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // read and fill block auto mutable_full_columns = full_block.mutate_columns(); + auto token = StorageEngine::instance()->check_primary_keys_executor()->create_token(); + if (config::enable_check_primary_keys) { + RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, &pk_entries, + have_input_seq_column)); + } RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, segment_start_pos)); + if (config::enable_check_primary_keys) { + RETURN_IF_ERROR(token->wait()); + } // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index f17a6de84146cd8..1451b836c6c2846 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -123,6 +123,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _calc_delete_bitmap_executor(nullptr), + _check_primary_keys_executor(nullptr), _default_rowset_type(BETA_ROWSET), _heartbeat_flags(nullptr), _stream_load_recorder(nullptr) { @@ -184,6 +185,9 @@ Status StorageEngine::_open() { _calc_delete_bitmap_executor.reset(new CalcDeleteBitmapExecutor()); _calc_delete_bitmap_executor->init(); + _check_primary_keys_executor.reset(new CheckPrimaryKeysExecutor()); + _check_primary_keys_executor->init(); + _parse_default_rowset_type(); return Status::OK(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6017354ef46bb48..cce9c4cc72b3f78 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -39,6 +39,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "olap/calc_delete_bitmap_executor.h" +#include "olap/check_primary_keys_executor.h" #include "olap/compaction_permit_limiter.h" #include "olap/olap_common.h" #include "olap/options.h" @@ -148,6 +149,9 @@ class StorageEngine { CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() { return _calc_delete_bitmap_executor.get(); } + CheckPrimaryKeysExecutor* check_primary_keys_executor() { + return _check_primary_keys_executor.get(); + } bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id); @@ -435,6 +439,7 @@ class StorageEngine { std::unique_ptr _memtable_flush_executor; std::unique_ptr _calc_delete_bitmap_executor; + std::unique_ptr _check_primary_keys_executor; // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue. // Type of new loaded data diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index ecb1021beb5e258..5c7316dbdede11d 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3112,13 +3112,24 @@ Status Tablet::generate_new_block_for_partial_update( auto update_cids = rowset_schema->get_update_cids(); std::map read_index_old; + std::unordered_map pk_entries; + if (config::enable_check_primary_keys) { + RETURN_IF_ERROR(fetch_pk_entries(&read_plan_update, &rsid_to_rowset, &pk_entries)); + } RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, old_block, &read_index_old)); std::map read_index_update; + auto token = StorageEngine::instance()->check_primary_keys_executor()->create_token(); + if (config::enable_check_primary_keys) { + RETURN_IF_ERROR(token->submit(this, &read_plan_update, &rsid_to_rowset, &pk_entries, + rowset_schema->has_sequence_col())); + } RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, rsid_to_rowset, update_block, &read_index_update)); - + if (config::enable_check_primary_keys) { + RETURN_IF_ERROR(token->wait()); + } // build full block CHECK(read_index_old.size() == read_index_update.size()); for (auto i = 0; i < missing_cids.size(); ++i) { @@ -3166,16 +3177,16 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, LOG(WARNING) << "failed to fetch value through row column"; return st; } - continue; - } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value"; - return st; + } else { + for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { + TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); + auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, + tablet_column, mutable_columns[cid]); + // set read value to output block + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value"; + return st; + } } } } @@ -3787,4 +3798,138 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in } return Status::OK(); } + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries, bool with_seq_col) { + LOG(INFO) << "check_primary_keys_consistency"; + size_t count = 0; + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast(rowset_iter->second); + CHECK(rowset); + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + + segment_v2::SegmentSharedPtr segment = *it; + LOG(INFO) << fmt::format( + "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", + segment->rowset_id().to_string(), segment->id()); + LOG(INFO) << "[check_primary_keys_consistency] before load index"; + RETURN_IF_ERROR(segment->load_index()); + LOG(INFO) << "[check_primary_keys_consistency] after load index"; + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_index->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + LOG(INFO) << fmt::format("begin to fetch pk at pos {}", pos); + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); + LOG(INFO) << fmt::format("fetched previous pk at {}: {}", pos, prev_pk_entry); + std::string cur_pk_entry = pk_entries->at(pos); + Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); + Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); + int result = 0; + // always ignore the seq col + if (tablet_schema->has_sequence_col()) { + auto seq_col_length = + tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1; + key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length); + if (with_seq_col) { + key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size() - seq_col_length); + } + } + result = key1.compare(key2); + if (result != 0) { + LOG(WARNING) << fmt::format( + "check primary keys consistency failed, pk at pos {} in current " + "block is {}, but in previous conflict segment is {}!", + pos, key2.to_string(), key1.to_string()); + return Status::InternalError("check primary keys consistency failed"); + } + } + count += rows_info.size(); + } + } + if (count != pk_entries->size()) { + return Status::InternalError( + "check primary keys consistency failed, pk_entries.size():{}, but number of keys " + "in read plan is {}", + pk_entries->size(), count); + } + LOG(INFO) << "[check_primary_keys_consistency] finish"; + return Status::OK(); +} + +Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries) { + LOG(INFO) << "fetch_pk_entries"; + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast(rowset_iter->second); + CHECK(rowset); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + segment_v2::SegmentSharedPtr segment = *it; + + LOG(WARNING) << "[pk check]read pk"; + RETURN_IF_ERROR(segment->load_index()); + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_column = vectorized::DataTypeFactory::instance() + .create_data_type(pk_index->type_info()->type(), 1, 0) + ->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + LOG(INFO) << "begin to fetch pk at [rowset_id:" << segment->rowset_id() + << "][segment_id:" << segment->id() << "][row_id:" << rowid << "]"; + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string pk_entry = index_column->get_data_at(idx++).to_string(); + LOG(INFO) << "fetched pk: " << pk_entry; + pk_entries->emplace(pos, pk_entry); + } + } + } + LOG(WARNING) << "[pk check]finish fetch pk"; + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 43993773542310a..87ea2018834162a 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -74,6 +74,7 @@ class TupleDescriptor; class CalcDeleteBitmapToken; enum CompressKind : int; class RowsetBinlogMetasPB; +class CheckPrimaryKeysToken; namespace io { class RemoteFileSystem; @@ -561,6 +562,14 @@ class Tablet : public BaseTablet { std::unique_ptr* column_iterator, OlapReaderStatistics* stats); + Status fetch_pk_entries(const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries); + Status check_primary_keys_consistency(const PartialUpdateReadPlan* read_plan, + const std::map* rsid_to_rowset, + std::unordered_map* pk_entries, + bool with_seq_col); + private: Status _init_once_action(); void _print_missed_versions(const std::vector& missed_versions) const; diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy index 19522e8064e122b..35eb7a8c921822d 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_parallel.groovy @@ -17,7 +17,7 @@ suite("test_primary_key_partial_update_parallel", "p0") { - def tableName = "test_primary_key_partial_update" + def tableName = "test_primary_key_partial_update_parallel" // create table sql """ DROP TABLE IF EXISTS ${tableName} """