From 67c9184fc3b64cbe451a7b0fa5aca9b69cdc7716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Tue, 26 Nov 2024 13:02:14 +0000 Subject: [PATCH] polish --- core/config/watcher/PipelineConfigWatcher.cpp | 209 +++++++++--------- core/config/watcher/PipelineConfigWatcher.h | 1 + core/monitor/SelfMonitorServer.cpp | 9 +- core/monitor/SelfMonitorServer.h | 1 + .../flusher/local_file/FlusherLocalFile.h | 2 +- core/plugin/input/InputSelfMonitorMetric.cpp | 3 + 6 files changed, 122 insertions(+), 103 deletions(-) diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 4199611f7b..7c91068b72 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -40,108 +40,8 @@ pair PipelineConfigWatcher::CheckConfigDiff( // inner configs InsertInnerPipelines(pDiff, tDiff, configSet); // configs from file - for (const auto& dir : mSourceDir) { - error_code ec; - filesystem::file_status s = filesystem::status(dir, ec); - if (ec) { - LOG_WARNING(sLogger, - ("failed to get config dir path info", "skip current object")("dir path", dir.string())( - "error code", ec.value())("error msg", ec.message())); - continue; - } - if (!filesystem::exists(s)) { - LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())); - continue; - } - if (!filesystem::is_directory(s)) { - LOG_WARNING(sLogger, - ("config dir path is not a directory", "skip current object")("dir path", dir.string())); - continue; - } - for (auto const& entry : filesystem::directory_iterator(dir, ec)) { - // lock the dir if it is provided by config provider - unique_lock lock; - auto itr = mDirMutexMap.find(dir.string()); - if (itr != mDirMutexMap.end()) { - lock = unique_lock(*itr->second, defer_lock); - lock.lock(); - } + InsertPipelines(pDiff, tDiff, configSet); - const filesystem::path& path = entry.path(); - const string& configName = path.stem().string(); - const string& filepath = path.string(); - if (!filesystem::is_regular_file(entry.status(ec))) { - LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)); - continue; - } - if (configSet.find(configName) != configSet.end()) { - LOG_WARNING( - sLogger, - ("more than 1 config with the same name is found", "skip current config")("filepath", filepath)); - continue; - } - configSet.insert(configName); - - auto iter = mFileInfoMap.find(filepath); - uintmax_t size = filesystem::file_size(path, ec); - filesystem::file_time_type mTime = filesystem::last_write_time(path, ec); - if (iter == mFileInfoMap.end()) { - mFileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(); - if (!LoadConfigDetailFromFile(path, *detail)) { - continue; - } - if (!IsConfigEnabled(configName, *detail)) { - LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)); - continue; - } - if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) { - continue; - } - } else if (iter->second.first != size || iter->second.second != mTime) { - // for config currently running, we leave it untouched if new config is invalid - mFileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(); - if (!LoadConfigDetailFromFile(path, *detail)) { - continue; - } - if (!IsConfigEnabled(configName, *detail)) { - switch (GetConfigType(*detail)) { - case ConfigType::Pipeline: - if (mPipelineManager->FindConfigByName(configName)) { - pDiff.mRemoved.push_back(configName); - LOG_INFO(sLogger, - ("existing valid config modified and disabled", - "prepare to stop current running pipeline")("config", configName)); - } else { - LOG_INFO(sLogger, - ("existing invalid config modified and disabled", - "skip current object")("config", configName)); - } - break; - case ConfigType::Task: - if (mTaskPipelineManager->FindPipelineByName(configName)) { - tDiff.mRemoved.push_back(configName); - LOG_INFO(sLogger, - ("existing valid config modified and disabled", - "prepare to stop current running task")("config", configName)); - } else { - LOG_INFO(sLogger, - ("existing invalid config modified and disabled", - "skip current object")("config", configName)); - } - break; - } - continue; - } - if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) { - continue; - } - } else { - LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); - } - } - } for (const auto& name : mPipelineManager->GetAllConfigNames()) { if (configSet.find(name) == configSet.end()) { pDiff.mRemoved.push_back(name); @@ -266,6 +166,113 @@ void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff, } } +void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + std::unordered_set& configSet) { + for (const auto& dir : mSourceDir) { + error_code ec; + filesystem::file_status s = filesystem::status(dir, ec); + if (ec) { + LOG_WARNING(sLogger, + ("failed to get config dir path info", "skip current object")("dir path", dir.string())( + "error code", ec.value())("error msg", ec.message())); + continue; + } + if (!filesystem::exists(s)) { + LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())); + continue; + } + if (!filesystem::is_directory(s)) { + LOG_WARNING(sLogger, + ("config dir path is not a directory", "skip current object")("dir path", dir.string())); + continue; + } + for (auto const& entry : filesystem::directory_iterator(dir, ec)) { + // lock the dir if it is provided by config provider + unique_lock lock; + auto itr = mDirMutexMap.find(dir.string()); + if (itr != mDirMutexMap.end()) { + lock = unique_lock(*itr->second, defer_lock); + lock.lock(); + } + + const filesystem::path& path = entry.path(); + const string& configName = path.stem().string(); + const string& filepath = path.string(); + if (!filesystem::is_regular_file(entry.status(ec))) { + LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)); + continue; + } + if (configSet.find(configName) != configSet.end()) { + LOG_WARNING( + sLogger, + ("more than 1 config with the same name is found", "skip current config")("filepath", filepath)); + continue; + } + configSet.insert(configName); + + auto iter = mFileInfoMap.find(filepath); + uintmax_t size = filesystem::file_size(path, ec); + filesystem::file_time_type mTime = filesystem::last_write_time(path, ec); + if (iter == mFileInfoMap.end()) { + mFileInfoMap[filepath] = make_pair(size, mTime); + unique_ptr detail = make_unique(); + if (!LoadConfigDetailFromFile(path, *detail)) { + continue; + } + if (!IsConfigEnabled(configName, *detail)) { + LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)); + continue; + } + if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) { + continue; + } + } else if (iter->second.first != size || iter->second.second != mTime) { + // for config currently running, we leave it untouched if new config is invalid + mFileInfoMap[filepath] = make_pair(size, mTime); + unique_ptr detail = make_unique(); + if (!LoadConfigDetailFromFile(path, *detail)) { + continue; + } + if (!IsConfigEnabled(configName, *detail)) { + switch (GetConfigType(*detail)) { + case ConfigType::Pipeline: + if (mPipelineManager->FindConfigByName(configName)) { + pDiff.mRemoved.push_back(configName); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running pipeline")("config", configName)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", configName)); + } + break; + case ConfigType::Task: + if (mTaskPipelineManager->FindPipelineByName(configName)) { + tDiff.mRemoved.push_back(configName); + LOG_INFO(sLogger, + ("existing valid config modified and disabled", + "prepare to stop current running task")("config", configName)); + } else { + LOG_INFO(sLogger, + ("existing invalid config modified and disabled", + "skip current object")("config", configName)); + } + break; + } + continue; + } + if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) { + continue; + } + } else { + LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); + } + } + } +} + bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, unique_ptr&& configDetail, PipelineConfigDiff& pDiff, diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index 081c2e2044..28a7f00f97 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -47,6 +47,7 @@ class PipelineConfigWatcher : public ConfigWatcher { ~PipelineConfigWatcher() = default; void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); + void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); bool CheckAddedConfig(const std::string& configName, std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp index 0a6d30abbf..690c0beca5 100644 --- a/core/monitor/SelfMonitorServer.cpp +++ b/core/monitor/SelfMonitorServer.cpp @@ -79,6 +79,13 @@ void SelfMonitorServer::UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMe LOG_INFO(sLogger, ("self-monitor metrics pipeline", "updated")); } +void SelfMonitorServer::RemoveMetricPipeline() { + WriteLock lock(mMetricPipelineLock); + mMetricPipelineCtx = nullptr; + mSelfMonitorMetricRules = nullptr; + LOG_INFO(sLogger, ("self-monitor metrics pipeline", "removed")); +} + void SelfMonitorServer::SendMetrics() { ReadMetrics::GetInstance()->UpdateMetrics(); @@ -98,7 +105,7 @@ void SelfMonitorServer::SendMetrics() { shared_ptr pipeline = PipelineManager::GetInstance()->FindConfigByName(mMetricPipelineCtx->GetConfigName()); - if (pipeline != nullptr) { + if (pipeline.get() != nullptr) { if (pipelineEventGroup.GetEvents().size() > 0) { ProcessorRunner::GetInstance()->PushQueue( pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup)); diff --git a/core/monitor/SelfMonitorServer.h b/core/monitor/SelfMonitorServer.h index 9ce15a1a73..cea3448b0c 100644 --- a/core/monitor/SelfMonitorServer.h +++ b/core/monitor/SelfMonitorServer.h @@ -31,6 +31,7 @@ class SelfMonitorServer { void Stop(); void UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMetricRules* rules); + void RemoveMetricPipeline(); void UpdateAlarmPipeline(PipelineContext* ctx); // Todo private: SelfMonitorServer(); diff --git a/core/plugin/flusher/local_file/FlusherLocalFile.h b/core/plugin/flusher/local_file/FlusherLocalFile.h index 9fab539871..b9f891318b 100644 --- a/core/plugin/flusher/local_file/FlusherLocalFile.h +++ b/core/plugin/flusher/local_file/FlusherLocalFile.h @@ -43,7 +43,7 @@ class FlusherLocalFile : public Flusher { std::shared_ptr mFileWriter; std::string mFileName; - std::string mPattern = "[%Y-%m-%d %H:%M:%S.%f] %v"; + std::string mPattern = "%v"; uint32_t mMaxFileSize = 1024 * 1024 * 10; uint32_t mMaxFiles = 10; Batcher mBatcher; diff --git a/core/plugin/input/InputSelfMonitorMetric.cpp b/core/plugin/input/InputSelfMonitorMetric.cpp index 4939e093c8..3633ff8444 100644 --- a/core/plugin/input/InputSelfMonitorMetric.cpp +++ b/core/plugin/input/InputSelfMonitorMetric.cpp @@ -55,6 +55,9 @@ bool InputSelfMonitorMetric::Start() { } bool InputSelfMonitorMetric::Stop(bool isPipelineRemoving) { + if (isPipelineRemoving) { + SelfMonitorServer::GetInstance()->RemoveMetricPipeline(); + } return true; }