Skip to content

Commit

Permalink
Merge branch 'master' into hudi_prune_partition
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored Dec 4, 2024
2 parents 03d3780 + d22bd83 commit 51d5fba
Show file tree
Hide file tree
Showing 500 changed files with 16,616 additions and 2,733 deletions.
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Status CloudBaseCompaction::prepare_compact() {
_input_row_num += rs->num_rows();
_input_segments += rs->num_segments();
_input_rowsets_data_size += rs->data_disk_size();
_input_rowsets_index_size += rs->index_disk_size();
_input_rowsets_total_size += rs->total_disk_size();
}
LOG_INFO("start CloudBaseCompaction, tablet_id={}, range=[{}-{}]", _tablet->tablet_id(),
Expand Down Expand Up @@ -320,6 +321,10 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.enable_spin_wait", {
LOG(INFO) << "CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, start";
Expand Down Expand Up @@ -371,11 +375,9 @@ Status CloudCumulativeCompaction::modify_rowsets() {
Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
// agg previously rowset old version delete bitmap
std::vector<RowsetSharedPtr> pre_rowsets {};
std::vector<std::string> pre_rowset_ids {};
for (const auto& it : cloud_tablet()->rowset_map()) {
if (it.first.second < _input_rowsets.front()->start_version()) {
pre_rowsets.emplace_back(it.second);
pre_rowset_ids.emplace_back(it.second->rowset_id().to_string());
}
}
std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
}
}
auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
LOG(INFO) << "calculate delete bitmap successfully on tablet"
LOG(INFO) << "finish calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->add_output_versions(_output_rowset->end_version());
compaction_job->add_txn_id(_output_rowset->txn_id());
compaction_job->add_output_rowset_ids(_output_rowset->rowset_id().to_string());
compaction_job->set_index_size_input_rowsets(_input_rowsets_index_size);
compaction_job->set_segment_size_input_rowsets(_input_rowsets_data_size);
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,23 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
int64_t num_output_rows = 0;
int64_t size_output_rowsets = 0;
int64_t num_output_segments = 0;
int64_t index_size_output_rowsets = 0;
int64_t segment_size_output_rowsets = 0;
for (auto& rs : _output_rowsets) {
sc_job->add_txn_ids(rs->txn_id());
sc_job->add_output_versions(rs->end_version());
num_output_rows += rs->num_rows();
size_output_rowsets += rs->total_disk_size();
num_output_segments += rs->num_segments();
index_size_output_rowsets += rs->index_disk_size();
segment_size_output_rowsets += rs->data_disk_size();
}
sc_job->set_num_output_rows(num_output_rows);
sc_job->set_size_output_rowsets(size_output_rowsets);
sc_job->set_num_output_segments(num_output_segments);
sc_job->set_num_output_rowsets(_output_rowsets.size());
sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
}
_output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,8 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
}

std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
if (config::enable_rowid_conversion_correctness_check) {
if (config::enable_rowid_conversion_correctness_check &&
tablet_schema()->cluster_key_idxes().empty()) {
location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/clucene
Submodule clucene updated 642 files
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,9 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// When the size of primary keys in memory exceeds this value, finish current segment
// and create a new segment, used in compaction. Default 50MB.
DEFINE_mInt64(mow_primary_key_index_max_size_in_memory, "52428800");
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,9 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// When the size of primary keys in memory exceeds this value, finish current segment
// and create a new segment, used in compaction.
DECLARE_mInt64(mow_primary_key_index_max_size_in_memory);
// When the version is not continuous for MOW table in publish phase and the gap between
// current txn's publishing version and the max version of the tablet exceeds this value,
// don't print warning log
Expand Down
11 changes: 7 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,15 +500,18 @@ void Daemon::cache_adjust_capacity_thread() {
void Daemon::cache_prune_stale_thread() {
int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
if (interval <= 0) {
LOG(WARNING) << "config of cache clean interval is illegal: [" << interval
<< "], force set to 3600 ";
interval = 3600;
if (config::cache_periodic_prune_stale_sweep_sec <= 0) {
LOG(WARNING) << "config of cache clean interval is: [" << interval
<< "], it means the cache prune stale thread is disabled, will wait 3s "
"and check again.";
interval = 3;
continue;
}
if (config::disable_memory_gc) {
continue;
}
CacheManager::instance()->for_each_cache_prune_stale();
interval = config::cache_periodic_prune_stale_sweep_sec;
}
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
}
SCOPED_ATTACH_TASK(state);
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
Expand All @@ -140,9 +139,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}
Expand Down
7 changes: 1 addition & 6 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,7 @@ class SchemaScanner {
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
void set_dependency(std::shared_ptr<pipeline::Dependency> dep) { _dependency = dep; }
Status get_next_block_async(RuntimeState* state);

protected:
Expand Down Expand Up @@ -139,7 +135,6 @@ class SchemaScanner {
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
Expand Down
16 changes: 14 additions & 2 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.avg_row_length) {
if (tbl_status.__isset.data_length) {
srcs[i] = tbl_status.data_length;
datas[i] = srcs.data() + i;
} else {
Expand All @@ -248,7 +248,19 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
// max_data_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 10, null_datas)); }
// index_length
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, null_datas)); }
{
std::vector<int64_t> srcs(table_num);
for (int i = 0; i < table_num; ++i) {
const TTableStatus& tbl_status = _table_result.tables[i];
if (tbl_status.__isset.index_length) {
srcs[i] = tbl_status.index_length;
datas[i] = srcs.data() + i;
} else {
datas[i] = nullptr;
}
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 11, datas));
}
// data_free
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, null_datas)); }
// auto_increment
Expand Down
24 changes: 6 additions & 18 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,10 @@ class RuntimePredicateWrapper {

Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly) {
int node_id, std::shared_ptr<IRuntimeFilter>* res) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id);
}

RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
Expand Down Expand Up @@ -1348,7 +1347,7 @@ std::string IRuntimeFilter::formatted_state() const {
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));

Expand All @@ -1370,21 +1369,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size
? options->runtime_bloom_filter_max_size
: 0;
auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size;
// We build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. Only 1 join key
// 2. Bloom filter
// 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join).
params.build_bf_exactly =
build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);

params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly;
params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv;

if (!sync_filter_size) {
params.build_bf_exactly &= !_is_broadcast_join;
}

if (desc->__isset.bloom_filter_size_bytes) {
params.bloom_filter_size = desc->bloom_filter_size_bytes;
}
Expand Down Expand Up @@ -1542,9 +1530,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}"
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, "
"build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, "
"has_remote_target: {},error_msg: [{}]",
"has_remote_target: {}, error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
_dependency ? _dependency->debug_string() : "none", _synced_size, _has_local_target,
Expand Down
10 changes: 5 additions & 5 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class IRuntimeFilter {

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false);
int node_id, std::shared_ptr<IRuntimeFilter>* res);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -260,7 +259,7 @@ class IRuntimeFilter {

// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);
int node_id = -1);

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Expand Down Expand Up @@ -355,8 +354,9 @@ class IRuntimeFilter {
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency);

int64_t get_synced_size() const {
if (_synced_size == -1) {
throw Status::InternalError("sync filter size meet error, filter: {}", debug_string());
if (_synced_size == -1 || !_dependency) {
throw Exception(doris::ErrorCode::INTERNAL_ERROR,
"sync filter size meet error, filter: {}", debug_string());
}
return _synced_size;
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/http/http_handler_with_auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ HttpHandlerWithAuth::HttpHandlerWithAuth(ExecEnv* exec_env, TPrivilegeHier::type
: _exec_env(exec_env), _hier(hier), _type(type) {}

int HttpHandlerWithAuth::on_header(HttpRequest* req) {
//if u return value isn't 0,u should `send_reply`,Avoid requesting links that never return.
TCheckAuthRequest auth_request;
TCheckAuthResult auth_result;
AuthInfo auth_info;
Expand Down Expand Up @@ -83,13 +84,22 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {

#ifndef BE_TEST
TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
LOG(WARNING) << "Not found master fe, Can't auth API request: " << req->debug_string();
HttpChannel::send_error(req, HttpStatus::SERVICE_UNAVAILABLE);
return -1;
}
{
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&auth_result, &auth_request](FrontendServiceConnection& client) {
client->checkAuth(auth_result, auth_request);
});
if (!status) {
LOG(WARNING) << "CheckAuth Rpc Fail.Fe Ip:" << master_addr.hostname
<< ", Fe port:" << master_addr.port << ".Status:" << status.to_string()
<< ".Request: " << req->debug_string();
HttpChannel::send_error(req, HttpStatus::SERVICE_UNAVAILABLE);
return -1;
}
}
Expand All @@ -98,6 +108,7 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
auth_result.status.status_code = TStatusCode::type::OK;
auth_result.status.error_msgs.clear();
} else {
HttpChannel::send_reply(req, HttpStatus::FORBIDDEN);
return -1;
}
#endif
Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1604,10 +1604,6 @@ Status BaseTablet::check_rowid_conversion(
VLOG_DEBUG << "check_rowid_conversion, location_map is empty";
return Status::OK();
}
if (!tablet_schema()->cluster_key_idxes().empty()) {
VLOG_DEBUG << "skip check_rowid_conversion for mow tables with cluster keys";
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;

RETURN_IF_ERROR(
Expand Down
Loading

0 comments on commit 51d5fba

Please sign in to comment.