Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Oct 12, 2023
1 parent d6ff974 commit 4397b2f
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 12 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,10 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_String(doris_cgroup_cpu_path, "");
DEFINE_Bool(enable_cpu_hard_limit, "false");

DEFINE_mBool(enable_check_primary_keys, "true");

DEFINE_Int32(check_primary_keys_max_thread, "16");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,10 @@ DECLARE_mBool(exit_on_exception);
DECLARE_String(doris_cgroup_cpu_path);
DECLARE_Bool(enable_cpu_hard_limit);

DECLARE_mBool(enable_check_primary_keys);

DECLARE_Int32(check_primary_keys_max_thread);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
64 changes: 64 additions & 0 deletions be/src/olap/check_primary_keys_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/check_primary_keys_executor.h"

#include "common/config.h"
#include "common/logging.h"
#include "olap/tablet.h"

namespace doris {
using namespace ErrorCode;

Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries,
bool with_seq_col) {
{
std::shared_lock rlock(_mutex);
RETURN_IF_ERROR(_status);
}
return _thread_token->submit_func([=, this]() {
auto st = tablet->check_primary_keys_consistency(read_plan, rsid_to_rowset, pk_entries,
with_seq_col);
if (!st.ok()) {
std::lock_guard wlock(_mutex);
if (_status.ok()) {
_status = st;
}
}
});
}

Status CheckPrimaryKeysToken::wait() {
_thread_token->wait();
return _status;
}

void CheckPrimaryKeysExecutor::init() {
ThreadPoolBuilder("TabletCheckPrimaryKeysThreadPool")
.set_min_threads(1)
.set_max_threads(config::check_primary_keys_max_thread)
.build(&_thread_pool);
}

std::unique_ptr<CheckPrimaryKeysToken> CheckPrimaryKeysExecutor::create_token() {
return std::make_unique<CheckPrimaryKeysToken>(
_thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
}

} // namespace doris
68 changes: 68 additions & 0 deletions be/src/olap/check_primary_keys_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet_meta.h"
#include "util/threadpool.h"

namespace doris {

class Tablet;

class CheckPrimaryKeysToken {
public:
explicit CheckPrimaryKeysToken(std::unique_ptr<ThreadPoolToken> thread_token)
: _thread_token(std::move(thread_token)), _status(Status::OK()) {}

Status submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan,
const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
std::unordered_map<uint32_t, std::string>* pk_entries, bool with_seq_col);

Status wait();

void cancel() { _thread_token->shutdown(); }

Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);

private:
std::unique_ptr<ThreadPoolToken> _thread_token;
std::shared_mutex _mutex;
Status _status;
};

class CheckPrimaryKeysExecutor {
public:
CheckPrimaryKeysExecutor() = default;
~CheckPrimaryKeysExecutor() { _thread_pool->shutdown(); }

void init();

std::unique_ptr<CheckPrimaryKeysToken> create_token();

private:
std::unique_ptr<ThreadPool> _thread_pool;
};

} // namespace doris
15 changes: 15 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/segment_loader.h"
#include "olap/short_key_index.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker.h"
Expand Down Expand Up @@ -393,6 +394,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

std::unordered_map<uint32_t, std::string> pk_entries;

int64_t num_rows_filtered = 0;
for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -461,6 +464,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
_tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);

if (config::enable_check_primary_keys) {
pk_entries.emplace(block_pos, key);
}
}

if (st.is<KEY_ALREADY_EXISTS>()) {
Expand All @@ -483,8 +490,16 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*

// read and fill block
auto mutable_full_columns = full_block.mutate_columns();
auto token = StorageEngine::instance()->check_primary_keys_executor()->create_token();
if (config::enable_check_primary_keys) {
RETURN_IF_ERROR(token->submit(_tablet.get(), &_rssid_to_rid, &_rsid_to_rowset, &pk_entries,
have_input_seq_column));
}
RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos));
if (config::enable_check_primary_keys) {
RETURN_IF_ERROR(token->wait());
}
// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_calc_delete_bitmap_executor(nullptr),
_check_primary_keys_executor(nullptr),
_default_rowset_type(BETA_ROWSET),
_heartbeat_flags(nullptr),
_stream_load_recorder(nullptr) {
Expand Down Expand Up @@ -184,6 +185,9 @@ Status StorageEngine::_open() {
_calc_delete_bitmap_executor.reset(new CalcDeleteBitmapExecutor());
_calc_delete_bitmap_executor->init();

_check_primary_keys_executor.reset(new CheckPrimaryKeysExecutor());
_check_primary_keys_executor->init();

_parse_default_rowset_type();

return Status::OK();
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "olap/calc_delete_bitmap_executor.h"
#include "olap/check_primary_keys_executor.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/olap_common.h"
#include "olap/options.h"
Expand Down Expand Up @@ -148,6 +149,9 @@ class StorageEngine {
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
return _calc_delete_bitmap_executor.get();
}
CheckPrimaryKeysExecutor* check_primary_keys_executor() {
return _check_primary_keys_executor.get();
}

bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);

Expand Down Expand Up @@ -435,6 +439,7 @@ class StorageEngine {

std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
std::unique_ptr<CheckPrimaryKeysExecutor> _check_primary_keys_executor;

// Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue.
// Type of new loaded data
Expand Down
Loading

0 comments on commit 4397b2f

Please sign in to comment.