Skip to content

Commit

Permalink
[Enhancement] support query lake compaction profile in FE (backport #…
Browse files Browse the repository at this point in the history
…50685) (#51275)

Signed-off-by: starrocks-xupeng <[email protected]>
  • Loading branch information
starrocks-xupeng authored Sep 23, 2024
1 parent dab2855 commit 27d0310
Show file tree
Hide file tree
Showing 19 changed files with 370 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ SchemaScanner::ColumnDesc SchemaBeCloudNativeCompactionsScanner::_s_columns[] =
{"START_TIME", TYPE_DATETIME, sizeof(DateTimeValue), true},
{"FINISH_TIME", TYPE_DATETIME, sizeof(DateTimeValue), true},
{"PROGRESS", TYPE_INT, sizeof(int32_t), false},
{"STATUS", TYPE_VARCHAR, sizeof(StringValue), false}};
{"STATUS", TYPE_VARCHAR, sizeof(StringValue), false},
{"PROFILE", TYPE_VARCHAR, sizeof(StringValue), false}};

SchemaBeCloudNativeCompactionsScanner::SchemaBeCloudNativeCompactionsScanner()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}
Expand Down Expand Up @@ -68,37 +69,43 @@ Status SchemaBeCloudNativeCompactionsScanner::fill_chunk(ChunkPtr* chunk) {
for (; _cur_idx < end; _cur_idx++) {
auto& info = _infos[_cur_idx];
for (const auto& [slot_id, index] : slot_id_to_index_map) {
if (slot_id < 1 || slot_id > 10) {
if (slot_id < 1 || slot_id > 11) {
return Status::InternalError(strings::Substitute("invalid slot id:$0", slot_id));
}
ColumnPtr column = (*chunk)->get_column_by_slot_id(slot_id);
switch (slot_id) {
case 1: {
// be id
// BE_ID
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&_be_id);
break;
}
case 2: {
// TXN_ID
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.txn_id);
break;
}
case 3: {
// TABLET_ID
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.tablet_id);
break;
}
case 4: {
// VERSION
fill_column_with_slot<TYPE_BIGINT>(column.get(), (void*)&info.version);
break;
}
case 5: {
// SKIPPED
fill_column_with_slot<TYPE_BOOLEAN>(column.get(), (void*)&info.skipped);
break;
}
case 6: {
// RUNS
fill_column_with_slot<TYPE_INT>(column.get(), (void*)&info.runs);
break;
}
case 7: {
// START_TIME
if (info.start_time > 0) {
DateTimeValue ts;
ts.from_unixtime(info.start_time, _ctz);
Expand All @@ -109,6 +116,7 @@ Status SchemaBeCloudNativeCompactionsScanner::fill_chunk(ChunkPtr* chunk) {
break;
}
case 8: {
// FINISH_TIME
if (info.finish_time > 0) {
DateTimeValue ts;
ts.from_unixtime(info.finish_time, _ctz);
Expand All @@ -119,15 +127,23 @@ Status SchemaBeCloudNativeCompactionsScanner::fill_chunk(ChunkPtr* chunk) {
break;
}
case 9: {
// PROGRESS
fill_column_with_slot<TYPE_INT>(column.get(), (void*)&info.progress);
break;
}
case 10: {
// STATUS
auto s = info.status.message();
Slice v(s);
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&v);
break;
}
case 11: {
// PROFILE
Slice v(info.profile.data(), info.profile.size());
fill_column_with_slot<TYPE_VARCHAR>(column.get(), (void*)&v);
break;
}
default:
break;
}
Expand Down
15 changes: 15 additions & 0 deletions be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ void CompactionTaskCallback::finish_task(std::unique_ptr<CompactionTaskContext>&
_response->add_failed_tablets(context->tablet_id);
}

// process compact stat
auto compact_stat = _response->add_compact_stats();
compact_stat->set_tablet_id(context->tablet_id);
compact_stat->set_read_time_remote(context->stats->io_ns_remote);
compact_stat->set_read_bytes_remote(context->stats->io_bytes_read_remote);
compact_stat->set_read_time_local(context->stats->io_ns_local_disk);
compact_stat->set_read_bytes_local(context->stats->io_bytes_read_local_disk);
compact_stat->set_in_queue_time_sec(context->stats->in_queue_time_sec);
compact_stat->set_sub_task_count(_request->tablet_ids_size());

DCHECK(_request != nullptr);
_status.update(context->status);

Expand Down Expand Up @@ -192,6 +202,9 @@ void CompactionScheduler::list_tasks(std::vector<CompactionTaskInfo>* infos) {
// Load "finish_time" with memory_order_acquire and check its value before reading the "status" to avoid
// the race condition between this thread and the `CompactionScheduler::thread_task` threads.
info.finish_time = context->finish_time.load(std::memory_order_acquire);
if (info.runs > 0) {
info.profile = context->stats->to_json_stats();
}
if (info.finish_time > 0) {
info.status = context->status;
}
Expand Down Expand Up @@ -328,6 +341,8 @@ Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext>
const auto txn_id = context->txn_id;
const auto version = context->version;

int64_t in_queue_time_sec = start_time > context->enqueue_time_sec ? (start_time - context->enqueue_time_sec) : 0;
context->stats->in_queue_time_sec += in_queue_time_sec;
context->start_time.store(start_time, std::memory_order_relaxed);
context->runs.fetch_add(1, std::memory_order_relaxed);

Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/lake/compaction_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct CompactionTaskInfo {
int runs; // How many times the compaction task has been executed
int progress; // 0-100
bool skipped;
std::string profile; // detailed execution info, such as io stats
};

class CompactionScheduler {
Expand Down Expand Up @@ -333,14 +334,17 @@ inline void CompactionScheduler::WrapTaskQueues::put_by_txn_id(int64_t txn_id,
std::unique_ptr<CompactionTaskContext>& context) {
std::lock_guard<std::mutex> lock(_task_queues_mutex);
int idx = _task_queue_safe_index(txn_id);
context->enqueue_time_sec = ::time(nullptr);
_internal_task_queues[idx]->put(std::move(context));
}

inline void CompactionScheduler::WrapTaskQueues::put_by_txn_id(
int64_t txn_id, std::vector<std::unique_ptr<CompactionTaskContext>>& contexts) {
std::lock_guard<std::mutex> lock(_task_queues_mutex);
int idx = _task_queue_safe_index(txn_id);
int64_t now = ::time(nullptr);
for (auto& context : contexts) {
context->enqueue_time_sec = now;
_internal_task_queues[idx]->put(std::move(context));
}
}
Expand Down
65 changes: 45 additions & 20 deletions be/src/storage/lake/compaction_task_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,60 @@
namespace starrocks::lake {

static constexpr long TIME_UNIT_NS_PER_SECOND = 1000000000;
static constexpr long BYTES_UNIT_MB = 1048576;

void CompactionTaskStats::accumulate(const OlapReaderStatistics& reader_stats) {
io_ns += reader_stats.io_ns;
io_ns_remote += reader_stats.io_ns_remote;
io_ns_local_disk += reader_stats.io_ns_local_disk;
segment_init_ns += reader_stats.segment_init_ns;
column_iterator_init_ns += reader_stats.column_iterator_init_ns;
io_count_local_disk += reader_stats.io_count_local_disk;
io_count_remote += reader_stats.io_count_remote;
compressed_bytes_read += reader_stats.compressed_bytes_read;
void CompactionTaskStats::collect(const OlapReaderStatistics& reader_stats) {
io_ns_remote = reader_stats.io_ns_remote;
io_ns_local_disk = reader_stats.io_ns_local_disk;
io_bytes_read_remote = reader_stats.compressed_bytes_read_remote;
io_bytes_read_local_disk = reader_stats.compressed_bytes_read_local_disk;
segment_init_ns = reader_stats.segment_init_ns;
column_iterator_init_ns = reader_stats.column_iterator_init_ns;
io_count_local_disk = reader_stats.io_count_local_disk;
io_count_remote = reader_stats.io_count_remote;
}

CompactionTaskStats CompactionTaskStats::operator+(const CompactionTaskStats& that) const {
CompactionTaskStats diff;
diff.io_ns_remote = io_ns_remote + that.io_ns_remote;
diff.io_ns_local_disk = io_ns_local_disk + that.io_ns_local_disk;
diff.io_bytes_read_remote = io_bytes_read_remote + that.io_bytes_read_remote;
diff.io_bytes_read_local_disk = io_bytes_read_local_disk + that.io_bytes_read_local_disk;
diff.segment_init_ns = segment_init_ns + that.segment_init_ns;
diff.column_iterator_init_ns = column_iterator_init_ns + that.column_iterator_init_ns;
diff.io_count_local_disk = io_count_local_disk + that.io_count_local_disk;
diff.io_count_remote = io_count_remote + that.io_count_remote;
return diff;
}

CompactionTaskStats CompactionTaskStats::operator-(const CompactionTaskStats& that) const {
CompactionTaskStats diff;
diff.io_ns_remote = io_ns_remote - that.io_ns_remote;
diff.io_ns_local_disk = io_ns_local_disk - that.io_ns_local_disk;
diff.io_bytes_read_remote = io_bytes_read_remote - that.io_bytes_read_remote;
diff.io_bytes_read_local_disk = io_bytes_read_local_disk - that.io_bytes_read_local_disk;
diff.segment_init_ns = segment_init_ns - that.segment_init_ns;
diff.column_iterator_init_ns = column_iterator_init_ns - that.column_iterator_init_ns;
diff.io_count_local_disk = io_count_local_disk - that.io_count_local_disk;
diff.io_count_remote = io_count_remote - that.io_count_remote;
return diff;
}

std::string CompactionTaskStats::to_json_stats() {
rapidjson::Document root;
root.SetObject();
auto& allocator = root.GetAllocator();
// add stats
root.AddMember("reader_total_time_second", rapidjson::Value(reader_time_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second", rapidjson::Value(io_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second_remote", rapidjson::Value(io_ns_remote / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("reader_io_second_local_disk", rapidjson::Value(io_ns_local_disk / TIME_UNIT_NS_PER_SECOND),
allocator);
root.AddMember("reader_io_count_remote", rapidjson::Value(io_count_remote), allocator);
root.AddMember("reader_io_count_local_disk", rapidjson::Value(io_count_local_disk), allocator);
root.AddMember("compressed_bytes_read", rapidjson::Value(compressed_bytes_read), allocator);
root.AddMember("segment_init_second", rapidjson::Value(segment_init_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("column_iterator_init_second", rapidjson::Value(column_iterator_init_ns / TIME_UNIT_NS_PER_SECOND),
root.AddMember("read_local_sec", rapidjson::Value(io_ns_local_disk / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("read_local_mb", rapidjson::Value(io_bytes_read_local_disk / BYTES_UNIT_MB), allocator);
root.AddMember("read_remote_sec", rapidjson::Value(io_ns_remote / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("read_remote_mb", rapidjson::Value(io_bytes_read_remote / BYTES_UNIT_MB), allocator);
root.AddMember("read_remote_count", rapidjson::Value(io_count_remote), allocator);
root.AddMember("read_local_count", rapidjson::Value(io_count_local_disk), allocator);
root.AddMember("segment_init_sec", rapidjson::Value(segment_init_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("column_iterator_init_sec", rapidjson::Value(column_iterator_init_ns / TIME_UNIT_NS_PER_SECOND),
allocator);
root.AddMember("segment_write_second", rapidjson::Value(segment_write_ns / TIME_UNIT_NS_PER_SECOND), allocator);
root.AddMember("in_queue_sec", rapidjson::Value(in_queue_time_sec), allocator);

rapidjson::StringBuffer strbuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
Expand Down
12 changes: 7 additions & 5 deletions be/src/storage/lake/compaction_task_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ class Progress {
};

struct CompactionTaskStats {
int64_t io_ns = 0;
int64_t io_ns_remote = 0;
int64_t io_ns_local_disk = 0;
int64_t io_bytes_read_remote = 0;
int64_t io_bytes_read_local_disk = 0;
int64_t segment_init_ns = 0;
int64_t column_iterator_init_ns = 0;
int64_t io_count_local_disk = 0;
int64_t io_count_remote = 0;
int64_t compressed_bytes_read = 0;
int64_t reader_time_ns = 0;
int64_t segment_write_ns = 0;
int64_t in_queue_time_sec = 0;

void accumulate(const OlapReaderStatistics& reader_stats);
void collect(const OlapReaderStatistics& reader_stats);
CompactionTaskStats operator+(const CompactionTaskStats& that) const;
CompactionTaskStats operator-(const CompactionTaskStats& that) const;
std::string to_json_stats();
};

Expand Down Expand Up @@ -82,6 +83,7 @@ struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
bool is_checker;
Status status;
Progress progress;
int64_t enqueue_time_sec; // time point when put into queue
std::shared_ptr<CompactionTaskCallback> callback;
std::unique_ptr<CompactionTaskStats> stats = std::make_unique<CompactionTaskStats>();
};
Expand Down
5 changes: 0 additions & 5 deletions be/src/storage/lake/general_tablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ Status HorizontalGeneralTabletWriter::open() {
}

Status HorizontalGeneralTabletWriter::write(const starrocks::Chunk& data, SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
if (_seg_writer == nullptr || _seg_writer->estimate_segment_size() >= config::max_segment_file_size ||
_seg_writer->num_rows_written() + data.num_rows() >= INT32_MAX /*TODO: configurable*/) {
RETURN_IF_ERROR(flush_segment_writer(segment));
Expand All @@ -59,7 +58,6 @@ Status HorizontalGeneralTabletWriter::flush(SegmentPB* segment) {
}

Status HorizontalGeneralTabletWriter::finish(SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
RETURN_IF_ERROR(flush_segment_writer(segment));
_finished = true;
return Status::OK();
Expand Down Expand Up @@ -135,7 +133,6 @@ Status VerticalGeneralTabletWriter::open() {

Status VerticalGeneralTabletWriter::write_columns(const Chunk& data, const std::vector<uint32_t>& column_indexes,
bool is_key) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
const size_t chunk_num_rows = data.num_rows();
if (_segment_writers.empty()) {
DCHECK(is_key);
Expand Down Expand Up @@ -207,7 +204,6 @@ Status VerticalGeneralTabletWriter::flush(SegmentPB* segment) {
}

Status VerticalGeneralTabletWriter::flush_columns() {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
if (_segment_writers.empty()) {
return Status::OK();
}
Expand All @@ -223,7 +219,6 @@ Status VerticalGeneralTabletWriter::flush_columns() {
}

Status VerticalGeneralTabletWriter::finish(SegmentPB* segment) {
SCOPED_RAW_TIMER(&_stats.segment_write_ns);
for (auto& segment_writer : _segment_writers) {
uint64_t segment_size = 0;
uint64_t footer_position = 0;
Expand Down
15 changes: 4 additions & 11 deletions be/src/storage/lake/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
std::vector<uint64_t> rssid_rowids;
rssid_rowids.reserve(chunk_size);

int64_t reader_time_ns = 0;
const bool enable_light_pk_compaction_publish = StorageEngine::instance()->enable_light_pk_compaction_publish();
while (true) {
if (UNLIKELY(StorageEngine::instance()->bg_worker_stopped())) {
Expand All @@ -76,7 +75,6 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
RETURN_IF_ERROR(tls_thread_status.mem_tracker()->check_mem_limit("Compaction"));
#endif
{
SCOPED_RAW_TIMER(&reader_time_ns);
auto st = Status::OK();
if (tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && enable_light_pk_compaction_publish) {
st = reader.get_next(chunk.get(), &rssid_rowids);
Expand All @@ -100,21 +98,16 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
rssid_rowids.clear();

_context->progress.update(100 * reader.stats().raw_rows_read / total_num_rows);
VLOG_EVERY_N(3, 1000) << "Tablet: " << _tablet.id() << ", compaction progress: " << _context->progress.value();
_context->stats->collect(reader.stats());
}

RETURN_IF_ERROR(writer->finish());

// Adjust the progress here for 2 reasons:
// 1. For primary key, due to the existence of the delete vector, the rows read may be less than "total_num_rows"
// 2. If the "total_num_rows" is 0, the progress will not be updated above
_context->progress.update(100);
RETURN_IF_ERROR(writer->finish());

// add reader stats
_context->stats->reader_time_ns += reader_time_ns;
_context->stats->accumulate(reader.stats());

// update writer stats
_context->stats->segment_write_ns += writer->stats().segment_write_ns;
_context->stats->collect(reader.stats());

auto txn_log = std::make_shared<TxnLog>();
auto op_compaction = txn_log->mutable_op_compaction();
Expand Down
Loading

0 comments on commit 27d0310

Please sign in to comment.