diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 36779aa44d..47977e47a6 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -208,75 +208,57 @@ void FlusherSLS::SetDefaultRegion(const string& region) { sDefaultRegion = region; } -mutex FlusherSLS::sProjectRefCntMapLock; +mutex FlusherSLS::sProjectRegionMapLock; unordered_map FlusherSLS::sProjectRefCntMap; -mutex FlusherSLS::sRegionRefCntMapLock; unordered_map FlusherSLS::sRegionRefCntMap; -mutex FlusherSLS::sProjectRegionMapLock; unordered_map FlusherSLS::sProjectRegionMap; - string FlusherSLS::GetAllProjects() { string result; - lock_guard lock(sProjectRefCntMapLock); + lock_guard lock(sProjectRegionMapLock); for (auto iter = sProjectRefCntMap.cbegin(); iter != sProjectRefCntMap.cend(); ++iter) { result.append(iter->first).append(" "); } return result; } -void FlusherSLS::IncreaseProjectReferenceCnt(const string& project) { - lock_guard lock(sProjectRefCntMapLock); - ++sProjectRefCntMap[project]; -} - -void FlusherSLS::DecreaseProjectReferenceCnt(const string& project) { - lock_guard lock(sProjectRefCntMapLock); - auto iter = sProjectRefCntMap.find(project); - if (iter == sProjectRefCntMap.end()) { - // should not happen - return; - } - if (--iter->second == 0) { - sProjectRefCntMap.erase(iter); - } -} - bool FlusherSLS::IsRegionContainingConfig(const string& region) { - lock_guard lock(sRegionRefCntMapLock); + lock_guard lock(sProjectRegionMapLock); return sRegionRefCntMap.find(region) != sRegionRefCntMap.end(); } -void FlusherSLS::IncreaseRegionReferenceCnt(const string& region) { - lock_guard lock(sRegionRefCntMapLock); - ++sRegionRefCntMap[region]; -} - -void FlusherSLS::DecreaseRegionReferenceCnt(const string& region) { - lock_guard lock(sRegionRefCntMapLock); - auto iter = sRegionRefCntMap.find(region); - if (iter == sRegionRefCntMap.end()) { - // should not happen - return; - } - if (--iter->second == 0) { - sRegionRefCntMap.erase(iter); - } -} - std::string FlusherSLS::GetProjectRegion(const std::string& project) { - lock_guard lock(sProjectRefCntMapLock); - return sProjectRegionMap[project]; + lock_guard lock(sProjectRegionMapLock); + auto iter = sProjectRegionMap.find(project); + if (iter == sProjectRegionMap.end()) { + return ""; + } + return iter->second; } -void FlusherSLS::SetProjectRegion(const std::string& project, const std::string& region) { - lock_guard lock(sProjectRefCntMapLock); +void FlusherSLS::IncreaseProjectRegionReferenceCnt(const string& project, const string& region) { + lock_guard lock(sProjectRegionMapLock); + ++sProjectRefCntMap[project]; + ++sRegionRefCntMap[region]; sProjectRegionMap[project] = region; } -void FlusherSLS::RemoveProjectRegion(const std::string& project) { - lock_guard lock(sProjectRefCntMapLock); - sProjectRegionMap.erase(project); +void FlusherSLS::DecreaseProjectRegionReferenceCnt(const string& project, const string& region) { + lock_guard lock(sProjectRegionMapLock); + auto projectRefCnt = sProjectRefCntMap.find(project); + if (projectRefCnt != sProjectRefCntMap.end()) { + if (--projectRefCnt->second == 0) { + sProjectRefCntMap.erase(projectRefCnt); + sProjectRegionMap.erase(project); + } + } + + auto regionRefCnt = sRegionRefCntMap.find(region); + if (regionRefCnt != sRegionRefCntMap.end()) { + if (--regionRefCnt->second == 0) { + sRegionRefCntMap.erase(regionRefCnt); + } + } } mutex FlusherSLS::sRegionStatusLock; @@ -558,9 +540,7 @@ bool FlusherSLS::Start() { Flusher::Start(); InitResource(); - IncreaseProjectReferenceCnt(mProject); - IncreaseRegionReferenceCnt(mRegion); - SetProjectRegion(mProject, mRegion); + IncreaseProjectRegionReferenceCnt(mProject, mRegion); SLSClientManager::GetInstance()->IncreaseAliuidReferenceCntForRegion(mRegion, mAliuid); return true; } @@ -568,13 +548,12 @@ bool FlusherSLS::Start() { bool FlusherSLS::Stop(bool isPipelineRemoving) { Flusher::Stop(isPipelineRemoving); - DecreaseProjectReferenceCnt(mProject); - DecreaseRegionReferenceCnt(mRegion); - RemoveProjectRegion(mProject); + DecreaseProjectRegionReferenceCnt(mProject, mRegion); SLSClientManager::GetInstance()->DecreaseAliuidReferenceCntForRegion(mRegion, mAliuid); return true; } + bool FlusherSLS::Send(PipelineEventGroup&& g) { if (g.IsReplay()) { return SerializeAndPush(std::move(g)); diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 21336f20ef..6a71b0f526 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -91,12 +91,8 @@ class FlusherSLS : public HttpFlusher { static void InitResource(); - static void IncreaseProjectReferenceCnt(const std::string& project); - static void DecreaseProjectReferenceCnt(const std::string& project); - static void IncreaseRegionReferenceCnt(const std::string& region); - static void DecreaseRegionReferenceCnt(const std::string& region); - static void SetProjectRegion(const std::string& project, const std::string& region); - static void RemoveProjectRegion(const std::string& project); + static void IncreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region); + static void DecreaseProjectRegionReferenceCnt(const std::string& project, const std::string& region); static std::mutex sMux; static std::unordered_map> sProjectConcurrencyLimiterMap; @@ -106,11 +102,9 @@ class FlusherSLS : public HttpFlusher { static std::mutex sDefaultRegionLock; static std::string sDefaultRegion; - static std::mutex sProjectRefCntMapLock; + static std::mutex sProjectRegionMapLock; static std::unordered_map sProjectRefCntMap; - static std::mutex sRegionRefCntMapLock; static std::unordered_map sRegionRefCntMap; - static std::mutex sProjectRegionMapLock; static std::unordered_map sProjectRegionMap; // TODO: should be moved to enterprise config provider diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 451880ceb4..cecc538727 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -21,6 +21,9 @@ #include "config/ConfigDiff.h" #include "config/InstanceConfigManager.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/InstanceConfigWatcher.h" #include "config/watcher/PipelineConfigWatcher.h" #include "gmock/gmock.h" @@ -433,18 +436,22 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { // 处理 pipelineconfig auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty()); - APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mAdded.size()); - APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[0].mName, "config1"); - APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(1U + builtinPipelineCnt, pipelineConfigDiff.first.mAdded.size()); + APSARA_TEST_EQUAL(pipelineConfigDiff.first.mAdded[builtinPipelineCnt].mName, "config1"); + APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt); APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr); // 再次处理 pipelineconfig pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.first.mAdded.empty()); - APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1); + APSARA_TEST_EQUAL(PipelineManager::GetInstance()->GetAllConfigNames().size(), 1U + builtinPipelineCnt); APSARA_TEST_TRUE(PipelineManager::GetInstance()->FindConfigByName("config1").get() != nullptr); @@ -649,17 +656,21 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { // 处理pipelineConfigDiff auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(!pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_EQUAL(1U, pipelineConfigDiff.first.mRemoved.size()); APSARA_TEST_EQUAL(pipelineConfigDiff.first.mRemoved[0], "config1"); - APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size()); // 再次处理pipelineConfigDiff pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff.first); APSARA_TEST_TRUE(pipelineConfigDiff.first.IsEmpty()); APSARA_TEST_TRUE(pipelineConfigDiff.first.mRemoved.empty()); - APSARA_TEST_EQUAL(0U, PipelineManager::GetInstance()->GetAllConfigNames().size()); + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, PipelineManager::GetInstance()->GetAllConfigNames().size()); APSARA_TEST_TRUE(provider.mInstanceConfigInfoMap.empty()); // 处理instanceConfigDiff diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index 341f48f00d..73e910d2e8 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -20,6 +20,9 @@ #include "config/PipelineConfig.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/PipelineConfigWatcher.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineManager.h" @@ -268,7 +271,11 @@ class ConfigUpdateUnittest : public testing::Test { void ConfigUpdateUnittest::OnStartUp() const { auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); GenerateInitialConfigs(); diff --git a/core/unittest/config/ConfigWatcherUnittest.cpp b/core/unittest/config/ConfigWatcherUnittest.cpp index 8ed7327cbc..2da9c9dc9e 100644 --- a/core/unittest/config/ConfigWatcherUnittest.cpp +++ b/core/unittest/config/ConfigWatcherUnittest.cpp @@ -17,6 +17,9 @@ #include "config/ConfigDiff.h" #include "config/common_provider/CommonConfigProvider.h" +#ifdef __ENTERPRISE__ +#include "config/provider/EnterpriseConfigProvider.h" +#endif #include "config/watcher/InstanceConfigWatcher.h" #include "config/watcher/PipelineConfigWatcher.h" #include "pipeline/plugin/PluginRegistry.h" @@ -51,7 +54,11 @@ const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_co void ConfigWatcherUnittest::InvalidConfigDirFound() const { { auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); { ofstream fout("continuous_pipeline_config"); } @@ -83,7 +90,11 @@ void ConfigWatcherUnittest::InvalidConfigFileFound() const { fout << "[}"; } auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); - APSARA_TEST_EQUAL(0U, diff.first.mAdded.size()); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + APSARA_TEST_EQUAL(0U + builtinPipelineCnt, diff.first.mAdded.size()); APSARA_TEST_TRUE(diff.second.IsEmpty()); filesystem::remove_all(configDir); } @@ -132,8 +143,12 @@ void ConfigWatcherUnittest::DuplicateConfigs() const { } { ofstream fout("dir2/config.json"); } auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + size_t builtinPipelineCnt = 0; +#ifdef __ENTERPRISE__ + builtinPipelineCnt += EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif APSARA_TEST_FALSE(diff.first.IsEmpty()); - APSARA_TEST_EQUAL(1U, diff.first.mAdded.size()); + APSARA_TEST_EQUAL(1U + builtinPipelineCnt, diff.first.mAdded.size()); filesystem::remove_all("dir1"); filesystem::remove_all("dir2"); diff --git a/core/unittest/flusher/CMakeLists.txt b/core/unittest/flusher/CMakeLists.txt index 6b912780b9..76ac1afd6d 100644 --- a/core/unittest/flusher/CMakeLists.txt +++ b/core/unittest/flusher/CMakeLists.txt @@ -24,6 +24,8 @@ target_link_libraries(pack_id_manager_unittest ${UT_BASE_TARGET}) if (ENABLE_ENTERPRISE) add_executable(enterprise_sls_client_manager_unittest EnterpriseSLSClientManagerUnittest.cpp) target_link_libraries(enterprise_sls_client_manager_unittest ${UT_BASE_TARGET}) + add_executable(enterprise_flusher_sls_monitor_unittest EnterpriseFlusherSLSMonitorUnittest.cpp) + target_link_libraries(enterprise_flusher_sls_monitor_unittest ${UT_BASE_TARGET}) endif () include(GoogleTest) @@ -31,4 +33,5 @@ gtest_discover_tests(flusher_sls_unittest) gtest_discover_tests(pack_id_manager_unittest) if (ENABLE_ENTERPRISE) gtest_discover_tests(enterprise_sls_client_manager_unittest) + gtest_discover_tests(enterprise_flusher_sls_monitor_unittest) endif ()