Skip to content

Commit

Permalink
Merge branch 'master' into show_load_profile
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 23, 2024
2 parents 3b81b6d + 362efda commit 77fa537
Show file tree
Hide file tree
Showing 48 changed files with 495 additions and 164 deletions.
12 changes: 12 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ void cast_set(T& a, U b) {
a = static_cast<T>(b);
}

template <typename T, typename U>
requires std::is_floating_point_v<T> and std::is_integral_v<U>
void cast_set(T& a, U b) {
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
Expand All @@ -70,4 +76,10 @@ T cast_set(U b) {
return static_cast<T>(b);
}

template <typename T, typename U>
requires std::is_floating_point_v<T> and std::is_integral_v<U>
T cast_set(U b) {
return static_cast<T>(b);
}

} // namespace doris
6 changes: 4 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

DEFINE_Bool(enable_skip_tablet_compaction, "false");
DEFINE_Bool(enable_skip_tablet_compaction, "true");
DEFINE_mInt32(skip_tablet_compaction_second, "10");

// output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
DEFINE_mInt64(compaction_promotion_size_mbytes, "1024");
Expand Down Expand Up @@ -454,7 +456,7 @@ DEFINE_mInt32(multi_get_max_threads, "10");
DEFINE_mInt64(total_permits_for_compaction_score, "10000");

// sleep interval in ms after generated compaction tasks
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "10");
DEFINE_mInt32(generate_compaction_tasks_interval_ms, "100");

// sleep interval in second after update replica infos
DEFINE_mInt32(update_replica_infos_interval_seconds, "60");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

