Skip to content

Commit

Permalink
Merge branch 'master' into 20241125_fix_cgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Nov 25, 2024
2 parents defc14c + 9867ba3 commit c1b1509
Show file tree
Hide file tree
Showing 46 changed files with 1,430 additions and 392 deletions.
15 changes: 7 additions & 8 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ class MetaServiceProxy {
long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
config::meta_service_connection_age_base_minutes > 0) {
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
config::meta_service_connection_age_base_minutes,
config::meta_service_connection_age_base_minutes * 2);
deadline = now + duration_cast<milliseconds>(minutes(uni(rng))).count();
config::meta_service_connection_age_base_seconds,
config::meta_service_connection_age_base_seconds * 2);
deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
} else {
deadline = LONG_MAX;
}
Expand Down Expand Up @@ -610,8 +610,9 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id()
<< ", txn_id=" << txn_id << ", status=" << status;
LOG_EVERY_N(INFO, 20)
<< "delete bitmap not found in cache, will sync rowset to get. tablet_id= "
<< tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
Expand All @@ -630,8 +631,6 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
<< tablet->tablet_id();
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ DEFINE_Bool(meta_service_use_load_balancer, "false");
DEFINE_mInt32(meta_service_rpc_timeout_ms, "10000");
DEFINE_Bool(meta_service_connection_pooled, "true");
DEFINE_mInt64(meta_service_connection_pool_size, "20");
DEFINE_mInt32(meta_service_connection_age_base_minutes, "5");
DEFINE_mInt32(meta_service_connection_age_base_seconds, "30");
DEFINE_mInt32(meta_service_idle_connection_timeout_ms, "0");
DEFINE_mInt32(meta_service_rpc_retry_times, "200");
DEFINE_mInt32(meta_service_brpc_timeout_ms, "10000");
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static inline bool is_cloud_mode() {
// If meta services are deployed behind a load balancer, set this config to "host:port" of the load balancer.
// Here is a set of configs to configure the connection behaviors:
// - meta_service_connection_pooled: distribute the long connections to different RS of the VIP.
// - meta_service_connection_age_base_minutes: expire the connection after a random time during [base, 2*base],
// - meta_service_connection_age_base_seconds: expire the connection after a random time during [base, 2*base],
// so that the BE has a chance to connect to a new RS. (When you add a new RS, the BE will connect to it)
// - meta_service_idle_connection_timeout_ms: rebuild the idle connections after the timeout exceeds. Some LB
// vendors will reset the connection if it is idle for a long time.
Expand All @@ -50,7 +50,7 @@ DECLARE_mInt64(meta_service_connection_pool_size);
// has a chance to connect to a new RS. Set zero to disable it.
//
// Only works when meta_service_endpoint is set to a single host.
DECLARE_mInt32(meta_service_connection_age_base_minutes);
DECLARE_mInt32(meta_service_connection_age_base_seconds);
// Rebuild the idle connections after the timeout exceeds. Set zero to disable it.
//
// Only works when meta_service_endpoint is set to a single host.
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,6 @@ DEFINE_Int32(doris_remote_scanner_thread_pool_thread_num, "48");
// number of s3 scanner thread pool queue size
DEFINE_Int32(doris_remote_scanner_thread_pool_queue_size, "102400");
DEFINE_mInt64(block_cache_wait_timeout_ms, "1000");
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");

// limit the queue of pending batches which will be sent by a single nodechannel
DEFINE_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
Expand Down Expand Up @@ -1054,6 +1052,9 @@ DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB
DEFINE_mInt64(cache_lock_long_tail_threshold, "1000");
DEFINE_Int64(file_cache_recycle_keys_size, "1000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
11 changes: 9 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,6 @@ DECLARE_mInt64(nodechannel_pending_queue_max_bytes);
// The batch size for sending data by brpc streaming client
DECLARE_mInt64(brpc_streaming_client_batch_bytes);
DECLARE_mInt64(block_cache_wait_timeout_ms);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);

DECLARE_Bool(enable_brpc_builtin_services);

Expand Down Expand Up @@ -1095,6 +1093,15 @@ DECLARE_Bool(enable_ttl_cache_evict_using_lru);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);
DECLARE_mInt64(cache_lock_long_tail_threshold);
DECLARE_Int64(file_cache_recycle_keys_size);
// Base compaction may retrieve and produce some less frequently accessed data,
// potentially affecting the file cache hit rate.
// This configuration determines whether to retain the output within the file cache.
// Make your choice based on the following considerations:
// If your file cache is ample enough to accommodate all the data in your database,
// enable this option; otherwise, it is recommended to leave it disabled.
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1190,8 +1190,12 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
ctx.compaction_level = _engine.cumu_compaction_policy(compaction_policy)
->new_compaction_level(_input_rowsets);
}

ctx.write_file_cache = compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION;
// We presume that the data involved in cumulative compaction is sufficiently 'hot'
// and should always be retained in the cache.
// TODO(gavin): Ensure that the retention of hot data is implemented with precision.
ctx.write_file_cache = (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) ||
(config::enable_file_cache_keep_base_compaction_output &&
compaction_type() == ReaderType::READER_BASE_COMPACTION);
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical));
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
Expand Down
103 changes: 103 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <CLucene.h> // IWYU pragma: keep

#include <memory>

#include "common/logging.h"

namespace lucene::store {
class Directory;
} // namespace lucene::store

namespace doris::segment_v2 {

struct DirectoryDeleter {
void operator()(lucene::store::Directory* ptr) const { _CLDECDELETE(ptr); }
};

struct ErrorContext {
std::string err_msg;
std::exception_ptr eptr;
};

template <typename T>
concept HasClose = requires(T t) {
{ t->close() };
};

template <typename PtrType>
requires HasClose<PtrType>
void finally_close(PtrType& resource, ErrorContext& error_context) {
if (resource) {
try {
resource->close();
} catch (CLuceneError& err) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource: ");
error_context.err_msg.append(err.what());
LOG(ERROR) << error_context.err_msg;
} catch (...) {
error_context.eptr = std::current_exception();
error_context.err_msg.append("Error occurred while closing resource");
LOG(ERROR) << error_context.err_msg;
}
}
}

#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-macros"
#endif

#define FINALLY_CLOSE(resource) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_close(resource, error_context); \
}

// Return ERROR after finally
#define FINALLY(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(error_context.err_msg); \
} \
}

// Re-throw the exception after finally
#define FINALLY_EXCEPTION(finally_block) \
{ \
static_assert(sizeof(error_context) > 0, \
"error_context must be defined before using FINALLY macro!"); \
finally_block; \
if (error_context.eptr) { \
std::rethrow_exception(error_context.eptr); \
} \
}

#if defined(__clang__)
#pragma clang diagnostic pop
#endif

} // namespace doris::segment_v2
Loading

0 comments on commit c1b1509

Please sign in to comment.