Skip to content

Commit

Permalink
Merge branch 'master' into optimize_output
Browse files Browse the repository at this point in the history
  • Loading branch information
yagagagaga authored Dec 26, 2024
2 parents 2d7dda1 + f4bff84 commit 9c1142b
Show file tree
Hide file tree
Showing 348 changed files with 12,998 additions and 4,664 deletions.
17 changes: 7 additions & 10 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MetaServiceProxy {
}

private:
static bool is_meta_service_endpoint_list() {
return config::meta_service_endpoint.find(',') != std::string::npos;
}

static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
Expand All @@ -154,9 +158,6 @@ class MetaServiceProxy {
if (config::meta_service_connection_pooled) {
num_proxies = config::meta_service_connection_pool_size;
}
if (config::meta_service_endpoint.find(',') != std::string::npos) {
is_meta_service_endpoint_list = true;
}
proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
});

Expand All @@ -175,7 +176,7 @@ class MetaServiceProxy {

const char* load_balancer_name = nullptr;
std::string endpoint;
if (is_meta_service_endpoint_list) {
if (is_meta_service_endpoint_list()) {
endpoint = fmt::format("list://{}", config::meta_service_endpoint);
load_balancer_name = "random";
} else {
Expand Down Expand Up @@ -215,7 +216,7 @@ class MetaServiceProxy {
bool is_idle_timeout(long now) {
auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
// idle timeout only works without list endpoint.
return !is_meta_service_endpoint_list && idle_timeout_ms > 0 &&
return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
_last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
}

Expand Down Expand Up @@ -243,7 +244,7 @@ class MetaServiceProxy {

long deadline = now;
// connection age only works without list endpoint.
if (!is_meta_service_endpoint_list &&
if (!is_meta_service_endpoint_list() &&
config::meta_service_connection_age_base_seconds > 0) {
std::default_random_engine rng(static_cast<uint32_t>(now));
std::uniform_int_distribution<> uni(
Expand All @@ -262,16 +263,12 @@ class MetaServiceProxy {
return Status::OK();
}

static std::atomic_bool is_meta_service_endpoint_list;

std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
};

std::atomic_bool MetaServiceProxy::is_meta_service_endpoint_list = false;

template <typename T, typename... Ts>
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};

Expand Down
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
return Status::InternalError<false>("injected update_tmp_rowset error.");
});
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}
Expand Down Expand Up @@ -870,9 +873,6 @@ Status CloudTablet::sync_meta() {
}
return st;
}
if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {

for (auto& weak_tablet : weak_tablets) {
if (auto tablet = weak_tablet.lock()) {
if (tablet->tablet_state() != TABLET_RUNNING) {
continue;
}
int64_t last_sync_time = tablet->last_sync_time_s;
if (last_sync_time <= last_sync_time_bound) {
sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
Expand Down
9 changes: 8 additions & 1 deletion be/src/exec/schema_scanner/schema_views_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ Status SchemaViewsScanner::_fill_block_impl(vectorized::Block* block) {
std::vector<void*> datas(tables_num);

// catalog
{ RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, null_datas)); }
{
std::string catalog_name = _db_result.catalogs[_db_index - 1];
StringRef str = StringRef(catalog_name.c_str(), catalog_name.size());
for (int i = 0; i < tables_num; ++i) {
datas[i] = &str;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 0, datas));
}
// schema
{
std::string db_name = SchemaHelper::extract_db_name(_db_result.dbs[_db_index - 1]);
Expand Down
20 changes: 17 additions & 3 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/time.h"
#include "vec/common/sip_hash.h"
#include "vec/common/uint128.h"
Expand Down Expand Up @@ -770,20 +772,32 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o
CacheContext& context) {
FileBlock::Range range(offset, offset + size - 1);

SCOPED_CACHE_LOCK(_mutex);
ReadStatistics* stats = context.stats;
DCHECK(stats != nullptr);
MonotonicStopWatch sw;
sw.start();
std::lock_guard cache_lock(_mutex);
stats->lock_wait_timer += sw.elapsed_time();

if (auto iter = _key_to_time.find(hash);
context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) {
context.cache_type = FileCacheType::TTL;
context.expiration_time = iter->second;
}

/// Get all blocks which intersect with the given range.
auto file_blocks = get_impl(hash, context, range, cache_lock);
FileBlocks file_blocks;
{
SCOPED_RAW_TIMER(&stats->get_timer);
file_blocks = get_impl(hash, context, range, cache_lock);
}

if (file_blocks.empty()) {
SCOPED_RAW_TIMER(&stats->set_timer);
file_blocks = split_range_into_cells(hash, context, offset, size, FileBlock::State::EMPTY,
cache_lock);
} else {
SCOPED_RAW_TIMER(&stats->set_timer);
fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
}
DCHECK(!file_blocks.empty());
Expand Down Expand Up @@ -996,7 +1010,6 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext&
if (!_async_open_done) {
return try_reserve_during_async_load(size, cache_lock);
}

// use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
// directly eliminate 5 times the size of the space
if (_disk_resource_limit_mode) {
Expand Down Expand Up @@ -1055,6 +1068,7 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext&

if (cell->releasable()) {
auto& file_block = cell->file_block;

std::lock_guard block_lock(file_block->_mutex);
DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
to_evict.push_back(cell);
Expand Down
17 changes: 17 additions & 0 deletions be/src/io/cache/block_file_cache_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ struct FileCacheProfileReporter {
RuntimeProfile::Counter* write_cache_io_timer = nullptr;
RuntimeProfile::Counter* bytes_write_into_cache = nullptr;
RuntimeProfile::Counter* num_skip_cache_io_total = nullptr;
RuntimeProfile::Counter* read_cache_file_directly_timer = nullptr;
RuntimeProfile::Counter* cache_get_or_set_timer = nullptr;
RuntimeProfile::Counter* lock_wait_timer = nullptr;
RuntimeProfile::Counter* get_timer = nullptr;
RuntimeProfile::Counter* set_timer = nullptr;

FileCacheProfileReporter(RuntimeProfile* profile) {
static const char* cache_profile = "FileCache";
Expand All @@ -105,6 +110,13 @@ struct FileCacheProfileReporter {
TUnit::BYTES, cache_profile, 1);
bytes_scanned_from_remote = ADD_CHILD_COUNTER_WITH_LEVEL(profile, "BytesScannedFromRemote",
TUnit::BYTES, cache_profile, 1);
read_cache_file_directly_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "ReadCacheFileDirectlyTimer", cache_profile, 1);
cache_get_or_set_timer =
ADD_CHILD_TIMER_WITH_LEVEL(profile, "CacheGetOrSetTimer", cache_profile, 1);
lock_wait_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "LockWaitTimer", cache_profile, 1);
get_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "GetTimer", cache_profile, 1);
set_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile, "SetTimer", cache_profile, 1);
}

void update(const FileCacheStatistics* statistics) const {
Expand All @@ -119,6 +131,11 @@ struct FileCacheProfileReporter {
COUNTER_UPDATE(num_skip_cache_io_total, statistics->num_skip_cache_io_total);
COUNTER_UPDATE(bytes_scanned_from_cache, statistics->bytes_read_from_local);
COUNTER_UPDATE(bytes_scanned_from_remote, statistics->bytes_read_from_remote);
COUNTER_UPDATE(read_cache_file_directly_timer, statistics->read_cache_file_directly_timer);
COUNTER_UPDATE(cache_get_or_set_timer, statistics->cache_get_or_set_timer);
COUNTER_UPDATE(lock_wait_timer, statistics->lock_wait_timer);
COUNTER_UPDATE(get_timer, statistics->get_timer);
COUNTER_UPDATE(set_timer, statistics->set_timer);
}
};

Expand Down
11 changes: 11 additions & 0 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
stats.bytes_read += bytes_req;
if (config::enable_read_cache_file_directly) {
// read directly
SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
size_t need_read_size = bytes_req;
std::shared_lock lock(_mtx);
if (!_cache_file_readers.empty()) {
Expand Down Expand Up @@ -174,8 +175,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
// read from cache or remote
auto [align_left, align_size] = s_align_size(offset, bytes_req, size());
CacheContext cache_context(io_ctx);
cache_context.stats = &stats;
MonotonicStopWatch sw;
sw.start();
FileBlocksHolder holder =
_cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
stats.cache_get_or_set_timer += sw.elapsed_time();
std::vector<FileBlockSPtr> empty_blocks;
for (auto& block : holder.file_blocks) {
switch (block->state()) {
Expand Down Expand Up @@ -333,6 +338,12 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
statis->write_cache_io_timer += read_stats.local_write_timer;

statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer;
statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer;
statis->lock_wait_timer += read_stats.lock_wait_timer;
statis->get_timer += read_stats.get_timer;
statis->set_timer += read_stats.set_timer;

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}
Expand Down
9 changes: 0 additions & 9 deletions be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@ class CachedRemoteFileReader final : public FileReader {
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;

struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
};
void _update_state(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;
};
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offs
ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : FileCacheType::TTL;
ctx.expiration_time = _expiration_time;
ctx.is_cold_data = _is_cold_data;
ReadStatistics stats;
ctx.stats = &stats;
auto holder = _cache->get_or_set(_cache_hash, offset, size, ctx);
return std::make_unique<FileBlocksHolder>(std::move(holder));
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ struct UInt128Wrapper {
bool operator==(const UInt128Wrapper& other) const { return value_ == other.value_; }
};

struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
int64_t local_write_timer = 0;
int64_t read_cache_file_directly_timer = 0;
int64_t cache_get_or_set_timer = 0;
int64_t lock_wait_timer = 0;
int64_t get_timer = 0;
int64_t set_timer = 0;
};

class BlockFileCache;
struct FileBlocksHolder;
using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
Expand Down Expand Up @@ -134,6 +149,7 @@ struct CacheContext {
FileCacheType cache_type;
int64_t expiration_time {0};
bool is_cold_data {false};
ReadStatistics* stats;
};

} // namespace doris::io
5 changes: 5 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ struct FileCacheStatistics {
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t num_skip_cache_io_total = 0;
int64_t read_cache_file_directly_timer = 0;
int64_t cache_get_or_set_timer = 0;
int64_t lock_wait_timer = 0;
int64_t get_timer = 0;
int64_t set_timer = 0;
};

struct IOContext {
Expand Down
Loading

0 comments on commit 9c1142b

Please sign in to comment.