Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 12, 2023
1 parent 4397b2f commit aa0986c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 30 deletions.
21 changes: 21 additions & 0 deletions be/src/olap/check_primary_keys_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan
});
}

Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
segment_v2::SegmentWriter* segment_writer,
std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns,
uint32_t row_pos) {
{
std::shared_lock rlock(_mutex);
RETURN_IF_ERROR(_status);
}
return _thread_token->submit_func([=, this]() {
auto st = tablet->check_primary_keys_consistency(read_plan, rsid_to_rowset, segment_writer,
key_columns, row_pos);
if (!st.ok()) {
std::lock_guard wlock(_mutex);
if (_status.ok()) {
_status = st;
}
}
});
}

Status CheckPrimaryKeysToken::wait() {
_thread_token->wait();
return _status;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/check_primary_keys_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

#include "common/status.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/tablet_meta.h"
#include "util/threadpool.h"
#include "vec/olap/olap_data_convertor.h"

namespace doris {

Expand All @@ -39,6 +41,10 @@ class CheckPrimaryKeysToken {
Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries, bool with_seq_col);
Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
segment_v2::SegmentWriter* segment_writer,
std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns, uint32_t row_pos);

Status wait();

Expand Down
22 changes: 8 additions & 14 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

std::unordered_map<uint32_t, std::string> pk_entries;

int64_t num_rows_filtered = 0;
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
Expand All @@ -406,7 +404,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
// here row_pos = 2, num_rows = 4.
size_t delta_pos = block_pos - row_pos;
size_t segment_pos = segment_start_pos + delta_pos;
std::string key = _full_encode_keys(key_columns, delta_pos);
std::string key = full_encode_keys(key_columns, delta_pos);
if (have_input_seq_column) {
_encode_seq_column(seq_column, delta_pos, &key);
}
Expand Down Expand Up @@ -464,10 +462,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
_tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);

if (config::enable_check_primary_keys) {
pk_entries.emplace(block_pos, key);
}
}

if (st.is<KEY_ALREADY_EXISTS>()) {
Expand All @@ -492,8 +486,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
auto mutable_full_columns = full_block.mutate_columns();
auto token = StorageEngine::instance()->check_primary_keys_executor()->create_token();
if (config::enable_check_primary_keys) {
RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, &pk_entries,
have_input_seq_column));
RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, this,
&key_columns, row_pos));
}
RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos));
Expand Down Expand Up @@ -541,7 +535,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
_num_rows_written, row_pos, _primary_key_index_builder->num_rows());
}
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
std::string key = _full_encode_keys(key_columns, block_pos - row_pos);
std::string key = full_encode_keys(key_columns, block_pos - row_pos);
_encode_seq_column(seq_column, block_pos - row_pos, &key);
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
}
Expand Down Expand Up @@ -747,7 +741,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
// create primary indexes
std::string last_key;
for (size_t pos = 0; pos < num_rows; pos++) {
std::string key = _full_encode_keys(key_columns, pos);
std::string key = full_encode_keys(key_columns, pos);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
Expand All @@ -761,8 +755,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
} else {
// create short key indexes'
// for min_max key
set_min_key(_full_encode_keys(key_columns, 0));
set_max_key(_full_encode_keys(key_columns, num_rows - 1));
set_min_key(full_encode_keys(key_columns, 0));
set_max_key(full_encode_keys(key_columns, num_rows - 1));

key_columns.resize(_num_short_key_columns);
for (const auto pos : short_key_pos) {
Expand All @@ -788,7 +782,7 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
return std::min(size_rows, count_rows);
}

std::string SegmentWriter::_full_encode_keys(
std::string SegmentWriter::full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first) {
assert(_key_index_size.size() == _num_key_columns);
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class SegmentWriter {
Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag,
bool has_default_or_nullable, const size_t& segment_start_pos);
// used for unique-key with merge on write and segment min_max key
std::string full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first = true);

private:
DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
Expand All @@ -150,9 +154,7 @@ class SegmentWriter {
std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos, bool null_first = true);
// used for unique-key with merge on write and segment min_max key
std::string _full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos,
bool null_first = true);

