Skip to content

Commit

Permalink
Merge branch 'master' into rs_meta_query_4_cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Nov 18, 2024
2 parents 45539ab + 258dcee commit f601408
Show file tree
Hide file tree
Showing 295 changed files with 7,580 additions and 1,868 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/auto-cherry-pick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ on:
pull_request_target:
types:
- closed
- labeled
branches:
- master
permissions:
Expand All @@ -30,7 +31,7 @@ permissions:
jobs:
auto_cherry_pick:
runs-on: ubuntu-latest
if: ${{ (contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) && github.event.pull_request.merged == true }}
if: ${{(contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') || contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') ||github.event.label.name == 'dev/3.0.x' || github.event.label.name == 'dev/2.1.x') && github.event.pull_request.merged == true }}
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand All @@ -54,15 +55,15 @@ jobs:
echo "SHA matches: $calculated_sha"
fi
- name: Auto cherry-pick to branch-3.0
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/3.0.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/3.0.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
run: |
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0
- name: Auto cherry-pick to branch-2.1
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') }}
if: ${{ ((github.event.action == 'labeled' && github.event.label.name == 'dev/2.1.x'))|| ((github.event_name == 'pull_request_target' && github.event.action == 'closed') && contains(github.event.pull_request.labels.*.name, 'dev/2.1.x')) }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO_NAME: ${{ github.repository }}
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,12 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset

if (config::enable_file_cache) {
for (const auto& rs : rowsets) {
if (rs.use_count() >= 1) {
LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
<< rs.use_count()
<< " references. File Cache won't be recycled when query is using it.";
continue;
}
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
// TODO: Segment::file_cache_key
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
Expand Down
7 changes: 3 additions & 4 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: {"path": "/path/to/file_cache", "total_size":53687091200, "normal_percent":85, "disposable_percent":10, "index_percent":5}
// format: {"path": "/path/to/file_cache", "total_size":53687091200, "ttl_percent":50, "normal_percent":40, "disposable_percent":5, "index_percent":5}
// format: [{"path": "xxx", "total_size":53687091200, "storage": "memory"}]
// Note1: storage is "disk" by default
// Note2: when the storage is "memory", the path is ignored. So you can set xxx to anything you like
Expand All @@ -1020,7 +1020,7 @@ DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "90");
DEFINE_mInt32(file_cache_enter_disk_resource_limit_mode_percent, "88");
DEFINE_mInt32(file_cache_exit_disk_resource_limit_mode_percent, "80");
DEFINE_mBool(enable_read_cache_file_directly, "false");
DEFINE_mBool(file_cache_enable_evict_from_other_queue_by_size, "true");
Expand Down Expand Up @@ -1301,8 +1301,6 @@ DEFINE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread, "64");
DEFINE_Int64(num_s3_file_upload_thread_pool_min_thread, "16");
// The max thread num for S3FileUploadThreadPool
DEFINE_Int64(num_s3_file_upload_thread_pool_max_thread, "64");
// The max ratio for ttl cache's size
DEFINE_mInt64(max_ttl_cache_ratio, "50");
// The maximum jvm heap usage ratio for hdfs write workload
DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
Expand Down Expand Up @@ -1370,6 +1368,7 @@ DEFINE_Int32(query_cache_size, "512");
DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
// Enable validation to check the correctness of table size.
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");

// clang-format off
#ifdef BE_TEST
Expand Down
5 changes: 2 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ DECLARE_Int32(pipeline_executor_size);
DECLARE_Bool(enable_file_cache);
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240, "ttl_percent":50, "normal_percent":40, "disposable_percent":5, "index_percent":5}]
// format: [{"path": "xxx", "total_size":53687091200, "storage": "memory"}]
// Note1: storage is "disk" by default
// Note2: when the storage is "memory", the path is ignored. So you can set xxx to anything you like
Expand Down Expand Up @@ -1382,8 +1382,6 @@ DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread);
DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread);
// The max thread num for S3FileUploadThreadPool
DECLARE_Int64(num_s3_file_upload_thread_pool_max_thread);
// The max ratio for ttl cache's size
DECLARE_mInt64(max_ttl_cache_ratio);
// The maximum jvm heap usage ratio for hdfs write workload
DECLARE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio);
// The sleep milliseconds duration when hdfs write exceeds the maximum usage
Expand Down Expand Up @@ -1450,6 +1448,7 @@ DECLARE_mInt32(check_score_rounds_num);

// MB
DECLARE_Int32(query_cache_size);
DECLARE_Bool(force_regenerate_rowsetid_on_start_error);

DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class [[nodiscard]] Status {
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR)
#undef ERROR_CTOR

template <int code>
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/lzo_decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in
ptr = get_uint32(ptr, &uncompressed_size);
left_input_len -= sizeof(uint32_t);
if (uncompressed_size == 0) {
*input_bytes_read += sizeof(uint32_t);
*stream_end = true;
return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/tablet_info.h"

#include <butil/logging.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Partitions_types.h>
Expand Down Expand Up @@ -180,6 +181,17 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
is_null_str);
if (it == std::end(slots_map)) {
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\npschema={}",
pcolumn_desc.name(), pcolumn_desc.type(), data_type_str, is_null_str,
keys, debug_string(), pschema.ShortDebugString());

