Skip to content

Commit

Permalink
Merge branch 'master' into neredis-rebalance-disk
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Dec 16, 2024
2 parents 9f127d5 + 830b74c commit 070ccbc
Show file tree
Hide file tree
Showing 729 changed files with 39,063 additions and 3,900 deletions.
1 change: 0 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ github:
- COMPILE (DORIS_COMPILE)
- Need_2_Approval
- Cloud UT (Doris Cloud UT)
- performance (Doris Performance)

required_pull_request_reviews:
dismiss_stale_reviews: true
Expand Down
Binary file added aazcp.tar.gz
Binary file not shown.
1 change: 1 addition & 0 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "agent/workload_group_listener.h"

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/workload_group_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
#include <glog/logging.h>

#include "agent/topic_listener.h"
#include "runtime/exec_env.h"

namespace doris {

class ExecEnv;

class WorkloadGroupListener : public TopicListener {
public:
~WorkloadGroupListener() {}
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
if (!config::enable_sleep_between_delete_cumu_compaction) {
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
}
return st;
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}

for (auto& split : rs_splits) {
RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() &&
st.msg() != "_last_delete_version.first not equal to -1") {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ DEFINE_mInt32(download_low_speed_limit_kbps, "50");
// download low speed time(seconds)
DEFINE_mInt32(download_low_speed_time, "300");
// whether to download small files in batch
DEFINE_mBool(enable_batch_download, "false");
DEFINE_mBool(enable_batch_download, "true");

DEFINE_String(sys_log_dir, "");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
Expand Down Expand Up @@ -1211,7 +1211,7 @@ DEFINE_Bool(exit_on_exception, "false");
DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_String(doris_cgroup_cpu_path, "");

DEFINE_mBool(enable_be_proc_monitor, "false");
DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
Expand Down Expand Up @@ -1402,6 +1402,7 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false");
// Enable validation to check the correctness of table size.
DEFINE_Bool(enable_table_size_correctness_check, "false");
DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false");

// clang-format off
#ifdef BE_TEST
Expand Down
4 changes: 3 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ DECLARE_mInt32(tablet_schema_cache_capacity);
DECLARE_mBool(exit_on_exception);

// cgroup
DECLARE_mString(doris_cgroup_cpu_path);
DECLARE_String(doris_cgroup_cpu_path);
DECLARE_mBool(enable_be_proc_monitor);
DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_Int32(workload_group_metrics_interval_ms);
Expand Down Expand Up @@ -1487,6 +1487,8 @@ DECLARE_Bool(force_regenerate_rowsetid_on_start_error);
DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
// Enable validation to check the correctness of table size.
DECLARE_Bool(enable_table_size_correctness_check);
// Enable sleep 5s between delete cumulative compaction.
DECLARE_mBool(enable_sleep_between_delete_cumu_compaction);

#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ void Daemon::calculate_metrics_thread() {
// update lst map
DorisMetrics::instance()->system_metrics()->get_network_traffic(
&lst_net_send_bytes, &lst_net_receive_bytes);

DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num();
}
update_rowsets_and_segments_num_metrics();
}
Expand Down
24 changes: 12 additions & 12 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t
auto* output_ptr = output;

while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-lz4 decompress, input_len=$0", input_len));
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

//if faild, fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
Expand Down Expand Up @@ -609,15 +609,15 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
auto* output_ptr = output;

while (input_len > 0) {
//if faild , fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

if (input_len < sizeof(uint32_t)) {
return Status::InvalidArgument(strings::Substitute(
"fail to do hadoop-snappy decompress, input_len=$0", input_len));
*more_input_bytes = sizeof(uint32_t) - input_len;
break;
}

//if faild, fall back to large block begin
auto* large_block_input_ptr = input_ptr;
auto* large_block_output_ptr = output_ptr;

uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);

