Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
2

1

6

7

8

audit

audit

add ut

2

3

1

tmp fix
  • Loading branch information
morningman authored and zhiqiang-hhhh committed Nov 23, 2024
1 parent 362efda commit 03b0f2f
Show file tree
Hide file tree
Showing 39 changed files with 348 additions and 114 deletions.
14 changes: 6 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,28 +256,26 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
io::IOContext io_ctx;
io_ctx.expiration_time = expiration_time;
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(
*rowset_meta, seg_id),
.file_size = rs->rowset_meta()->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx,
.download_done {},
});

io::IOContext io_ctx2;
io_ctx2.expiration_time = expiration_time;
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx2,
.download_done {},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
Expand Down
14 changes: 6 additions & 8 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,15 @@ void CloudWarmUpManager::handle_jobs() {
}

wait->add_count();
io::IOContext io_ctx;
io_ctx.expiration_time = expiration_time;
_engine.file_cache_block_downloader().submit_download_task(
io::DownloadFileMeta {
.path = storage_resource.value()->remote_segment_path(*rs,
seg_id),
.file_size = rs->segment_file_size(seg_id),
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx,
.download_done =
[wait](Status st) {
if (!st) {
Expand All @@ -125,15 +124,14 @@ void CloudWarmUpManager::handle_jobs() {
},
});

io::IOContext io_ctx2;
io_ctx2.expiration_time = expiration_time;
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
.path = idx_path,
.file_size = -1,
.file_system = storage_resource.value()->fs,
.ctx =
{
.expiration_time = expiration_time,
},
.ctx = io_ctx2,
.download_done =
[wait](Status st) {
if (!st) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down Expand Up @@ -93,4 +95,4 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc
return Status::OK();
}

} // namespace doris
} // namespace doris
9 changes: 4 additions & 5 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ void FileCacheBlockDownloader::download_file_cache_block(
}
};

IOContext io_ctx;
io_ctx.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX;
io_ctx.expiration_time = meta.expiration_time();
DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(*find_it->second,
meta.segment_id()),
Expand All @@ -186,11 +189,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.offset = meta.offset(),
.download_size = meta.size(),
.file_system = storage_resource.value()->fs,
.ctx =
{
.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
},
.ctx = io_ctx,
.download_done = std::move(download_done),
};
download_segment_file(download_meta);
Expand Down
12 changes: 5 additions & 7 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
*bytes_read = 0;
return Status::OK();
}

ReadStatistics stats;
auto defer_func = [&](int*) {
if (io_ctx->file_cache_stats) {
Expand All @@ -131,7 +132,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
}
};
std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));
stats.bytes_read += bytes_req;

if (config::enable_read_cache_file_directly) {
// read directly
size_t need_read_size = bytes_req;
Expand All @@ -155,7 +156,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
SCOPED_RAW_TIMER(&stats.local_read_timer);
if (!iter->second
->read(Slice(result.data + (cur_offset - offset), reserve_bytes),
file_offset)
file_offset, io_ctx)
.ok()) {
break;
}
Expand Down Expand Up @@ -289,7 +290,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
size_t file_offset = current_offset - left;
SCOPED_RAW_TIMER(&stats.local_read_timer);
st = block->read(Slice(result.data + (current_offset - offset), read_size),
file_offset);
file_offset, io_ctx);
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
LOG(WARNING) << "Read data failed from file cache downloaded by others. err="
Expand All @@ -300,7 +301,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
SCOPED_RAW_TIMER(&stats.remote_read_timer);
RETURN_IF_ERROR(_remote_file_reader->read_at(
current_offset, Slice(result.data + (current_offset - offset), read_size),
&bytes_read));
&bytes_read, io_ctx));
DCHECK(bytes_read == read_size);
}
}
Expand All @@ -318,10 +319,8 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
statis->remote_io_timer += read_stats.remote_read_timer;
statis->local_io_timer += read_stats.local_read_timer;
Expand All @@ -330,7 +329,6 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
statis->write_cache_io_timer += read_stats.local_write_timer;

g_skip_cache_num << read_stats.skip_cache;
g_skip_cache_sum << read_stats.skip_cache;
}

} // namespace doris::io
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ class CachedRemoteFileReader final : public FileReader {
std::shared_mutex _mtx;
std::map<size_t, FileBlockSPtr> _cache_file_readers;

// Used to record read/write timer and cache related metrics.
// These metrics will finally be saved in FileCacheStatistics.
struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ Status FileBlock::finalize() {
return st;
}

Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
Status FileBlock::read(Slice buffer, size_t read_offset, const IOContext* io_ctx) {
return _mgr->_storage->read(_key, read_offset, buffer, io_ctx);
}

Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_type) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class FileBlock {
[[nodiscard]] Status append(Slice data);

// read data from cache file
[[nodiscard]] Status read(Slice buffer, size_t read_offset);
[[nodiscard]] Status read(Slice buffer, size_t read_offset, const IOContext* io_ctx);

// finish write, release the file writer
[[nodiscard]] Status finalize();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class FileCacheStorage {
// finalize the block
virtual Status finalize(const FileCacheKey& key) = 0;
// read the block
virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result) = 0;
virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result,
const IOContext* io_ctx) = 0;
// remove the block
virtual Status remove(const FileCacheKey& key) = 0;
// change the block meta
Expand Down
5 changes: 3 additions & 2 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ Status FSFileCacheStorage::finalize(const FileCacheKey& key) {
return fs->rename(file_writer->path(), true_file);
}

Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer) {
Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer,
const IOContext* io_ctx) {
AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset);
FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key);
if (!file_reader) {
Expand Down Expand Up @@ -184,7 +185,7 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl
FDCache::instance()->insert_file_reader(fd_key, file_reader);
}
size_t bytes_read = 0;
auto s = file_reader->read_at(value_offset, buffer, &bytes_read);
auto s = file_reader->read_at(value_offset, buffer, &bytes_read, io_ctx);
if (!s.ok()) {
LOG(WARNING) << "read file failed, file=" << file_reader->path()
<< ", error=" << s.to_string();
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class FSFileCacheStorage : public FileCacheStorage {
Status init(BlockFileCache* _mgr) override;
Status append(const FileCacheKey& key, const Slice& value) override;
Status finalize(const FileCacheKey& key) override;
Status read(const FileCacheKey& key, size_t value_offset, Slice buffer) override;
Status read(const FileCacheKey& key, size_t value_offset, Slice buffer,
const IOContext* io_ctx) override;
Status remove(const FileCacheKey& key) override;
Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) override;
Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t expiration) override;
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/fs/broker_file_system.h"
#include "io/io_common.h"
#include "util/doris_metrics.h"

namespace doris::io {
Expand Down Expand Up @@ -92,7 +93,7 @@ Status BrokerFileReader::close() {
}

Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
if (closed()) [[unlikely]] {
return Status::InternalError("read closed file: ", _path.native());
}
Expand Down Expand Up @@ -145,6 +146,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes

*bytes_read = response.data.size();
memcpy(to, response.data.data(), *bytes_read);
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

Expand Down
Loading

0 comments on commit 03b0f2f

Please sign in to comment.