Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prom instance config and drop metric processor #1988

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,13 @@ void AppConfig::RegisterCallback(const std::string& key, std::function<bool()>*
mCallbacks[key] = callback;
}

void AppConfig::UnregisterCallback(const std::string& key) {
if (mCallbacks.find(key) == mCallbacks.end()) {
return;
}
mCallbacks.erase(key);
}

template <typename T>
T AppConfig::MergeConfig(const T& defaultValue,
const T& currentValue,
Expand Down
2 changes: 2 additions & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ class AppConfig {

// 注册回调
void RegisterCallback(const std::string& key, std::function<bool()>* callback);
// 注销回调
void UnregisterCallback(const std::string& key);

// 合并配置
std::string Merge(Json::Value& localConf,
Expand Down
2 changes: 1 addition & 1 deletion core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void Application::Start() { // GCOVR_EXCL_START

// destruct event handlers here so that it will not block file reading task
ConfigManager::GetInstance()->DeleteHandlers();
PrometheusInputRunner::GetInstance()->CheckGC();
prom::PrometheusServer::GetInstance()->CheckGC();

this_thread::sleep_for(chrono::seconds(1));
}
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace logtail {

PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
prom::PrometheusServer::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
#endif
Expand Down
2 changes: 2 additions & 0 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "plugin/flusher/blackhole/FlusherBlackHole.h"
#include "plugin/flusher/file/FlusherFile.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/processor/inner/ProcessorPromDropMetricNative.h"
#ifdef __ENTERPRISE__
#include "plugin/flusher/sls/EnterpriseFlusherSLSMonitor.h"
#endif
Expand Down Expand Up @@ -154,6 +155,7 @@ void PluginRegistry::LoadStaticPlugins() {
RegisterProcessorCreator(new StaticProcessorCreator<ProcessorFilterNative>());
RegisterProcessorCreator(new StaticProcessorCreator<ProcessorPromParseMetricNative>());
RegisterProcessorCreator(new StaticProcessorCreator<ProcessorPromRelabelMetricNative>());
RegisterProcessorCreator(new StaticProcessorCreator<ProcessorPromDropMetricNative>());
#if defined(__linux__) && !defined(__ANDROID__) && !defined(__EXCLUDE_SPL__)
if (BOOL_FLAG(enable_processor_spl)) {
RegisterProcessorCreator(new StaticProcessorCreator<ProcessorSPL>());
Expand Down
20 changes: 15 additions & 5 deletions core/plugin/input/InputPrometheus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineContext.h"
#include "pipeline/plugin/instance/ProcessorInstance.h"
#include "plugin/processor/inner/ProcessorPromDropMetricNative.h"
#include "plugin/processor/inner/ProcessorPromParseMetricNative.h"
#include "plugin/processor/inner/ProcessorPromRelabelMetricNative.h"
#include "prometheus/Constants.h"
Expand All @@ -44,12 +45,12 @@ bool InputPrometheus::Init(const Json::Value& config, Json::Value&) {
string errorMsg;

// config["ScrapeConfig"]
if (!config.isMember(prometheus::SCRAPE_CONFIG) || !config[prometheus::SCRAPE_CONFIG].isObject()) {
if (!config.isMember(prom::SCRAPE_CONFIG) || !config[prom::SCRAPE_CONFIG].isObject()) {
errorMsg = "ScrapeConfig not found";
LOG_ERROR(sLogger, ("init prometheus input failed", errorMsg));
return false;
}
const Json::Value& scrapeConfig = config[prometheus::SCRAPE_CONFIG];
const Json::Value& scrapeConfig = config[prom::SCRAPE_CONFIG];

// build scrape job
mTargetSubscirber = make_unique<TargetSubscriberScheduler>();
Expand All @@ -65,20 +66,21 @@ bool InputPrometheus::Init(const Json::Value& config, Json::Value&) {
/// @brief register scrape job by PrometheusInputRunner
bool InputPrometheus::Start() {
LOG_INFO(sLogger, ("input config start", mJobName));
PrometheusInputRunner::GetInstance()->Init();
prom::PrometheusServer::GetInstance()->Init();

mTargetSubscirber->mQueueKey = mContext->GetProcessQueueKey();
auto defaultLabels = GetMetricsRecordRef()->GetLabels();

PrometheusInputRunner::GetInstance()->UpdateScrapeInput(std::move(mTargetSubscirber), *defaultLabels, mContext->GetProjectName());
prom::PrometheusServer::GetInstance()->UpdateScrapeInput(
std::move(mTargetSubscirber), *defaultLabels, mContext->GetProjectName());
return true;
}

/// @brief unregister scrape job by PrometheusInputRunner
bool InputPrometheus::Stop(bool) {
LOG_INFO(sLogger, ("input config stop", mJobName));

PrometheusInputRunner::GetInstance()->RemoveScrapeInput(mJobName);
prom::PrometheusServer::GetInstance()->RemoveScrapeInput(mJobName);
return true;
}

Expand All @@ -100,6 +102,14 @@ bool InputPrometheus::CreateInnerProcessors(const Json::Value& inputConfig) {
}
mInnerProcessors.emplace_back(std::move(processor));
}
{
processor = PluginRegistry::GetInstance()->CreateProcessor(ProcessorPromDropMetricNative::sName,
mContext->GetPipeline().GenNextPluginMeta(false));
if (!processor->Init(inputConfig, *mContext)) {
return false;
}
mInnerProcessors.emplace_back(std::move(processor));
}
return true;
}

Expand Down
50 changes: 50 additions & 0 deletions core/plugin/processor/inner/ProcessorPromDropMetricNative.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "plugin/processor/inner/ProcessorPromDropMetricNative.h"

#include <json/json.h>

#include "models/MetricEvent.h"
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "prometheus/PrometheusInputRunner.h"

using namespace std;
namespace logtail {

const string ProcessorPromDropMetricNative::sName = "processor_prom_drop_metric_native";

// only for inner processor
bool ProcessorPromDropMetricNative::Init(const Json::Value&) {
mGlobalConfig = prom::PrometheusServer::GetInstance()->GetGlobalConfig();
return true;
}

void ProcessorPromDropMetricNative::Process(PipelineEventGroup& eGroup) {
if (!mGlobalConfig->HasDropMetrics()) {
return;
}
EventsContainer& events = eGroup.MutableEvents();
size_t wIdx = 0;
for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) {
if (ProcessEvent(events[rIdx])) {
if (wIdx != rIdx) {
events[wIdx] = std::move(events[rIdx]);
}
++wIdx;
}
}
events.resize(wIdx);
}

bool ProcessorPromDropMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const {
return e.Is<MetricEvent>();
}

bool ProcessorPromDropMetricNative::ProcessEvent(const PipelineEventPtr& e) {
if (!IsSupportedEvent(e)) {
return false;
}
const auto& sourceEvent = e.Cast<MetricEvent>();
return !mGlobalConfig->IsDropped(sourceEvent.GetName());
}

} // namespace logtail
31 changes: 31 additions & 0 deletions core/plugin/processor/inner/ProcessorPromDropMetricNative.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <string>

#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "pipeline/plugin/interface/Processor.h"
#include "prometheus/component/GlobalConfig.h"

namespace logtail {
class ProcessorPromDropMetricNative : public Processor {
public:
static const std::string sName;

const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override;
void Process(PipelineEventGroup&) override;

protected:
bool IsSupportedEvent(const PipelineEventPtr&) const override;

private:
bool ProcessEvent(const PipelineEventPtr&);

std::shared_ptr<prom::GlobalConfig> mGlobalConfig;
#ifdef APSARA_UNIT_TEST_MAIN
friend class InputPrometheusUnittest;
#endif
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
auto& sourceEvent = e.Cast<RawEvent>();
std::unique_ptr<MetricEvent> metricEvent = eGroup.CreateMetricEvent(true);
if (parser.ParseLine(sourceEvent.GetContent(), *metricEvent)) {
metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName());
metricEvent->SetTag(string(prom::NAME), metricEvent->GetName());
newEvents.emplace_back(std::move(metricEvent), true, nullptr);
}
return true;
Expand Down
27 changes: 14 additions & 13 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "prometheus/Constants.h"
#include "prometheus/PrometheusInputRunner.h"

using namespace std;

Expand All @@ -42,7 +43,7 @@ bool ProcessorPromRelabelMetricNative::Init(const Json::Value& config) {
return false;
}

mLoongCollectorScraper = STRING_FLAG(_pod_name_);
mGlobalConfig = prom::PrometheusServer::GetInstance()->GetGlobalConfig();

return true;
}
Expand Down Expand Up @@ -96,7 +97,7 @@ bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e,
if (!mScrapeConfigPtr->mHonorLabels) {
// metric event labels is secondary
// if confiliction, then rename it exported_<label_name>
auto key = prometheus::EXPORTED_PREFIX + k.to_string();
auto key = prom::EXPORTED_PREFIX + k.to_string();
auto b = sourceEvent.GetSourceBuffer()->CopyString(key);
sourceEvent.SetTagNoCopy(StringView(b.data, b.size), sourceEvent.GetTag(k));
sourceEvent.SetTagNoCopy(k, v);
Expand All @@ -112,7 +113,7 @@ bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e,
return false;
}
// set metricEvent name
sourceEvent.SetNameNoCopy(sourceEvent.GetTag(prometheus::NAME));
sourceEvent.SetNameNoCopy(sourceEvent.GetTag(prom::NAME));

for (const auto& k : toDelete) {
sourceEvent.DelTag(k);
Expand All @@ -122,7 +123,7 @@ bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e,
}

// set metricEvent name
sourceEvent.SetTagNoCopy(prometheus::NAME, sourceEvent.GetName());
sourceEvent.SetTagNoCopy(prom::NAME, sourceEvent.GetName());

return true;
}
Expand Down Expand Up @@ -157,23 +158,23 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metric
auto scrapeDurationSeconds
= StringTo<double>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION).to_string());

AddMetric(metricGroup, prometheus::SCRAPE_DURATION_SECONDS, scrapeDurationSeconds, timestamp, nanoSec, targetTags);
AddMetric(metricGroup, prom::SCRAPE_DURATION_SECONDS, scrapeDurationSeconds, timestamp, nanoSec, targetTags);

auto scrapeResponseSize
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE).to_string());
AddMetric(metricGroup, prometheus::SCRAPE_RESPONSE_SIZE_BYTES, scrapeResponseSize, timestamp, nanoSec, targetTags);
AddMetric(metricGroup, prom::SCRAPE_RESPONSE_SIZE_BYTES, scrapeResponseSize, timestamp, nanoSec, targetTags);

if (mScrapeConfigPtr->mSampleLimit > 0) {
AddMetric(metricGroup,
prometheus::SCRAPE_SAMPLES_LIMIT,
prom::SCRAPE_SAMPLES_LIMIT,
mScrapeConfigPtr->mSampleLimit,
timestamp,
nanoSec,
targetTags);
}

AddMetric(metricGroup,
prometheus::SCRAPE_SAMPLES_POST_METRIC_RELABELING,
prom::SCRAPE_SAMPLES_POST_METRIC_RELABELING,
samplesPostMetricRelabel,
timestamp,
nanoSec,
Expand All @@ -182,10 +183,10 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metric
auto samplesScraped
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED).to_string());

