Skip to content

Commit

Permalink
[fix][store] Add time statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD authored and rock-git committed Oct 30, 2024
1 parent 92c5795 commit 958f383
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/common/tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ bvar::LatencyRecorder Tracker::raft_queue_wait_latency("dingo_tracker_raft_queue
bvar::LatencyRecorder Tracker::raft_apply_latency("dingo_tracker_raft_apply");
bvar::LatencyRecorder Tracker::read_store_latency("dingo_tracker_read_store");

bvar::LatencyRecorder Tracker::store_write_latency("dingo_tracker_store_write");
bvar::LatencyRecorder Tracker::vector_index_write_latency("dingo_tracker_vector_index_write");
bvar::LatencyRecorder Tracker::document_index_write_latency("dingo_tracker_document_index_write");

} // namespace dingodb
19 changes: 16 additions & 3 deletions src/common/tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,22 @@ class Tracker {
}
uint64_t RaftApplyTime() const { return metrics_.raft_apply_time_ns; }

void SetStoreWriteTime(uint64_t elapsed_time) { metrics_.store_write_time_ns = elapsed_time; }
void SetStoreWriteTime(uint64_t elapsed_time) {
metrics_.store_write_time_ns = elapsed_time;
store_write_latency << metrics_.store_write_time_ns / 1000;
}
uint64_t StoreWriteTime() const { return metrics_.store_write_time_ns; }

void SetVectorIndexWriteTime(uint64_t elapsed_time) { metrics_.vector_index_write_time_ns = elapsed_time; }
void SetVectorIndexWriteTime(uint64_t elapsed_time) {
metrics_.vector_index_write_time_ns = elapsed_time;
vector_index_write_latency << metrics_.vector_index_write_time_ns / 1000;
}
uint64_t VectorIndexwriteTime() const { return metrics_.vector_index_write_time_ns; }

void SetDocumentIndexWriteTime(uint64_t elapsed_time) { metrics_.document_index_write_time_ns = elapsed_time; }
void SetDocumentIndexWriteTime(uint64_t elapsed_time) {
metrics_.document_index_write_time_ns = elapsed_time;
document_index_write_latency << metrics_.document_index_write_time_ns / 1000;
}
uint64_t DocumentIndexwriteTime() const { return metrics_.document_index_write_time_ns; }