input_ptr += sizeof(uint32_t);
Expand Down
15 changes: 13 additions & 2 deletions be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,19 @@ void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
segment_index_file_path =
tablet->get_segment_index_filepath(rowset_id, segment_index, segment_index_id);
auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
InvertedIndexStorageFormatPB::V1) {
// now CCR not support for variant + index v1
constexpr std::string_view index_suffix = "";
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
std::stoll(segment_index_id), index_suffix);
} else {
DCHECK(segment_index_id == "-1");
segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
}
is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
} catch (const std::exception& e) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ void StreamLoadAction::handle(HttpRequest* req) {
<< ctx->commit_and_publish_txn_cost_nanos / 1000000
<< ", number_total_rows=" << ctx->number_total_rows
<< ", number_loaded_rows=" << ctx->number_loaded_rows
<< ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes;
<< ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes
<< ", error_url=" << ctx->error_url;

// update statistics
streaming_load_requests_total->increment(1);
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void memory_info_handler(std::stringstream* output) {
auto* _opaque = static_cast<std::string*>(opaque);
_opaque->append(buf);
};
jemalloc_stats_print(write_cb, &tmp, "a");
malloc_stats_print(write_cb, &tmp, "a");
boost::replace_all(tmp, "\n", "<br>");
(*output) << tmp;
#else
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct FileCacheProfile {
struct FileCacheProfileReporter {
RuntimeProfile::Counter* num_local_io_total = nullptr;
RuntimeProfile::Counter* num_remote_io_total = nullptr;
RuntimeProfile::Counter* num_inverted_index_remote_io_total = nullptr;
RuntimeProfile::Counter* local_io_timer = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_cache = nullptr;
RuntimeProfile::Counter* bytes_scanned_from_remote = nullptr;
Expand All @@ -90,6 +91,8 @@ struct FileCacheProfileReporter {
cache_profile, 1);
num_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "NumRemoteIOTotal", TUnit::UNIT,
cache_profile, 1);
num_inverted_index_remote_io_total = ADD_CHILD_COUNTER_WITH_LEVEL(
profile, "NumInvertedIndexRemoteIOTotal", TUnit::UNIT, cache_profile, 1);
local_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LocalIOUseTimer", cache_profile, 1);
remote_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "RemoteIOUseTimer", cache_profile, 1);
write_cache_io_timer =
Expand All @@ -107,6 +110,8 @@ struct FileCacheProfileReporter {
void update(const FileCacheStatistics* statistics) const {
COUNTER_UPDATE(num_local_io_total, statistics->num_local_io_total);
COUNTER_UPDATE(num_remote_io_total, statistics->num_remote_io_total);
COUNTER_UPDATE(num_inverted_index_remote_io_total,
statistics->num_inverted_index_remote_io_total);
COUNTER_UPDATE(local_io_timer, statistics->local_io_timer);
COUNTER_UPDATE(remote_io_timer, statistics->remote_io_timer);
COUNTER_UPDATE(write_cache_io_timer, statistics->write_cache_io_timer);
Expand Down
8 changes: 6 additions & 2 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
_update_state(stats, io_ctx->file_cache_stats);
_update_state(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index);
io::FileCacheProfile::instance().update(io_ctx->file_cache_stats);
}
};
Expand Down Expand Up @@ -312,14 +312,18 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}

void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
FileCacheStatistics* statis) const {
FileCacheStatistics* statis,
bool is_inverted_index) const {
if (statis == nullptr) {
return;
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
if (is_inverted_index) {
statis->num_inverted_index_remote_io_total++;
}
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class CachedRemoteFileReader final : public FileReader {
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state) const;
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
};

} // namespace doris::io
10 changes: 5 additions & 5 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
using namespace Aws::Http;
switch (err.GetResponseCode()) {
case HttpResponseCode::NOT_FOUND:
return Status::Error<NOT_FOUND, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
return Status::Error<NOT_FOUND, false>("{}: {} {} code=NOT_FOUND, type={}, request_id={}",
msg, err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
case HttpResponseCode::FORBIDDEN:
return Status::Error<PERMISSION_DENIED, false>("{}: {} {} type={}, request_id={}", msg,
err.GetExceptionName(), err.GetMessage(),
err.GetErrorType(), err.GetRequestId());
return Status::Error<PERMISSION_DENIED, false>(
"{}: {} {} code=FORBIDDEN, type={}, request_id={}", msg, err.GetExceptionName(),
err.GetMessage(), err.GetErrorType(), err.GetRequestId());
default:
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"{}: {} {} code={} type={}, request_id={}", msg, err.GetExceptionName(),
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace io {
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
int64_t num_inverted_index_remote_io_total = 0;
int64_t local_io_timer = 0;
int64_t bytes_read_from_local = 0;
int64_t bytes_read_from_remote = 0;
Expand All @@ -60,6 +61,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
bool is_inverted_index = false;
};

} // namespace io
Expand Down
Loading

0 comments on commit 070ccbc

Please sign in to comment.