AddMetric(metricGroup, prometheus::SCRAPE_SAMPLES_SCRAPED, samplesScraped, timestamp, nanoSec, targetTags);
AddMetric(metricGroup, prom::SCRAPE_SAMPLES_SCRAPED, samplesScraped, timestamp, nanoSec, targetTags);

AddMetric(metricGroup,
prometheus::SCRAPE_TIMEOUT_SECONDS,
prom::SCRAPE_TIMEOUT_SECONDS,
mScrapeConfigPtr->mScrapeTimeoutSeconds,
timestamp,
nanoSec,
Expand All @@ -196,12 +197,12 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metric

if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE)) {
auto scrapeState = metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE);
AddMetric(metricGroup, prometheus::SCRAPE_STATE, 1.0 * upState, timestamp, nanoSec, targetTags);
AddMetric(metricGroup, prom::SCRAPE_STATE, 1.0 * upState, timestamp, nanoSec, targetTags);
auto& last = metricGroup.MutableEvents()[metricGroup.GetEvents().size() - 1];
last.Cast<MetricEvent>().SetTag(METRIC_LABEL_KEY_STATUS, scrapeState);
}

AddMetric(metricGroup, prometheus::UP, 1.0 * upState, timestamp, nanoSec, targetTags);
AddMetric(metricGroup, prom::UP, 1.0 * upState, timestamp, nanoSec, targetTags);
}