void SetReadStoreTime() {
Expand All @@ -124,6 +133,10 @@ class Tracker {
static bvar::LatencyRecorder raft_apply_latency;
static bvar::LatencyRecorder read_store_latency;

static bvar::LatencyRecorder store_write_latency;
static bvar::LatencyRecorder vector_index_write_latency;
static bvar::LatencyRecorder document_index_write_latency;

private:
uint64_t start_time_;
uint64_t last_time_;
Expand Down
6 changes: 4 additions & 2 deletions src/handler/raft_apply_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1395,12 +1395,11 @@ int DocumentAddHandler::Handle(std::shared_ptr<Context> ctx, store::RegionPtr re
document_with_id.set_id(document.id());
document_with_ids.push_back(document_with_id);
}

auto start_time = Helper::TimestampNs();
auto status = request.is_update() ? document_index_wrapper->Upsert(document_with_ids)
: document_index_wrapper->Add(document_with_ids);
if (tracker) tracker->SetDocumentIndexWriteTime(Helper::TimestampNs() - start_time);
DINGO_LOG(DEBUG) << fmt::format("[raft.apply][region({})] upsert document, count: {} cost: {}us",
DINGO_LOG(DEBUG) << fmt::format("[raft.apply][region({})] upsert document, count: {} cost: {}ns",
document_index_id, document_with_ids.size(),
Helper::TimestampNs() - start_time);
if (status.ok()) {
Expand Down Expand Up @@ -1494,6 +1493,9 @@ int DocumentDeleteHandler::Handle(std::shared_ptr<Context> ctx, store::RegionPtr
auto start_time = Helper::TimestampNs();
auto status = document_index_wrapper->Delete(Helper::PbRepeatedToVector(request.ids()));
if (tracker) tracker->SetDocumentIndexWriteTime(Helper::TimestampNs() - start_time);
DINGO_LOG(DEBUG) << fmt::format("[raft.apply][region({})] delete document, count: {} cost: {}ns",
document_index_id, request.ids().size(),
Helper::TimestampNs() - start_time);
if (status.ok()) {
if (region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE && log_id != INT64_MAX) {
document_index_wrapper->SetApplyLogId(log_id);
Expand Down
7 changes: 6 additions & 1 deletion src/handler/raft_apply_handler_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void TxnHandler::HandleMultiCfPutAndDeleteRequest(std::shared_ptr<Context> ctx,
"[txn][region({})] HandleMultiCfPutAndDelete fail, term: {} apply_log_id: {}, error: {} request: {}.",
region->Id(), term_id, log_id, status.error_str(), request.ShortDebugString());
}
auto tracker = ctx ? ctx->Tracker() : nullptr;

// check if need to commit to vector index
{
Expand All @@ -119,6 +120,8 @@ void TxnHandler::HandleMultiCfPutAndDeleteRequest(std::shared_ptr<Context> ctx,
add_ctx->SetRegionId(region->Id());
add_ctx->SetCfName(Constant::kVectorDataCF);
add_ctx->SetRegionEpoch(region->Definition().epoch());
if (tracker) add_ctx->SetTracker(tracker);

pb::raft::Request raft_request_for_vector_add;
for (const auto &vector : vector_add.vectors()) {
auto *new_vector = raft_request_for_vector_add.mutable_vector_add()->add_vectors();
Expand Down Expand Up @@ -152,7 +155,7 @@ void TxnHandler::HandleMultiCfPutAndDeleteRequest(std::shared_ptr<Context> ctx,
del_ctx->SetRegionId(region->Id());
del_ctx->SetCfName(Constant::kVectorDataCF);
del_ctx->SetRegionEpoch(region->Definition().epoch());

if (tracker) del_ctx->SetTracker(tracker);
pb::raft::Request raft_request_for_vector_del;
for (const auto &id : vector_del.ids()) {
raft_request_for_vector_del.mutable_vector_delete()->add_ids(id);
Expand Down Expand Up @@ -187,6 +190,7 @@ void TxnHandler::HandleMultiCfPutAndDeleteRequest(std::shared_ptr<Context> ctx,
add_ctx->SetRegionId(region->Id());
add_ctx->SetCfName(Constant::kStoreDataCF);
add_ctx->SetRegionEpoch(region->Definition().epoch());
if (tracker) add_ctx->SetTracker(tracker);
pb::raft::Request raft_request_for_document_add;
for (const auto &document : document_add.documents()) {
auto *new_document = raft_request_for_document_add.mutable_document_add()->add_documents();
Expand Down Expand Up @@ -221,6 +225,7 @@ void TxnHandler::HandleMultiCfPutAndDeleteRequest(std::shared_ptr<Context> ctx,
del_ctx->SetRegionId(region->Id());
del_ctx->SetCfName(Constant::kStoreDataCF);
del_ctx->SetRegionEpoch(region->Definition().epoch());
if (tracker) del_ctx->SetTracker(tracker);
pb::raft::Request raft_request_for_document_del;
for (const auto &id : document_del.ids()) {
raft_request_for_document_del.mutable_document_delete()->add_ids(id);
Expand Down
4 changes: 4 additions & 0 deletions src/server/service_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ inline void SetPbMessageResponseInfo(google::protobuf::Message* message, Tracker
<< response_info_field->message_type()->full_name();
return;
}

pb::common::ResponseInfo* response_info =
dynamic_cast<pb::common::ResponseInfo*>(reflection->MutableMessage(message, response_info_field));
auto* time_info = response_info->mutable_time_info();
Expand All @@ -226,6 +227,9 @@ inline void SetPbMessageResponseInfo(google::protobuf::Message* message, Tracker
time_info->set_raft_commit_time_ns(tracker->RaftCommitTime());
time_info->set_raft_queue_wait_time_ns(tracker->RaftQueueWaitTime());
time_info->set_raft_apply_time_ns(tracker->RaftApplyTime());
time_info->set_store_write_time_ns(tracker->StoreWriteTime());
time_info->set_vector_index_write_time_ns(tracker->VectorIndexwriteTime());
time_info->set_document_index_write_time_ns(tracker->DocumentIndexwriteTime());
}

template <typename T, typename U, bool need_region>
Expand Down

0 comments on commit 958f383

Please sign in to comment.