Skip to content

Commit

Permalink
[opt](scan) unify the local and remote scan bytes stats for all scann…
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Dec 9, 2024
1 parent 99e51ae commit 16c4747
Show file tree
Hide file tree
Showing 32 changed files with 212 additions and 47 deletions.
4 changes: 3 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
Expand Down Expand Up @@ -93,4 +95,4 @@ Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc
return Status::OK();
}

} // namespace doris
} // namespace doris
4 changes: 2 additions & 2 deletions be/src/io/cache/block/block_file_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ std::string FileBlock::get_path_in_local_cache() const {
return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
}

Status FileBlock::read_at(Slice buffer, size_t read_offset) {
Status FileBlock::read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx) {
Status st = Status::OK();
std::shared_ptr<FileReader> reader;
if (!(reader = _cache_reader.lock())) {
Expand All @@ -192,7 +192,7 @@ Status FileBlock::read_at(Slice buffer, size_t read_offset) {
}
}
size_t bytes_reads = buffer.size;
RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads, io_ctx));
DCHECK(bytes_reads == buffer.size);
return st;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/cache/block/block_file_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class FileBlock {
Status append(Slice data);

// read data from cache file
Status read_at(Slice buffer, size_t read_offset);
Status read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx);

// finish write, release the file writer
Status finalize_write();
Expand Down
9 changes: 3 additions & 6 deletions be/src/io/cache/block/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result, bytes_read, io_ctx));
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
if (io_ctx->file_cache_stats) {
stats.bytes_read += bytes_req;
_update_state(stats, io_ctx->file_cache_stats);
}
return Status::OK();
Expand Down Expand Up @@ -142,7 +141,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
break;
}
}
stats.bytes_read += bytes_req;
size_t empty_start = 0;
size_t empty_end = 0;
if (!empty_segments.empty()) {
Expand Down Expand Up @@ -224,8 +222,9 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
size_t file_offset = current_offset - left;
{
SCOPED_RAW_TIMER(&stats.local_read_timer);
RETURN_IF_ERROR(segment->read_at(
Slice(result.data + (current_offset - offset), read_size), file_offset));
RETURN_IF_ERROR(
segment->read_at(Slice(result.data + (current_offset - offset), read_size),
file_offset, io_ctx));
}
*bytes_read += read_size;
current_offset = right + 1;
Expand Down Expand Up @@ -280,10 +279,8 @@ void CachedRemoteFileReader::_update_state(const ReadStatistics& read_stats,
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
statis->bytes_read_from_local += read_stats.bytes_read;
} else {
statis->num_remote_io_total++;
statis->bytes_read_from_remote += read_stats.bytes_read;
}
statis->remote_io_timer += read_stats.remote_read_timer;
statis->local_io_timer += read_stats.local_read_timer;
Expand Down
5 changes: 3 additions & 2 deletions be/src/io/cache/block/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ class CachedRemoteFileReader final : public FileReader {
IFileCache::Key _cache_key;
CloudFileCachePtr _cache;

// Used to record read/write timer and cache related metrics.
// These metrics will finally be saved in FileCacheStatistics.
struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
Expand All @@ -82,4 +83,4 @@ class CachedRemoteFileReader final : public FileReader {
};

} // namespace io
} // namespace doris
} // namespace doris
5 changes: 4 additions & 1 deletion be/src/io/fs/broker_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status BrokerFileReader::close() {
}

Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
size_t bytes_req = result.size;
char* to = result.data;
Expand All @@ -76,6 +76,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes

*bytes_read = data.size();
memcpy(to, data.data(), *bytes_read);
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/io/fs/broker_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

