Skip to content

Commit

Permalink
feat: Enhance configuration handling, send crash alarms only after ap…
Browse files Browse the repository at this point in the history
…pconfig initialization and refactor SLS client manager (#1835)
  • Loading branch information
quzard authored Oct 25, 2024
1 parent 9a5211b commit 8493e0c
Show file tree
Hide file tree
Showing 22 changed files with 131 additions and 19,645 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ jobs:
run: make unittest_core

- name: Unit Test Coverage
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*sdk.*\" -e \".*observer.*\" -e \".*logger.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*protobuf.*\" -e \".*runner.*\""
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --gcov-ignore-errors=no_working_dir_found --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*sdk.*\" -e \".*observer.*\" -e \".*logger.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*protobuf.*\" -e \".*runner.*\""

- name: Setup Python3.10
uses: actions/setup-python@v5
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
.devcontainer/

### GoLand
.idea
**/.idea/**

### MacOS
.DS_Store
Expand Down Expand Up @@ -55,6 +55,7 @@ _deps
# Custom
/build/
core/build/
core/protobuf/config_server/*/*.pb.*
core/protobuf/*/*.pb.*
core/common/Version.cpp
!/Makefile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ message AgentAttributes {
bytes version = 1; // Agent's version
bytes ip = 2; // Agent's ip
bytes hostname = 3; // Agent's hostname
bytes hostid = 4; // Agent's hostid https://opentelemetry.io/docs/specs/semconv/attributes-registry/host/
map<string, bytes> extras = 100; // Agent's other attributes
// before 100 (inclusive) are reserved for future official fields
}
Expand Down
41 changes: 30 additions & 11 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,14 @@ string GetFileTagsDir() {
}
}

string GetPipelineConfigDir() {
if (BOOL_FLAG(logtail_mode)) {
return "config";
} else {
return "pipeline_config";
}
}

AppConfig::AppConfig() {
LOG_INFO(sLogger, ("AppConfig AppConfig", "success"));
SetIlogtailConfigJson("");
Expand Down Expand Up @@ -610,7 +618,7 @@ void AppConfig::LoadAppConfig(const std::string& ilogtailConfigFile) {
if (BOOL_FLAG(logtail_mode)) {
loadAppConfigLogtailMode(ilogtailConfigFile);
} else {
std::string confDir = GetAgentConfDir();
std::string confDir = GetAgentConfDir();
SetLoongcollectorConfDir(AbsolutePath(confDir, mProcessExecutionDir));
}
// 加载本地instanceconfig
Expand All @@ -628,7 +636,6 @@ void AppConfig::LoadAppConfig(const std::string& ilogtailConfigFile) {
}

void AppConfig::loadAppConfigLogtailMode(const std::string& ilogtailConfigFile) {

Json::Value confJson(Json::objectValue);
std::string newConfDir;

Expand Down Expand Up @@ -1355,8 +1362,11 @@ void AppConfig::InitEnvMapping(const std::string& envStr, std::map<std::string,
}
}
void AppConfig::SetConfigFlag(const std::string& flagName, const std::string& value) {
static set<string> sIgnoreFlagSet
= {"loongcollector_conf_dir", "loongcollector_log_dir", "loongcollector_data_dir", "loongcollector_run_dir", "logtail_mode"};
static set<string> sIgnoreFlagSet = {"loongcollector_conf_dir",
"loongcollector_log_dir",
"loongcollector_data_dir",
"loongcollector_run_dir",
"logtail_mode"};
if (sIgnoreFlagSet.find(flagName) != sIgnoreFlagSet.end()) {
return;
}
Expand Down Expand Up @@ -1403,7 +1413,7 @@ void AppConfig::ParseEnvToFlags() {
}
}
#endif
for (const auto & iter : envMapping) {
for (const auto& iter : envMapping) {
const std::string& key = iter.first;
const std::string& value = iter.second;
SetConfigFlag(key, value);
Expand Down Expand Up @@ -1480,8 +1490,8 @@ void AppConfig::ReadFlagsFromMap(const std::unordered_map<std::string, std::stri
* - 记录无法转换的值
*/
void AppConfig::RecurseParseJsonToFlags(const Json::Value& confJson, std::string prefix) {
const static unordered_set<string> sIgnoreKeySet = {"data_server_list"};
const static unordered_set<string> sForceKeySet = {"config_server_address_list"};
const static unordered_set<string> sIgnoreKeySet = {"data_server_list", "legacy_data_server_list"};
const static unordered_set<string> sForceKeySet = {"config_server_address_list", "config_server_list"};
for (auto name : confJson.getMemberNames()) {
auto jsonvalue = confJson[name];
string fullName;
Expand Down Expand Up @@ -1705,7 +1715,10 @@ void AppConfig::UpdateFileTags() {
return;
}

void AppConfig::MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson, std::unordered_map<std::string, std::string>& keyToConfigName, const std::string& configName) {
void AppConfig::MergeJson(Json::Value& mainConfJson,
const Json::Value& subConfJson,
std::unordered_map<std::string, std::string>& keyToConfigName,
const std::string& configName) {
for (const auto& subkey : subConfJson.getMemberNames()) {
mainConfJson[subkey] = subConfJson[subkey];
keyToConfigName[subkey] = configName;
Expand All @@ -1719,9 +1732,15 @@ void AppConfig::LoadInstanceConfig(const std::map<std::string, std::shared_ptr<I
mRemoteInstanceConfigKeyToConfigName.clear();
for (const auto& config : instanceConfig) {
if (EndWith(config.second->mDirName, AppConfig::sLocalConfigDir)) {
MergeJson(localInstanceConfig, config.second->GetConfig(), mLocalInstanceConfigKeyToConfigName, config.second->mDirName+"/"+config.second->mConfigName);
MergeJson(localInstanceConfig,
config.second->GetConfig(),
mLocalInstanceConfigKeyToConfigName,
config.second->mDirName + "/" + config.second->mConfigName);
} else {
MergeJson(remoteInstanceConfig, config.second->GetConfig(), mRemoteInstanceConfigKeyToConfigName, config.second->mDirName+"/"+config.second->mConfigName);
MergeJson(remoteInstanceConfig,
config.second->GetConfig(),
mRemoteInstanceConfigKeyToConfigName,
config.second->mDirName + "/" + config.second->mConfigName);
}
}
if (localInstanceConfig != mLocalInstanceConfig || mRemoteInstanceConfig != remoteInstanceConfig) {
Expand Down Expand Up @@ -1760,7 +1779,7 @@ void AppConfig::RegisterCallback(const std::string& key, std::function<bool()>*
mCallbacks[key] = callback;
}

template<typename T>
template <typename T>
T AppConfig::MergeConfig(const T& defaultValue,
const T& currentValue,
const std::string& name,
Expand Down
8 changes: 6 additions & 2 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ std::string GetProfileSnapshotDumpFileName();
std::string GetObserverEbpfHostPath();
std::string GetSendBufferFileNamePrefix();
std::string GetLegacyUserLocalConfigFilePath();
std::string GetExactlyOnceCheckpoint();
std::string GetExactlyOnceCheckpoint();
std::string GetPipelineConfigDir();

template <class T>
class DoubleBuffer {
Expand Down Expand Up @@ -224,7 +225,10 @@ class AppConfig {
*/
void CheckAndAdjustParameters();
void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson);
void MergeJson(Json::Value& mainConfJson, const Json::Value& subConfJson, std::unordered_map<std::string, std::string>& keyToConfigName, const std::string& configName);
void MergeJson(Json::Value& mainConfJson,
const Json::Value& subConfJson,
std::unordered_map<std::string, std::string>& keyToConfigName,
const std::string& configName);
/**
* @brief Load *.json from config.d dir
*
Expand Down
30 changes: 17 additions & 13 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ Application::Application() : mStartTime(time(nullptr)) {
}

void Application::Init() {
// get last crash info
string backTraceStr = GetCrashBackTrace();
if (!backTraceStr.empty()) {
LOG_ERROR(sLogger, ("last logtail crash stack", backTraceStr));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
}
if (BOOL_FLAG(ilogtail_disable_core)) {
InitCrashBackTrace();
}

// change working dir to ./${ILOGTAIL_VERSION}/
string processExecutionDir = GetProcessExecutionDir();
AppConfig::GetInstance()->SetProcessExecutionDir(processExecutionDir);
Expand Down Expand Up @@ -123,7 +113,21 @@ void Application::Init() {

// Initialize basic information: IP, hostname, etc.
LogFileProfiler::GetInstance();

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->LoadRegionConfig();
if (GlobalConf::Instance()->mStartWorkerStatus == "Crash") {
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM, "Logtail Restart");
}
// get last crash info
string backTraceStr = GetCrashBackTrace();
if (!backTraceStr.empty()) {
LOG_ERROR(sLogger, ("last logtail crash stack", backTraceStr));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
}
if (BOOL_FLAG(ilogtail_disable_core)) {
InitCrashBackTrace();
}
#endif
// override process related params if designated by user explicitly
const string& interface = AppConfig::GetInstance()->GetBindInterface();
const string& configIP = AppConfig::GetInstance()->GetConfigIP();
Expand Down Expand Up @@ -192,7 +196,7 @@ void Application::Init() {
}

void Application::Start() { // GCOVR_EXCL_START
LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogtailMonitor::GetInstance()->UpdateConstMetric("start_time", LogFileProfiler::mStartTime);

#if defined(__ENTERPRISE__) && defined(_MSC_VER)
Expand All @@ -206,7 +210,7 @@ void Application::Start() { // GCOVR_EXCL_START
{
// add local config dir
filesystem::path localConfigPath
= filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / "pipeline_config" / "local";
= filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetPipelineConfigDir() / "local";
error_code ec;
filesystem::create_directories(localConfigPath, ec);
if (ec) {
Expand Down
1 change: 1 addition & 0 deletions core/common/Constants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const std::string EXPIRE_DAY = "expire_day";
const std::string DEFAULT_CONTENT_KEY = "content";
const std::string DEFAULT_REG = "(.*)";

const std::string AGENT_NAME = "LoongCollector";
const std::string LOONGCOLLECTOR_CONFIG = "loongcollector_config.json";

} // namespace logtail
1 change: 1 addition & 0 deletions core/common/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ extern const std::string EXPIRE_DAY;
extern const std::string DEFAULT_CONTENT_KEY; //"content"
extern const std::string DEFAULT_REG; //"(.*)"

extern const std::string AGENT_NAME;
extern const std::string LOONGCOLLECTOR_CONFIG;

} // namespace logtail
4 changes: 4 additions & 0 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,10 @@ bool PipelineConfig::ReplaceEnvVar() {

bool LoadConfigDetailFromFile(const filesystem::path& filepath, Json::Value& detail) {
const string& ext = filepath.extension().string();
const string& configName = filepath.stem().string();
if (configName == "region_config") {
return false;
}
if (ext != ".yaml" && ext != ".yml" && ext != ".json") {
LOG_WARNING(sLogger, ("unsupported config file format", "skip current object")("filepath", filepath));
return false;
Expand Down
10 changes: 5 additions & 5 deletions core/config/common_provider/CommonConfigProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

#include "config/feedbacker/ConfigFeedbackable.h"
#include "config/provider/ConfigProvider.h"
#include "protobuf/config_server/v2/agent.pb.h"
#include "protobuf/config_server/v2/agentV2.pb.h"

namespace logtail {

Expand Down Expand Up @@ -74,7 +74,7 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable {
configserver::proto::v2::HeartbeatResponse&);

virtual bool FetchInstanceConfig(::configserver::proto::v2::HeartbeatResponse&,
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);

virtual bool FetchPipelineConfig(::configserver::proto::v2::HeartbeatResponse&,
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);
Expand All @@ -83,12 +83,12 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable {
virtual void FillAttributes(::configserver::proto::v2::AgentAttributes& attributes);
void UpdateRemotePipelineConfig(
const google::protobuf::RepeatedPtrField<configserver::proto::v2::ConfigDetail>& configs);
void
UpdateRemoteInstanceConfig(const google::protobuf::RepeatedPtrField<configserver::proto::v2::ConfigDetail>& configs);
void UpdateRemoteInstanceConfig(
const google::protobuf::RepeatedPtrField<configserver::proto::v2::ConfigDetail>& configs);

virtual bool
FetchInstanceConfigFromServer(::configserver::proto::v2::HeartbeatResponse&,
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);
virtual bool
FetchPipelineConfigFromServer(::configserver::proto::v2::HeartbeatResponse&,
::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>&);
Expand Down
2 changes: 1 addition & 1 deletion core/config/provider/ConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace logtail {
void ConfigProvider::Init(const string& dir) {
// default path: /etc/ilogtail/config/${dir}
mPipelineSourceDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir());
mPipelineSourceDir /= "pipeline_config";
mPipelineSourceDir /= GetPipelineConfigDir();
mPipelineSourceDir /= dir;

mInstanceSourceDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir());
Expand Down
3 changes: 3 additions & 0 deletions core/config/watcher/InstanceConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ InstanceConfigDiff InstanceConfigWatcher::CheckConfigDiff() {

const filesystem::path& path = entry.path();
const string& configName = path.stem().string();
if (configName == "region_config") {
continue;
}
const string& filepath = path.string();
if (!filesystem::is_regular_file(entry.status(ec))) {
LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath));
Expand Down
38 changes: 31 additions & 7 deletions core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,38 @@ macro(link_protobuf target_name)
endif ()
endmacro()
logtail_define(protobuf_BIN "Absolute path to protoc" "${DEPS_BINARY_ROOT}/protoc")
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls")
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})
set(PROTO_PUBLIC_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../protobuf_public/models")
set(PROTO_PUBLIC_OUTPUT_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/models")
set(PROTO_FILES log_event.proto metric_event.proto span_event.proto pipeline_event_group.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_PUBLIC_FILE_PATH} --cpp_out=${PROTO_PUBLIC_OUTPUT_PATH} ${PROTO_FILES})

function(compile_proto PROTO_PATH OUTPUT_PATH PROTO_FILES)
file(MAKE_DIRECTORY ${OUTPUT_PATH})
execute_process(COMMAND ${protobuf_BIN}
--proto_path=${PROTO_PATH}
--cpp_out=${OUTPUT_PATH}
${PROTO_FILES})
endfunction()

compile_proto(
"${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls"
"${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls"
"sls_logs.proto;logtail_buffer_meta.proto;metric.proto;checkpoint.proto"
)

compile_proto(
"${CMAKE_CURRENT_SOURCE_DIR}/../protobuf_public/models"
"${CMAKE_CURRENT_SOURCE_DIR}/protobuf/models"
"log_event.proto;metric_event.proto;span_event.proto;pipeline_event_group.proto"
)

compile_proto(
"${CMAKE_CURRENT_SOURCE_DIR}/../config_server/protocol/v1"
"${CMAKE_CURRENT_SOURCE_DIR}/protobuf/config_server/v1"
"agent.proto"
)

compile_proto(
"${CMAKE_CURRENT_SOURCE_DIR}/../config_server/protocol/v2"
"${CMAKE_CURRENT_SOURCE_DIR}/protobuf/config_server/v2"
"agentV2.proto"
)
# re2
macro(link_re2 target_name)
if (re2_${LINK_OPTION_SUFFIX})
Expand Down
3 changes: 2 additions & 1 deletion core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
#endif
mEndpoint = TrimString(mEndpoint);
if (!mEndpoint.empty()) {
SLSClientManager::GetInstance()->AddEndpointEntry(mRegion, StandardizeEndpoint(mEndpoint, mEndpoint));
SLSClientManager::GetInstance()->AddEndpointEntry(
mRegion, StandardizeEndpoint(mEndpoint, mEndpoint), false, SLSClientManager::EndpointSourceType::LOCAL);
}
}
#ifdef __ENTERPRISE__
Expand Down
26 changes: 19 additions & 7 deletions core/plugin/flusher/sls/SLSClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ using namespace std;

namespace logtail {

bool SLSClientManager::RegionEndpointsInfo::AddDefaultEndpoint(const std::string& endpoint) {
mDefaultEndpoint = endpoint;
bool SLSClientManager::RegionEndpointsInfo::AddDefaultEndpoint(const std::string& endpoint,
const EndpointSourceType& endpointType,
bool& isDefault) {
if (mDefaultEndpoint.empty()
|| (endpointType == EndpointSourceType::LOCAL && mDefaultEndpointType == EndpointSourceType::REMOTE)) {
mDefaultEndpoint = endpoint;
mDefaultEndpointType = endpointType;
isDefault = true;
}
return AddEndpoint(endpoint, true, false);
}

Expand Down Expand Up @@ -299,14 +306,19 @@ void SLSClientManager::CleanTimeoutClient() {
}
}

void SLSClientManager::AddEndpointEntry(const string& region, const string& endpoint, bool isDefault, bool isProxy) {
void SLSClientManager::AddEndpointEntry(const string& region,
const string& endpoint,
bool isProxy,
const EndpointSourceType& endpointType) {
lock_guard<mutex> lock(mRegionEndpointEntryMapLock);
RegionEndpointsInfo& info = mRegionEndpointEntryMap[region];
if (isDefault) {
if (info.AddDefaultEndpoint(endpoint)) {
if (!isProxy) {
bool isDefault = false;
if (info.AddDefaultEndpoint(endpoint, endpointType, isDefault)) {
LOG_INFO(sLogger,
("add default data server endpoint, region",
region)("endpoint", endpoint)("isProxy", "false")("#endpoint", info.mEndpointInfoMap.size()));
("add data server endpoint, region", region)("endpoint", endpoint)(
"isDefault", isDefault ? "yes" : "no")("isProxy", "false")("#endpoint",
info.mEndpointInfoMap.size()));
}
} else {
if (info.AddEndpoint(endpoint, true, isProxy)) {
Expand Down
Loading

0 comments on commit 8493e0c

Please sign in to comment.