From 5d3f0a267a89a5954312e17831832a20e6d0dd4d Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Mon, 9 Dec 2024 22:19:19 -0800 Subject: [PATCH] [opt](scan) unify the local and remote scan bytes stats for all scanners for 2.1 (#45167) pick part of #40493 TODO: not working with s3 reader --- .../schema_backend_active_tasks.cpp | 4 +- be/src/io/cache/block/block_file_segment.cpp | 4 +- be/src/io/cache/block/block_file_segment.h | 2 +- .../cache/block/cached_remote_file_reader.cpp | 9 ++-- .../cache/block/cached_remote_file_reader.h | 5 ++- be/src/io/fs/broker_file_reader.cpp | 5 ++- be/src/io/fs/broker_file_reader.h | 2 - be/src/io/fs/file_reader.h | 2 +- be/src/io/fs/hdfs_file_reader.cpp | 10 ++++- be/src/io/fs/hdfs_file_reader.h | 1 - be/src/io/fs/local_file_reader.cpp | 5 ++- be/src/io/fs/s3_file_reader.cpp | 8 +++- be/src/io/fs/s3_file_reader.h | 1 - be/src/io/io_common.h | 42 +++++++++++++++++++ be/src/runtime/query_statistics.cpp | 8 ++++ be/src/runtime/query_statistics.h | 13 ++++++ .../runtime/runtime_query_statistics_mgr.cpp | 37 ++++++++++------ be/src/vec/exec/format/orc/vorc_reader.h | 1 - be/src/vec/exec/scan/new_olap_scanner.cpp | 16 ++++++- be/src/vec/exec/scan/new_olap_scanner.h | 1 + be/src/vec/exec/scan/vfile_scanner.cpp | 15 +++++++ be/src/vec/exec/scan/vfile_scanner.h | 2 + be/src/vec/exec/scan/vscanner.cpp | 17 ++++---- be/src/vec/exec/scan/vscanner.h | 11 ++++- be/test/io/cache/file_block_cache_test.cpp | 14 +++---- .../apache/doris/catalog/InternalSchema.java | 6 +++ .../org/apache/doris/catalog/SchemaTable.java | 2 + .../org/apache/doris/plugin/AuditEvent.java | 14 +++++++ .../doris/plugin/audit/AuditLoader.java | 4 ++ .../org/apache/doris/qe/AuditLogHelper.java | 6 ++- .../WorkloadRuntimeStatusMgr.java | 5 +++ gensrc/proto/data.proto | 2 + gensrc/thrift/FrontendService.thrift | 2 + 33 files changed, 222 insertions(+), 54 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index 74e95f4203217c..a67b2600d2342a 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -34,6 +34,8 @@ std::vector SchemaBackendActiveTasksScanner::_s_tbls_ {"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, {"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false}, {"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, {"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, @@ -93,4 +95,4 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/io/cache/block/block_file_segment.cpp b/be/src/io/cache/block/block_file_segment.cpp index 564ac9d776f6a0..1d6d425593db8c 100644 --- a/be/src/io/cache/block/block_file_segment.cpp +++ b/be/src/io/cache/block/block_file_segment.cpp @@ -179,7 +179,7 @@ std::string FileBlock::get_path_in_local_cache() const { return _cache->get_path_in_local_cache(key(), offset(), _cache_type); } -Status FileBlock::read_at(Slice buffer, size_t read_offset) { +Status FileBlock::read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx) { Status st = Status::OK(); std::shared_ptr reader; if (!(reader = _cache_reader.lock())) { @@ -192,7 +192,7 @@ Status FileBlock::read_at(Slice buffer, size_t read_offset) { } } size_t bytes_reads = buffer.size; - RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads)); + RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads, io_ctx)); DCHECK(bytes_reads == buffer.size); return st; } diff --git a/be/src/io/cache/block/block_file_segment.h b/be/src/io/cache/block/block_file_segment.h index 67f4fc17a0d366..d1e341d42c6b64 100644 --- a/be/src/io/cache/block/block_file_segment.h +++ b/be/src/io/cache/block/block_file_segment.h @@ -110,7 +110,7 @@ class FileBlock { Status append(Slice data); // read data from cache file - Status read_at(Slice buffer, size_t read_offset); + Status read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx); // finish write, release the file writer Status finalize_write(); diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index bbd7516dfaaf01..f8fda4b028e677 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -112,7 +112,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result, bytes_read, io_ctx)); DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read); if (io_ctx->file_cache_stats) { - stats.bytes_read += bytes_req; _update_state(stats, io_ctx->file_cache_stats); } return Status::OK(); @@ -142,7 +141,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz break; } } - stats.bytes_read += bytes_req; size_t empty_start = 0; size_t empty_end = 0; if (!empty_segments.empty()) { @@ -224,8 +222,9 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz size_t file_offset = current_offset - left; { SCOPED_RAW_TIMER(&stats.local_read_timer); - RETURN_IF_ERROR(segment->read_at( - Slice(result.data + (current_offset - offset), read_size), file_offset)); + RETURN_IF_ERROR( + segment->read_at(Slice(result.data + (current_offset - offset), read_size), + file_offset, io_ctx)); } *bytes_read += read_size; current_offset = right + 1; @@ -280,10 +279,8 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats, } if (read_stats.hit_cache) { statis->num_local_io_total++; - statis->bytes_read_from_local += read_stats.bytes_read; } else { statis->num_remote_io_total++; - statis->bytes_read_from_remote += read_stats.bytes_read; } statis->remote_io_timer += read_stats.remote_read_timer; statis->local_io_timer += read_stats.local_read_timer; diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h index e4e280d95f64c6..af33e5d6f15b7b 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.h +++ b/be/src/io/cache/block/cached_remote_file_reader.h @@ -66,10 +66,11 @@ class CachedRemoteFileReader final : public FileReader { IFileCache::Key _cache_key; CloudFileCachePtr _cache; + // Used to record read/write timer and cache related metrics. + // These metrics will finally be saved in FileCacheStatistics. struct ReadStatistics { bool hit_cache = true; bool skip_cache = false; - int64_t bytes_read = 0; int64_t bytes_write_into_file_cache = 0; int64_t remote_read_timer = 0; int64_t local_read_timer = 0; @@ -82,4 +83,4 @@ class CachedRemoteFileReader final : public FileReader { }; } // namespace io -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/io/fs/broker_file_reader.cpp b/be/src/io/fs/broker_file_reader.cpp index 4d370cfb4d8387..bb0eac47e386b7 100644 --- a/be/src/io/fs/broker_file_reader.cpp +++ b/be/src/io/fs/broker_file_reader.cpp @@ -62,7 +62,7 @@ Status BrokerFileReader::close() { } Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); size_t bytes_req = result.size; char* to = result.data; @@ -76,6 +76,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes *bytes_read = data.size(); memcpy(to, data.data(), *bytes_read); + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } diff --git a/be/src/io/fs/broker_file_reader.h b/be/src/io/fs/broker_file_reader.h index 7acdcbcc0d578f..43a46c62331e35 100644 --- a/be/src/io/fs/broker_file_reader.h +++ b/be/src/io/fs/broker_file_reader.h @@ -34,8 +34,6 @@ namespace doris::io { -struct IOContext; - class BrokerFileReader : public FileReader { public: BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd, diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index 03828ef28ddf74..b41df4426ced24 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "io/fs/path.h" +#include "io/io_common.h" #include "util/profile_collector.h" #include "util/slice.h" @@ -32,7 +33,6 @@ namespace doris { namespace io { class FileSystem; -struct IOContext; enum class FileCachePolicy : uint8_t { NO_CACHE, diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 263276768bc0b8..ac75e2e722b9d7 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -84,7 +84,7 @@ Status HdfsFileReader::close() { #ifdef USE_HADOOP_HDFS Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _handle->file_size()) { return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", @@ -121,6 +121,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r has_read += loop_read; } *bytes_read = has_read; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } @@ -128,7 +131,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r // The hedged read only support hdfsPread(). // TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead() Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _handle->file_size()) { return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})", @@ -177,6 +180,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r has_read += loop_read; } *bytes_read = has_read; + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + } return Status::OK(); } #endif diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 6204859e6006b0..0f4a3f14019b6d 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -34,7 +34,6 @@ namespace doris { namespace io { -struct IOContext; class HdfsFileReader : public FileReader { public: diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index 93953eeddd9b63..c7abf2ad04788a 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -118,7 +118,7 @@ Status LocalFileReader::close() { } Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _file_size) { return Status::InternalError( @@ -148,6 +148,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_ *bytes_read += res; } } + if (io_ctx && io_ctx->file_cache_stats) { + io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read; + } DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 005257c1312f6c..2d97252319dfcb 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -42,7 +42,6 @@ namespace doris { namespace io { -struct IOContext; bvar::Adder s3_file_reader_read_counter("s3_file_reader", "read_at"); bvar::Adder s3_file_reader_total("s3_file_reader", "total_num"); bvar::Adder s3_bytes_read_total("s3_file_reader", "bytes_read"); @@ -86,7 +85,7 @@ Status S3FileReader::close() { } Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, - const IOContext* /*io_ctx*/) { + const IOContext* io_ctx) { DCHECK(!closed()); if (offset > _file_size) { return Status::InternalError( @@ -154,6 +153,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", _path.native(), retry_count, total_sleep_time); } + // ATTN: Do not open it, may casuing stack-use-after-scope. + // Will be refactored in future + // if (io_ctx && io_ctx->file_cache_stats) { + // io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req; + // } return Status::OK(); } return Status::InternalError("failed to read from s3, exceeded maximum retries"); diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h index c9e9656fe5860b..50a19712674050 100644 --- a/be/src/io/fs/s3_file_reader.h +++ b/be/src/io/fs/s3_file_reader.h @@ -35,7 +35,6 @@ namespace doris { class RuntimeProfile; namespace io { -struct IOContext; class S3FileReader final : public FileReader { public: diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 80a594473dc376..3bd92dc7c1a5b8 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -19,6 +19,8 @@ #include +#include + namespace doris { enum class ReaderType : uint8_t { @@ -45,6 +47,38 @@ struct FileCacheStatistics { int64_t write_cache_io_timer = 0; int64_t bytes_write_into_cache = 0; int64_t num_skip_cache_io_total = 0; + + void update(const FileCacheStatistics& other) { + num_local_io_total += other.num_local_io_total; + num_remote_io_total += other.num_remote_io_total; + local_io_timer += other.local_io_timer; + bytes_read_from_local += other.bytes_read_from_local; + bytes_read_from_remote += other.bytes_read_from_remote; + remote_io_timer += other.remote_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + write_cache_io_timer += other.write_cache_io_timer; + bytes_write_into_cache += other.bytes_write_into_cache; + num_skip_cache_io_total += other.num_skip_cache_io_total; + } + + void reset() { + num_local_io_total = 0; + num_remote_io_total = 0; + local_io_timer = 0; + bytes_read_from_local = 0; + bytes_read_from_remote = 0; + remote_io_timer = 0; + write_cache_io_timer = 0; + bytes_write_into_cache = 0; + num_skip_cache_io_total = 0; + } + + std::string debug_string() const { + std::stringstream ss; + ss << "bytes_read_from_local: " << bytes_read_from_local + << ", bytes_read_from_remote: " << bytes_read_from_remote; + return ss.str(); + } }; struct IOContext { @@ -60,6 +94,14 @@ struct IOContext { int64_t expiration_time = 0; const TUniqueId* query_id = nullptr; // Ref FileCacheStatistics* file_cache_stats = nullptr; // Ref + + std::string debug_string() const { + if (file_cache_stats != nullptr) { + return file_cache_stats->debug_string(); + } else { + return "no file cache stats"; + } + } }; } // namespace io diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index de950704180319..4f87da1196bb3e 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -32,6 +32,8 @@ void QueryStatistics::merge(const QueryStatistics& other) { cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed); shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed); shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed); + _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage; + _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage; int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed); if (other_peak_mem > this->max_peak_memory_bytes) { @@ -51,6 +53,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); statistics->set_returned_rows(returned_rows); statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); + statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); + statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); } void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { @@ -64,12 +68,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { current_used_memory_bytes.load(std::memory_order_relaxed)); statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed)); statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed)); + statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage); + statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { scan_rows = statistics.scan_rows(); scan_bytes = statistics.scan_bytes(); cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS; + _scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage(); + _scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage(); } void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index a9f6e192ec00d4..fcfbf48bb18ad9 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -44,6 +44,8 @@ class QueryStatistics { : scan_rows(0), scan_bytes(0), cpu_nanos(0), + _scan_bytes_from_local_storage(0), + _scan_bytes_from_remote_storage(0), returned_rows(0), max_peak_memory_bytes(0), current_used_memory_bytes(0), @@ -65,6 +67,13 @@ class QueryStatistics { this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed); } + void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) { + _scan_bytes_from_local_storage += scan_bytes_from_local_storage; + } + void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) { + _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage; + } + void add_shuffle_send_bytes(int64_t delta_bytes) { this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed); } @@ -95,6 +104,8 @@ class QueryStatistics { cpu_nanos.store(0, std::memory_order_relaxed); shuffle_send_bytes.store(0, std::memory_order_relaxed); shuffle_send_rows.store(0, std::memory_order_relaxed); + _scan_bytes_from_local_storage.store(0, std::memory_order_relaxed); + _scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed); returned_rows = 0; max_peak_memory_bytes.store(0, std::memory_order_relaxed); @@ -120,6 +131,8 @@ class QueryStatistics { std::atomic scan_rows; std::atomic scan_bytes; std::atomic cpu_nanos; + std::atomic _scan_bytes_from_local_storage; + std::atomic _scan_bytes_from_remote_storage; // number rows returned by query. // only set once by result sink when closing. int64_t returned_rows; diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 1d9bb34d09d7fe..104a22fb8b9ef8 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -225,28 +225,41 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns + // before 2.1.7, there are 12 columns in "backend_active_tasks" table. + // after 2.1.8, 2 new columns added. + // check this to make it compatible with version before 2.1.7 + bool need_local_and_remote_bytes = (block->columns() > 12); for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + int col_idx = 0; TQueryStatistics tqs; qs_ctx_ptr->collect_query_statistics(&tqs); - SchemaScannerHelper::insert_int64_value(0, be_id, block); - SchemaScannerHelper::insert_string_value(1, qs_ctx_ptr->_fe_addr.hostname, block); - SchemaScannerHelper::insert_string_value(2, query_id, block); + SchemaScannerHelper::insert_int64_value(col_idx++, be_id, block); + SchemaScannerHelper::insert_string_value(col_idx++, qs_ctx_ptr->_fe_addr.hostname, block); + SchemaScannerHelper::insert_string_value(col_idx++, query_id, block); int64_t task_time = qs_ctx_ptr->_is_query_finished ? qs_ctx_ptr->_query_finish_time - qs_ctx_ptr->_query_start_time : MonotonicMillis() - qs_ctx_ptr->_query_start_time; - SchemaScannerHelper::insert_int64_value(3, task_time, block); - SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block); - SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block); - SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block); - SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes, block); - SchemaScannerHelper::insert_int64_value(8, tqs.current_used_memory_bytes, block); - SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes, block); - SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows, block); + SchemaScannerHelper::insert_int64_value(col_idx++, task_time, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.cpu_ms, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_rows, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes, block); + + if (need_local_and_remote_bytes) { + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_local_storage, + block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes_from_remote_storage, + block); + } + + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.max_peak_memory_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.current_used_memory_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_bytes, block); + SchemaScannerHelper::insert_int64_value(col_idx++, tqs.shuffle_send_rows, block); std::stringstream ss; ss << qs_ctx_ptr->_query_type; - SchemaScannerHelper::insert_string_value(11, ss.str(), block); + SchemaScannerHelper::insert_string_value(col_idx++, ss.str(), block); } } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 0807f4949e5850..b286b714ad98e0 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -667,7 +667,6 @@ class ORCFileInputStream : public orc::InputStream, public ProfileCollector { io::FileReaderSPtr& get_inner_reader() { return _inner_reader; } protected: - void _collect_profile_at_runtime() override {}; void _collect_profile_before_close() override; private: diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index aaf6fbdf3b4055..22360200d797ed 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -656,7 +656,6 @@ void NewOlapScanner::_collect_profile_before_close() { COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number); // Update counters for NewOlapScanner - // Update counters from tablet reader's stats auto& stats = _tablet_reader->stats(); if (_parent) { @@ -678,4 +677,19 @@ void NewOlapScanner::_collect_profile_before_close() { tablet->query_scan_count->increment(1); } +void NewOlapScanner::_update_bytes_and_rows_read() { + VScanner::_update_bytes_and_rows_read(); + if (_query_statistics) { + auto& stats = _tablet_reader->stats(); + int64_t delta_local = stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local; + _bytes_read_from_remote = stats.file_cache_stats.bytes_read_from_remote; + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index cdadf8f7f49f9b..90d871734c334e 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -80,6 +80,7 @@ class NewOlapScanner : public VScanner { protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; void _collect_profile_before_close() override; + void _update_bytes_and_rows_read() override; private: void _update_realtime_counters(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e3899d9698221c..331b49b2082fcf 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -1236,4 +1236,19 @@ void VFileScanner::_collect_profile_before_close() { } } +void VFileScanner::_update_bytes_and_rows_read() { + VScanner::_update_bytes_and_rows_read(); + if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) { + int64_t delta_local = + _io_ctx->file_cache_stats->bytes_read_from_local - _bytes_read_from_local; + int64_t delta_remote = + _io_ctx->file_cache_stats->bytes_read_from_remote - _bytes_read_from_remote; + _query_statistics->add_scan_bytes_from_local_storage(delta_local); + _query_statistics->add_scan_bytes_from_remote_storage(delta_remote); + _query_statistics->add_scan_bytes(delta_local + delta_remote); + _bytes_read_from_local = _io_ctx->file_cache_stats->bytes_read_from_local; + _bytes_read_from_remote = _io_ctx->file_cache_stats->bytes_read_from_remote; + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index cf1ea97f21b0f0..1c6d903a87fc43 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -97,6 +97,8 @@ class VFileScanner : public VScanner { void _collect_profile_before_close() override; + void _update_bytes_and_rows_read() override; + protected: const TFileScanRangeParams* _params = nullptr; std::shared_ptr _split_source; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 97a2ba8207a15f..58511e890d66b1 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -118,8 +118,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } - int64_t old_scan_rows = _num_rows_read; - int64_t old_scan_bytes = _num_byte_read; + _prev_num_rows_read = _num_rows_read; { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -138,7 +137,6 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { break; } _num_rows_read += block->rows(); - _num_byte_read += block->allocated_bytes(); } // 2. Filter the output block finally. @@ -153,10 +151,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } - if (_query_statistics) { - _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); - _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); - } + _update_bytes_and_rows_read(); if (state->is_cancelled()) { // TODO: Should return the specific ErrorStatus instead of just Cancelled. @@ -281,7 +276,6 @@ void VScanner::_collect_profile_before_close() { if (_parent) { COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); - COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read); } else { COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read); @@ -301,4 +295,11 @@ void VScanner::update_scan_cpu_timer() { } } +void VScanner::_update_bytes_and_rows_read() { + if (_query_statistics) { + _query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read); + _prev_num_rows_read = _num_rows_read; + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index e85c6082ca6a1c..03604621f0536d 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -135,6 +135,10 @@ class VScanner { void update_scan_cpu_timer(); + // update the bytes and rows read at each round in query statistics. + // so that we can get runtime statistics for each query. + virtual void _update_bytes_and_rows_read(); + RuntimeState* runtime_state() { return _state; } bool is_open() { return _is_open; } @@ -214,7 +218,12 @@ class VScanner { // num of rows read from scanner int64_t _num_rows_read = 0; - int64_t _num_byte_read = 0; + // save the current _num_rows_read before next round, + // so that we can get delta rows between each round. + int64_t _prev_num_rows_read = 0; + // bytes read from local and remote fs + int64_t _bytes_read_from_local = 0; + int64_t _bytes_read_from_remote = 0; // num of rows return from scanner, after filter block int64_t _num_rows_return = 0; diff --git a/be/test/io/cache/file_block_cache_test.cpp b/be/test/io/cache/file_block_cache_test.cpp index 20f97aae6ad05d..092a343c1a55fc 100644 --- a/be/test/io/cache/file_block_cache_test.cpp +++ b/be/test/io/cache/file_block_cache_test.cpp @@ -852,7 +852,7 @@ TEST(LRUFileCache, fd_cache_remove) { assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(9); - static_cast(segments[0]->read_at(Slice(buffer.get(), 9), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 9), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); } { @@ -864,7 +864,7 @@ TEST(LRUFileCache, fd_cache_remove) { assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(1); - static_cast(segments[0]->read_at(Slice(buffer.get(), 1), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 1), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9))); } { @@ -877,7 +877,7 @@ TEST(LRUFileCache, fd_cache_remove) { io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(5); - static_cast(segments[0]->read_at(Slice(buffer.get(), 5), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 5), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10))); } { @@ -890,7 +890,7 @@ TEST(LRUFileCache, fd_cache_remove) { io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(10); - static_cast(segments[0]->read_at(Slice(buffer.get(), 10), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 10), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 15))); } EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); @@ -933,7 +933,7 @@ TEST(LRUFileCache, fd_cache_evict) { assert_range(2, segments[0], io::FileBlock::Range(0, 8), io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(9); - static_cast(segments[0]->read_at(Slice(buffer.get(), 9), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 9), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); } { @@ -945,7 +945,7 @@ TEST(LRUFileCache, fd_cache_evict) { assert_range(2, segments[0], io::FileBlock::Range(9, 9), io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(1); - static_cast(segments[0]->read_at(Slice(buffer.get(), 1), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 1), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 9))); } { @@ -958,7 +958,7 @@ TEST(LRUFileCache, fd_cache_evict) { io::FileBlock::State::DOWNLOADING); download(segments[0]); std::unique_ptr buffer = std::make_unique(5); - static_cast(segments[0]->read_at(Slice(buffer.get(), 5), 0)); + static_cast(segments[0]->read_at(Slice(buffer.get(), 5), 0, nullptr)); EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key, 10))); } EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index afe1f9af2da60d..cf827efbbdb12a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -82,6 +82,12 @@ public class InternalSchema { AUDIT_SCHEMA.add(new ColumnDef("return_rows", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes", TypeDef.create(PrimitiveType.BIGINT), true)); + AUDIT_SCHEMA + .add(new ColumnDef("scan_bytes_from_local_storage", TypeDef.create(PrimitiveType.BIGINT), + true)); + AUDIT_SCHEMA + .add(new ColumnDef("scan_bytes_from_remote_storage", TypeDef.create(PrimitiveType.BIGINT), + true)); AUDIT_SCHEMA.add(new ColumnDef("stmt_id", TypeDef.create(PrimitiveType.BIGINT), true)); AUDIT_SCHEMA.add(new ColumnDef("is_query", TypeDef.create(PrimitiveType.TINYINT), true)); AUDIT_SCHEMA.add(new ColumnDef("is_nereids", TypeDef.create(PrimitiveType.TINYINT), true)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 785343d78cb78b..8f12300faea57a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -455,6 +455,8 @@ public class SchemaTable extends Table { .column("TASK_CPU_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT)) .column("SCAN_ROWS", ScalarType.createType(PrimitiveType.BIGINT)) .column("SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("LOCAL_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("REMOTE_SCAN_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("BE_PEAK_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("CURRENT_USED_MEMORY_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) .column("SHUFFLE_SEND_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 7d64b600d8a9f1..5e9dd3f4d4cf3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -105,6 +105,10 @@ public enum EventType { // note: newly added fields should be always before fuzzyVariables @AuditField(value = "FuzzyVariables") public String fuzzyVariables = ""; + @AuditField(value = "scanBytesFromLocalStorage") + public long scanBytesFromLocalStorage = -1; + @AuditField(value = "scanBytesFromRemoteStorage") + public long scanBytesFromRemoteStorage = -1; public long pushToAuditLogQueueTime; @@ -249,6 +253,16 @@ public AuditEventBuilder setCommandType(String commandType) { return this; } + public AuditEventBuilder setScanBytesFromLocalStorage(long scanBytesFromLocalStorage) { + auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage; + return this; + } + + public AuditEventBuilder setScanBytesFromRemoteStorage(long scanBytesFromRemoteStorage) { + auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage; + return this; + } + public AuditEvent build() { return this.auditEvent; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java index 55dbba9805e762..6aac0364bc0efb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java @@ -151,6 +151,10 @@ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { logBuffer.append(event.queryTime).append("\t"); logBuffer.append(event.scanBytes).append("\t"); logBuffer.append(event.scanRows).append("\t"); + logBuffer.append(event.shuffleSendBytes).append("\t"); + logBuffer.append(event.shuffleSendRows).append("\t"); + logBuffer.append(event.scanBytesFromLocalStorage).append("\t"); + logBuffer.append(event.scanBytesFromRemoteStorage).append("\t"); logBuffer.append(event.returnRows).append("\t"); logBuffer.append(event.shuffleSendRows).append("\t"); logBuffer.append(event.shuffleSendBytes).append("\t"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 031a01b32d2aa4..e73ddd7aa86888 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -226,7 +226,11 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme auditEventBuilder.setSqlDigest(sqlDigest); } } - auditEventBuilder.setIsQuery(true); + auditEventBuilder.setIsQuery(true) + .setScanBytesFromLocalStorage( + statistics == null ? 0 : statistics.getScanBytesFromLocalStorage()) + .setScanBytesFromRemoteStorage( + statistics == null ? 0 : statistics.getScanBytesFromRemoteStorage()); } else { auditEventBuilder.setIsQuery(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index e27cb4e0df278c..ce0703f23a82e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -84,6 +84,8 @@ protected void runAfterCatalogReady() { auditEvent.cpuTimeMs = queryStats.cpu_ms; auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; + auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage; + auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage; } boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true); if (!ret) { @@ -222,6 +224,8 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) { dst.max_peak_memory_bytes = src.max_peak_memory_bytes; } + dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage; + dst.scan_bytes_from_remote_storage += src.scan_bytes_from_remote_storage; } private void queryAuditEventLogWriteLock() { @@ -232,3 +236,4 @@ private void queryAuditEventLogWriteUnlock() { queryAuditEventLock.writeLock().unlock(); } } + diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index e9ced523912e5d..755a3a042db8ab 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -35,6 +35,8 @@ message PQueryStatistics { optional int64 cpu_ms = 4; optional int64 max_peak_memory_bytes = 5; repeated PNodeStatistics nodes_statistics = 6; + optional int64 scan_bytes_from_local_storage = 7; + optional int64 scan_bytes_from_remote_storage = 8; } message PRowBatch { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4522fd08f68ebb..638edf1a7bb382 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -412,6 +412,8 @@ struct TQueryStatistics { 7: optional i64 workload_group_id 8: optional i64 shuffle_send_bytes 9: optional i64 shuffle_send_rows + 10: optional i64 scan_bytes_from_local_storage + 11: optional i64 scan_bytes_from_remote_storage } struct TReportWorkloadRuntimeStatusParams {