DECLARE_Bool(enable_skip_tablet_compaction);
DECLARE_mInt32(skip_tablet_compaction_second);
// output rowset of cumulative compaction total disk size exceed this config size,
// this rowset will be given to base compaction, unit is m byte.
DECLARE_mInt64(compaction_promotion_size_mbytes);
Expand Down
4 changes: 2 additions & 2 deletions be/src/http/action/calc_file_crc_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engin
// calculate the crc value of the files in the tablet
Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value,
int64_t* start_version, int64_t* end_version,
int32_t* rowset_count, int64_t* file_count) {
uint32_t* rowset_count, int64_t* file_count) {
uint64_t tablet_id = 0;
const auto& req_tablet_id = req->param(TABLET_ID_KEY);
if (req_tablet_id.empty()) {
Expand Down Expand Up @@ -110,7 +110,7 @@ void CalcFileCrcAction::handle(HttpRequest* req) {
uint32_t crc_value = 0;
int64_t start_version = 0;
int64_t end_version = 0;
int32_t rowset_count = 0;
uint32_t rowset_count = 0;
int64_t file_count = 0;

MonotonicStopWatch timer;
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/calc_file_crc_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class CalcFileCrcAction : public HttpHandlerWithAuth {

private:
Status _handle_calc_crc(HttpRequest* req, uint32_t* crc_value, int64_t* start_version,
int64_t* end_version, int32_t* rowset_count, int64_t* file_count);
int64_t* end_version, uint32_t* rowset_count, int64_t* file_count);

private:
BaseStorageEngine& _engine;
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mutex>
#include <ostream>

#include "common/cast_set.h"
#include "common/config.h"
#include "common/logging.h"
#include "olap/compaction.h"
Expand All @@ -35,6 +36,8 @@
#include "util/trace.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace ErrorCode;

BaseCompaction::BaseCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
Expand Down Expand Up @@ -184,7 +187,8 @@ Status BaseCompaction::pick_rowsets_to_compact() {
// set to 1 to void divide by zero
base_size = 1;
}
double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size;
double cumulative_base_ratio =
cast_set<double>(cumulative_total_size) / cast_set<double>(base_size);

if (cumulative_base_ratio > min_data_ratio) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
32 changes: 21 additions & 11 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#include <fmt/format.h>
#include <rapidjson/prettywriter.h>

#include <cstdint>
#include <iterator>

#include "common/cast_set.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/calc_delete_bitmap_executor.h"
Expand All @@ -45,6 +49,8 @@
#include "vec/jsonb/serialize.h"

namespace doris {
#include "common/compile_check_begin.h"

using namespace ErrorCode;

namespace {
Expand Down Expand Up @@ -462,9 +468,9 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
RowLocation loc;

for (size_t i = 0; i < specified_rowsets.size(); i++) {
auto& rs = specified_rowsets[i];
auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
int num_segments = rs->num_segments();
const auto& rs = specified_rowsets[i];
const auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
int num_segments = cast_set<int>(rs->num_segments());
DCHECK_EQ(segments_key_bounds.size(), num_segments);
std::vector<uint32_t> picked_segments;
for (int i = num_segments - 1; i >= 0; i--) {
Expand Down Expand Up @@ -671,7 +677,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,

RowsetSharedPtr rowset_find;
auto st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc,
dummy_version.first - 1, segment_caches, &rowset_find);
cast_set<uint32_t>(dummy_version.first - 1), segment_caches,
&rowset_find);
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>();
// It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump
// while stress test
Expand Down Expand Up @@ -1130,7 +1137,7 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
const signed char* delete_sign_column_data) {
if (skipped) {
if (delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index_old[idx]] != 0) {
delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0) {
if (tablet_column.has_default_value()) {
new_col->insert_from(default_value_col, 0);
} else if (tablet_column.is_nullable()) {
Expand Down Expand Up @@ -1300,7 +1307,8 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
for (const auto& rowset : *rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
value.SetString(version_str.c_str(),
cast_set<rapidjson::SizeType>(version_str.length()),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator());
}
Expand All @@ -1313,15 +1321,17 @@ Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
for (const auto& rowset : rowsets) {
rapidjson::Value value;
std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
value.SetString(version_str.c_str(),
cast_set<rapidjson::SizeType>(version_str.length()),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator());
}
}
for (const auto& missing_rowset_id : missing_ids) {
rapidjson::Value miss_value;
std::string rowset_id_str = missing_rowset_id.to_string();
miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
miss_value.SetString(rowset_id_str.c_str(),
cast_set<rapidjson::SizeType>(rowset_id_str.length()),
missing_rowsets_arr.GetAllocator());
missing_rowsets_arr.PushBack(miss_value, missing_rowsets_arr.GetAllocator());
}
Expand Down Expand Up @@ -1725,7 +1735,7 @@ std::vector<RowsetSharedPtr> BaseTablet::get_snapshot_rowset(bool include_stale_
void BaseTablet::calc_consecutive_empty_rowsets(
std::vector<RowsetSharedPtr>* empty_rowsets,
const std::vector<RowsetSharedPtr>& candidate_rowsets, int limit) {
int len = candidate_rowsets.size();
int len = cast_set<int>(candidate_rowsets.size());
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
auto next_rowset = candidate_rowsets[i + 1];
Expand Down Expand Up @@ -1761,7 +1771,7 @@ void BaseTablet::calc_consecutive_empty_rowsets(
}

Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
int32_t* rowset_count, int64_t* file_count) {
uint32_t* rowset_count, int64_t* file_count) {
Version v(start_version, end_version);
std::vector<RowsetSharedPtr> rowsets;
traverse_rowsets([&rowsets, &v](const auto& rs) {
Expand All @@ -1771,7 +1781,7 @@ Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int
}
});
std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
*rowset_count = rowsets.size();
*rowset_count = cast_set<uint32_t>(rowsets.size());

*crc_value = 0;
*file_count = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class BaseTablet {
}

Status calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
int32_t* rowset_count, int64_t* file_count);
uint32_t* rowset_count, int64_t* file_count);

Status show_nested_index_file(std::string* json_meta);

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2542,10 +2542,10 @@ void Tablet::set_skip_compaction(bool skip, CompactionType compaction_type, int6

bool Tablet::should_skip_compaction(CompactionType compaction_type, int64_t now) {
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION && _skip_cumu_compaction &&
now < _skip_cumu_compaction_ts + 120) {
now < _skip_cumu_compaction_ts + config::skip_tablet_compaction_second) {
return true;
} else if (compaction_type == CompactionType::BASE_COMPACTION && _skip_base_compaction &&
now < _skip_base_compaction_ts + 120) {
now < _skip_base_compaction_ts + config::skip_tablet_compaction_second) {
return true;
}
return false;
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/functions/array/function_array_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ class FunctionArraySlice : public IFunction {
auto offset_column =
block.get_by_position(arguments[1]).column->convert_to_full_column_if_const();
ColumnPtr length_column = nullptr;
const ColumnInt64* length_column_int64 = nullptr;
if (arguments.size() > 2) {
length_column =
block.get_by_position(arguments[2]).column->convert_to_full_column_if_const();
length_column_int64 = assert_cast<const ColumnInt64*>(length_column.get());
}
// extract src array column
ColumnArrayExecutionData src;
Expand All @@ -94,8 +92,7 @@ class FunctionArraySlice : public IFunction {
ColumnArrayMutableData dst = create_mutable_data(src.nested_col, is_nullable);
dst.offsets_ptr->reserve(input_rows_count);
// execute
const auto* offset_column_int64 = assert_cast<const ColumnInt64*>(offset_column.get());
slice_array(dst, src, *offset_column_int64, length_column_int64);
slice_array(dst, src, *offset_column, length_column.get());
ColumnPtr res_column = assemble_column_array(dst);
block.replace_by_position(result, std::move(res_column));
return Status::OK();
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/functions/array/function_array_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ MutableColumnPtr assemble_column_array(ColumnArrayMutableData& data) {
}

void slice_array(ColumnArrayMutableData& dst, ColumnArrayExecutionData& src,
const ColumnInt64& offset_column, const ColumnInt64* length_column) {
const IColumn& offset_column, const IColumn* length_column) {
size_t cur = 0;
for (size_t row = 0; row < src.offsets_ptr->size(); ++row) {
size_t off = (*src.offsets_ptr)[row - 1];
size_t len = (*src.offsets_ptr)[row] - off;
Int64 start = offset_column.get_element(row);
Int64 start = offset_column.get_int(row);
if (len == 0 || start == 0) {
dst.offsets_ptr->push_back(cur);
continue;
Expand All @@ -98,7 +98,7 @@ void slice_array(ColumnArrayMutableData& dst, ColumnArrayExecutionData& src,
}
Int64 end;
if (length_column) {
Int64 size = length_column->get_element(row);
Int64 size = length_column->get_int(row);
end = std::max((Int64)off, std::min((Int64)(off + len), start + size));
} else {
end = off + len;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/functions/array/function_array_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ MutableColumnPtr assemble_column_array(ColumnArrayMutableData& data);

// array[offset:length]
void slice_array(ColumnArrayMutableData& dst, ColumnArrayExecutionData& src,
const ColumnInt64& offset_column, const ColumnInt64* length_column);
const IColumn& offset_column, const IColumn* length_column);

using ColumnArrayExecutionDatas = std::vector<ColumnArrayExecutionData>;
} // namespace doris::vectorized
42 changes: 34 additions & 8 deletions be/src/vec/functions/function_date_or_datetime_computation.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,34 @@ struct DateTimeOp {
}
}

// use for (const DateTime, ColumnNumber) -> other_type
static void constant_vector(const FromType1& from, PaddedPODArray<ToType>& vec_to,
NullMap& null_map, const IColumn& delta) {
size_t size = delta.size();
vec_to.resize(size);
null_map.resize_fill(size, false);

for (size_t i = 0; i < size; ++i) {
vec_to[i] = Transform::execute(from, delta.get_int(i),
reinterpret_cast<bool&>(null_map[i]));
}
}
static void constant_vector(const FromType1& from, PaddedPODArray<ToType>& vec_to,
const IColumn& delta) {
size_t size = delta.size();
vec_to.resize(size);
bool invalid = true;

for (size_t i = 0; i < size; ++i) {
vec_to[i] = Transform::execute(from, delta.get_int(i), invalid);

if (UNLIKELY(invalid)) {
throw Exception(ErrorCode::OUT_OF_BOUND, "Operation {} {} {} out of range",
Transform::name, from, delta.get_int(i));
}
}
}

static void constant_vector(const FromType1& from, PaddedPODArray<ToType>& vec_to,
NullMap& null_map, const PaddedPODArray<FromType2>& delta) {
size_t size = delta.size();
Expand Down Expand Up @@ -619,10 +647,9 @@ struct DateTimeAddIntervalImpl {
col_to->get_data(), null_map->get_data(),
delta_vec_column->get_data());
} else {
return Status::RuntimeError(
"Illegal column {} of first argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
Transform::name);
Op::constant_vector(sources_const->template get_value<FromType1>(),
col_to->get_data(), null_map->get_data(),
*not_nullable_column_ptr_arg1);
}
if (const auto* nullable_col = check_and_get_column<ColumnNullable>(
block.get_by_position(arguments[0]).column.get())) {
Expand Down Expand Up @@ -650,10 +677,9 @@ struct DateTimeAddIntervalImpl {
Op::constant_vector(sources_const->template get_value<FromType1>(),
col_to->get_data(), delta_vec_column->get_data());
} else {
return Status::RuntimeError(
"Illegal column {} of first argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
Transform::name);
Op::constant_vector(sources_const->template get_value<FromType1>(),
col_to->get_data(),
*block.get_by_position(arguments[1]).column);
}
block.replace_by_position(result, std::move(col_to));
}
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/sink/vtablet_block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/types.h"
Expand Down Expand Up @@ -533,7 +532,6 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si
} else if (const auto* src_nullable_column =
check_and_get_column<vectorized::ColumnNullable>(src_column_ptr)) {
auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr();
const auto* src_int_64 = assert_cast<const ColumnInt64*>(src_nested_column_ptr.get());
const auto& null_map_data = src_nullable_column->get_null_map_data();
dst_values.reserve(rows);
for (size_t i = 0; i < rows; i++) {
Expand All @@ -547,7 +545,7 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si

for (size_t i = 0; i < rows; i++) {
dst_values.emplace_back((null_map_data[i] != 0) ? _auto_inc_id_allocator.next_id()
: src_int_64->get_element(i));
: src_nested_column_ptr->get_int(i));
}
} else {
return Status::OK();
Expand Down
Loading

0 comments on commit 77fa537

Please sign in to comment.