Skip to content

Commit

Permalink
add flusher_sls_monitor (#1944)
Browse files Browse the repository at this point in the history
* init

* polish
  • Loading branch information
Takuka0311 authored Dec 9, 2024
1 parent 1be7605 commit 4832d8f
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 56 deletions.
1 change: 1 addition & 0 deletions core/constants/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const char* SLS_EMPTY_STR_FOR_INDEX = "\01";
// profile project
const std::string PROFILE_PROJECT = "profile_project";
const std::string PROFILE_PROJECT_REGION = "profile_project_region";
const std::string PROFILE_LOGSTORE = "profile_logstore";

// global config
const std::string GLOBAL_CONFIG_NODE = "config";
Expand Down
1 change: 1 addition & 0 deletions core/constants/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern const char* SLS_EMPTY_STR_FOR_INDEX;
// profile project
extern const std::string PROFILE_PROJECT;
extern const std::string PROFILE_PROJECT_REGION;
extern const std::string PROFILE_LOGSTORE;

// global config
extern const std::string GLOBAL_CONFIG_NODE;
Expand Down
3 changes: 0 additions & 3 deletions core/monitor/MetricManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ using namespace std;

namespace logtail {

const string METRIC_KEY_CATEGORY = "category";
const string METRIC_KEY_LABEL = "label";
const string METRIC_TOPIC_TYPE = "loongcollector_metric";
const string METRIC_EXPORT_TYPE_GO = "direct";
const string METRIC_EXPORT_TYPE_CPP = "cpp_provided";

Expand Down
2 changes: 0 additions & 2 deletions core/monitor/MetricManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

namespace logtail {

extern const std::string METRIC_TOPIC_TYPE;

class WriteMetrics {
private:
WriteMetrics() = default;
Expand Down
1 change: 0 additions & 1 deletion core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ void SelfMonitorServer::SendMetrics() {

PipelineEventGroup pipelineEventGroup(std::make_shared<SourceBuffer>());
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr);
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_TOPIC, METRIC_TOPIC_TYPE);
ReadAsPipelineEventGroup(pipelineEventGroup);

shared_ptr<Pipeline> pipeline
Expand Down
6 changes: 6 additions & 0 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
#include "plugin/flusher/blackhole/FlusherBlackHole.h"
#include "plugin/flusher/file/FlusherFile.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#ifdef __ENTERPRISE__
#include "plugin/flusher/sls/EnterpriseFlusherSLSMonitor.h"
#endif
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"
#include "plugin/input/InputPrometheus.h"
Expand Down Expand Up @@ -160,6 +163,9 @@ void PluginRegistry::LoadStaticPlugins() {
RegisterFlusherCreator(new StaticFlusherCreator<FlusherSLS>());
RegisterFlusherCreator(new StaticFlusherCreator<FlusherBlackHole>());
RegisterFlusherCreator(new StaticFlusherCreator<FlusherFile>());
#ifdef __ENTERPRISE__
RegisterFlusherCreator(new StaticFlusherCreator<FlusherSLSMonitor>());
#endif
}

void PluginRegistry::LoadDynamicPlugins(const set<string>& plugins) {
Expand Down
67 changes: 33 additions & 34 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,56 +208,56 @@ void FlusherSLS::SetDefaultRegion(const string& region) {
sDefaultRegion = region;
}

mutex FlusherSLS::sProjectRefCntMapLock;
mutex FlusherSLS::sProjectRegionMapLock;
unordered_map<string, int32_t> FlusherSLS::sProjectRefCntMap;
mutex FlusherSLS::sRegionRefCntMapLock;
unordered_map<string, int32_t> FlusherSLS::sRegionRefCntMap;
unordered_map<string, string> FlusherSLS::sProjectRegionMap;

string FlusherSLS::GetAllProjects() {
string result;
lock_guard<mutex> lock(sProjectRefCntMapLock);
lock_guard<mutex> lock(sProjectRegionMapLock);
for (auto iter = sProjectRefCntMap.cbegin(); iter != sProjectRefCntMap.cend(); ++iter) {
result.append(iter->first).append(" ");
}
return result;
}

void FlusherSLS::IncreaseProjectReferenceCnt(const string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
++sProjectRefCntMap[project];
bool FlusherSLS::IsRegionContainingConfig(const string& region) {
lock_guard<mutex> lock(sProjectRegionMapLock);
return sRegionRefCntMap.find(region) != sRegionRefCntMap.end();
}

void FlusherSLS::DecreaseProjectReferenceCnt(const string& project) {
lock_guard<mutex> lock(sProjectRefCntMapLock);
auto iter = sProjectRefCntMap.find(project);
if (iter == sProjectRefCntMap.end()) {
// should not happen
return;
}
if (--iter->second == 0) {
sProjectRefCntMap.erase(iter);
std::string FlusherSLS::GetProjectRegion(const std::string& project) {
lock_guard<mutex> lock(sProjectRegionMapLock);
auto iter = sProjectRegionMap.find(project);
if (iter == sProjectRegionMap.end()) {
return "";
}
return iter->second;
}

bool FlusherSLS::IsRegionContainingConfig(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
return sRegionRefCntMap.find(region) != sRegionRefCntMap.end();
}

void FlusherSLS::IncreaseRegionReferenceCnt(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
void FlusherSLS::IncreaseProjectRegionReferenceCnt(const string& project, const string& region) {
lock_guard<mutex> lock(sProjectRegionMapLock);
++sProjectRefCntMap[project];
++sRegionRefCntMap[region];
sProjectRegionMap[project] = region;
}

void FlusherSLS::DecreaseRegionReferenceCnt(const string& region) {
lock_guard<mutex> lock(sRegionRefCntMapLock);
auto iter = sRegionRefCntMap.find(region);
if (iter == sRegionRefCntMap.end()) {
// should not happen
return;
void FlusherSLS::DecreaseProjectRegionReferenceCnt(const string& project, const string& region) {
lock_guard<mutex> lock(sProjectRegionMapLock);
auto projectRefCnt = sProjectRefCntMap.find(project);
if (projectRefCnt != sProjectRefCntMap.end()) {
if (--projectRefCnt->second == 0) {
sProjectRefCntMap.erase(projectRefCnt);
sProjectRegionMap.erase(project);
}
}
if (--iter->second == 0) {
sRegionRefCntMap.erase(iter);

auto regionRefCnt = sRegionRefCntMap.find(region);
if (regionRefCnt != sRegionRefCntMap.end()) {
if (--regionRefCnt->second == 0) {
sRegionRefCntMap.erase(regionRefCnt);
}
}
}

Expand Down Expand Up @@ -540,21 +540,20 @@ bool FlusherSLS::Start() {
Flusher::Start();
InitResource();

IncreaseProjectReferenceCnt(mProject);
IncreaseRegionReferenceCnt(mRegion);
IncreaseProjectRegionReferenceCnt(mProject, mRegion);
SLSClientManager::GetInstance()->IncreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}

bool FlusherSLS::Stop(bool isPipelineRemoving) {
Flusher::Stop(isPipelineRemoving);

DecreaseProjectReferenceCnt(mProject);
DecreaseRegionReferenceCnt(mRegion);
DecreaseProjectRegionReferenceCnt(mProject, mRegion);
SLSClientManager::GetInstance()->DecreaseAliuidReferenceCntForRegion(mRegion, mAliuid);
return true;
}


bool FlusherSLS::Send(PipelineEventGroup&& g) {
if (g.IsReplay()) {
return SerializeAndPush(std::move(g));
Expand Down
11 changes: 5 additions & 6 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class FlusherSLS : public HttpFlusher {
static void SetDefaultRegion(const std::string& region);
static std::string GetAllProjects();
static bool IsRegionContainingConfig(const std::string& region);
static std::string GetProjectRegion(const std::string& project);

// TODO: should be moved to enterprise config provider
static bool GetRegionStatus(const std::string& region);
Expand Down Expand Up @@ -90,10 +91,8 @@ class FlusherSLS : public HttpFlusher {

static void InitResource();

static void IncreaseProjectReferenceCnt(const std::string& project);
static void DecreaseProjectReferenceCnt(const std::string& project);
static void IncreaseRegionReferenceCnt(const std::string& region);
static void DecreaseRegionReferenceCnt(const std::string& region);
static void IncreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region);
static void DecreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region);

static std::mutex sMux;
static std::unordered_map<std::string, std::weak_ptr<ConcurrencyLimiter>> sProjectConcurrencyLimiterMap;
Expand All @@ -103,10 +102,10 @@ class FlusherSLS : public HttpFlusher {
static std::mutex sDefaultRegionLock;
static std::string sDefaultRegion;

static std::mutex sProjectRefCntMapLock;
static std::mutex sProjectRegionMapLock;
static std::unordered_map<std::string, int32_t> sProjectRefCntMap;
static std::mutex sRegionRefCntMapLock;
static std::unordered_map<std::string, int32_t> sRegionRefCntMap;
static std::unordered_map<std::string, std::string> sProjectRegionMap;

// TODO: should be moved to enterprise config provider
static std::mutex sRegionStatusLock;
Expand Down
23 changes: 17 additions & 6 deletions core/unittest/config/CommonConfigProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "gmock/gmock.h"
Expand Down Expand Up @@ -433,18 +436,22 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理 pipelineconfig
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[0].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_EQUAL(1U + builtinPipelineCnt, pipelineConfigDiff.first.mAdded.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[builtinPipelineCnt].mName, "config1");
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);
// 再次处理 pipelineconfig
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty());
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1);
APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt);
APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr);


Expand Down Expand Up @@ -649,17 +656,21 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() {

// 处理pipelineConfigDiff
auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size());
APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1");
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());
// 再次处理pipelineConfigDiff
pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first);
APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty());
APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty());
APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size());
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size());

APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty());
// 处理instanceConfigDiff
Expand Down
9 changes: 8 additions & 1 deletion core/unittest/config/ConfigUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

#include "config/PipelineConfig.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/PipelineConfigWatcher.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"
Expand Down Expand Up @@ -268,7 +271,11 @@ class ConfigUpdateUnittest : public testing::Test {

void ConfigUpdateUnittest::OnStartUp() const {
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

GenerateInitialConfigs();
Expand Down
21 changes: 18 additions & 3 deletions core/unittest/config/ConfigWatcherUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "config/ConfigDiff.h"
#include "config/common_provider/CommonConfigProvider.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
#include "config/watcher/InstanceConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "pipeline/plugin/PluginRegistry.h"
Expand Down Expand Up @@ -51,7 +54,11 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co
void ConfigWatcherUnittest::InvalidConfigDirFound() const {
{
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());

{ ofstream fout("continuous_pipeline_config"); }
Expand Down Expand Up @@ -83,7 +90,11 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const {
fout << "[}";
}
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
APSARA_TEST_EQUAL(0U, diff.first.mAdded.size());
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size());
APSARA_TEST_TRUE(diff.second.IsEmpty());
filesystem::remove_all(configDir);
}
Expand Down Expand Up @@ -132,8 +143,12 @@ void ConfigWatcherUnittest::DuplicateConfigs() const {
}
{ ofstream fout("dir2/config.json"); }
auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
size_t builtinPipelineCnt = 0;
#ifdef __ENTERPRISE__
builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size();
#endif
APSARA_TEST_FALSE(diff.first.IsEmpty());
APSARA_TEST_EQUAL(1U, diff.first.mAdded.size());
APSARA_TEST_EQUAL(1U + builtinPipelineCnt, diff.first.mAdded.size());

filesystem::remove_all("dir1");
filesystem::remove_all("dir2");
Expand Down
3 changes: 3 additions & 0 deletions core/unittest/flusher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ target_link_libraries(pack_id_manager_unittest ${UT_BASE_TARGET})
if (ENABLE_ENTERPRISE)
add_executable(enterprise_sls_client_manager_unittest EnterpriseSLSClientManagerUnittest.cpp)
target_link_libraries(enterprise_sls_client_manager_unittest ${UT_BASE_TARGET})
add_executable(enterprise_flusher_sls_monitor_unittest EnterpriseFlusherSLSMonitorUnittest.cpp)
target_link_libraries(enterprise_flusher_sls_monitor_unittest ${UT_BASE_TARGET})
endif ()

include(GoogleTest)
gtest_discover_tests(flusher_sls_unittest)
gtest_discover_tests(pack_id_manager_unittest)
if (ENABLE_ENTERPRISE)
gtest_discover_tests(enterprise_sls_client_manager_unittest)
gtest_discover_tests(enterprise_flusher_sls_monitor_unittest)
endif ()

0 comments on commit 4832d8f

Please sign in to comment.