return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
Expand Down Expand Up @@ -286,6 +298,18 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
is_null_str);
if (it == slots_map.end()) {
std::stringstream ss;
ss << tschema;
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\ntschema={}",
tcolumn_desc.column_name, tcolumn_desc.column_type.type, data_type_str,
is_null_str, keys, debug_string(), ss.str());
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
Expand Down
38 changes: 0 additions & 38 deletions be/src/exprs/bitmapfilter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ namespace doris {
// only used in Runtime Filter
class BitmapFilterFuncBase : public RuntimeFilterFuncBase {
public:
virtual void insert(const void* data) = 0;
virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 0;
virtual bool empty() = 0;
virtual Status assign(BitmapValue* bitmap_value) = 0;
virtual void light_copy(BitmapFilterFuncBase* other) { _not_in = other->_not_in; }
virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap,
uint16_t* offsets, int number) = 0;
virtual void find_batch(const char* data, const uint8* nullmap, size_t number,
Expand All @@ -58,8 +54,6 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {

~BitmapFilterFunc() override = default;

void insert(const void* data) override;

void insert_many(const std::vector<const BitmapValue*>& bitmaps) override;

uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets,
Expand All @@ -68,45 +62,21 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {
void find_batch(const char* data, const uint8* nullmap, size_t number,
uint8* results) const override;

bool empty() override { return _bitmap_value->empty(); }

Status assign(BitmapValue* bitmap_value) override {
*_bitmap_value = *bitmap_value;
return Status::OK();
}

void light_copy(BitmapFilterFuncBase* bitmapfilter_func) override;

size_t size() const override { return _bitmap_value->cardinality(); }

uint64_t max() { return _bitmap_value->max(nullptr); }

uint64_t min() { return _bitmap_value->min(nullptr); }

bool contains_any(CppType left, CppType right) {
if (right < 0) {
return false;
}
return _bitmap_value->contains_any(std::max(left, (CppType)0), right);
}

std::shared_ptr<BitmapValue> get_inner_bitmap() { return _bitmap_value; }

private:
std::shared_ptr<BitmapValue> _bitmap_value;

bool find(CppType data) const { return _not_in ^ (data >= 0 && _bitmap_value->contains(data)); }
};

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert(const void* data) {
if (data == nullptr) {
return;
}

*_bitmap_value |= *reinterpret_cast<const BitmapValue*>(data);
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*>& bitmaps) {
if (bitmaps.empty()) {
Expand Down Expand Up @@ -147,12 +117,4 @@ void BitmapFilterFunc<type>::find_batch(const char* data, const uint8* nullmap,
}
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::light_copy(BitmapFilterFuncBase* bitmapfilter_func) {
BitmapFilterFuncBase::light_copy(bitmapfilter_func);
auto other_func = reinterpret_cast<BitmapFilterFunc*>(bitmapfilter_func);
_bitmap_value = other_func->_bitmap_value;
set_filter_id(bitmapfilter_func->get_filter_id());
}

} // namespace doris
24 changes: 10 additions & 14 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include "common/exception.h"
#include "common/status.h"
#include "exprs/hybrid_set.h"
#include "exprs/minmax_predicate.h"
#include "function_filter.h"
Expand Down Expand Up @@ -244,12 +246,9 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
int be_exec_version, const TabletColumn*) {
if constexpr (PT == TYPE_TINYINT || PT == TYPE_SMALLINT || PT == TYPE_INT ||
PT == TYPE_BIGINT) {
std::shared_ptr<BitmapFilterFuncBase> filter_olap;
filter_olap.reset(create_bitmap_filter(PT));
filter_olap->light_copy(filter.get());
return new BitmapFilterColumnPredicate<PT>(column_id, filter, be_exec_version);
} else {
return nullptr;
throw Exception(ErrorCode::INTERNAL_ERROR, "bitmap filter do not support type {}", PT);
}
}

Expand All @@ -266,17 +265,14 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<FunctionFilter>& filter, int,
const TabletColumn* column = nullptr) {
// currently only support like predicate
if constexpr (PT == TYPE_CHAR || PT == TYPE_VARCHAR || PT == TYPE_STRING) {
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id,
filter->_fn_ctx, filter->_string_param);
}
} else {
return nullptr;
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else if constexpr (PT == TYPE_VARCHAR || PT == TYPE_STRING) {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
}
throw Exception(ErrorCode::INTERNAL_ERROR, "function filter do not support type {}", PT);
}

template <typename T>
Expand Down
23 changes: 15 additions & 8 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::publish(bool publish_local) {
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
DCHECK(is_producer());

auto send_to_remote = [&](IRuntimeFilter* filter) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return filter->push_to_remote(&addr);
return filter->push_to_remote(state, &addr);
};
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) {
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
Expand Down Expand Up @@ -1088,8 +1088,10 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context)
: Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {}
RuntimeFilterContextSPtr rf_context, std::weak_ptr<QueryContext> context)
: Base(req, callback, context),
_dependency(std::move(dependency)),
_rf_context(rf_context) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1133,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure =
SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context);
auto closure = SyncSizeClosure::create_unique(
request, callback, _dependency, _wrapper->_context,
state->query_options().ignore_runtime_filter_error ? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand All @@ -1157,7 +1161,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
return Status::OK();
}

Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
Expand All @@ -1170,7 +1174,10 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
create_unique(merge_filter_request, merge_filter_callback);
create_unique(merge_filter_request, merge_filter_callback,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
void* data = nullptr;
int len = 0;

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class IRuntimeFilter {

// publish filter
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);
Status publish(RuntimeState* state, bool publish_local = false);

Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);

Expand Down Expand Up @@ -293,7 +293,7 @@ class IRuntimeFilter {
bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(const TNetworkAddress* addr);
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);

void init_profile(RuntimeProfile* parent_profile);

Expand Down
Loading

0 comments on commit f601408

Please sign in to comment.