diff --git a/core/constants/Constants.cpp b/core/constants/Constants.cpp index 9fcc30d73b..e7536b57f8 100644 --- a/core/constants/Constants.cpp +++ b/core/constants/Constants.cpp @@ -39,6 +39,7 @@ const char* SLS_EMPTY_STR_FOR_INDEX = "\01"; // profile project const std::string PROFILE_PROJECT = "profile_project"; const std::string PROFILE_PROJECT_REGION = "profile_project_region"; +const std::string PROFILE_LOGSTORE = "profile_logstore"; // global config const std::string GLOBAL_CONFIG_NODE = "config"; diff --git a/core/constants/Constants.h b/core/constants/Constants.h index 95bed2bec9..48ddacaab5 100644 --- a/core/constants/Constants.h +++ b/core/constants/Constants.h @@ -40,6 +40,7 @@ extern const char* SLS_EMPTY_STR_FOR_INDEX; // profile project extern const std::string PROFILE_PROJECT; extern const std::string PROFILE_PROJECT_REGION; +extern const std::string PROFILE_LOGSTORE; // global config extern const std::string GLOBAL_CONFIG_NODE; diff --git a/core/monitor/MetricManager.cpp b/core/monitor/MetricManager.cpp index 21aa50c68a..aae567c11f 100644 --- a/core/monitor/MetricManager.cpp +++ b/core/monitor/MetricManager.cpp @@ -28,9 +28,6 @@ using namespace std; namespace logtail { -const string METRIC_KEY_CATEGORY = "category"; -const string METRIC_KEY_LABEL = "label"; -const string METRIC_TOPIC_TYPE = "loongcollector_metric"; const string METRIC_EXPORT_TYPE_GO = "direct"; const string METRIC_EXPORT_TYPE_CPP = "cpp_provided"; diff --git a/core/monitor/MetricManager.h b/core/monitor/MetricManager.h index 3f28b9b477..ad5b743a8d 100644 --- a/core/monitor/MetricManager.h +++ b/core/monitor/MetricManager.h @@ -30,8 +30,6 @@ namespace logtail { -extern const std::string METRIC_TOPIC_TYPE; - class WriteMetrics { private: WriteMetrics() = default; diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp index 690c0beca5..d4a75e141a 100644 --- a/core/monitor/SelfMonitorServer.cpp +++ b/core/monitor/SelfMonitorServer.cpp @@ -100,7 +100,6 @@ void SelfMonitorServer::SendMetrics() { PipelineEventGroup pipelineEventGroup(std::make_shared()); pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr); - pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_TOPIC, METRIC_TOPIC_TYPE); ReadAsPipelineEventGroup(pipelineEventGroup); shared_ptr pipeline diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index bf0d7acbe6..469b2c06b1 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -28,6 +28,9 @@ #include "plugin/flusher/blackhole/FlusherBlackHole.h" #include "plugin/flusher/file/FlusherFile.h" #include "plugin/flusher/sls/FlusherSLS.h" +#ifdef __ENTERPRISE__ +#include "plugin/flusher/sls/EnterpriseFlusherSLSMonitor.h" +#endif #include "plugin/input/InputContainerStdio.h" #include "plugin/input/InputFile.h" #include "plugin/input/InputPrometheus.h" @@ -160,6 +163,9 @@ void PluginRegistry::LoadStaticPlugins() { RegisterFlusherCreator(new StaticFlusherCreator()); RegisterFlusherCreator(new StaticFlusherCreator()); RegisterFlusherCreator(new StaticFlusherCreator()); +#ifdef __ENTERPRISE__ + RegisterFlusherCreator(new StaticFlusherCreator()); +#endif } void PluginRegistry::LoadDynamicPlugins(const set& plugins) { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 3fcdb6fa6b..47977e47a6 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -208,56 +208,56 @@ void FlusherSLS::SetDefaultRegion(const string& region) { sDefaultRegion = region; } -mutex FlusherSLS::sProjectRefCntMapLock; +mutex FlusherSLS::sProjectRegionMapLock; unordered_map FlusherSLS::sProjectRefCntMap; -mutex FlusherSLS::sRegionRefCntMapLock; unordered_map FlusherSLS::sRegionRefCntMap; +unordered_map FlusherSLS::sProjectRegionMap; string FlusherSLS::GetAllProjects() { string result; - lock_guard lock(sProjectRefCntMapLock); + lock_guard lock(sProjectRegionMapLock); for (auto iter = sProjectRefCntMap.cbegin(); iter != sProjectRefCntMap.cend(); ++iter) { result.append(iter->first).append(" "); } return result; } -void FlusherSLS::IncreaseProjectReferenceCnt(const string& project) { - lock_guard lock(sProjectRefCntMapLock); - ++sProjectRefCntMap[project]; +bool FlusherSLS::IsRegionContainingConfig(const string& region) { + lock_guard lock(sProjectRegionMapLock); + return sRegionRefCntMap.find(region) != sRegionRefCntMap.end(); } -void FlusherSLS::DecreaseProjectReferenceCnt(const string& project) { - lock_guard lock(sProjectRefCntMapLock); - auto iter = sProjectRefCntMap.find(project); - if (iter == sProjectRefCntMap.end()) { - // should not happen - return; - } - if (--iter->second == 0) { - sProjectRefCntMap.erase(iter); +std::string FlusherSLS::GetProjectRegion(const std::string& project) { + lock_guard lock(sProjectRegionMapLock); + auto iter = sProjectRegionMap.find(project); + if (iter == sProjectRegionMap.end()) { + return ""; } + return iter->second; } -bool FlusherSLS::IsRegionContainingConfig(const string& region) { - lock_guard lock(sRegionRefCntMapLock); - return sRegionRefCntMap.find(region) != sRegionRefCntMap.end(); -} - -void FlusherSLS::IncreaseRegionReferenceCnt(const string& region) { - lock_guard lock(sRegionRefCntMapLock); +void FlusherSLS::IncreaseProjectRegionReferenceCnt(const string& project, const string& region) { + lock_guard lock(sProjectRegionMapLock); + ++sProjectRefCntMap[project]; ++sRegionRefCntMap[region]; + sProjectRegionMap[project] = region; } -void FlusherSLS::DecreaseRegionReferenceCnt(const string& region) { - lock_guard lock(sRegionRefCntMapLock); - auto iter = sRegionRefCntMap.find(region); - if (iter == sRegionRefCntMap.end()) { - // should not happen - return; +void FlusherSLS::DecreaseProjectRegionReferenceCnt(const string& project, const string& region) { + lock_guard lock(sProjectRegionMapLock); + auto projectRefCnt = sProjectRefCntMap.find(project); + if (projectRefCnt != sProjectRefCntMap.end()) { + if (--projectRefCnt->second == 0) { + sProjectRefCntMap.erase(projectRefCnt); + sProjectRegionMap.erase(project); + } } - if (--iter->second == 0) { - sRegionRefCntMap.erase(iter); + + auto regionRefCnt = sRegionRefCntMap.find(region); + if (regionRefCnt != sRegionRefCntMap.end()) { + if (--regionRefCnt->second == 0) { + sRegionRefCntMap.erase(regionRefCnt); + } } } @@ -540,8 +540,7 @@ bool FlusherSLS::Start() { Flusher::Start(); InitResource(); - IncreaseProjectReferenceCnt(mProject); - IncreaseRegionReferenceCnt(mRegion); + IncreaseProjectRegionReferenceCnt(mProject, mRegion); SLSClientManager::GetInstance()->IncreaseAliuidReferenceCntForRegion(mRegion, mAliuid); return true; } @@ -549,12 +548,12 @@ bool FlusherSLS::Start() { bool FlusherSLS::Stop(bool isPipelineRemoving) { Flusher::Stop(isPipelineRemoving); - DecreaseProjectReferenceCnt(mProject); - DecreaseRegionReferenceCnt(mRegion); + DecreaseProjectRegionReferenceCnt(mProject, mRegion); SLSClientManager::GetInstance()->DecreaseAliuidReferenceCntForRegion(mRegion, mAliuid); return true; } + bool FlusherSLS::Send(PipelineEventGroup&& g) { if (g.IsReplay()) { return SerializeAndPush(std::move(g)); diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index c874a71f1e..6a71b0f526 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -49,6 +49,7 @@ class FlusherSLS : public HttpFlusher { static void SetDefaultRegion(const std::string& region); static std::string GetAllProjects(); static bool IsRegionContainingConfig(const std::string& region); + static std::string GetProjectRegion(const std::string& project); // TODO: should be moved to enterprise config provider static bool GetRegionStatus(const std::string& region); @@ -90,10 +91,8 @@ class FlusherSLS : public HttpFlusher { static void InitResource(); - static void IncreaseProjectReferenceCnt(const std::string& project); - static void DecreaseProjectReferenceCnt(const std::string& project); - static void IncreaseRegionReferenceCnt(const std::string& region); - static void DecreaseRegionReferenceCnt(const std::string& region); + static void IncreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region); + static void DecreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region); static std::mutex sMux; static std::unordered_map> sProjectConcurrencyLimiterMap; @@ -103,10 +102,10 @@ class FlusherSLS : public HttpFlusher { static std::mutex sDefaultRegionLock; static std::string sDefaultRegion; - static std::mutex sProjectRefCntMapLock; + static std::mutex sProjectRegionMapLock; static std::unordered_map sProjectRefCntMap; - static std::mutex sRegionRefCntMapLock; static std::unordered_map sRegionRefCntMap; + static std::unordered_map sProjectRegionMap; // TODO: should be moved to enterprise config provider static std::mutex sRegionStatusLock; diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 451880ceb4..cecc538727 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -21,6 +21,9 @@ #include "config/ConfigDiff.h" #include "config/InstanceConfigManager.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/InstanceConfigWatcher.h" #include "config/watcher/PipelineConfigWatcher.h" #include "gmock/gmock.h" @@ -433,18 +436,22 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { // 处理 pipelineconfig auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty()); - APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mAdded.size()); - APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[0].mName, "config1"); - APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(1U + builtinPipelineCnt, pipelineConfigDiff.first.mAdded.size()); + APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[builtinPipelineCnt].mName, "config1"); + APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt); APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr); // 再次处理 pipelineconfig pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty()); - APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt); APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr); @@ -649,17 +656,21 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { // 处理pipelineConfigDiff auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size()); APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1"); - APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size()); // 再次处理pipelineConfigDiff pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty()); - APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size()); APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty()); // 处理instanceConfigDiff diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index 341f48f00d..73e910d2e8 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -20,6 +20,9 @@ #include "config/PipelineConfig.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/PipelineConfigWatcher.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineManager.h" @@ -268,7 +271,11 @@ class ConfigUpdateUnittest : public testing::Test { void ConfigUpdateUnittest::OnStartUp() const { auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); GenerateInitialConfigs(); diff --git a/core/unittest/config/ConfigWatcherUnittest.cpp b/core/unittest/config/ConfigWatcherUnittest.cpp index 8ed7327cbc..2da9c9dc9e 100644 --- a/core/unittest/config/ConfigWatcherUnittest.cpp +++ b/core/unittest/config/ConfigWatcherUnittest.cpp @@ -17,6 +17,9 @@ #include "config/ConfigDiff.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/InstanceConfigWatcher.h" #include "config/watcher/PipelineConfigWatcher.h" #include "pipeline/plugin/PluginRegistry.h" @@ -51,7 +54,11 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co void ConfigWatcherUnittest::InvalidConfigDirFound() const { { auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); { ofstream fout("continuous_pipeline_config"); } @@ -83,7 +90,11 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const { fout << "[}"; } auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); filesystem::remove_all(configDir); } @@ -132,8 +143,12 @@ void ConfigWatcherUnittest::DuplicateConfigs() const { } { ofstream fout("dir2/config.json"); } auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif APSARA_TEST_FALSE(diff.first.IsEmpty()); - APSARA_TEST_EQUAL(1U, diff.first.mAdded.size()); + APSARA_TEST_EQUAL(1U + builtinPipelineCnt, diff.first.mAdded.size()); filesystem::remove_all("dir1"); filesystem::remove_all("dir2"); diff --git a/core/unittest/flusher/CMakeLists.txt b/core/unittest/flusher/CMakeLists.txt index 6b912780b9..76ac1afd6d 100644 --- a/core/unittest/flusher/CMakeLists.txt +++ b/core/unittest/flusher/CMakeLists.txt @@ -24,6 +24,8 @@ target_link_libraries(pack_id_manager_unittest ${UT_BASE_TARGET}) if (ENABLE_ENTERPRISE) add_executable(enterprise_sls_client_manager_unittest EnterpriseSLSClientManagerUnittest.cpp) target_link_libraries(enterprise_sls_client_manager_unittest ${UT_BASE_TARGET}) + add_executable(enterprise_flusher_sls_monitor_unittest EnterpriseFlusherSLSMonitorUnittest.cpp) + target_link_libraries(enterprise_flusher_sls_monitor_unittest ${UT_BASE_TARGET}) endif () include(GoogleTest) @@ -31,4 +33,5 @@ gtest_discover_tests(flusher_sls_unittest) gtest_discover_tests(pack_id_manager_unittest) if (ENABLE_ENTERPRISE) gtest_discover_tests(enterprise_sls_client_manager_unittest) + gtest_discover_tests(enterprise_flusher_sls_monitor_unittest) endif ()