Skip to content

Commit

Permalink
add mProcessFileInfoMap
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard committed Jul 8, 2024
1 parent bcf2965 commit 73ed36e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
56 changes: 33 additions & 23 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,32 @@ bool ReadFile(const string& filepath, string& content);
ConfigWatcher::ConfigWatcher() : mPipelineManager(PipelineManager::GetInstance()) {
}

bool ConfigWatcher::CheckDirectoryStatus(const std::filesystem::path& dir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to get config dir path info", "skip current object")("dir path", dir.string())(
"error code", ec.value())("error msg", ec.message()));
return false;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string()));
return false;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger, ("config dir path is not a directory", "skip current object")("dir path", dir.string()));
return false;
}
return true;
}

ConfigDiff ConfigWatcher::CheckConfigDiff() {
ConfigDiff diff;
unordered_set<string> configSet;
for (const auto& dir : mPipelineConfigDir) {
error_code ec;
filesystem::file_status s = filesystem::status(dir, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to get config dir path info", "skip current object")("dir path", dir.string())(
"error code", ec.value())("error msg", ec.message()));
continue;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string()));
continue;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger,
("config dir path is not a directory", "skip current object")("dir path", dir.string()));
if (!CheckDirectoryStatus(dir)) {
continue;
}
for (auto const& entry : filesystem::directory_iterator(dir, ec)) {
Expand All @@ -75,11 +82,11 @@ ConfigDiff ConfigWatcher::CheckConfigDiff() {
}
configSet.insert(configName);

auto iter = mFileInfoMap.find(filepath);
auto iter = mPipelineFileInfoMap.find(filepath);
uintmax_t size = filesystem::file_size(path, ec);
filesystem::file_time_type mTime = filesystem::last_write_time(path, ec);
if (iter == mFileInfoMap.end()) {
mFileInfoMap[filepath] = make_pair(size, mTime);
if (iter == mPipelineFileInfoMap.end()) {
mPipelineFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = unique_ptr<Json::Value>(new Json::Value());
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
Expand All @@ -105,7 +112,7 @@ ConfigDiff ConfigWatcher::CheckConfigDiff() {
("new config found and passed topology check", "prepare to build pipeline")("config", configName));
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
mFileInfoMap[filepath] = make_pair(size, mTime);
mPipelineFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = unique_ptr<Json::Value>(new Json::Value());
if (!LoadConfigDetailFromFile(path, *detail)) {
if (mPipelineManager->FindPipelineByName(configName)) {
Expand Down Expand Up @@ -187,10 +194,10 @@ ConfigDiff ConfigWatcher::CheckConfigDiff() {
("existing valid config is removed", "prepare to stop current running pipeline")("config", name));
}
}
for (const auto& item : mFileInfoMap) {
for (const auto& item : mPipelineFileInfoMap) {
string configName = filesystem::path(item.first).stem().string();
if (configSet.find(configName) == configSet.end()) {
mFileInfoMap.erase(item.first);
mPipelineFileInfoMap.erase(item.first);
}
}

Expand All @@ -213,15 +220,18 @@ void ConfigWatcher::AddPipelineSource(const string& dir, mutex* mux) {
}

void ConfigWatcher::AddProcessSource(const string& dir, mutex* mux) {
mPipelineConfigDir.emplace_back(dir);
mProcessConfigDir.emplace_back(dir);
if (mux != nullptr) {
mPipelineConfigDirMutexMap[dir] = mux;
mProcessConfigDirMutexMap[dir] = mux;
}
}

void ConfigWatcher::ClearEnvironment() {
mPipelineConfigDir.clear();
mFileInfoMap.clear();
mPipelineFileInfoMap.clear();

mProcessConfigDir.clear();
mProcessFileInfoMap.clear();
}

} // namespace logtail
6 changes: 5 additions & 1 deletion core/config/watcher/ConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ class ConfigWatcher {
std::vector<std::filesystem::path> mProcessConfigDir;
std::unordered_map<std::string, std::mutex*> mProcessConfigDirMutexMap;

std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mFileInfoMap;
std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mPipelineFileInfoMap;
const PipelineManager* mPipelineManager = nullptr;

std::map<std::string, std::pair<uintmax_t, std::filesystem::file_time_type>> mProcessFileInfoMap;

bool CheckDirectoryStatus(const std::filesystem::path& dir);
};

} // namespace logtail

0 comments on commit 73ed36e

Please sign in to comment.