void ProcessorPromRelabelMetricNative::AddMetric(PipelineEventGroup& metricGroup,
Expand All @@ -214,7 +215,7 @@ void ProcessorPromRelabelMetricNative::AddMetric(PipelineEventGroup& metricGroup
metricEvent->SetName(name);
metricEvent->SetValue<UntypedSingleValue>(value);
metricEvent->SetTimestamp(timestamp, nanoSec);
metricEvent->SetTag(prometheus::NAME, name);
metricEvent->SetTag(prom::NAME, name);
for (const auto& [k, v] : targetTags) {
metricEvent->SetTag(k, v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "pipeline/plugin/interface/Processor.h"
#include "prometheus/component/GlobalConfig.h"
#include "prometheus/schedulers/ScrapeConfig.h"

namespace logtail {
Expand Down Expand Up @@ -48,7 +49,7 @@ class ProcessorPromRelabelMetricNative : public Processor {
const GroupTags& targetTags);

std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;
std::string mLoongCollectorScraper;
std::shared_ptr<prom::GlobalConfig> mGlobalConfig;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorPromRelabelMetricNativeUnittest;
Expand Down
5 changes: 4 additions & 1 deletion core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#include <cstdint>
#include <string>

namespace logtail::prometheus {
namespace logtail::prom {

// app config
const std::string PROM_DROP_METRICS = "prom_drop_metrics_list";

// magic number for labels hash, from https://github.com/prometheus/common/blob/main/model/fnv.go#L19
const uint64_t PRIME64 = 1099511628211;
Expand Down
Loading
Loading