// used for unique-key with merge on write
void _encode_seq_column(const vectorized::IOlapColumnDataAccessor* seq_column, size_t pos,
string* encoded_keys);
Expand Down
80 changes: 68 additions & 12 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3803,7 +3803,6 @@ Status Tablet::check_primary_keys_consistency(
const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries, bool with_seq_col) {
LOG(INFO) << "check_primary_keys_consistency";
size_t count = 0;
for (auto& [rowset_id, segment_read_info] : *read_plan) {
for (auto& [segment_id, rows_info] : segment_read_info) {
Expand All @@ -3828,9 +3827,7 @@ Status Tablet::check_primary_keys_consistency(
LOG(INFO) << fmt::format(
"[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]",
segment->rowset_id().to_string(), segment->id());
LOG(INFO) << "[check_primary_keys_consistency] before load index";
RETURN_IF_ERROR(segment->load_index());
LOG(INFO) << "[check_primary_keys_consistency] after load index";
auto pk_index = segment->get_primary_key_index();
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
RETURN_IF_ERROR(pk_index->new_iterator(&iter));
Expand All @@ -3840,13 +3837,11 @@ Status Tablet::check_primary_keys_consistency(

size_t idx = 0;
for (auto [rowid, pos] : rows_info) {
LOG(INFO) << fmt::format("begin to fetch pk at pos {}", pos);
RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
size_t num_read = 1;
RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
CHECK(num_read == 1);
std::string prev_pk_entry = index_column->get_data_at(idx++).to_string();
LOG(INFO) << fmt::format("fetched previous pk at {}: {}", pos, prev_pk_entry);
std::string cur_pk_entry = pk_entries->at(pos);
Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size());
Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size());
Expand Down Expand Up @@ -3878,14 +3873,80 @@ Status Tablet::check_primary_keys_consistency(
"in read plan is {}",
pk_entries->size(), count);
}
LOG(INFO) << "[check_primary_keys_consistency] finish";
return Status::OK();
}

Status Tablet::check_primary_keys_consistency(
const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
segment_v2::SegmentWriter* segment_writer,
std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns, uint32_t row_pos) {
for (auto& [rowset_id, segment_read_info] : *read_plan) {
for (auto& [segment_id, rows_info] : segment_read_info) {
auto rowset_iter = rsid_to_rowset->find(rowset_id);
CHECK(rowset_iter != rsid_to_rowset->end());
BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(rowset_iter->second);
CHECK(rowset);
const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
SegmentCacheHandle segment_cache;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
auto it = std::find_if(segment_cache.get_segments().cbegin(),
segment_cache.get_segments().cend(),
[segment_id](const segment_v2::SegmentSharedPtr& seg) {
return seg->id() == segment_id;
});
if (it == segment_cache.get_segments().end()) {
return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}",
rowset->rowset_id().to_string(), segment_id));
}

segment_v2::SegmentSharedPtr segment = *it;
LOG(INFO) << fmt::format(
"[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]",
segment->rowset_id().to_string(), segment->id());
RETURN_IF_ERROR(segment->load_index());
auto pk_index = segment->get_primary_key_index();
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
RETURN_IF_ERROR(pk_index->new_iterator(&iter));
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
pk_index->type_info()->type(), 1, 0);
auto index_column = index_type->create_column();

size_t idx = 0;
for (auto [rowid, pos] : rows_info) {
RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
size_t num_read = 1;
RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
CHECK(num_read == 1);
std::string prev_pk_entry = index_column->get_data_at(idx++).to_string();
std::string cur_pk_entry =
segment_writer->full_encode_keys(*key_columns, pos - row_pos);
Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size());
Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size());
int result = 0;
// always ignore the seq col
if (tablet_schema->has_sequence_col()) {
auto seq_col_length =
tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1;
key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length);
}
result = key1.compare(key2);
if (result != 0) {
LOG(WARNING) << fmt::format(
"check primary keys consistency failed, pk at pos {} in current "
"block is {}, but in previous conflict segment is {}!",
pos, key2.to_string(), key1.to_string());
return Status::InternalError("check primary keys consistency failed");
}
}
}
}
return Status::OK();
}

Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries) {
LOG(INFO) << "fetch_pk_entries";
for (auto& [rowset_id, segment_read_info] : *read_plan) {
for (auto& [segment_id, rows_info] : segment_read_info) {
auto rowset_iter = rsid_to_rowset->find(rowset_id);
Expand All @@ -3905,7 +3966,6 @@ Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan,
}
segment_v2::SegmentSharedPtr segment = *it;

LOG(WARNING) << "[pk check]read pk";
RETURN_IF_ERROR(segment->load_index());
auto pk_index = segment->get_primary_key_index();
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
Expand All @@ -3916,19 +3976,15 @@ Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan,

size_t idx = 0;
for (auto [rowid, pos] : rows_info) {
LOG(INFO) << "begin to fetch pk at [rowset_id:" << segment->rowset_id()
<< "][segment_id:" << segment->id() << "][row_id:" << rowid << "]";
RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
size_t num_read = 1;
RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
CHECK(num_read == 1);
std::string pk_entry = index_column->get_data_at(idx++).to_string();
LOG(INFO) << "fetched pk: " << pk_entry;
pk_entries->emplace(pos, pk_entry);
}
}
}
LOG(WARNING) << "[pk check]finish fetch pk";
return Status::OK();
}

Expand Down
10 changes: 9 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "util/metrics.h"
#include "util/once.h"
#include "util/slice.h"
#include "vec/olap/olap_data_convertor.h"

namespace doris {

Expand All @@ -75,7 +76,9 @@ class CalcDeleteBitmapToken;
enum CompressKind : int;
class RowsetBinlogMetasPB;
class CheckPrimaryKeysToken;

namespace segment_v2 {
class SegmentWriter;
}
namespace io {
class RemoteFileSystem;
} // namespace io
Expand Down Expand Up @@ -569,6 +572,11 @@ class Tablet : public BaseTablet {
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries,
bool with_seq_col);
Status check_primary_keys_consistency(
const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
segment_v2::SegmentWriter* segment_writer,
std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns, uint32_t row_pos);

private:
Status _init_once_action();
Expand Down

0 comments on commit aa0986c

Please sign in to comment.