Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 committed Nov 26, 2024
1 parent c1f8527 commit 67c9184
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 103 deletions.
209 changes: 108 additions & 101 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,108 +40,8 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
// inner configs
InsertInnerPipelines(pDiff, tDiff, configSet);
// configs from file
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to get config dir path info", "skip current object")("dir path", dir.string())(
"error code", ec.value())("error msg", ec.message()));
continue;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string()));
continue;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger,
("config dir path is not a directory", "skip current object")("dir path", dir.string()));
continue;
}
for (auto const& entry : filesystem::directory_iterator(dir, ec)) {
// lock the dir if it is provided by config provider
unique_lock<mutex> lock;
auto itr = mDirMutexMap.find(dir.string());
if (itr != mDirMutexMap.end()) {
lock = unique_lock<mutex>(*itr->second, defer_lock);
lock.lock();
}
InsertPipelines(pDiff, tDiff, configSet);

const filesystem::path& path = entry.path();
const string& configName = path.stem().string();
const string& filepath = path.string();
if (!filesystem::is_regular_file(entry.status(ec))) {
LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath));
continue;
}
if (configSet.find(configName) != configSet.end()) {
LOG_WARNING(
sLogger,
("more than 1 config with the same name is found", "skip current config")("filepath", filepath));
continue;
}
configSet.insert(configName);

auto iter = mFileInfoMap.find(filepath);
uintmax_t size = filesystem::file_size(path, ec);
filesystem::file_time_type mTime = filesystem::last_write_time(path, ec);
if (iter == mFileInfoMap.end()) {
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName));
continue;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
switch (GetConfigType(*detail)) {
case ConfigType::Pipeline:
if (mPipelineManager->FindConfigByName(configName)) {
pDiff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running pipeline")("config", configName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", configName));
}
break;
case ConfigType::Task:
if (mTaskPipelineManager->FindPipelineByName(configName)) {
tDiff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running task")("config", configName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", configName));
}
break;
}
continue;
}
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object"));
}
}
}
for (const auto& name : mPipelineManager->GetAllConfigNames()) {
if (configSet.find(name) == configSet.end()) {
pDiff.mRemoved.push_back(name);
Expand Down Expand Up @@ -266,6 +166,113 @@ void PipelineConfigWatcher::InsertInnerPipelines(PipelineConfigDiff& pDiff,
}
}

void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff,
std::unordered_set<std::string>& configSet) {
for (const auto& dir : mSourceDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to get config dir path info", "skip current object")("dir path", dir.string())(
"error code", ec.value())("error msg", ec.message()));
continue;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string()));
continue;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger,
("config dir path is not a directory", "skip current object")("dir path", dir.string()));
continue;
}
for (auto const& entry : filesystem::directory_iterator(dir, ec)) {
// lock the dir if it is provided by config provider
unique_lock<mutex> lock;
auto itr = mDirMutexMap.find(dir.string());
if (itr != mDirMutexMap.end()) {
lock = unique_lock<mutex>(*itr->second, defer_lock);
lock.lock();
}

const filesystem::path& path = entry.path();
const string& configName = path.stem().string();
const string& filepath = path.string();
if (!filesystem::is_regular_file(entry.status(ec))) {
LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath));
continue;
}
if (configSet.find(configName) != configSet.end()) {
LOG_WARNING(
sLogger,
("more than 1 config with the same name is found", "skip current config")("filepath", filepath));
continue;
}
configSet.insert(configName);

auto iter = mFileInfoMap.find(filepath);
uintmax_t size = filesystem::file_size(path, ec);
filesystem::file_time_type mTime = filesystem::last_write_time(path, ec);
if (iter == mFileInfoMap.end()) {
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName));
continue;
}
if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
switch (GetConfigType(*detail)) {
case ConfigType::Pipeline:
if (mPipelineManager->FindConfigByName(configName)) {
pDiff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running pipeline")("config", configName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", configName));
}
break;
case ConfigType::Task:
if (mTaskPipelineManager->FindPipelineByName(configName)) {
tDiff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running task")("config", configName));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled",
"skip current object")("config", configName));
}
break;
}
continue;
}
if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) {
continue;
}
} else {
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object"));
}
}
}
}

bool PipelineConfigWatcher::CheckAddedConfig(const string& configName,
unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
Expand Down
1 change: 1 addition & 0 deletions core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class PipelineConfigWatcher : public ConfigWatcher {
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
Expand Down
9 changes: 8 additions & 1 deletion core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ void SelfMonitorServer::UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMe
LOG_INFO(sLogger, ("self-monitor metrics pipeline", "updated"));
}

void SelfMonitorServer::RemoveMetricPipeline() {
WriteLock lock(mMetricPipelineLock);
mMetricPipelineCtx = nullptr;
mSelfMonitorMetricRules = nullptr;
LOG_INFO(sLogger, ("self-monitor metrics pipeline", "removed"));
}

void SelfMonitorServer::SendMetrics() {
ReadMetrics::GetInstance()->UpdateMetrics();

Expand All @@ -98,7 +105,7 @@ void SelfMonitorServer::SendMetrics() {

shared_ptr<Pipeline> pipeline
= PipelineManager::GetInstance()->FindConfigByName(mMetricPipelineCtx->GetConfigName());
if (pipeline != nullptr) {
if (pipeline.get() != nullptr) {
if (pipelineEventGroup.GetEvents().size() > 0) {
ProcessorRunner::GetInstance()->PushQueue(
pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup));
Expand Down
1 change: 1 addition & 0 deletions core/monitor/SelfMonitorServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class SelfMonitorServer {
void Stop();

void UpdateMetricPipeline(PipelineContext* ctx, SelfMonitorMetricRules* rules);
void RemoveMetricPipeline();
void UpdateAlarmPipeline(PipelineContext* ctx); // Todo
private:
SelfMonitorServer();
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/flusher/local_file/FlusherLocalFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FlusherLocalFile : public Flusher {

std::shared_ptr<spdlog::logger> mFileWriter;
std::string mFileName;
std::string mPattern = "[%Y-%m-%d %H:%M:%S.%f] %v";
std::string mPattern = "%v";
uint32_t mMaxFileSize = 1024 * 1024 * 10;
uint32_t mMaxFiles = 10;
Batcher<EventBatchStatus> mBatcher;
Expand Down
3 changes: 3 additions & 0 deletions core/plugin/input/InputSelfMonitorMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ bool InputSelfMonitorMetric::Start() {
}

bool InputSelfMonitorMetric::Stop(bool isPipelineRemoving) {
if (isPipelineRemoving) {
SelfMonitorServer::GetInstance()->RemoveMetricPipeline();
}
return true;
}

Expand Down

0 comments on commit 67c9184

Please sign in to comment.