Skip to content

Commit

Permalink
fix: fix PreservedDirDepth not working with polling and wildcard path (
Browse files Browse the repository at this point in the history
…#1866)

* fix PreservedDirDepth not working with polling and wildcard path

* Add unittest and fix mem leaks

* fix register dir
  • Loading branch information
yyuuttaaoo authored Nov 25, 2024
1 parent 53ff484 commit 802d034
Show file tree
Hide file tree
Showing 45 changed files with 880 additions and 372 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ _deps
core/build/
core/protobuf/config_server/*/*.pb.*
core/protobuf/*/*.pb.*
core/log_pb/*.pb.*
core/common/Version.cpp
!/Makefile
# Enterprise
Expand Down
5 changes: 3 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat
option(WITHOUTGDB "Build Logtail without gdb")
option(WITHSPL "Build Logtail and UT with SPL" ON)
option(BUILD_LOGTAIL_UT "Build unit test for Logtail")
cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" ON "CMAKE_BUILD_TYPE STREQUAL Debug;NOT ANDROID" OFF)
set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH
set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH

Expand Down Expand Up @@ -61,8 +62,8 @@ if (UNIX)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -ggdb")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -ggdb")
endif ()
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -O2")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
string(REPLACE "-O3" "" CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE}")
Expand Down
19 changes: 13 additions & 6 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf
return false;
}

void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(filename);
if (it != mDirNameMap.end())
void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(dirname);
if (it != mDirNameMap.end()) {
mDirNameMap.erase(it);
}
auto parentpos = dirname.find_last_of(PATH_SEPARATOR);
if (parentpos != std::string::npos) {
auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos));
if (parentDirCheckpoint != mDirNameMap.end()) {
parentDirCheckpoint->second->mSubDir.erase(dirname);
}
}
}

bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) {
Expand Down Expand Up @@ -123,8 +131,7 @@ void CheckPointManager::LoadCheckPoint() {
Json::Value root;
ParseConfResult cptRes = ParseConfig(AppConfig::GetInstance()->GetCheckPointFilePath(), root);
// if new checkpoint file not exist, check old checkpoint file.
if (cptRes == CONFIG_NOT_EXIST
&& AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) {
if (cptRes == CONFIG_NOT_EXIST && AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) {
cptRes = ParseConfig(GetCheckPointFileName(), root);
}
if (cptRes != CONFIG_OK) {
Expand Down Expand Up @@ -408,7 +415,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
result["dir_check_point"] = dirJson;
result["version"] = Json::Value(Json::UInt(INT32_FLAG(check_point_version)));
fout << result.toStyledString();
if (!fout.good()) {
if (!fout) {
LOG_ERROR(sLogger, ("dump check point to file failed", checkPointFile));
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "dump check point to file failed");
fout.close();
Expand Down
2 changes: 1 addition & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CheckPointManager {
void AddCheckPoint(CheckPoint* checkPointPtr);
void AddDirCheckPoint(const std::string& dirname);
void DeleteCheckPoint(DevInode devInode, const std::string& configName);
void DeleteDirCheckPoint(const std::string& filename);
void DeleteDirCheckPoint(const std::string& dirname);
void LoadCheckPoint();
void LoadDirCheckPoint(const Json::Value& root);
void LoadFileCheckPoint(const Json::Value& root);
Expand Down
2 changes: 1 addition & 1 deletion core/common/links.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ macro(common_link target_name)
link_zlib(${target_name})
link_zstd(${target_name})
link_unwind(${target_name})
if (NOT ANDROID)
if (ENABLE_ADDRESS_SANITIZER)
link_asan(${target_name})
endif()
if (UNIX)
Expand Down
3 changes: 3 additions & 0 deletions core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ void CommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, (sName, "stopped successfully"));
Expand Down
3 changes: 3 additions & 0 deletions core/config/common_provider/LegacyCommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ void LegacyCommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("legacy common config provider", "stopped successfully"));
Expand Down
5 changes: 4 additions & 1 deletion core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,12 @@ endmacro()

# asan for debug
macro(link_asan target_name)
if(CMAKE_BUILD_TYPE MATCHES Debug)
if (UNIX)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
elseif(MSVC)
target_compile_options(${target_name} PUBLIC /fsanitize=address)
target_link_options(${target_name} PUBLIC /fsanitize=address)
endif()
endmacro()

Expand Down
122 changes: 83 additions & 39 deletions core/file_server/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot

ifstream is;
is.open(fullPath.c_str());
if (!is.good()) {
if (!is) { // https://horstmann.com/cpp/pitfalls.html
return CONFIG_NOT_EXIST;
}
std::string buffer;
try {
buffer.assign(std::istreambuf_iterator<char>(is), std::istreambuf_iterator<char>());
} catch (const std::ios_base::failure& e) {
return CONFIG_NOT_EXIST;
}
std::string buffer((std::istreambuf_iterator<char>(is)), (std::istreambuf_iterator<char>()));
if (!IsValidJson(buffer.c_str(), buffer.length())) {
return CONFIG_INVALID_FORMAT;
}
Expand Down Expand Up @@ -144,7 +149,7 @@ bool ConfigManager::RegisterHandlersRecursively(const std::string& path,
return result;

if (!config.first->IsDirectoryInBlacklist(path))
result = EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler);
result = EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler);

if (!result)
return result;
Expand Down Expand Up @@ -305,9 +310,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -381,9 +393,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(
item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -420,52 +439,57 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery
DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath);
if (registerStatus == GET_REGISTER_STATUS_ERROR)
return result;
// dir in config is valid by default, do not call pathValidator
result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler);
// if we come into a failure, do not try to register others, there must be something wrong!
if (!result)
return result;

if (config.first->mPreservedDirDepth < 0)
result = RegisterDescendants(
basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
int depth = config.first->mPreservedDirDepth;
result = RegisterHandlersWithinDepth(basePath, config, depth);
result = RegisterHandlersWithinDepth(basePath,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
return result;
}

bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) {
// TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very
// TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very
// low possibility to match a sub directory name, so here will return false in most cases.
// e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir.
// Match(subdir, *.log) = false.
FileDiscoveryConfig config = FindBestMatch(source, object);
if (config.first && !config.first->IsDirectoryInBlacklist(source))
return EventDispatcher::GetInstance()->RegisterEventHandler(source.c_str(), config, mSharedHandler);
if (config.first && !config.first->IsDirectoryInBlacklist(source)) {
return EventDispatcher::GetInstance()->RegisterEventHandler(source, config, mSharedHandler);
}
return false;
}

bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) {
bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth) {
if (maxDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (depth <= 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false)
if (preservedDirDepth < 0) {
fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) {
return true;
}
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return true;
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (set<string>::iterator it = subdir.begin(); it != subdir.end(); it++) {
if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler))
RegisterHandlersWithinDepth(*it, config, depth - 1);
}
return true;
}
bool result = true;

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -479,30 +503,44 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler))) {
// break;// fail early, do not try to register others
return false;
}
if (maxDepth == 0) {
return true;
}

if (preservedDirDepth == 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1);
}
return true;
}
}
fsutil::Entry ent;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
result = false;
} else // sub dir will not be registered if parent dir fails
RegisterHandlersWithinDepth(item, config, depth - 1);
RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1);
}
}

return result;
return true;
}

// path not terminated by '/', path already registered
bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) {
if (withinDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (withinDepth <= 0) {
return true;
}

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -515,14 +553,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler)) {
// break;// fail early, do not try to register others
return false;
}
if (withinDepth == 0) {
return true;
}

fsutil::Entry ent;
bool result = true;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler);
if (result)
RegisterDescendants(item, config, withinDepth - 1);
RegisterDescendants(item, config, withinDepth - 1);
}
}
return result;
Expand Down
5 changes: 4 additions & 1 deletion core/file_server/ConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ class ConfigManager {
* @param path is the current dir that being registered
* @depth is the num of sub dir layers that should be registered
*/
bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth);
bool RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth);
bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth);
// bool CheckLogType(const std::string& logTypeStr, LogType& logType);
// 废弃
Expand Down
Loading

0 comments on commit 802d034

Please sign in to comment.