namespace doris::io {

struct IOContext;

class BrokerFileReader : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t file_size, TBrokerFD fd,
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "common/status.h"
#include "io/fs/path.h"
#include "io/io_common.h"
#include "util/profile_collector.h"
#include "util/slice.h"

Expand All @@ -32,7 +33,6 @@ namespace doris {
namespace io {

class FileSystem;
struct IOContext;

enum class FileCachePolicy : uint8_t {
NO_CACHE,
Expand Down
10 changes: 8 additions & 2 deletions be/src/io/fs/hdfs_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status HdfsFileReader::close() {

#ifdef USE_HADOOP_HDFS
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
Expand Down Expand Up @@ -121,14 +121,17 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}

#else
// The hedged read only support hdfsPread().
// TODO: rethink here to see if there are some difference between hdfsPread() and hdfsRead()
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file size: {}, path: {})",
Expand Down Expand Up @@ -177,6 +180,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}
#endif
Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/hdfs_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

namespace doris {
namespace io {
struct IOContext;

class HdfsFileReader : public FileReader {
public:
Expand Down
5 changes: 4 additions & 1 deletion be/src/io/fs/local_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status LocalFileReader::close() {
}

Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
Expand Down Expand Up @@ -148,6 +148,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
*bytes_read += res;
}
}
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read;
}
DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

namespace doris {
namespace io {
struct IOContext;
bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
Expand Down Expand Up @@ -86,7 +85,7 @@ Status S3FileReader::close() {
}

Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) {
const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
Expand Down Expand Up @@ -154,6 +153,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
_path.native(), retry_count, total_sleep_time);
}
if (io_ctx && io_ctx->file_cache_stats) {
io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
}
return Status::OK();
}
return Status::InternalError("failed to read from s3, exceeded maximum retries");
Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/s3_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace doris {
class RuntimeProfile;

namespace io {
struct IOContext;

class S3FileReader final : public FileReader {
public:
Expand Down
42 changes: 42 additions & 0 deletions be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <gen_cpp/Types_types.h>

#include <sstream>

namespace doris {

enum class ReaderType : uint8_t {
Expand All @@ -45,6 +47,38 @@ struct FileCacheStatistics {
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t num_skip_cache_io_total = 0;

void update(const FileCacheStatistics& other) {
num_local_io_total += other.num_local_io_total;
num_remote_io_total += other.num_remote_io_total;
local_io_timer += other.local_io_timer;
bytes_read_from_local += other.bytes_read_from_local;
bytes_read_from_remote += other.bytes_read_from_remote;
remote_io_timer += other.remote_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
write_cache_io_timer += other.write_cache_io_timer;
bytes_write_into_cache += other.bytes_write_into_cache;
num_skip_cache_io_total += other.num_skip_cache_io_total;
}

void reset() {
num_local_io_total = 0;
num_remote_io_total = 0;
local_io_timer = 0;
bytes_read_from_local = 0;
bytes_read_from_remote = 0;
remote_io_timer = 0;
write_cache_io_timer = 0;
bytes_write_into_cache = 0;
num_skip_cache_io_total = 0;
}

std::string debug_string() const {
std::stringstream ss;
ss << "bytes_read_from_local: " << bytes_read_from_local
<< ", bytes_read_from_remote: " << bytes_read_from_remote;
return ss.str();
}
};

struct IOContext {
Expand All @@ -60,6 +94,14 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref

std::string debug_string() const {
if (file_cache_stats != nullptr) {
return file_cache_stats->debug_string();
} else {
return "no file cache stats";
}
}
};

} // namespace io
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ void QueryStatistics::merge(const QueryStatistics& other) {
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);
_scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
_scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;

int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
Expand All @@ -51,6 +53,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
Expand All @@ -64,12 +68,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
_scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage();
_scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage();
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
Expand Down
13 changes: 13 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class QueryStatistics {
: scan_rows(0),
scan_bytes(0),
cpu_nanos(0),
_scan_bytes_from_local_storage(0),
_scan_bytes_from_remote_storage(0),
returned_rows(0),
max_peak_memory_bytes(0),
current_used_memory_bytes(0),
Expand All @@ -65,6 +67,13 @@ class QueryStatistics {
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}

void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) {
_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
}
void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) {
_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
}

void add_shuffle_send_bytes(int64_t delta_bytes) {
this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
}
Expand Down Expand Up @@ -95,6 +104,8 @@ class QueryStatistics {
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
_scan_bytes_from_local_storage.store(0, std::memory_order_relaxed);
_scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed);

returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
Expand All @@ -120,6 +131,8 @@ class QueryStatistics {
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
std::atomic<int64_t> _scan_bytes_from_local_storage;
std::atomic<int64_t> _scan_bytes_from_remote_storage;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
Expand Down
Loading

0 comments on commit 16c4747

Please sign in to comment.