Skip to content

Commit

Permalink
json_healthcheck cache
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen committed Jun 13, 2024
1 parent 32c0c65 commit b9d24a2
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 38 deletions.
149 changes: 111 additions & 38 deletions ydb/core/viewer/json_healthcheck.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/actors/core/mon.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/grpc_services/db_metadata_cache.h>
#include <ydb/library/services/services.pb.h>
#include "viewer.h"
#include <ydb/core/viewer/json/json.h>
#include <ydb/core/health_check/health_check.h>
#include <ydb/core/util/proto_duration.h>
#include <library/cpp/monlib/encode/prometheus/prometheus.h>
#include <util/string/split.h>
#include "json_pipe_req.h"
#include "healthcheck_record.h"
#include <vector>

Expand All @@ -23,14 +25,20 @@ enum HealthCheckResponseFormat {
PROMETHEUS
};

class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
class TJsonHealthCheck : public TViewerPipeClient<TJsonHealthCheck> {
using TBase = TViewerPipeClient<TJsonHealthCheck>;
IViewer* Viewer;
static const bool WithRetry = false;
NMon::TEvHttpInfo::TPtr Event;
TJsonSettings JsonSettings;
ui32 Timeout = 0;
HealthCheckResponseFormat Format;
TString Database;
bool Cache = true;
bool MergeRecords = false;
std::optional<Ydb::Monitoring::SelfCheckResult> Result;
std::optional<TNodeId> SubscribedNodeId;
Ydb::Monitoring::StatusFlag::Status MinStatus = Ydb::Monitoring::StatusFlag::UNSPECIFIED;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand All @@ -42,8 +50,34 @@ class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
, Event(ev)
{}

void Bootstrap(const TActorContext& ctx) {
THolder<NHealthCheck::TEvSelfCheckRequest> MakeSelfCheckRequest() {
const auto& params(Event->Get()->Request.GetParams());
THolder<NHealthCheck::TEvSelfCheckRequest> request = MakeHolder<NHealthCheck::TEvSelfCheckRequest>();
request->Database = Database;
if (params.Has("verbose")) {
request->Request.set_return_verbose_status(FromStringWithDefault<bool>(params.Get("verbose"), false));
}
if (params.Has("max_level")) {
request->Request.set_maximum_level(FromStringWithDefault<ui32>(params.Get("max_level"), 0));
}
if (MinStatus != Ydb::Monitoring::StatusFlag::UNSPECIFIED) {
request->Request.set_minimum_status(MinStatus);
}
if (params.Has("merge_records")) {
request->Request.set_merge_records(MergeRecords);
}
SetDuration(TDuration::MilliSeconds(Timeout), *request->Request.mutable_operation_params()->mutable_operation_timeout());
return request;
}

void SendHealthCheckRequest() {
auto request = MakeSelfCheckRequest();
Send(NHealthCheck::MakeHealthCheckID(), request.Release());
}

void Bootstrap() {
const auto& params(Event->Get()->Request.GetParams());
InitConfig(params);

Format = HealthCheckResponseFormat::JSON;
if (params.Has("format")) {
Expand All @@ -68,43 +102,49 @@ class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false);
}
Database = params.Get("tenant");
Cache = FromStringWithDefault<bool>(params.Get("cache"), Cache);
MergeRecords = FromStringWithDefault<bool>(params.Get("merge_records"), MergeRecords);
Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
THolder<NHealthCheck::TEvSelfCheckRequest> request = MakeHolder<NHealthCheck::TEvSelfCheckRequest>();
request->Database = Database = params.Get("tenant");
request->Request.set_return_verbose_status(FromStringWithDefault<bool>(params.Get("verbose"), false));
request->Request.set_maximum_level(FromStringWithDefault<ui32>(params.Get("max_level"), 0));
request->Request.set_merge_records(FromStringWithDefault<bool>(params.Get("merge_records"), false));
SetDuration(TDuration::MilliSeconds(Timeout), *request->Request.mutable_operation_params()->mutable_operation_timeout());
if (params.Has("min_status")) {
Ydb::Monitoring::StatusFlag::Status minStatus;
if (Ydb::Monitoring::StatusFlag_Status_Parse(params.Get("min_status"), &minStatus)) {
request->Request.set_minimum_status(minStatus);
} else {
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
return PassAway();
}

if (params.Get("min_status") && !Ydb::Monitoring::StatusFlag_Status_Parse(params.Get("min_status"), &MinStatus)) {
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "The field 'min_status' cannot be parsed"), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
return PassAway();
}
if (AppData()->FeatureFlags.GetEnableDbMetadataCache() && Cache && Database && MergeRecords) {
RequestStateStorageMetadataCacheEndpointsLookup(Database);
} else {
SendHealthCheckRequest();
}
Send(NHealthCheck::MakeHealthCheckID(), request.Release());
Timeout += Timeout * 20 / 100; // we prefer to wait for more (+20%) verbose timeout status from HC
ctx.Schedule(TDuration::Seconds(Timeout), new TEvents::TEvWakeup());
Become(&TThis::StateRequestedInfo);
Become(&TThis::StateRequestedInfo, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
}

void PassAway() override {
if (SubscribedNodeId.has_value()) {
Send(TActivationContext::InterconnectProxy(SubscribedNodeId.value()), new TEvents::TEvUnsubscribe());
}
TBase::PassAway();
}

STFUNC(StateRequestedInfo) {
switch (ev->GetTypeRewrite()) {
HFunc(NHealthCheck::TEvSelfCheckResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(NHealthCheck::TEvSelfCheckResult, Handle);
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(NHealthCheck::TEvSelfCheckResultProto, Handle);
cFunc(TEvents::TSystem::Undelivered, SendHealthCheckRequest);
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
}
}

int GetIssueCount(const Ydb::Monitoring::IssueLog& issueLog) {
return issueLog.count() == 0 ? 1 : issueLog.count();
}

THolder<THashMap<TMetricRecord, ui32>> GetRecordCounters(NHealthCheck::TEvSelfCheckResult::TPtr& ev) {
THolder<THashMap<TMetricRecord, ui32>> GetRecordCounters() {
const auto *descriptor = Ydb::Monitoring::StatusFlag_Status_descriptor();
THashMap<TMetricRecord, ui32> recordCounters;
for (auto& log : ev->Get()->Result.issue_log()) {
for (auto& log : Result->issue_log()) {
TMetricRecord record {
.Database = log.location().database().name(),
.Message = log.message(),
Expand All @@ -123,15 +163,14 @@ class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
return MakeHolder<THashMap<TMetricRecord, ui32>>(recordCounters);
}

void HandleJSON(NHealthCheck::TEvSelfCheckResult::TPtr& ev, const TActorContext &ctx) {
void HandleJSON() {
TStringStream json;
TProtoToJson::ProtoToJson(json, ev->Get()->Result, JsonSettings);
ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get(), json.Str()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
Die(ctx);
TProtoToJson::ProtoToJson(json, *Result, JsonSettings);
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKJSON(Event->Get(), json.Str()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
}

void HandlePrometheus(NHealthCheck::TEvSelfCheckResult::TPtr& ev, const TActorContext &ctx) {
auto recordCounters = GetRecordCounters(ev);
void HandlePrometheus() {
auto recordCounters = GetRecordCounters();

TStringStream ss;
IMetricEncoderPtr encoder = EncoderPrometheus(&ss);
Expand Down Expand Up @@ -159,7 +198,7 @@ class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
}
}
const auto *descriptor = Ydb::Monitoring::SelfCheck_Result_descriptor();
auto result = descriptor->FindValueByNumber(ev->Get()->Result.self_check_result())->name();
auto result = descriptor->FindValueByNumber(Result->self_check_result())->name();
e->OnMetricBegin(EMetricType::IGAUGE);
{
e->OnLabelsBegin();
Expand All @@ -175,21 +214,50 @@ class TJsonHealthCheck : public TActorBootstrapped<TJsonHealthCheck> {
e->OnMetricEnd();
e->OnStreamEnd();

ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + ss.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
Die(ctx);
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + ss.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
}

void Handle(NHealthCheck::TEvSelfCheckResult::TPtr& ev, const TActorContext &ctx) {
if (Format == HealthCheckResponseFormat::JSON) {
HandleJSON(ev, ctx);
void ReplyAndPassAway() {
if (Result) {
if (Format == HealthCheckResponseFormat::JSON) {
HandleJSON();
} else {
HandlePrometheus();
}
}
PassAway();
}

void Handle(NHealthCheck::TEvSelfCheckResult::TPtr& ev) {
Result = std::move(ev->Get()->Result);
ReplyAndPassAway();
}

void Handle(TEvents::TEvUndelivered::TPtr&) {
SendHealthCheckRequest();
}

void Handle(NHealthCheck::TEvSelfCheckResultProto::TPtr& ev) {
Result = std::move(ev->Get()->Record);
NHealthCheck::RemoveUnrequestedEntries(*Result, MakeSelfCheckRequest().Release()->Request);
ReplyAndPassAway();
}

void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
auto activeNode = TDatabaseMetadataCache::PickActiveNode(ev->Get()->InfoEntries);
if (activeNode != 0) {
SubscribedNodeId = activeNode;
std::optional<TActorId> cache = MakeDatabaseMetadataCacheId(activeNode);
auto request = MakeHolder<NHealthCheck::TEvSelfCheckRequestProto>();
Send(*cache, request.Release());
} else {
HandlePrometheus(ev, ctx);
SendHealthCheckRequest();
}
}

void HandleTimeout(const TActorContext &ctx) {
void HandleTimeout() {
Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
Die(ctx);
PassAway();
}
};

Expand Down Expand Up @@ -224,6 +292,11 @@ struct TJsonRequestParameters<TJsonHealthCheck> {
description: path to database
required: false
type: string
- name: cache
in: query
description: use cache
required: false
type: boolean
- name: verbose
in: query
description: return verbose status
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/viewer/json_pipe_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/cms/console/console.h>
#include <ydb/core/base/hive.h>
#include <ydb/core/base/statestorage.h>
#include <ydb/core/grpc_services/db_metadata_cache.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
Expand Down Expand Up @@ -248,6 +249,16 @@ class TViewerPipeClient : public TActorBootstrapped<TDerived> {
++Requests;
}

void RequestStateStorageMetadataCacheEndpointsLookup(const TString& path) {
if (!AppData()->DomainsInfo->Domain) {
return;
}
TBase::RegisterWithSameMailbox(CreateBoardLookupActor(MakeDatabaseMetadataCacheBoardPath(path),
TBase::SelfId(),
EBoardLookupMode::Second));
++Requests;
}

std::vector<TNodeId> GetNodesFromBoardReply(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
std::vector<TNodeId> databaseNodes;
if (ev->Get()->Status == TEvStateStorage::TEvBoardInfo::EStatus::Ok) {
Expand Down

0 comments on commit b9d24a2

Please sign in to comment.