From 03b0f2f67138a4dc647d0ca02c6c12674ac4175e Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 6 Sep 2024 23:43:02 +0800 Subject: [PATCH] 1 2 1 6 7 8 audit audit add ut 2 3 1 tmp fix --- be/src/cloud/cloud_tablet.cpp | 14 ++-- be/src/cloud/cloud_warm_up_manager.cpp | 14 ++-- .../schema_backend_active_tasks.cpp | 4 +- .../io/cache/block_file_cache_downloader.cpp | 9 +-- be/src/io/cache/cached_remote_file_reader.cpp | 12 ++- be/src/io/cache/cached_remote_file_reader.h | 3 +- be/src/io/cache/file_block.cpp | 4 +- be/src/io/cache/file_block.h | 2 +- be/src/io/cache/file_cache_storage.h | 3 +- be/src/io/cache/fs_file_cache_storage.cpp | 5 +- be/src/io/cache/fs_file_cache_storage.h | 3 +- be/src/io/fs/broker_file_reader.cpp | 6 +- be/src/io/fs/buffered_reader.cpp | 71 +++++++++++++----- be/src/io/fs/buffered_reader.h | 29 ++++---- be/src/io/fs/file_reader.cpp | 2 +- be/src/io/fs/hdfs_file_reader.cpp | 5 +- be/src/io/fs/local_file_reader.cpp | 5 +- be/src/io/fs/s3_file_reader.cpp | 5 +- be/src/io/io_common.h | 73 +++++++++++++++++++ be/src/olap/base_tablet.cpp | 6 +- .../segment_v2/indexed_column_reader.cpp | 6 +- .../rowset/segment_v2/ordinal_page_index.cpp | 4 +- be/src/olap/rowset/segment_v2/segment.cpp | 11 ++- be/src/runtime/query_statistics.h | 2 + .../runtime/runtime_query_statistics_mgr.cpp | 35 ++++++--- be/src/vec/exec/format/generic_reader.h | 1 + be/src/vec/exec/format/orc/vorc_reader.h | 1 - be/src/vec/exec/scan/new_olap_scanner.cpp | 18 +++-- be/src/vec/exec/scan/new_olap_scanner.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 15 ++++ be/src/vec/exec/scan/vfile_scanner.h | 2 + be/src/vec/exec/scan/vscanner.cpp | 16 ++-- be/src/vec/exec/scan/vscanner.h | 12 ++- .../apache/doris/catalog/InternalSchema.java | 11 +++ .../org/apache/doris/catalog/SchemaTable.java | 2 + .../doris/plugin/audit/AuditLoader.java | 6 +- .../external_table_p0/hive/test_hive_orc.out | 6 ++ .../hive/test_hive_orc.groovy | 31 +++++++- .../test_backend_active_tasks.groovy | 6 +- 39 files changed, 348 insertions(+), 114 deletions(-) diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index c88b073e96494a..b6762cd637f5c6 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -256,28 +256,26 @@ void CloudTablet::add_rowsets(std::vector to_add, bool version_ ? 0 : rowset_meta->newest_write_timestamp() + _tablet_meta->ttl_seconds(); + io::IOContext io_ctx; + io_ctx.expiration_time = expiration_time; _engine.file_cache_block_downloader().submit_download_task( io::DownloadFileMeta { .path = storage_resource.value()->remote_segment_path( *rowset_meta, seg_id), .file_size = rs->rowset_meta()->segment_file_size(seg_id), .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx, .download_done {}, }); + io::IOContext io_ctx2; + io_ctx2.expiration_time = expiration_time; auto download_idx_file = [&](const io::Path& idx_path) { io::DownloadFileMeta meta { .path = idx_path, .file_size = -1, .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx2, .download_done {}, }; _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 06d6df11dc4cc3..19a1aeeabd60cc 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -106,16 +106,15 @@ void CloudWarmUpManager::handle_jobs() { } wait->add_count(); + io::IOContext io_ctx; + io_ctx.expiration_time = expiration_time; _engine.file_cache_block_downloader().submit_download_task( io::DownloadFileMeta { .path = storage_resource.value()->remote_segment_path(*rs, seg_id), .file_size = rs->segment_file_size(seg_id), .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx, .download_done = [wait](Status st) { if (!st) { @@ -125,15 +124,14 @@ void CloudWarmUpManager::handle_jobs() { }, }); + io::IOContext io_ctx2; + io_ctx2.expiration_time = expiration_time; auto download_idx_file = [&](const io::Path& idx_path) { io::DownloadFileMeta meta { .path = idx_path, .file_size = -1, .file_system = storage_resource.value()->fs, - .ctx = - { - .expiration_time = expiration_time, - }, + .ctx = io_ctx2, .download_done = [wait](Status st) { if (!st) { diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index 74e95f4203217c..a67b2600d2342a 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -34,6 +34,8 @@ std::vector SchemaBackendActiveTasksScanner::_s_tbls_ {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, @@ -93,4 +95,4 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/io/cache/block_file_cache_downloader.cpp b/be/src/io/cache/block_file_cache_downloader.cpp index 026f7e2a01741d..90909d8965ccb6 100644 --- a/be/src/io/cache/block_file_cache_downloader.cpp +++ b/be/src/io/cache/block_file_cache_downloader.cpp @@ -178,6 +178,9 @@ void FileCacheBlockDownloader::download_file_cache_block( } }; + IOContext io_ctx; + io_ctx.is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX; + io_ctx.expiration_time = meta.expiration_time(); DownloadFileMeta download_meta { .path = storage_resource.value()->remote_segment_path(*find_it->second, meta.segment_id()), @@ -186,11 +189,7 @@ void FileCacheBlockDownloader::download_file_cache_block( .offset = meta.offset(), .download_size = meta.size(), .file_system = storage_resource.value()->fs, - .ctx = - { - .is_index_data = meta.cache_type() == ::doris::FileCacheType::INDEX, - .expiration_time = meta.expiration_time(), - }, + .ctx = io_ctx, .download_done = std::move(download_done), }; download_segment_file(download_meta); diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index c9a273c5d368a6..736735b7a2b387 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -123,6 +123,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* *bytes_read = 0; return Status::OK(); } + ReadStatistics stats; auto defer_func = [&](int*) { if (io_ctx->file_cache_stats) { @@ -131,7 +132,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* } }; std::unique_ptr defer((int*)0x01, std::move(defer_func)); - stats.bytes_read += bytes_req; + if (config::enable_read_cache_file_directly) { // read directly size_t need_read_size = bytes_req; @@ -155,7 +156,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* SCOPED_RAW_TIMER(&stats.local_read_timer); if (!iter->second ->read(Slice(result.data + (cur_offset - offset), reserve_bytes), - file_offset) + file_offset, io_ctx) .ok()) { break; } @@ -289,7 +290,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* size_t file_offset = current_offset - left; SCOPED_RAW_TIMER(&stats.local_read_timer); st = block->read(Slice(result.data + (current_offset - offset), read_size), - file_offset); + file_offset, io_ctx); } if (!st || block_state != FileBlock::State::DOWNLOADED) { LOG(WARNING) << "Read data failed from file cache downloaded by others. err=" @@ -300,7 +301,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* SCOPED_RAW_TIMER(&stats.remote_read_timer); RETURN_IF_ERROR(_remote_file_reader->read_at( current_offset, Slice(result.data + (current_offset - offset), read_size), - &bytes_read)); + &bytes_read, io_ctx)); DCHECK(bytes_read == read_size); } } @@ -318,10 +319,8 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, } if (read_stats.hit_cache) { statis->num_local_io_total++; - statis->bytes_read_from_local += read_stats.bytes_read; } else { statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; } statis->remote_io_timer += read_stats.remote_read_timer; statis->local_io_timer += read_stats.local_read_timer; @@ -330,7 +329,6 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, statis->write_cache_io_timer += read_stats.local_write_timer; g_skip_cache_num << read_stats.skip_cache; - g_skip_cache_sum << read_stats.skip_cache; } } // namespace doris::io diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index b3efb83c0803c8..1f1d5006dea3b4 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -67,10 +67,11 @@ class CachedRemoteFileReader final : public FileReader { std::shared_mutex _mtx; std::map _cache_file_readers; + // Used to record read/write timer and cache related metrics. + // These metrics will finally be saved in FileCacheStatistics. struct ReadStatistics { bool hit_cache = true; bool skip_cache = false; - int64_t bytes_read = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; int64_t local_read_timer = 0; diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 44cad5520ead06..b6fe6c2dc74bee 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -157,8 +157,8 @@ Status FileBlock::finalize() { return st; } -Status FileBlock::read(Slice buffer, size_t read_offset) { - return _mgr->_storage->read(_key, read_offset, buffer); +Status FileBlock::read(Slice buffer, size_t read_offset, const IOContext* io_ctx) { + return _mgr->_storage->read(_key, read_offset, buffer, io_ctx); } Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_type) { diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index 3a4490d67a3f9d..3f3da230a159c7 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -95,7 +95,7 @@ class FileBlock { [[nodiscard]] Status append(Slice data); // read data from cache file - [[nodiscard]] Status read(Slice buffer, size_t read_offset); + [[nodiscard]] Status read(Slice buffer, size_t read_offset, const IOContext* io_ctx); // finish write, release the file writer [[nodiscard]] Status finalize(); diff --git a/be/src/io/cache/file_cache_storage.h b/be/src/io/cache/file_cache_storage.h index 024e701c6fa08b..c7a7f70546edd3 100644 --- a/be/src/io/cache/file_cache_storage.h +++ b/be/src/io/cache/file_cache_storage.h @@ -52,7 +52,8 @@ class FileCacheStorage { // finalize the block virtual Status finalize(const FileCacheKey& key) = 0; // read the block - virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result) = 0; + virtual Status read(const FileCacheKey& key, size_t value_offset, Slice result, + const IOContext* io_ctx) = 0; // remove the block virtual Status remove(const FileCacheKey& key) = 0; // change the block meta diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index cf1cd41a537abc..52afb7abf1742c 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -152,7 +152,8 @@ Status FSFileCacheStorage::finalize(const FileCacheKey& key) { return fs->rename(file_writer->path(), true_file); } -Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer) { +Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Slice buffer, + const IOContext* io_ctx) { AccessKeyAndOffset fd_key = std::make_pair(key.hash, key.offset); FileReaderSPtr file_reader = FDCache::instance()->get_file_reader(fd_key); if (!file_reader) { @@ -184,7 +185,7 @@ Status FSFileCacheStorage::read(const FileCacheKey& key, size_t value_offset, Sl FDCache::instance()->insert_file_reader(fd_key, file_reader); } size_t bytes_read = 0; - auto s = file_reader->read_at(value_offset, buffer, &bytes_read); + auto s = file_reader->read_at(value_offset, buffer, &bytes_read, io_ctx); if (!s.ok()) { LOG(WARNING) << "read file failed, file=" << file_reader->path() << ", error=" << s.to_string(); diff --git a/be/src/io/cache/fs_file_cache_storage.h b/be/src/io/cache/fs_file_cache_storage.h index 8a97aa109ad741..c912a788bad8b4 100644 --- a/be/src/io/cache/fs_file_cache_storage.h +++ b/be/src/io/cache/fs_file_cache_storage.h @@ -63,7 +63,8 @@ class FSFileCacheStorage : public FileCacheStorage { Status init(BlockFileCache* _mgr) override; Status append(const FileCacheKey& key, const Slice& value) override; Status finalize(const FileCacheKey& key) override; - Status read(const FileCacheKey& key, size_t value_offset, Slice buffer) override; + Status read(const FileCacheKey& key, size_t value_offset, Slice buffer, + const IOContext* io_ctx) override; Status remove(const FileCacheKey& key) override; Status change_key_meta_type(const FileCacheKey& key, const FileCacheType type) override; Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t expiration) override; diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 102ea3e247778a..ef75404f7bfa58 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -32,6 +32,7 @@ #include "common/logging.h" #include "common/status.h" #include "io/fs/broker_file_system.h" +#include "io/io_common.h" #include "util/doris_metrics.h" namespace doris::io { @@ -92,7 +93,7 @@ Status BrokerFileReader::close() { } Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: ", _path.native()); } @@ -145,6 +146,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes *bytes_read = response.data.size(); memcpy(to, response.data.data(), *bytes_read); + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 7fd85caa43b6c0..a4ee07dc4b402d 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -394,7 +394,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz // there exists occasions where the buffer is already closed but // some prior tasks are still queued in thread pool, so we have to check whether // the buffer is closed each time the condition variable is notified. -void PrefetchBuffer::reset_offset(size_t offset) { +void PrefetchBuffer::reset_offset(size_t offset, const IOContext* io_ctx) { { std::unique_lock lck {_lock}; if (!_prefetched.wait_for( @@ -418,10 +418,36 @@ void PrefetchBuffer::reset_offset(size_t offset) { } else { _exceed = false; } + + // The "io_ctx" in the input parameter belongs to the upper caller. + // Because PrefetchBuffer actually runs in another thread, + // its life cycle may be different from that of the caller. + // Therefore, before submitting PrefetchBuffer to the thread pool, + // we need to copy io_ctx to _owned_io_ctx to ensure that + // the life cycle of IOContext is consistent with that of PrefetchBuffer. + // _update_and_reset_io_context(io_ctx); _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); } +void PrefetchBuffer::_update_and_reset_io_context(const IOContext* io_ctx) { + if (io_ctx) { + // If file_cache_stats is set in the input parameter, + // first need to update the last statistics in _owned_cache_stats + // to the file_cache_stats in the input parameter. + // Then reset _owned_cache_stats + if (io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->update(_owned_cache_stats); + LOG(INFO) << "yy debug _update_and_reset_io_context. old: " + << _owned_cache_stats.debug_string() << ", new: " << io_ctx->debug_string(); + _owned_cache_stats.reset(); + } + // Copy io_ctx + _owned_io_ctx = *io_ctx; + _owned_io_ctx.file_cache_stats = &_owned_cache_stats; + } +} + // only this function would run concurrently in another thread void PrefetchBuffer::prefetch_buffer() { { @@ -458,7 +484,9 @@ void PrefetchBuffer::prefetch_buffer() { { SCOPED_RAW_TIMER(&_statis.read_time); - s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); + // Use owned io_ctx here + s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, nullptr); + // LOG(INFO) << "yy debug prefetch_buffer: " << _owned_io_ctx.debug_string(); } if (UNLIKELY(s.ok() && buf_size != _len)) { // This indicates that the data size returned by S3 object storage is smaller than what we requested, @@ -545,15 +573,14 @@ size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const { return _size - remaining; } -Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, - size_t* bytes_read) { +Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, size_t* bytes_read, + const IOContext* io_ctx) { if (UNLIKELY(off >= _file_range.end_offset)) { - // Reader can read out of [start_offset, end_offset) by synchronous method. - return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx); + return _reader->read_at(off, Slice {out, buf_len}, bytes_read, io_ctx); } if (_exceed) { - reset_offset((off / _size) * _size); - return read_buffer(off, out, buf_len, bytes_read); + reset_offset((off / _size) * _size, io_ctx); + return read_buffer(off, out, buf_len, bytes_read, io_ctx); } auto start = std::chrono::steady_clock::now(); // The baseline time is calculated by dividing the size of each buffer by MB/s. @@ -584,8 +611,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, // there is only parquet would do not sequence read // it would read the end of the file first if (UNLIKELY(!contains(off))) { - reset_offset((off / _size) * _size); - return read_buffer(off, out, buf_len, bytes_read); + reset_offset((off / _size) * _size, io_ctx); + return read_buffer(off, out, buf_len, bytes_read, io_ctx); } if (UNLIKELY(0 == _len || _offset + _len < off)) { return Status::OK(); @@ -602,9 +629,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, *bytes_read = read_len; _statis.request_io += 1; _statis.request_bytes += read_len; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += read_len; + } } if (off + *bytes_read == _offset + _len) { - reset_offset(_offset + _whole_buffer_size); + reset_offset(_offset + _whole_buffer_size, io_ctx); } return Status::OK(); } @@ -630,9 +660,8 @@ void PrefetchBuffer::_collect_profile_before_close() { // buffered reader PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, - PrefetchRange file_range, const IOContext* io_ctx, - int64_t buffer_size) - : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) { + PrefetchRange file_range, int64_t buffer_size) + : _reader(std::move(reader)), _file_range(file_range) { if (buffer_size == -1L) { buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; } @@ -667,7 +696,7 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File // to make sure the buffer reader will start to read at right position. for (int i = 0; i < buffer_num; i++) { _pre_buffers.emplace_back(std::make_shared( - _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), _io_ctx, + _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), sync_buffer)); } } @@ -692,11 +721,15 @@ Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* while (actual_bytes_read < nbytes && offset < size()) { size_t read_num = 0; auto buffer_pos = get_buffer_pos(offset); - RETURN_IF_ERROR( - _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read, - nbytes - actual_bytes_read, &read_num)); + RETURN_IF_ERROR(_pre_buffers[buffer_pos]->read_buffer( + offset, result.get_data() + actual_bytes_read, nbytes - actual_bytes_read, + &read_num, io_ctx)); actual_bytes_read += read_num; offset += read_num; + if (io_ctx) { + LOG(INFO) << "yy debug PrefetchBufferedReader::read_at_impl: " + << io_ctx->debug_string(); + } } *bytes_read = actual_bytes_read; return Status::OK(); @@ -862,7 +895,7 @@ Result DelegateReader::create_file_reader( if (is_thread_safe) { // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently. return std::make_shared( - profile, std::move(reader), file_range, io_ctx); + profile, std::move(reader), file_range); } } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 67e07665fbfd9f..d30f035a0b59f7 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -414,13 +414,11 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size, - io::FileReader* reader, const IOContext* io_ctx, - std::function sync_profile) + io::FileReader* reader, std::function sync_profile) : _file_range(file_range), _size(buffer_size), _whole_buffer_size(whole_buffer_size), _reader(reader), - _io_ctx(io_ctx), _buf(new char[buffer_size]), _sync_profile(std::move(sync_profile)) {} @@ -431,7 +429,6 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro _size(other._size), _whole_buffer_size(other._whole_buffer_size), _reader(other._reader), - _io_ctx(other._io_ctx), _buf(std::move(other._buf)), _sync_profile(std::move(other._sync_profile)) {} @@ -447,7 +444,12 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t _len {0}; size_t _whole_buffer_size; io::FileReader* _reader = nullptr; - const IOContext* _io_ctx = nullptr; + // PrefetchBuffer is running in separate thread. + // MUST use self owned FileCacheStatistics and IOContext to avoid stack-use-after-scope error. + // And after reading finish, the caller should update the parent's + // FileCacheStatistics by using stats from this one. + FileCacheStatistics _owned_cache_stats; + IOContext _owned_io_ctx; std::unique_ptr _buf; BufferStatus _buffer_status {BufferStatus::RESET}; std::mutex _lock; @@ -467,7 +469,7 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro // @brief: reset the start offset of this buffer to offset // @param: the new start offset for this buffer - void reset_offset(size_t offset); + void reset_offset(size_t offset, const IOContext* io_ctx); // @brief: start to fetch the content between [_offset, _offset + _size) void prefetch_buffer(); // @brief: used by BufferedReader to read the prefetched data @@ -475,7 +477,8 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro // @param[buf] buffer to put the actual content // @param[buf_len] maximum len trying to read // @param[bytes_read] actual bytes read - Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); + Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read, + const IOContext* io_ctx); // @brief: shut down the buffer until the prior prefetching task is done void close(); // @brief: to detect whether this buffer contains off @@ -491,9 +494,9 @@ struct PrefetchBuffer : std::enable_shared_from_this, public Pro size_t merge_small_ranges(size_t off, int range_index) const; - void _collect_profile_at_runtime() override {} - void _collect_profile_before_close() override; + + void _update_and_reset_io_context(const IOContext* io_ctx); }; constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB @@ -516,8 +519,7 @@ constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB class PrefetchBufferedReader final : public io::FileReader { public: PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, - PrefetchRange file_range, const IOContext* io_ctx = nullptr, - int64_t buffer_size = -1L); + PrefetchRange file_range, int64_t buffer_size = -1L); ~PrefetchBufferedReader() override; Status close() override; @@ -554,14 +556,15 @@ class PrefetchBufferedReader final : public io::FileReader { int64_t cur_pos = position + i * s_max_pre_buffer_size; int cur_buf_pos = get_buffer_pos(cur_pos); // reset would do all the prefetch work - _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos)); + // reset all buffer is done only once when initializing, + // no need to pass IOContext. + _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos), nullptr); } } io::FileReaderSPtr _reader; PrefetchRange _file_range; const std::vector* _random_access_ranges = nullptr; - const IOContext* _io_ctx = nullptr; std::vector> _pre_buffers; int64_t _whole_pre_buffer_size; bool _initialized = false; diff --git a/be/src/io/fs/file_reader.cpp b/be/src/io/fs/file_reader.cpp index 86596fd88f7020..85cc288b1cff5d 100644 --- a/be/src/io/fs/file_reader.cpp +++ b/be/src/io/fs/file_reader.cpp @@ -41,7 +41,7 @@ Status FileReader::read_at(size_t offset, Slice result, size_t* bytes_read, Result create_cached_file_reader(FileReaderSPtr raw_reader, const FileReaderOptions& opts) { switch (opts.cache_type) { - case io::FileCachePolicy::NO_CACHE: + case FileCachePolicy::NO_CACHE: return raw_reader; case FileCachePolicy::FILE_BLOCK_CACHE: return std::make_shared(std::move(raw_reader), opts); diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index d43cfae1c28228..831db6bec1f1d7 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -116,7 +116,7 @@ Status HdfsFileReader::close() { #ifdef USE_HADOOP_HDFS Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { if (closed()) [[unlikely]] { return Status::InternalError("read closed file: {}", _path.native()); } @@ -163,6 +163,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r *bytes_read = has_read; hdfs_bytes_read_total << *bytes_read; hdfs_bytes_per_read << *bytes_read; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index 4a41fa479d9808..a234f9467be2f7 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -119,7 +119,7 @@ Status LocalFileReader::close() { } Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl", Status::IOError("inject io error")); if (closed()) [[unlikely]] { @@ -163,6 +163,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_ *bytes_read += res; } } + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read; + } DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 86590d91632162..7d9860d3b69d5e 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -103,7 +103,7 @@ Status S3FileReader::close() { } Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _file_size) { return Status::InternalError( @@ -168,6 +168,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", _path.native(), retry_count, total_sleep_time); } + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } return Status::InternalError("failed to read from s3, exceeded maximum retries"); diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 80a594473dc376..1d885f5f0f9ddc 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -19,6 +19,8 @@ #include +#include + namespace doris { enum class ReaderType : uint8_t { @@ -45,6 +47,38 @@ struct FileCacheStatistics { int64_t write_cache_io_timer = 0; int64_t bytes_write_into_cache = 0; int64_t num_skip_cache_io_total = 0; + + void update(const FileCacheStatistics& other) { + num_local_io_total += other.num_local_io_total; + num_remote_io_total += other.num_remote_io_total; + local_io_timer += other.local_io_timer; + bytes_read_from_local += other.bytes_read_from_local; + bytes_read_from_remote += other.bytes_read_from_remote; + remote_io_timer += other.remote_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + bytes_write_into_cache += other.bytes_write_into_cache; + num_skip_cache_io_total += other.num_skip_cache_io_total; + } + + void reset() { + num_local_io_total = 0; + num_remote_io_total = 0; + local_io_timer = 0; + bytes_read_from_local = 0; + bytes_read_from_remote = 0; + remote_io_timer = 0; + write_cache_io_timer = 0; + bytes_write_into_cache = 0; + num_skip_cache_io_total = 0; + } + + std::string debug_string() const { + std::stringstream ss; + ss << "bytes_read_from_local: " << bytes_read_from_local + << ", bytes_read_from_remote: " << bytes_read_from_remote; + return ss.str(); + } }; struct IOContext { @@ -60,6 +94,45 @@ struct IOContext { int64_t expiration_time = 0; const TUniqueId* query_id = nullptr; // Ref FileCacheStatistics* file_cache_stats = nullptr; // Ref + + IOContext() = default; + + IOContext(const IOContext& other) { + reader_type = other.reader_type; + is_disposable = other.is_disposable; + is_index_data = other.is_index_data; + read_file_cache = other.read_file_cache; + is_persistent = other.is_persistent; + should_stop = other.should_stop; + expiration_time = other.expiration_time; + query_id = other.query_id; + file_cache_stats = other.file_cache_stats; + } + + IOContext& operator=(const IOContext& other) { + if (this == &other) { + return *this; + } + + reader_type = other.reader_type; + is_disposable = other.is_disposable; + is_index_data = other.is_index_data; + read_file_cache = other.read_file_cache; + is_persistent = other.is_persistent; + should_stop = other.should_stop; + expiration_time = other.expiration_time; + query_id = other.query_id; + file_cache_stats = other.file_cache_stats; + return *this; + } + + std::string debug_string() const { + if (file_cache_stats != nullptr) { + return file_cache_stats->debug_string(); + } else { + return "no file cache stats"; + } + } }; } // namespace io diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 2e70e4586cc768..f107e0376c2837 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -82,12 +82,14 @@ Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t } segment_v2::SegmentSharedPtr segment = *it; RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator, nullptr)); + io::IOContext ctx; + ctx.reader_type = ReaderType::READER_QUERY; + ctx.file_cache_stats = &stats->file_cache_stats; segment_v2::ColumnIteratorOptions opt { .use_page_cache = !config::disable_storage_page_cache, .file_reader = segment->file_reader().get(), .stats = stats, - .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY, - .file_cache_stats = &stats->file_cache_stats}, + .io_ctx = ctx, }; RETURN_IF_ERROR((*column_iterator)->init(opt)); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 3028211f266157..abc59ff535847b 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -121,6 +121,9 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, OlapReaderStatistics* stats) const { OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; + ctx.file_cache_stats = &stats_ptr->file_cache_stats; PageReadOptions opts { .use_page_cache = _use_page_cache, .kept_in_memory = _kept_in_memory, @@ -131,8 +134,7 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, .codec = codec, .stats = stats_ptr, .encoding_info = _encoding_info, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, + .io_ctx = ctx, }; if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 9ee82bacdd73d2..9d1255090baa3c 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -88,6 +88,8 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, } // need to read index page OlapReaderStatistics tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; PageReadOptions opts { .use_page_cache = use_page_cache, .kept_in_memory = kept_in_memory, @@ -97,7 +99,7 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, // ordinal index page uses NO_COMPRESSION right now .codec = nullptr, .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true}, + .io_ctx = ctx, }; // read index page diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 0ad799683fc458..41465764c55406 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -393,7 +393,8 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) { uint8_t fixed_buf[12]; size_t bytes_read = 0; // TODO(plat1ko): Support session variable `enable_file_cache` - io::IOContext io_ctx {.is_index_data = true}; + io::IOContext io_ctx; + io_ctx.is_index_data = true; RETURN_IF_ERROR( _file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx)); DCHECK_EQ(bytes_read, 12); @@ -499,6 +500,8 @@ Status Segment::load_index() { } else { // read and parse short key index page OlapReaderStatistics tmp_stats; + io::IOContext ctx; + ctx.is_index_data = true; PageReadOptions opts { .use_page_cache = true, .type = INDEX_PAGE, @@ -507,7 +510,7 @@ Status Segment::load_index() { // short key index page uses NO_COMPRESSION for now .codec = nullptr, .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true}, + .io_ctx = ctx, }; Slice body; PageFooterPB footer; @@ -1106,11 +1109,13 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto std::unique_ptr& iterator_hint) { StorageReadOptions storage_read_opt; storage_read_opt.io_ctx.reader_type = ReaderType::READER_QUERY; + io::IOContext ctx; + ctx.reader_type = ReaderType::READER_QUERY; segment_v2::ColumnIteratorOptions opt { .use_page_cache = !config::disable_storage_page_cache, .file_reader = file_reader().get(), .stats = &stats, - .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY}, + .io_ctx = ctx, }; std::vector single_row_loc {row_id}; if (!slot->column_paths().empty()) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 0a19dfd46f0a08..ef455aac18587f 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -43,6 +43,8 @@ class QueryStatistics { : scan_rows(0), scan_bytes(0), cpu_nanos(0), + _scan_bytes_from_local_storage(0), + _scan_bytes_from_remote_storage(0), returned_rows(0), max_peak_memory_bytes(0), current_used_memory_bytes(0), diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 93d5256cad7525..f178ef684e28b7 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -521,28 +521,39 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns + // before 2.1.6, there are 12 columns in "backend_active_tasks" table. + // after 2.1.7, 2 new columns added. + // check this to make it compatible with version before 2.1.6 + bool need_local_and_remote_bytes = (block->columns() > 12); for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + int col_idx = 0; TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); - SchemaScannerHelper::insert_int64_value(0, be_id, block); - SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); - SchemaScannerHelper::insert_string_value(2, query_id, block); + SchemaScannerHelper::insert_int64_value(col_idx++, be_id, block); + SchemaScannerHelper::insert_string_value(col_idx++, qs_ctx_ptr->_fe_addr.hostname, block); + SchemaScannerHelper::insert_string_value(col_idx++, query_id, block); int64_t task_time = qs_ctx_ptr->_is_query_finished ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time : MonotonicMillis() - qs_ctx_ptr->_query_start_time; - SchemaScannerHelper::insert_int64_value(3, task_time, block); - SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block); - SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block); - SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block); - SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes, block); - SchemaScannerHelper::insert_int64_value(8, tqs.current_used_memory_bytes, block); - SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes, block); - SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, block); + SchemaScannerHelper::insert_int64_value(col_idx++, task_time, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.cpu_ms, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_rows, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes, block); + if (need_local_and_remote_bytes) { + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_local_storage, + block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_remote_storage, + block); + } + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.max_peak_memory_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.current_used_memory_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_rows, block); std::stringstream ss; ss << qs_ctx_ptr->_query_type; - SchemaScannerHelper::insert_string_value(11, ss.str(), block); + SchemaScannerHelper::insert_string_value(col_idx++, ss.str(), block); } } diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index e32928e4b95de4..cdec5085a7f5b5 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -34,6 +34,7 @@ class Block; class GenericReader : public ProfileCollector { public: GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} + void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { _push_down_agg_type = push_down_agg_type; } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 8c73957e79e4e0..09dcf4119eb6ee 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -682,7 +682,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { io::FileReaderSPtr& get_inner_reader() { return _inner_reader; } protected: - void _collect_profile_at_runtime() override {}; void _collect_profile_before_close() override; private: diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 4c0b30e440ecf5..e4138ec7045b46 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -559,7 +559,6 @@ void NewOlapScanner::_collect_profile_before_close() { VScanner::_collect_profile_before_close(); // Update counters for NewOlapScanner - // Update counters from tablet reader's stats auto& stats = _tablet_reader->stats(); auto* local_state = (pipeline::OlapScanLocalState*)_local_state; COUNTER_UPDATE(local_state->_io_timer, stats.io_ns); @@ -657,11 +656,20 @@ void NewOlapScanner::_collect_profile_before_close() { tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value()); tablet->query_scan_rows->increment(local_state->_scan_rows->value()); tablet->query_scan_count->increment(1); +} + +void NewOlapScanner::_update_bytes_and_rows_read() { + VScanner::_update_bytes_and_rows_read(); if (_query_statistics) { - _query_statistics->add_scan_bytes_from_local_storage( - stats.file_cache_stats.bytes_read_from_local); - _query_statistics->add_scan_bytes_from_remote_storage( - stats.file_cache_stats.bytes_read_from_remote); + auto& stats = _tablet_reader->stats(); + int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local; + _bytes_read_from_remote = stats.file_cache_stats.bytes_read_from_remote; } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index fd1246b120ba77..719f098f0733ca 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -81,6 +81,8 @@ class NewOlapScanner : public VScanner { Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _collect_profile_before_close() override; + void _update_bytes_and_rows_read() override; + private: void _update_realtime_counters(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 9353887799207d..ba6288ddd24b70 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -1238,4 +1238,19 @@ void VFileScanner::_collect_profile_before_close() { } } +void VFileScanner::_update_bytes_and_rows_read() { + VScanner::_update_bytes_and_rows_read(); + if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) { + int64_t delta_local = + _io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + _io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = _io_ctx->file_cache_stats->bytes_read_from_local; + _bytes_read_from_remote = _io_ctx->file_cache_stats->bytes_read_from_remote; + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 86171d634ac693..4e7f7a2214a63d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -94,6 +94,8 @@ class VFileScanner : public VScanner { // fe will add skip_bitmap_col to _input_tuple_desc iff the target olaptable has skip_bitmap_col // and the current load is a flexible partial update bool _should_process_skip_bitmap_col() const { return _skip_bitmap_col_idx != -1; } + + void _update_bytes_and_rows_read() override; protected: const TFileScanRangeParams* _params = nullptr; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 97bf563db1fa58..21b1f5ea83cefc 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -103,8 +103,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } - int64_t old_scan_rows = _num_rows_read; - int64_t old_scan_bytes = _num_byte_read; + _prev_num_rows_read = _num_rows_read; { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -122,7 +121,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { break; } _num_rows_read += block->rows(); - _num_byte_read += block->allocated_bytes(); } // 2. Filter the output block finally. @@ -136,10 +134,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } - if (_query_statistics) { - _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); - _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); - } + _update_bytes_and_rows_read(); if (state->is_cancelled()) { // TODO: Should return the specific ErrorStatus instead of just Cancelled. @@ -266,4 +261,11 @@ void VScanner::update_scan_cpu_timer() { } } +void VScanner::_update_bytes_and_rows_read() { + if (_query_statistics) { + _query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read); + _prev_num_rows_read = _num_rows_read; + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6c4f3294ce1bcc..935bb7801e8e91 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -164,6 +164,10 @@ class VScanner { _conjuncts.clear(); } + // update the bytes and rows read at each round in query statistics. + // so that we can get runtime statistics for each query. + virtual void _update_bytes_and_rows_read(); + RuntimeState* _state = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; QueryStatistics* _query_statistics = nullptr; @@ -205,8 +209,12 @@ class VScanner { // num of rows read from scanner int64_t _num_rows_read = 0; - - int64_t _num_byte_read = 0; + // save the current _num_rows_read before next round, + // so that we can get delta rows between each round. + int64_t _prev_num_rows_read = 0; + // bytes read from local and remote fs + int64_t _bytes_read_from_local = 0; + int64_t _bytes_read_from_remote = 0; // num of rows return from scanner, after filter block int64_t _num_rows_return = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index a571334660a5fa..ab73dfa56d303c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -137,6 +137,14 @@ public class InternalSchema { AUDIT_SCHEMA .add(new ColumnDef("scan_bytes", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("scan_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add(new ColumnDef("local_scan_bytes", + TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add(new ColumnDef("remote_scan_bytes", + TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add(new ColumnDef("shuffle_bytes", + TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add(new ColumnDef("shuffle_rows", + TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA .add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA @@ -168,6 +176,9 @@ public class InternalSchema { AUDIT_SCHEMA.add( new ColumnDef("compute_group", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); // Keep stmt as last column. So that in fe.audit.log, it will be easier to get sql string + AUDIT_SCHEMA.add( + new ColumnDef("cloud_cluster_name", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); + AUDIT_SCHEMA.add(new ColumnDef("stmt", TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 17942dd04a2102..4839a80c237e41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -456,6 +456,8 @@ public class SchemaTable extends Table { .column("TASK_CPU_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) .column("SCAN_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) .column("SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("LOCAL_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("REMOTE_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("BE_PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 644bddee58b95d..31869555f1e0ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -138,7 +138,7 @@ private void assembleAudit(AuditEvent event) { } private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { - // should be same order as InternalSchema.AUDIT_SCHEMA + // The order should be exactly the same as the table structure of InternalSchema.AUDIT_SCHEMA logBuffer.append(event.queryId).append("\t"); logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t"); logBuffer.append(event.clientIp).append("\t"); @@ -151,6 +151,10 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { logBuffer.append(event.queryTime).append("\t"); logBuffer.append(event.scanBytes).append("\t"); logBuffer.append(event.scanRows).append("\t"); + logBuffer.append(event.scanBytesFromLocalStorage).append("\t"); + logBuffer.append(event.scanBytesFromRemoteStorage).append("\t"); + logBuffer.append(event.shuffleSendBytes).append("\t"); + logBuffer.append(event.shuffleSendRows).append("\t"); logBuffer.append(event.returnRows).append("\t"); logBuffer.append(event.shuffleSendRows).append("\t"); logBuffer.append(event.shuffleSendBytes).append("\t"); diff --git a/regression-test/data/external_table_p0/hive/test_hive_orc.out b/regression-test/data/external_table_p0/hive/test_hive_orc.out index b34f276020c7cc..7a0b775aaaa59d 100644 --- a/regression-test/data/external_table_p0/hive/test_hive_orc.out +++ b/regression-test/data/external_table_p0/hive/test_hive_orc.out @@ -134,6 +134,9 @@ tablets tinyint_col 179 182 182 187 183 181 177 183 177 187 183 202 202 186 528 -- !predicate_pushdown8 -- 1533 +-- !audit_sql01 -- +3 1234.0000 123456789123.000000 12345678912345678912345678.000000000000 123456789 123456789123456789.000000000 987654321 + -- !select_top50 -- 4 55 999742610 400899305488827731 false 6.5976813E8 7.8723304616937395E17 \N base tennis pit vertical friday 2022-08-19T07:29:58 \N tablets smallint_col 2019-02-07 [7.53124931825377e+17] ["NbSSBtwzpxNSkkwga"] tablets smallint_col 2 49 999613702 105493714032727452 \N 6.3322381E8 9.8642324410240179E17 Unveil bright recruit participate. Suspect impression camera mathematical revelation. Fault live2 elbow debt west hydrogen current. how literary 2022-09-03T17:20:21 481707.1065 tablets boolean_col 2020-01-12 [] ["HoMrAnn", "wteEFvIwoZsVpVQdscMb", null, "zcGFmv", "kGEBBckbMtX", "hrEtCGFdPWZK"] tablets boolean_col @@ -321,6 +324,9 @@ tablets tinyint_col 179 182 182 187 183 181 177 183 177 187 183 202 202 186 528 -- !predicate_pushdown8 -- 1533 +-- !audit_sql01 -- +3 1234.0000 123456789123.000000 12345678912345678912345678.000000000000 123456789 123456789123456789.000000000 987654321 + -- !select_top50 -- 4 55 999742610 400899305488827731 false 6.5976813E8 7.8723304616937395E17 \N base tennis pit vertical friday 2022-08-19T07:29:58 \N tablets smallint_col 2019-02-07 [7.53124931825377e+17] ["NbSSBtwzpxNSkkwga"] tablets smallint_col 2 49 999613702 105493714032727452 \N 6.3322381E8 9.8642324410240179E17 Unveil bright recruit participate. Suspect impression camera mathematical revelation. Fault live2 elbow debt west hydrogen current. how literary 2022-09-03T17:20:21 481707.1065 tablets boolean_col 2020-01-12 [] ["HoMrAnn", "wteEFvIwoZsVpVQdscMb", null, "zcGFmv", "kGEBBckbMtX", "hrEtCGFdPWZK"] tablets boolean_col diff --git a/regression-test/suites/external_table_p0/hive/test_hive_orc.groovy b/regression-test/suites/external_table_p0/hive/test_hive_orc.groovy index 6457d2b3edd5d8..37a37d453c1deb 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_orc.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_orc.groovy @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_docker_hive") { // Ensure that all types are parsed correctly def select_top50 = { @@ -98,6 +101,17 @@ suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_dock return; } + // test audit log + try { + sql "set global enable_audit_plugin = true" + sql "set global audit_plugin_max_batch_interval_sec = 2" + } catch (Exception e) { + log.warn("skip this case, because " + e.getMessage()) + assertTrue(e.getMessage().toUpperCase().contains("ADMIN")) + return + } + + String uuid = UUID.randomUUID().toString(); for (String hivePrefix : ["hive2", "hive3"]) { try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") @@ -121,6 +135,9 @@ suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_dock string_col_dict_plain_mixed() predicate_pushdown() + // this is for testing audit log + order_qt_audit_sql01 """select * from orc_decimal_table where decimal_col2="123456789123.000000" and "a" != "${uuid}";""" + sql """drop catalog if exists ${catalog_name}""" // test old create-catalog syntax for compatibility @@ -132,9 +149,21 @@ suite("test_hive_orc", "all_types,p0,external,hive,external_docker,external_dock """ sql """use `${catalog_name}`.`default`""" select_top50() - sql """drop catalog if exists ${catalog_name}""" } finally { } } + + // wait for audit log + Awaitility.await().atMost(10, TimeUnit.SECONDS).with().pollDelay(3000, TimeUnit.MILLISECONDS).await().until(() -> { + def result01 = sql """select * from internal.__internal_schema.audit_log where stmt like "%${uuid}%" and (local_scan_bytes > 0 or remote_scan_bytes > 0)""" + if (result01.size() >= 2) { + return true; + } + return false; + }); + + sql "set global enable_audit_plugin = false" + sql "set global audit_plugin_max_sql_length = 4096" + sql "set global audit_plugin_max_batch_interval_sec = 60" } diff --git a/regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy b/regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy index f0d93fc43c967f..adcac7d1bbfea8 100644 --- a/regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy +++ b/regression-test/suites/query_p0/schema_table/test_backend_active_tasks.groovy @@ -22,19 +22,19 @@ suite("test_backend_active_tasks") { sql "set experimental_enable_pipeline_engine=false" sql "set experimental_enable_pipeline_x_engine=false" sql "select * from information_schema.backend_active_tasks" - sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS,LOCAL_SCAN_BYTES,REMOTE_SCAN_BYTES from information_schema.backend_active_tasks" // pipeline sql "set experimental_enable_pipeline_engine=true" sql "set experimental_enable_pipeline_x_engine=false" sql "select * from information_schema.backend_active_tasks" - sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS,LOCAL_SCAN_BYTES,REMOTE_SCAN_BYTES from information_schema.backend_active_tasks" // pipelinex sql "set experimental_enable_pipeline_engine=true" sql "set experimental_enable_pipeline_x_engine=true" sql "select * from information_schema.backend_active_tasks" - sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS from information_schema.backend_active_tasks" + sql "select BE_ID,FE_HOST,QUERY_ID,SCAN_ROWS,LOCAL_SCAN_BYTES,REMOTE_SCAN_BYTES from information_schema.backend_active_tasks" Thread.sleep(1000) } })